ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • 3. pipeline을 만들어보자 (+execution_date)
    Learn/Airflow 2021. 11. 11. 23:26

    설치까지 했으면 이제 파이프라인을 만들어볼 차례다. 

    흘려보낼 데이터가 없어서 실습이 고민되지만 일단 한 단계씩 따라가보자. 

     

    파이프라인을 만드는 작업은 5단계로 나눠볼 수 있다. 

     

    Step 1. Importing modules

    필요한 라이브러리를 import하자. 

    from datetime import timedelta
    
    import airflow
    from airflow import DAG
    from airflow.operators.bash_operator import BashOperator

     

    Step 2. Default Arguments

    dictionary형태로 기본 arg를 작성하자. 

    여기서 주의해야할건 시간이다. airflow의 시간 개념을 알고가야한다. 

    아래에서 start_date는 실행되는 날짜가 아니다. 시간은 아래에서 따로 정리하자. 

     

    depends_on_past가 True로 설정될 경우 이전 task가 실패하면 이번엔 돌지 않는다. 

    당연히 False면 상관없이 다시 돈다. 

    default_args = {
    	'owner': 'airflow',
    	'start_date': airflow.utils.dates.days_ago(2)
    	# 'end_date': datetime(2018, 12, 30)
    	'depends_on_past': False,
    	'email': ['airflow@example.com'],
    	'email_on_failure': False,
    	'email_on_retry': False,
    	# If a task fails, retry it once after waiting
    	# at least 5 minutes
    	'retries': 1,
    	'retry_delay': timedelta(minutes=5),
    }

     

    Step 3. Instantiate a DAG

    DAG의 이름, description, interval을 설정해준다. 

    interval 작성 규칙은 crontab 쓸때와 같으니 생각안나면 구글링

    dag = DAG(
    	'tutorial',
        default_args = default_args,
        description = 'A simple tutorial DAG',
        # Continue to run DAG once per day
        schedule_interval = timedelta(days=1),
    )

     

    Step 4. Tasks

    실행될 task들을 작성해준다. 

    template를 강의에서 "진짜 템플릿"이라고 하길래 당황했는데 Jinja Template이라는 것이었다. 

    (외국인이 영어로 강의하는데 진짜 템플릿일리가..오징어게임 국뽕에 설마하는 상상을 해버렸다..)

    뭐 자세히 적을건 아니고 variable을 받을 수 있는 형태 정도로 생각하면 될 것 같다. 

    # t1, t2 and t3 are examples of tasks created by instantiating operators
    t1 = BashOperator(
    	task_id = 'print_date',
        bash_command = 'date',
        dag = dag,
    )
    
    t2 = BashOperator(
    	task_id = 'sleep',
        depends_on_past=False,
        bash_command='sleep 5',
        dag = dag,
    )
    
    templated_command = """
    {% for i in range(5) %}
    	echo "{{ ds }}"
        echo "{{ macros.ds_add(ds, 7)}}"
        echo "{{ params.my_param }}"
    {% endfor %}
    """
    
    t3 = BashOperator(
    	task_id = 'templated',
        depends_on_past = False,
        bash_command = templated_command,
        params={'my_param': 'Parameter I passed in'},
        dag = dag,
    )

     

    Step 5. Setting up Dependencies

    위에서 아래로 흐른다고 생각하면 쉽다. 

    t1의 아래로 t2를 설정한거니 t1이 잘 끝나야 t2가 실행된다. 

    마찬가지로 t3의 위로 t1을 설정한거니 t1이 잘 끝나야 t3가 돌 수 있다. 

    # This means that t2 will depend on t1
    # running successfully to run.
    t1.set_downstream(t2)
    
    # similar to above where t3 will depend on t1
    t3.set_upstream(t1)

    아래처럼 작성할 수도 있는데 처음 봤을때 python의 operator override를 생각못해서 삽질을 했었다. 

    비트연산자가 아니고 upstream/downstream을 표현하는 것이다. (airflow에서 override됨)

    방향은 직관적이다. 

    # The bit shift operator can also be
    # used to chain operations:
    t1 >> t2
    
    # And the upstream dependency with the 
    # bit shift operator:
    t2 << t1

    이렇게도 가능하다. 

    # A list of tasks can also be set as
    # dependencies. These operations
    # all have the same effect:
    t1.set_downstream([t2, t3])
    t1 >> [t2, t3]
    [t2, t3] << t1

    이렇게 쭉 합쳐서 작성하고 airflow를 실행시키면 web에서 DAG를 확인할 수 있다. 

     

     

     

    그리고 마지막으로 start_date, execution_date관련 정리하고가자. 

    start_date는 DAG의 실행시간이 아니다. 그게 맞다면 고정값일리가 없지. 

    start_date는 고정값으로 스케줄이 처음으로 시작되는 날짜를 뜻하고 interval만큼 넘어가면서 실행된다. 

    즉 start_date가 현재보다 미래면 당연히 실행이 안된다. 

     

    execution_date는 실제 실행 시간이 아니라 작업할 데이터 범위의 가장 빠른 시점이라고 보면 된다. 

    말이 좀 어려운데.. 아래 링크에 잘 나와있고 핵심 한줄은 아래와 같다. 

     

    Airflow sets execution_date based on the left bound of the schedule period it is covering, not based on when it fires (which would be the right bound of the period).

    https://stackoverflow.com/questions/39612488/airflow-trigger-dag-execution-date-is-the-next-day-why#answer-39620901

     

    airflow trigger_dag execution_date is the next day, why?

    Recently I have tested airflow so much that have one problem with execution_date when running airflow trigger_dag <my-dag>. I have learned that execution_date is not what we think at first t...

    stackoverflow.com

     

    1시간마다 돌기로 설정했는데 지금 마침 딱 10시가 되었다고 생각해보자. 

    그럼 지금 시점에 쌓인 데이터들을 처리하게 될텐데 이때 가장 빠른 데이터는 9시일 것이고 이게 execution_date이다. 

     

    자세한건..쓰다보면 익숙해지겠지?

    'Learn > Airflow' 카테고리의 다른 글

    [Airflow] sub dag  (0) 2022.04.07
    5. Airflow variables  (0) 2021.11.19
    4. Airflow concept  (0) 2021.11.14
    2. 설치 및 셋업 (WSL2 / Docker / Airflow)  (0) 2021.11.09
    1. Airflow 소개  (0) 2021.11.08

    댓글

Designed by Tistory.