-
3. pipeline을 만들어보자 (+execution_date)Learn/Airflow 2021. 11. 11. 23:26
설치까지 했으면 이제 파이프라인을 만들어볼 차례다.
흘려보낼 데이터가 없어서 실습이 고민되지만 일단 한 단계씩 따라가보자.
파이프라인을 만드는 작업은 5단계로 나눠볼 수 있다.
Step 1. Importing modules
필요한 라이브러리를 import하자.
from datetime import timedelta import airflow from airflow import DAG from airflow.operators.bash_operator import BashOperator
Step 2. Default Arguments
dictionary형태로 기본 arg를 작성하자.
여기서 주의해야할건 시간이다. airflow의 시간 개념을 알고가야한다.
아래에서 start_date는 실행되는 날짜가 아니다. 시간은 아래에서 따로 정리하자.
depends_on_past가 True로 설정될 경우 이전 task가 실패하면 이번엔 돌지 않는다.
당연히 False면 상관없이 다시 돈다.
default_args = { 'owner': 'airflow', 'start_date': airflow.utils.dates.days_ago(2) # 'end_date': datetime(2018, 12, 30) 'depends_on_past': False, 'email': ['airflow@example.com'], 'email_on_failure': False, 'email_on_retry': False, # If a task fails, retry it once after waiting # at least 5 minutes 'retries': 1, 'retry_delay': timedelta(minutes=5), }
Step 3. Instantiate a DAG
DAG의 이름, description, interval을 설정해준다.
interval 작성 규칙은 crontab 쓸때와 같으니 생각안나면 구글링
dag = DAG( 'tutorial', default_args = default_args, description = 'A simple tutorial DAG', # Continue to run DAG once per day schedule_interval = timedelta(days=1), )
Step 4. Tasks
실행될 task들을 작성해준다.
template를 강의에서 "진짜 템플릿"이라고 하길래 당황했는데 Jinja Template이라는 것이었다.
(외국인이 영어로 강의하는데 진짜 템플릿일리가..오징어게임 국뽕에 설마하는 상상을 해버렸다..)
뭐 자세히 적을건 아니고 variable을 받을 수 있는 형태 정도로 생각하면 될 것 같다.
# t1, t2 and t3 are examples of tasks created by instantiating operators t1 = BashOperator( task_id = 'print_date', bash_command = 'date', dag = dag, ) t2 = BashOperator( task_id = 'sleep', depends_on_past=False, bash_command='sleep 5', dag = dag, ) templated_command = """ {% for i in range(5) %} echo "{{ ds }}" echo "{{ macros.ds_add(ds, 7)}}" echo "{{ params.my_param }}" {% endfor %} """ t3 = BashOperator( task_id = 'templated', depends_on_past = False, bash_command = templated_command, params={'my_param': 'Parameter I passed in'}, dag = dag, )
Step 5. Setting up Dependencies
위에서 아래로 흐른다고 생각하면 쉽다.
t1의 아래로 t2를 설정한거니 t1이 잘 끝나야 t2가 실행된다.
마찬가지로 t3의 위로 t1을 설정한거니 t1이 잘 끝나야 t3가 돌 수 있다.
# This means that t2 will depend on t1 # running successfully to run. t1.set_downstream(t2) # similar to above where t3 will depend on t1 t3.set_upstream(t1)
아래처럼 작성할 수도 있는데 처음 봤을때 python의 operator override를 생각못해서 삽질을 했었다.
비트연산자가 아니고 upstream/downstream을 표현하는 것이다. (airflow에서 override됨)
방향은 직관적이다.
# The bit shift operator can also be # used to chain operations: t1 >> t2 # And the upstream dependency with the # bit shift operator: t2 << t1
이렇게도 가능하다.
# A list of tasks can also be set as # dependencies. These operations # all have the same effect: t1.set_downstream([t2, t3]) t1 >> [t2, t3] [t2, t3] << t1
이렇게 쭉 합쳐서 작성하고 airflow를 실행시키면 web에서 DAG를 확인할 수 있다.
그리고 마지막으로 start_date, execution_date관련 정리하고가자.
start_date는 DAG의 실행시간이 아니다. 그게 맞다면 고정값일리가 없지.
start_date는 고정값으로 스케줄이 처음으로 시작되는 날짜를 뜻하고 interval만큼 넘어가면서 실행된다.
즉 start_date가 현재보다 미래면 당연히 실행이 안된다.
execution_date는 실제 실행 시간이 아니라 작업할 데이터 범위의 가장 빠른 시점이라고 보면 된다.
말이 좀 어려운데.. 아래 링크에 잘 나와있고 핵심 한줄은 아래와 같다.
Airflow sets execution_date based on the left bound of the schedule period it is covering, not based on when it fires (which would be the right bound of the period).
airflow trigger_dag execution_date is the next day, why?
Recently I have tested airflow so much that have one problem with execution_date when running airflow trigger_dag <my-dag>. I have learned that execution_date is not what we think at first t...
stackoverflow.com
1시간마다 돌기로 설정했는데 지금 마침 딱 10시가 되었다고 생각해보자.
그럼 지금 시점에 쌓인 데이터들을 처리하게 될텐데 이때 가장 빠른 데이터는 9시일 것이고 이게 execution_date이다.
자세한건..쓰다보면 익숙해지겠지?
'Learn > Airflow' 카테고리의 다른 글
[Airflow] sub dag (0) 2022.04.07 5. Airflow variables (0) 2021.11.19 4. Airflow concept (0) 2021.11.14 2. 설치 및 셋업 (WSL2 / Docker / Airflow) (0) 2021.11.09 1. Airflow 소개 (0) 2021.11.08