앎을 경계하기

[가짜연구소3기] Data Engineer

[가짜연구소 3기] 데이터 엔지니어링 - 24 Scheduling daily jobs

양갱맨 2021. 8. 25. 18:30

주제

배운 내용을 전체적으로 합쳐봤다.

ETL(추출,변환,로드), 스케쥴링 등록까지 해봤다.


앞서 한 내용들

  • 강의 데이터와 평가 데이터 추출
  • 프로그래밍언어가 저장되어 있는 programming_language 컬럼의 결측값 처리하기
  • 강의 당 평균 평점 집계 함수 만들기
  • user_idcourse_id 결합하여 평가하지 않은 강의와 쌍을 만든다.
  • 강의 추천을 위한 계산하기

Postgres에 로드하기

  • 추천 작업을 했던 제품에서 테이블을 사용한다.
  • 매일 업데이트한다.

로딩 단계

#pandas dataframe에서 sql 테이블에 작성한다.
#테이블 이름, 데이터베이스 엔진, 이미 테이블이 존재하는 경우 처리방법
recommendations.to_sql(
	"recommendations",
	db_engine,
	if_exists="append"
)
def etl(db_engines):
	#추출
	courses = extract_course_data(db_engines)
	rating = extract_course_data(db_engines)

	#전처리
	courses = transform_fill_programming_language(courses)
	#강의 당 평균 점수 구하기
	avg_course_rating = transform_avg_rating(rating)
	#강의 테이블과 평가 테이블의 id 쌍을 계산한다.
	courses_to_recommend = transform_courses_to_recommend(rating, courses)

	#추천 결과 얻기
	recommendations = transform_recommendations(
												avg_course_rating,
												courses_to_recommend
										)

	#Postgres DB에 로드하기
	load_to_dwh(recommendations, db_engine)

# DAG만들기
from airflow.models import DAG
from airflow.operators.python_operator import PythonOperator

dag = DAG(dag_id="recommendations",
					scheduled_interval="0 0 * * *")
task_recommendations = PythonOperator(
		task_id='recommendations_task',
		python_callable=etl
)