앎을 경계하기

[가짜연구소3기] Data Engineer

[가짜연구소 3기] 데이터 엔지니어링 58 - Building production pipelines in Airflow

양갱맨 2021. 10. 18. 09:44

템플릿 사용하기

  • 템플릿을 사용하면 DAG 실행 중 정보를 대체할 수 있다.
  • 템플릿 정보를 가지고 있는 DAG가 실행되면 정보가 해석되어 DAG 실행에 포함된다.
  • task 정의 시, 추가적인 유연성을 제공한다.
  • jinja 템플릿 언어를 사용하여 작성할 수 있다.
    • Jinja는 Python 프로그래밍 언어용 웹 템플릿 엔진

템플릿에 대한 이해 전, 템플릿을 사용하지 않은 배쉬 연산자를 사용하는 예제를 보자.

from airflow.operators.bash_operator import BashOperator  t1 = BashOperator( 	task_id = 'first_task', 	bash_command = 'echo "Reading file1.txt"', 	dag = dag )  t2 = BashOperator( 	task_id = 'second_task', 	bash_command = 'echo "Reading file2.txt"', 	dag = dag )

위 같은 작업을 수행할 때, 만약 파일이 100개라면 t100까지 만들어야할까..?

템플릿을 사용하면 다음처럼 코드를 바꿀 수 있다.

templated_command = """ 	echo "Reading {{ params.filename }}" """  t1 = BashOperator( 	task_id = 'template_task', 	bash_command = templated_command, 	params={'filename':'file1.txt'} 	dag = dag ) t2 = BashOperator( 	task_id = 'template_task', 	bash_command = templated_command, 	params={'filename':'file2.txt'} 	dag = dag )

..? 커맨드를 직접 명시하는 방법에서 재활용 가능하도록 하는 것만 바뀜..

이 상태에서는 템플릿을 쓰는게 더 불편해 보임

템플릿을 사용할 때의 장점

jinja를 사용하면 반복문을 사용하는 템플릿을 구성할 수 있다. (web 개발에서 javascript 라이브러리 사용하는 거랑 비슷한듯..?)

templated_command=""" {% for filename in params.filenames %} 	echo "Reading {{ filename }} {% endfor %} """  t1 = BashOperator( 	task_id = 'template_task', 	bash_command = templated_command, 	params = {'filenames'=['file1.txt','file2.txt']}, 	dag = example_dag )

이렇게 되면 템플릿을 사용하는 것이 매우 효율적인 방법이 된다.

템플릿 변수

  • Airflow에서는 빌트인 런타임 변수를 제공함
  • DAG 실행, 개별 작업 및 시스템 구성에 대한 여러 정보를 제공
  • 예시
    • 날짜
      • {{ ds }} : 실행 날짜 YYYY-MM-DD
      • {{ ds_nodash}} : 실행 날짜 YYYYMMDD
      • {{ prev_ds}} : 이전에 실행된 날짜 YYYY-MM-DD
      • {{ prev_ds_nodash}} : 이전에 실행된 날짜 YYYYMMDD
    • 객체
      • {{ dag }} : DAG 객체
      • {{ conf }} : Airflow config 객체
    • Airflow 변수 외 매크로 변수
      • {{ macros.datetime }} : datetime.datetime
      • {{ macros.timedelta }} : datetime.timedelta
      • {{ macros.uuid }} : python uuid (16바이트 고유 숫자값)
      • {{ macros.ds_add('2020-4-15', 5) }} : 첫번째 인수날짜의 두번째 인수 후 또는 전날 값 반환
    더 필요하면 Airflow docs 참고하기
  •  

브랜치

  • Airflow branch는 조건에 따라 작업을 선택적으로 실행, 스킵하는 조건부 논리 기능이 제공된다.
  • BranchPythonOperator
  • 예제
    • 이미 DAG 정의가 모두 완성된 후라고 가정
      from airflow.operators.python_operator import BranchPythonOperator  def branch_test(**kwargs): 	if int(kwargs['ds_nodash'])%2==0: 		return 'even_day_task' 	else: 		return 'odd_day_task'  branch_task = BranchPythonOperator( 	task_id='branch_task', 	dag=dag, 	provide_context=True, 	python_callable=branch_test )  #start_task는 한 번만 해주면 됨 start_task >> branch_task >> even_day_task >> even_day_task2 branch_task >> odd_day_task >> odd_day_task2

프로덕션 파이프라인 구축

DAG, Task 실행시키기

  • 특정 작업 수행
    • airflow run <dag id> <task id> <date>
  • 전체 DAG 수행
    • airflow trigger_dag -e <date> <dag id>

Operator 사용

  • BashOperator
    • bash_command 에 실행 커맨드 작성
  • PythonOperator
    • python_callable 에 실행 파이썬 함수 작성
  • BranchPythonOperator
    • python_callable (**kwargs를 받을 수 있는 함수)과 provide_context=True를 사용해야하고, **kwargs를 통해 인자 받음
  • FileSensor
    • filepath 에 파일 경로를 작성하고 mode, poke_interval 속성이 필요할 수 있다.

Template 사용

어떤 객체의 어떤 필드에서 템플릿을 적용할 수 있는지 다 기억할 수 없기 때문에 파이썬에서 확인하고 사용하면 된다.

 

 

연습문제

from airflow.models import DAG from airflow.contrib.sensors.file_sensor import FileSensor from airflow.operators.bash_operator import BashOperator from airflow.operators.python_operator import PythonOperator from airflow.operators.python_operator import BranchPythonOperator from airflow.operators.dummy_operator import DummyOperator from airflow.operators.email_operator import EmailOperator from dags.process import process_data from datetime import datetime, timedelta  # Update the default arguments and apply them to the DAG.  default_args = {   'start_date': datetime(2019,1,1),   'sla': timedelta(minutes=90) }      dag = DAG(dag_id='etl_update', default_args=default_args)  #sensor,operator sensor = FileSensor(task_id='sense_file',                      filepath='/home/repl/workspace/startprocess.txt',                     poke_interval=45,                     dag=dag)  bash_task = BashOperator(task_id='cleanup_tempfiles',                           bash_command='rm -f /home/repl/*.tmp',                          dag=dag)  python_task = PythonOperator(task_id='run_processing',                               python_callable=process_data,                              provide_context=True,                              dag=dag)  #template email_subject="""   Email report for {{ params.department }} on {{ ds_nodash }} """   email_report_task = EmailOperator(task_id='email_report_task',                                   to='sales@mycompany.com',                                   subject=email_subject,                                   html_content='',                                   params={'department': 'Data subscription services'},                                   dag=dag)   no_email_task = DummyOperator(task_id='no_email_task', dag=dag)   #branching def check_weekend(**kwargs):     dt = datetime.strptime(kwargs['execution_date'],"%Y-%m-%d")     # If dt.weekday() is 0-4, it's Monday - Friday. If 5 or 6, it's Sat / Sun.     if (dt.weekday() < 5):         return 'email_report_task'     else:         return 'no_email_task'           branch_task = BranchPythonOperator(task_id='check_if_weekend',                                    python_callable=check_weekend,                                    provide_context=True,                                    dag=dag)  #dependencies   sensor >> bash_task >> python_task  python_task >> branch_task >> [email_report_task, no_email_task]
 

Notion2Tistory

 

boltlessengineer.github.io