Airflow operators
- 일반적을 독립적으로 실행한다.
- 서로 정보를 공유하지 않는다.
- 다양한 작업의 연산자들이 있다.
BashOperator
BashOperator( task_id='bash_example', bash_command='echo "Example!"', dag=ml_dag) BashOperator( task_id='bash_script_example', bash_command='runcleanup.sh', dag=ml_dag)
- Bashoperator는 주어진 명령을 bash에서 실행한다.
- 명령은 임시 디렉토리에서 실행되고 자동으로 정리된다.
- 환경 변수 설정도 가능하다.
- 여러 개의 연산은 위 코드처럼 나열해서 dag만 공통적으로 등록해주면 된다.
예제
from airflow.operators.bash_operator import BashOperator example_task = BashOperator(task_id='bash_ex', bash_command='echo 1', dag=dag) bash_task = BashOperator(task_id='clean_addresses', bash_command='cat addresses.txt | awk "NF==10" > cleaned.txt', dag = dag)
bash_command에 원하는 bash 명령어를 작성하면 된다.
Operator 사용 시 발생하는 문제점
- 항상 명령이 같은 환경, 위치에서 실행된다는 보장이 없다.
- BashOperator에 대한 환경변수 설정이 요구될 수 있다.
- AWS 자격 증명, 데이터베이스 연결 정보 등
- 리소스에 접근 시 사용자 권한 설정이 필요하다.
- 어떤 계정이 접근 가능한지 모르면 루트 명령이 필요하다.
Airflow tasks
- task는 인스턴스화 된 연산자
- 파이썬에서 일반적으로 변수에 할당된다.
- Airflow tools 안에서는 task_id로 참조된다.
- airflow의 task dependencies는 작업 완료 순서를 정의한다.
- 종속성이 정의되지않은 작업은 순서 보장없이 처리된다.
- 작업은 upstream, downstream으로 나뉘고, upstream은 downstream 작업 전에 완료해야 함
- Airflow 1.8 이후 버전에서, 종속성을 나타낼때 비트쉬프트 연산자를 사용한다.
- >> upstream operator
- << downstream operator
task1 = BashOperator(task_id='first_task', bash_command='echo 1', dag=example_dag) task2 = BashOperator(task_id='second_task', bash_command='echo 2', dag=example_dag) task1 >> task2 # 업스트림
여러 개 작업의 종속성을 나타내는 것은 간단히 비트쉬프트연산자 사용하면 된다.
task1 >> task2 >> task3 >> task4
task1 >> task2 << task3 또는 #누가 먼저 수행될지 모름 task1 >> task2 task3 >> task2
PythonOperator
- BashOperator처럼 python 함수를 사용하고 호출하는 연산자
from airflow.operators.python_operator import PythonOperator def printme(): print("This goes in the logs!") python_task = PythonOperator( task_id='simple_print', python_callable=printme, dag=example_dag )
- PythonOperator는 arguments 추가도 지원한다.
op_kwargs
를 통해 호출 함수에 전달할 인수를 넣을 수 있다.- 파라미터 이름을 정확히 key에 명시해야한다.
def sleep(length_of_time): time.sleep(length_of_time) sleep_task = PythonOperator( task_id='sleep', python_callable=sleep, op_kwargs={'length_of_time' : 5}, dag=example_dag )
EmailOperator
- 메일링 서비스가 가능한 연산자.
airflow.operators
안에 있다.
from airflow.operators.email_operator import EmailOperator email_task = EmailOperator( task_id='email_sales_report', to='sales_manager@example.com', subject='Automated Sales Report', html_content='Attched is the latest sales report', files='latest_sales.xlsx', dag=example_dag )
Airflow scheduling
이제까지 airflow task 구현과 종속성 설정에 대해 배웠다.
이제 예약하고 자동 실행되는 스케쥴링에 대해 배워보자.
DAG 실행
- 구체적인 시점에 workflow가 수행될 수 있도록 한다.
- 수동 또는 일정 간격을 나타낸 매개 변수(
schedule_interval
) 값을 통해 실행할 수 있다.
- 각 작업에 대한 상태는 세 가지를 갖는다.
- running
- failed
- success
- Airflow UI 내에서 DAG Runs 페이지에서 상태 확인이 가능하다.
- DAG 스케쥴링
start_date
: DAG 실행을 초기 예약할 수 있는 시간 지정
end_date
: 실행 멈출 시간 지정
max_tries
: DAG 실행이 실패하기 전 재시도 횟수
schedule_interval
: 실행 빈도- schedule_interval은 DAG 실행 예약 빈도이다.
- start_date와 end_date 사이에 발생한다.
- cron 구문 또는 내장된 설정을 통해 몇 가지 방법으로 정의할 수 있다.
- 자주 사용되는 presets
- @hourly - 한 시간에 한 번 실행 —— 0 * * * *
- @daily - 하루에 한 번 실행 ——— 0 0 * * *
- @weekly - 일주일에 한 번 실행 —— 0 0 * * 0
- @monthly - 한 달에 한 번 실행 —— 0 0 0 * 0
- @yearly - 연에 한 번 실행 —— 0 0 0 0 0
- 특수 presets
- None - 수동으로 DAG를 설정함
- @once - 한 번만 예약
- 설정 날짜를 초과해야지만 DAG 수행이 된다.
- start_date에 20210525로 설정되어 있는 경우, 202105026부터 적용
# Update the scheduling arguments as defined default_args = { 'owner': 'Engineering', 'start_date': datetime(2019,11,1), 'email': ['airflowresults@datacamp.com'], 'email_on_failure': False, 'email_on_retry': False, 'retries': 3, 'retry_delay': timedelta(minutes=20) } #매주 수요일 오후 12시 30분 dag = DAG('update_dataflows', default_args=default_args, schedule_interval='30 12 * * 3')
Uploaded by Notion2Tistory v1.1.0