앎을 경계하기

[가짜연구소3기] Data Engineer

[가짜연구소 3기] 데이터 엔지니어링 56 - Implementing Airflow DAGs

양갱맨 2021. 10. 18. 09:43

Airflow operators

  • 일반적을 독립적으로 실행한다.
  • 서로 정보를 공유하지 않는다.
  • 다양한 작업의 연산자들이 있다.

BashOperator

BashOperator( 			task_id='bash_example', 			bash_command='echo "Example!"', 			dag=ml_dag)  BashOperator( 			task_id='bash_script_example', 			bash_command='runcleanup.sh', 			dag=ml_dag)
  • Bashoperator는 주어진 명령을 bash에서 실행한다.
  • 명령은 임시 디렉토리에서 실행되고 자동으로 정리된다.
  • 환경 변수 설정도 가능하다.
  • 여러 개의 연산은 위 코드처럼 나열해서 dag만 공통적으로 등록해주면 된다.

예제

from airflow.operators.bash_operator import BashOperator example_task = BashOperator(task_id='bash_ex', 														bash_command='echo 1', 														dag=dag)  bash_task = BashOperator(task_id='clean_addresses', 	bash_command='cat addresses.txt | awk "NF==10" > cleaned.txt', 	dag = dag)

bash_command에 원하는 bash 명령어를 작성하면 된다.

Operator 사용 시 발생하는 문제점

  • 항상 명령이 같은 환경, 위치에서 실행된다는 보장이 없다.
  • BashOperator에 대한 환경변수 설정이 요구될 수 있다.
    • AWS 자격 증명, 데이터베이스 연결 정보 등
  • 리소스에 접근 시 사용자 권한 설정이 필요하다.
    • 어떤 계정이 접근 가능한지 모르면 루트 명령이 필요하다.

Airflow tasks

  • task는 인스턴스화 된 연산자
  • 파이썬에서 일반적으로 변수에 할당된다.
  • Airflow tools 안에서는 task_id로 참조된다.
  • airflow의 task dependencies는 작업 완료 순서를 정의한다.
  • 종속성이 정의되지않은 작업은 순서 보장없이 처리된다.
  • 작업은 upstream, downstream으로 나뉘고, upstream은 downstream 작업 전에 완료해야 함
  • Airflow 1.8 이후 버전에서, 종속성을 나타낼때 비트쉬프트 연산자를 사용한다.
    • >> upstream operator
    • << downstream operator
task1 = BashOperator(task_id='first_task', 											bash_command='echo 1', 											dag=example_dag)  task2 = BashOperator(task_id='second_task', 											bash_command='echo 2', 											dag=example_dag)  task1 >> task2 # 업스트림 

여러 개 작업의 종속성을 나타내는 것은 간단히 비트쉬프트연산자 사용하면 된다.

task1 >> task2 >> task3 >> task4
task1 >> task2 << task3  또는  #누가 먼저 수행될지 모름 task1 >> task2 task3 >> task2

PythonOperator

  • BashOperator처럼 python 함수를 사용하고 호출하는 연산자
from airflow.operators.python_operator import PythonOperator def printme(): 	print("This goes in the logs!")  python_task = PythonOperator( 	task_id='simple_print', 	python_callable=printme, 	dag=example_dag )
  • PythonOperator는 arguments 추가도 지원한다.
  • op_kwargs를 통해 호출 함수에 전달할 인수를 넣을 수 있다.
    • 파라미터 이름을 정확히 key에 명시해야한다.
def sleep(length_of_time): 	time.sleep(length_of_time)  sleep_task = PythonOperator( 		task_id='sleep', 		python_callable=sleep, 		op_kwargs={'length_of_time' : 5}, 		dag=example_dag )

EmailOperator

  • 메일링 서비스가 가능한 연산자.
  • airflow.operators 안에 있다.
from airflow.operators.email_operator import EmailOperator  email_task = EmailOperator( 	task_id='email_sales_report', 	to='sales_manager@example.com', 	subject='Automated Sales Report', 	html_content='Attched is the latest sales report', 	files='latest_sales.xlsx', 	dag=example_dag )

Airflow scheduling

이제까지 airflow task 구현과 종속성 설정에 대해 배웠다.

이제 예약하고 자동 실행되는 스케쥴링에 대해 배워보자.

DAG 실행

  • 구체적인 시점에 workflow가 수행될 수 있도록 한다.
  • 수동 또는 일정 간격을 나타낸 매개 변수(schedule_interval) 값을 통해 실행할 수 있다.
  • 각 작업에 대한 상태는 세 가지를 갖는다.
    • running
    • failed
    • success
  • Airflow UI 내에서 DAG Runs 페이지에서 상태 확인이 가능하다.
  • DAG 스케쥴링
    • start_date : DAG 실행을 초기 예약할 수 있는 시간 지정
    • end_date : 실행 멈출 시간 지정
    • max_tries : DAG 실행이 실패하기 전 재시도 횟수
    • schedule_interval : 실행 빈도
      • schedule_interval은 DAG 실행 예약 빈도이다.
      • start_date와 end_date 사이에 발생한다.
      • cron 구문 또는 내장된 설정을 통해 몇 가지 방법으로 정의할 수 있다.
      • 자주 사용되는 presets
        • @hourly - 한 시간에 한 번 실행 —— 0 * * * *
        • @daily - 하루에 한 번 실행 ——— 0 0 * * *
        • @weekly - 일주일에 한 번 실행 —— 0 0 * * 0
        • @monthly - 한 달에 한 번 실행 —— 0 0 0 * 0
        • @yearly - 연에 한 번 실행 —— 0 0 0 0 0
      • 특수 presets
        • None - 수동으로 DAG를 설정함
        • @once - 한 번만 예약
    • 설정 날짜를 초과해야지만 DAG 수행이 된다.
      • start_date에 20210525로 설정되어 있는 경우, 202105026부터 적용
    # Update the scheduling arguments as defined default_args = {   'owner': 'Engineering',   'start_date': datetime(2019,11,1),   'email': ['airflowresults@datacamp.com'],   'email_on_failure': False,   'email_on_retry': False,   'retries': 3,   'retry_delay': timedelta(minutes=20) }  #매주 수요일 오후 12시 30분 dag = DAG('update_dataflows', default_args=default_args, schedule_interval='30 12 * * 3')