ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • [Airflow] sub dag
    Learn/Airflow 2022. 4. 7. 23:09

    sub dag

    dag를 만들어 쓰다보니 잘쓰고 싶은 욕심이 들게된다. 

    그러다보니 점점 task를 잘게 쪼개고 병렬화도 많이 하게 된다. 

     

    나는 Graph형태로 자주 보는편인데 문제는 너무 복잡해진다. 

     

    이 때 sub dag를 쓰면 좋다. 

    메인 dag에서는 sub dag로 묶어서 보이니 간결해진다. 

     

    써보자

    Sub Dag를 만드는건 기존의 Dag를 만드는 것과 다르지 않다. 

    Dag를 만들고 메인 Dag에서 SubDagOperator를 쓰면 Sub Dag가 된다. 

     

    추후에 빨리 참고하기위해 코드를 남긴다만, 사용법은 너무도 다양하다. 

     

    아래처럼 작성하면 prepare_train이 끝나면 5개의 train이 병렬로 실행된다. 

    (지금보면 별것 아니지만 Airflow 사용이 미숙해서 병렬로 실행하는 것도 한참 삽질했다...)

     

    보면 알겠지만 이름만 sub_dag라는걸 사용했지 생긴건 평범한 dag작성과 다르지 않다. 

    주의할 점은 sub dag의 dag_id는 반드시 {parent_dag_id}.{child_dag_id} 로 생성해야한다. 

    그렇지 않으면 airflow에서 에러를 발생시킨다. 

    그냥 인자로 넘겨주어 생성하면 이런 일이 안생긴다. 

    # train.py
    from airflow.models import DAG
    from airflow.operators.python import PythonOperator
    
    
    class Train:
        def __init__(self, dag: DAG):
            self.dag_id = dag.dag_id
            self.bagging_size = 5
            
        def train_sub_dag(self, sub_dag_id):
            sub_dag = DAG(
                dag_id=f'{self.dag_id}.{task_id}',
                default_args=self.dag.default_args,
                schedule_interval=sefl.dag.schedule_interval,
            )
            
            prepare_train = PythonOperator(
                task_id='prepare_train',
                python_callable=self.prepare_train_task,
                op_kwargs={'val1': val1},
                dag=sub_dag,
            )
            
            for i in range(self.bagging_size):
                prepare_train >> self.make_train_task(i, sub_dag)
            
            return sub_dag
            
        def make_train_task(self, sub_dag: DAG, index: int, sub_dag: DAG):
            return PythonOperator(
                task_id=f'train_model_1',
                python_callable=self.train,
                op_kwargs={'index': index},
                dag=sub_dag,
            )
            
        ....

     

    앞에서 언급했듯 메인 dag에서 위의 dag를 sub dag라고 해줘야한다. 

    아래와 같이 SubDagOperator에 위에서 작성한 dag를 넘겨주면 subdag로 인식된다. 

    # dag.py
    
    import airflow import DAG
    from airflow.operators.subdag import SubDagOperator
    from my_project.train import Train
    
    
    ...
    
    dag = DAG(
        DAG_NAME,
        default_args=default_args,
        schedule_interval=interval
    )
    
    train_instance = Train(dag)
    train_task_id = 'train'
    train = SubDagOperator(
        task_id=train_task_id,
        subdag=train_instance.train_sub_dag(train_task_id),
        dag=dag,
    )
    
    ...
    
    task1 >> task2 >> train >> task3

     

    집에 Airflow 환경이 없어서 스크린샷을 못찍는다만

    airflow web에서 sub dag는 마치 task같이 보이고 배경색만 다르다. 

    누르면 zoom sub dag를 해서 sub dag의 구성을 볼 수 있다. 

     

    궁금한 점

    아직은 많이 안써본지라 몇 가지 궁금한 점이 있다. 

    추후에 알아내면 업데이트. 

     

    1. 메인 dag에서 sub dag의 task들까지 한번에 볼 수 있는게 있는 방법이 있을까? 

    하나 하나 눌러서 zoom을 눌러야되는데 창이 많아져서 불편하고

    안그래도 airflow web이 느린지라 답답하다

     

    2. sub dag의 schedule_interval은 어떻게 쓰는게 best일까?

    sub dag도 dag다보니 schedule_interval이 있다. 

    dag안에서 다른 task와 의존성이 있는데 무슨 의미가 있을까?

     

    일단 메인 dag의 일정을 넘겨주어 그대로 썼다만 좋은 사용법 이있는지 궁금증이 든다. 

    'Learn > Airflow' 카테고리의 다른 글

    5. Airflow variables  (0) 2021.11.19
    4. Airflow concept  (0) 2021.11.14
    3. pipeline을 만들어보자 (+execution_date)  (0) 2021.11.11
    2. 설치 및 셋업 (WSL2 / Docker / Airflow)  (0) 2021.11.09
    1. Airflow 소개  (0) 2021.11.08

    댓글

Designed by Tistory.