주제
배운 내용을 전체적으로 합쳐봤다.
ETL(추출,변환,로드), 스케쥴링 등록까지 해봤다.
앞서 한 내용들
- 강의 데이터와 평가 데이터 추출
- 프로그래밍언어가 저장되어 있는
programming_language
컬럼의 결측값 처리하기
- 강의 당 평균 평점 집계 함수 만들기
user_id
와course_id
결합하여 평가하지 않은 강의와 쌍을 만든다.
- 강의 추천을 위한 계산하기
Postgres에 로드하기
- 추천 작업을 했던 제품에서 테이블을 사용한다.
- 매일 업데이트한다.
로딩 단계
#pandas dataframe에서 sql 테이블에 작성한다.
#테이블 이름, 데이터베이스 엔진, 이미 테이블이 존재하는 경우 처리방법
recommendations.to_sql(
"recommendations",
db_engine,
if_exists="append"
)
def etl(db_engines):
#추출
courses = extract_course_data(db_engines)
rating = extract_course_data(db_engines)
#전처리
courses = transform_fill_programming_language(courses)
#강의 당 평균 점수 구하기
avg_course_rating = transform_avg_rating(rating)
#강의 테이블과 평가 테이블의 id 쌍을 계산한다.
courses_to_recommend = transform_courses_to_recommend(rating, courses)
#추천 결과 얻기
recommendations = transform_recommendations(
avg_course_rating,
courses_to_recommend
)
#Postgres DB에 로드하기
load_to_dwh(recommendations, db_engine)
# DAG만들기
from airflow.models import DAG
from airflow.operators.python_operator import PythonOperator
dag = DAG(dag_id="recommendations",
scheduled_interval="0 0 * * *")
task_recommendations = PythonOperator(
task_id='recommendations_task',
python_callable=etl
)
Uploaded by Notion2Tistory v1.1.0