앎을 경계하기

[가짜연구소3기] Data Engineer

[가짜연구소 3기] 데이터 엔지니어링 57 - Maintaining and monitoring Airflow workflows

양갱맨 2021. 10. 18. 09:44

Airflow Sensors

  • 특정 조건이 참이 되길 기다리는 연산자
  • 파일 생성, 데이터베이스에 레코드 업로드, 웹 요청의 특정 응답
  • 일반 작업과 마찬가지로 task에 할당된다.
    • 비트 시프트 종속성 사용 가능
  • airflow.sensors.base_sensor_operator 클래스에서 파생
  • 센서 인자들
    • mode - 센서 상태 확인
      • mode='poke' - 완료시까지 계속 확인
      • mode='reschedule' - 다른 슬롯을 사용할 수 있을때까지 기다림
    • poke_interval - poke모드에서 airflow에 조건을 확인하는 빈도를 알려줌
    • timeout - 작업 실패로 표시하기 전에 대기하는 시간

File sensor

  • 파일 시스템의 특정 위치에 파일이 있는지 확인함
  • 주어진 디렉토리 내 모든 파일이 확인 가능하다.
from airflow.contrib.sensors.file_sensor import FileSensor  file_sensor_task = FileSensor(task_id='file_sense', 	filepath='salesdata.csv', 	poke_interval=300, 	dag=sales_report_dag)  init_sales_cleanup >> file_sensor_task >> generate_report

다른 센서들

  • ExternalTaskSensor - DAG에 있는 작업이 완료될 때까지 기다림
  • HttpSensor - 웹 URL을 요청하고 확인할 콘텐츠를 정의할 수 있다
  • SqlSensor - SQL 쿼리를 실행하고 콘텐츠를 확인한다.
  • airflow.sensors , airflow.contrib.sensors 라이브러리 내에서 다른 센서들을 사용할 수 있다.

센서를 언제 사용할까

  • 언제 참이 될지 불확실한 경우
  • 전체 DAG를 즉시 실패하지 않으려는 경우
  • DAG에 주기 추가 없이 반복적으로 검사하려는 경우

Airflow Executors

  • workflow 내 정의되어 있는 tasks를 실제로 실행하는 요소
  • 각 executors들은 task를 실행하는 각각의 기능과 동작을 가지고 있다.
    • 로컬에서 각각 실행시키거나 클러스터 내 모든 시스템간 개별 작업 분할 가능(작업자 슬롯 수)
  • executors 예
    • SequentialExecutor
    • LocalExecutor
    • CeleryExecutor

SequentialExecutor

  • 기본 executor
  • 한 번에 하나의 task 실행 - 여러 워크플로우 실행 시 예상 시간보다 오래 걸림
  • 디버깅이 쉬움
  • 기능적인 반면에 실제 프로덕션에선 추천되지 않음.

LocalExecutor

  • 싱글 시스템에서 실행됨
  • 각 작업을 로컬 시스템의 프로세스로 취급하여 여러 작업 실행이 가능함
  • parallelism을 사용자가 정의할 수 있다.

CeleryExecutor

  • 여러 시스템이 기본 클러스터로 통신할 수 있게 파이썬으로 작성된 일반 대기열 시스템
  • 여러 Airflow 시스템을 특정 workflow에 대한 작업자로 구성할 수 있다.
  • 잘 활용하기 위해 기능을 사용하려면 설정이나 구성이 훨씬 어려워진다.
  • 여러 workflows를 구성하기 위한 방법들이 있다.

사용 중인 Executor 확인

  • airflow.cfg 내에 저장되어 있는 executor = 부분을 확인한다.
    • cat airflow/airflow.cfg | grep "executor = " executor = SequentialExecutor
  • airflow list_dags 를 통해 실행 프로그램 확인이 가능하다.

Airflow에서 Debugging, troubleshooting

작업하는 동안 발생할 수 있는 문제들

  1. 일정에 따라 실행되지 않는 DAG
    1. 스케줄러가 실행되지 않는 경우 - airflow scheduler로 해결 가능
    1. schedule_interval시간이 지나지 않은 경우 - 요구 사항에 맞게 날짜 수정
    1. 작업을 수행할 공간 여유가 없는 경우
      1. 여러 작업 수행이 가능한 executor type으로 변경
      1. 시스템 또는 리소스 추가
      1. DAG 일정 변경
  1. 시스템에 로드되지 않는 dag
    1. Web UI나 터미널 명령으로 확인할 수 없다.
    1. Python 파일이 있어야할 폴더 안에 있는지 확인하기
    1. airflow.cfg 에 DAG 폴더 설정확인하기
  1. 구문 오류
    1. DAG 파일이 나타나지 않는 경우
    1. DAG에서 에러 찾는 게 어려울 때도 있다.
    1. python/Airflow 용 편집기 설정이 되지 않은 경우
      • airflow list_dags
      • python3 <dagfile.py>

SLAs and reporting in Airflow

  • SLA : Service Leavel Agreement
  • 가동 시간 또는 가용성 보장이 아닌 task 또는 DAG를 실행하기 위해 필요한 시간
  • SLA miss는 task 또는 DAG가 SLA 예상 타이밍을 충족하지 못하는 모든 상황
  • SLA 누락 시, 시스템 구성에 따라 메일 경고 전송, 로그 메모 작성
  • Web UI에서 SLA miss를 확인할 수 있다.

SLA 정의하기

  1. task의 sla 인자를 사용한다.
task1 = BashOperator(task_id='sla_task', 										bash_command='runcode.sh', 										sla=timedelta(seconds=30), 										dag=dag)
  1. default-args 딕셔너리 사용
default-args={ 	'sla': timedelta(minutes=20), 	'start_date': datetime(2020,2,20) } dag = DAG('sla_dag', default_args=default_args)

timedeltadatetime 라이브러리 안에 있다.

datetime.timedelta 모듈은 두 날짜의 차이 기간을 나타낼 때 사용하는 모듈

일반적인 리포팅

  • Airflow에는 기본적으로 상태(성공,실패,오류)에 대한 메시지 전송 옵션이 있다.
  • default_args 딕셔너리에서 설정 가능함
    default_args={ 	'email' = ['airflowalerts@datacamp.com'], 	'email_on_failure' : True, 	'email_on_retry' : False, 	'email_on_success' : True, 	... }

이메일 구성 설정 방법 등 자세한 설명은 Airflow doc을 봐야할 것 같다.