템플릿 사용하기
- 템플릿을 사용하면 DAG 실행 중 정보를 대체할 수 있다.
- 템플릿 정보를 가지고 있는 DAG가 실행되면 정보가 해석되어 DAG 실행에 포함된다.
- task 정의 시, 추가적인 유연성을 제공한다.
- jinja 템플릿 언어를 사용하여 작성할 수 있다.
- Jinja는 Python 프로그래밍 언어용 웹 템플릿 엔진
템플릿에 대한 이해 전, 템플릿을 사용하지 않은 배쉬 연산자를 사용하는 예제를 보자.
from airflow.operators.bash_operator import BashOperator t1 = BashOperator( task_id = 'first_task', bash_command = 'echo "Reading file1.txt"', dag = dag ) t2 = BashOperator( task_id = 'second_task', bash_command = 'echo "Reading file2.txt"', dag = dag )
위 같은 작업을 수행할 때, 만약 파일이 100개라면 t100까지 만들어야할까..?
템플릿을 사용하면 다음처럼 코드를 바꿀 수 있다.
templated_command = """ echo "Reading {{ params.filename }}" """ t1 = BashOperator( task_id = 'template_task', bash_command = templated_command, params={'filename':'file1.txt'} dag = dag ) t2 = BashOperator( task_id = 'template_task', bash_command = templated_command, params={'filename':'file2.txt'} dag = dag )
..? 커맨드를 직접 명시하는 방법에서 재활용 가능하도록 하는 것만 바뀜..
이 상태에서는 템플릿을 쓰는게 더 불편해 보임
템플릿을 사용할 때의 장점
jinja를 사용하면 반복문을 사용하는 템플릿을 구성할 수 있다. (web 개발에서 javascript 라이브러리 사용하는 거랑 비슷한듯..?)
templated_command=""" {% for filename in params.filenames %} echo "Reading {{ filename }} {% endfor %} """ t1 = BashOperator( task_id = 'template_task', bash_command = templated_command, params = {'filenames'=['file1.txt','file2.txt']}, dag = example_dag )
이렇게 되면 템플릿을 사용하는 것이 매우 효율적인 방법이 된다.
템플릿 변수
- Airflow에서는 빌트인 런타임 변수를 제공함
- DAG 실행, 개별 작업 및 시스템 구성에 대한 여러 정보를 제공
- 예시
- 날짜
{{ ds }}
: 실행 날짜 YYYY-MM-DD
{{ ds_nodash}}
: 실행 날짜 YYYYMMDD
{{ prev_ds}}
: 이전에 실행된 날짜 YYYY-MM-DD
{{ prev_ds_nodash}}
: 이전에 실행된 날짜 YYYYMMDD
- 객체
{{ dag }}
: DAG 객체
{{ conf }}
: Airflow config 객체
- Airflow 변수 외 매크로 변수
{{ macros.datetime }}
: datetime.datetime
{{ macros.timedelta }}
: datetime.timedelta
{{ macros.uuid }}
: pythonuuid
(16바이트 고유 숫자값)
{{ macros.ds_add('2020-4-15', 5) }}
: 첫번째 인수날짜의 두번째 인수 후 또는 전날 값 반환
- 날짜
브랜치
- Airflow branch는 조건에 따라 작업을 선택적으로 실행, 스킵하는 조건부 논리 기능이 제공된다.
BranchPythonOperator
- 예제
- 이미 DAG 정의가 모두 완성된 후라고 가정
from airflow.operators.python_operator import BranchPythonOperator def branch_test(**kwargs): if int(kwargs['ds_nodash'])%2==0: return 'even_day_task' else: return 'odd_day_task' branch_task = BranchPythonOperator( task_id='branch_task', dag=dag, provide_context=True, python_callable=branch_test ) #start_task는 한 번만 해주면 됨 start_task >> branch_task >> even_day_task >> even_day_task2 branch_task >> odd_day_task >> odd_day_task2
- 이미 DAG 정의가 모두 완성된 후라고 가정
프로덕션 파이프라인 구축
DAG, Task 실행시키기
- 특정 작업 수행
airflow run <dag id> <task id> <date>
- 전체 DAG 수행
airflow trigger_dag -e <date> <dag id>
Operator 사용
- BashOperator
bash_command
에 실행 커맨드 작성
- PythonOperator
python_callable
에 실행 파이썬 함수 작성
- BranchPythonOperator
python_callable
(**kwargs를 받을 수 있는 함수)과provide_context=True
를 사용해야하고,**kwargs
를 통해 인자 받음
- FileSensor
filepath
에 파일 경로를 작성하고mode
,poke_interval
속성이 필요할 수 있다.
Template 사용
어떤 객체의 어떤 필드에서 템플릿을 적용할 수 있는지 다 기억할 수 없기 때문에 파이썬에서 확인하고 사용하면 된다.
연습문제
from airflow.models import DAG from airflow.contrib.sensors.file_sensor import FileSensor from airflow.operators.bash_operator import BashOperator from airflow.operators.python_operator import PythonOperator from airflow.operators.python_operator import BranchPythonOperator from airflow.operators.dummy_operator import DummyOperator from airflow.operators.email_operator import EmailOperator from dags.process import process_data from datetime import datetime, timedelta # Update the default arguments and apply them to the DAG. default_args = { 'start_date': datetime(2019,1,1), 'sla': timedelta(minutes=90) } dag = DAG(dag_id='etl_update', default_args=default_args) #sensor,operator sensor = FileSensor(task_id='sense_file', filepath='/home/repl/workspace/startprocess.txt', poke_interval=45, dag=dag) bash_task = BashOperator(task_id='cleanup_tempfiles', bash_command='rm -f /home/repl/*.tmp', dag=dag) python_task = PythonOperator(task_id='run_processing', python_callable=process_data, provide_context=True, dag=dag) #template email_subject=""" Email report for {{ params.department }} on {{ ds_nodash }} """ email_report_task = EmailOperator(task_id='email_report_task', to='sales@mycompany.com', subject=email_subject, html_content='', params={'department': 'Data subscription services'}, dag=dag) no_email_task = DummyOperator(task_id='no_email_task', dag=dag) #branching def check_weekend(**kwargs): dt = datetime.strptime(kwargs['execution_date'],"%Y-%m-%d") # If dt.weekday() is 0-4, it's Monday - Friday. If 5 or 6, it's Sat / Sun. if (dt.weekday() < 5): return 'email_report_task' else: return 'no_email_task' branch_task = BranchPythonOperator(task_id='check_if_weekend', python_callable=check_weekend, provide_context=True, dag=dag) #dependencies sensor >> bash_task >> python_task python_task >> branch_task >> [email_report_task, no_email_task]
Uploaded by Notion2Tistory v1.1.0