Airflow Sensors
- 특정 조건이 참이 되길 기다리는 연산자
- 파일 생성, 데이터베이스에 레코드 업로드, 웹 요청의 특정 응답
- 일반 작업과 마찬가지로 task에 할당된다.
- 비트 시프트 종속성 사용 가능
airflow.sensors.base_sensor_operator
클래스에서 파생
- 센서 인자들
mode
- 센서 상태 확인mode='poke'
- 완료시까지 계속 확인
mode='reschedule'
- 다른 슬롯을 사용할 수 있을때까지 기다림
poke_interval
- poke모드에서 airflow에 조건을 확인하는 빈도를 알려줌
timeout
- 작업 실패로 표시하기 전에 대기하는 시간
File sensor
- 파일 시스템의 특정 위치에 파일이 있는지 확인함
- 주어진 디렉토리 내 모든 파일이 확인 가능하다.
from airflow.contrib.sensors.file_sensor import FileSensor file_sensor_task = FileSensor(task_id='file_sense', filepath='salesdata.csv', poke_interval=300, dag=sales_report_dag) init_sales_cleanup >> file_sensor_task >> generate_report
다른 센서들
ExternalTaskSensor
- DAG에 있는 작업이 완료될 때까지 기다림
HttpSensor
- 웹 URL을 요청하고 확인할 콘텐츠를 정의할 수 있다
SqlSensor
- SQL 쿼리를 실행하고 콘텐츠를 확인한다.
airflow.sensors
,airflow.contrib.sensors
라이브러리 내에서 다른 센서들을 사용할 수 있다.
센서를 언제 사용할까
- 언제 참이 될지 불확실한 경우
- 전체 DAG를 즉시 실패하지 않으려는 경우
- DAG에 주기 추가 없이 반복적으로 검사하려는 경우
Airflow Executors
- workflow 내 정의되어 있는 tasks를 실제로 실행하는 요소
- 각 executors들은 task를 실행하는 각각의 기능과 동작을 가지고 있다.
- 로컬에서 각각 실행시키거나 클러스터 내 모든 시스템간 개별 작업 분할 가능(작업자 슬롯 수)
- executors 예
SequentialExecutor
LocalExecutor
CeleryExecutor
SequentialExecutor
- 기본 executor
- 한 번에 하나의 task 실행 - 여러 워크플로우 실행 시 예상 시간보다 오래 걸림
- 디버깅이 쉬움
- 기능적인 반면에 실제 프로덕션에선 추천되지 않음.
LocalExecutor
- 싱글 시스템에서 실행됨
- 각 작업을 로컬 시스템의 프로세스로 취급하여 여러 작업 실행이 가능함
- parallelism을 사용자가 정의할 수 있다.
CeleryExecutor
- 여러 시스템이 기본 클러스터로 통신할 수 있게 파이썬으로 작성된 일반 대기열 시스템
- 여러 Airflow 시스템을 특정 workflow에 대한 작업자로 구성할 수 있다.
- 잘 활용하기 위해 기능을 사용하려면 설정이나 구성이 훨씬 어려워진다.
- 여러 workflows를 구성하기 위한 방법들이 있다.
사용 중인 Executor 확인
airflow.cfg
내에 저장되어 있는executor =
부분을 확인한다.cat airflow/airflow.cfg | grep "executor = " executor = SequentialExecutor
airflow list_dags
를 통해 실행 프로그램 확인이 가능하다.
Airflow에서 Debugging, troubleshooting
작업하는 동안 발생할 수 있는 문제들
- 일정에 따라 실행되지 않는 DAG
- 스케줄러가 실행되지 않는 경우 -
airflow scheduler
로 해결 가능
schedule_interval
시간이 지나지 않은 경우 - 요구 사항에 맞게 날짜 수정
- 작업을 수행할 공간 여유가 없는 경우
- 여러 작업 수행이 가능한 executor type으로 변경
- 시스템 또는 리소스 추가
- DAG 일정 변경
- 스케줄러가 실행되지 않는 경우 -
- 시스템에 로드되지 않는 dag
- Web UI나 터미널 명령으로 확인할 수 없다.
- Python 파일이 있어야할 폴더 안에 있는지 확인하기
airflow.cfg
에 DAG 폴더 설정확인하기
- 구문 오류
- DAG 파일이 나타나지 않는 경우
- DAG에서 에러 찾는 게 어려울 때도 있다.
- python/Airflow 용 편집기 설정이 되지 않은 경우
airflow list_dags
python3 <dagfile.py>
SLAs and reporting in Airflow
- SLA : Service Leavel Agreement
- 가동 시간 또는 가용성 보장이 아닌 task 또는 DAG를 실행하기 위해 필요한 시간
- SLA miss는 task 또는 DAG가 SLA 예상 타이밍을 충족하지 못하는 모든 상황
- SLA 누락 시, 시스템 구성에 따라 메일 경고 전송, 로그 메모 작성
- Web UI에서 SLA miss를 확인할 수 있다.
SLA 정의하기
- task의
sla
인자를 사용한다.
task1 = BashOperator(task_id='sla_task', bash_command='runcode.sh', sla=timedelta(seconds=30), dag=dag)
default-args
딕셔너리 사용
default-args={ 'sla': timedelta(minutes=20), 'start_date': datetime(2020,2,20) } dag = DAG('sla_dag', default_args=default_args)
timedelta
는 datetime
라이브러리 안에 있다.
datetime.timedelta
모듈은 두 날짜의 차이 기간을 나타낼 때 사용하는 모듈
일반적인 리포팅
- Airflow에는 기본적으로 상태(성공,실패,오류)에 대한 메시지 전송 옵션이 있다.
default_args
딕셔너리에서 설정 가능함default_args={ 'email' = ['airflowalerts@datacamp.com'], 'email_on_failure' : True, 'email_on_retry' : False, 'email_on_success' : True, ... }
이메일 구성 설정 방법 등 자세한 설명은 Airflow doc을 봐야할 것 같다.
Uploaded by Notion2Tistory v1.1.0