Airflow를 처음 알게 된 후, 이것 저것 찾아보는 중, 아래와 이미지를 접한 기억이 납니다.

Airflow: a workflow management platform Source Airflow: a workflow management platform

수많은 태스크들이 모여 복잡한 DAG를 이루는 파이프라인이 구축되어 있는데요, 처음에 언뜻 봤을 때는, 정말 그럴 듯하고 멋있어 간지나 보였습니다. 하지만, Airflow를 적극 도입하려고 하는 현재 시점에서 다시 생각해보니, 막상 저런 파이프라인을 전부 다 파악하고 관리할 상상을 하니까 결코 쉽지 않을 것 같다는 인상도 피할 수 없었습니다. 하지만 이런 고민은 Airflow2에서부터 도입된 Task Group의 도움을 받아 개선할 수 있을 것으로 보입니다.

이번 글에서는 Task Group의 개념과 사용 예제를 정리합니다.

Task Group 개념

특히 사용자는 Task Group을 통해 복합하게 연결된 태스크를 복수 개의 그룹으로 묶어 Webserver의 Graph View에서 깔끔하게 시각화 할 수 있으며, 특정 프로세스의 그룹은 재사용할 수도 있습니다.

Subdag와 Task Group의 비교

Airflow 1.x 에서는 유저가 SubDag를 따로 정의하여 이러한 역할을 수행할 수 있게 했습니다. 하지만 Subdag는 하나의 DAG 내부에 또 다른 DAG를 선언하는 형식으로 구성되기 때문에 파라미터와 스케줄이 추가적으로 정의되어야 하고, 만에 하나 엉킨다면 상위 DAG에서 문제를 일으킬 소지가 있는 만큼, 안정적인 방법은 아니었습니다.

반면, Task Group의 경우, 개별 파라미터와 스케줄 등을 정의할 필요 없이 단순히 태스크를 묶어주는 역할만 하기 때문에, 많은 자원을 차지하지 않습니다.

Task Group 정의 방법

Task Group 객체는 다음과 같이 사용할 수 있습니다.

  • Task Group 정의 방법
# Task Group 객체 불러오기
from airflow.utils.task_group import TaskGroup
# 시작 task
task_0 = DummyOperator(task_id='init')

# Task Group 1
with TaskGroup(group_id='group_1') as tg_1:
      task_1 = DummyOperator(task_id='task_1')
      task_2 = DummyOperator(task_id='task_2')
      
      # Task Group 1 내부에서의 의존성
      task_1 >> task_2

# Task Group 2
with TaskGroup(group_id='group_2') as tg_2:
      task_3= DummyOperator(task_id='task_3')
      task_4 = DummyOperator(task_id='task_4')
      
      [task_3, task_4] >> task_5
      
# 종료 task
task6 = DummyOperator(task_id='end')
  • DAG 내부에서 의존성을 정의하는 방식은 기존 task 간의 의존성 정의 방식과 마찬가지로 >> 연산자를 사용하면 됩니다.
# DAG 내 task 의존성 정의
task_0 >> tg_1 >> tg_2 >> task6

Task 테스트 시 주의 사항

보통 task를 테스트하는 명령어는 다음과 같습니다.

airflow tasks test [dag_id] [task_id] [date]

하지만 Task Group에 속하는 task의 경우에는 다음과 같이 group_id를 앞에 붙여줘야 합니다.

airflow tasks test [dag_id] [group_id.task_id] [date]

Task Group을 활용한 예제

Task Group을 활용하여 어떠한 가상의 프로세스 파이프라인에서 특성이나 단계에 따라 task들을 묶어 DAG를 형성해보았습니다. Webserver 상의 Graph View도 그룹에 따라 구분되었으며, 그룹을 펼쳐 볼 수도 있게 되었습니다. 자세한 내용은 아래 덧붙인 예제 코드와 Graph View 상의 DAG 시각화를 비교해보시길 바랍니다.


import airflow
from airflow import DAG
from airflow.utils.task_group import TaskGroup
from airflow.operators.dummy import DummyOperator

default_args = {
   "owner": "temp",
   "depends_on_past" : False,
   "start_date" : None
}

with DAG(
   dag_id = 'dummy_dag',
   default_args = default_args,
   description = "Task Group demo",
   schedule_interval = None,
   start_date = airflow.utils.dates.days_ago(10),
   ) as dag:

   task_0 = DummyOperator(task_id="task_0")

   with TaskGroup(group_id='group_1') as tg_1:
      task_1_1 = DummyOperator(task_id="task_1_1")
      task_1_2 = DummyOperator(task_id="task_1_2")

      with TaskGroup(group_id='group_1_3') as tg_1_3:
         task_1_3_1 = DummyOperator(task_id="task_1_3_1")
         task_1_3_2 = DummyOperator(task_id="task_1_3_2")
         task_1_3_3 = DummyOperator(task_id="task_1_3_3")
         task_1_3_4 = DummyOperator(task_id="task_1_3_4")

         [task_1_3_1, task_1_3_2, task_1_3_3] >> task_1_3_4

      task_1_1 >> task_1_2 >> tg_1_3

   with TaskGroup(group_id='group_2') as tg_2:

      task_2_1 = DummyOperator(task_id="task_2_1")

      with TaskGroup(group_id='group_2_2') as group_2_2:
         task_2_2_1 = DummyOperator(task_id="task_2_2_1")
         task_2_2_2 = DummyOperator(task_id="task_2_2_2")
         
         task_2_2_1 >> task_2_2_2

      with TaskGroup(group_id='group_2_3') as group_2_3:
         task_2_3_1 = DummyOperator(task_id="task_2_3_1")
         task_2_3_2 = DummyOperator(task_id="task_2_3_2")
         task_2_3_3 = DummyOperator(task_id="task_2_3_3")

         [task_2_3_1, task_2_3_2, task_2_3_3]

      with TaskGroup(group_id='group_2_4') as group_2_4:
         task_2_4_1 = DummyOperator(task_id="task_2_4_1")
         task_2_4_2 = DummyOperator(task_id="task_2_4_2")
         task_2_4_3 = DummyOperator(task_id="task_2_4_3")

         [task_2_4_1, task_2_4_2] >> task_2_4_3

      with TaskGroup(group_id='group_2_5') as group_2_5:
         task_2_5_1 = DummyOperator(task_id="task_2_5_1")
         task_2_5_2 = DummyOperator(task_id="task_2_5_2")
         
         [task_2_5_1, task_2_5_2]

      task_2_1 >> group_2_2 >> group_2_3 >> group_2_4 >> group_2_5

   with TaskGroup(group_id='group_3') as tg_3:
      task_3_1 = DummyOperator(task_id='task_3_1')
      task_3_2 = DummyOperator(task_id='task_3_2')
      task_3_3 = DummyOperator(task_id='task_3_3')
      task_3_4 = DummyOperator(task_id='task_3_4')

      [task_3_1, task_3_2, task_3_3, task_3_4]

   with TaskGroup(group_id='group_4') as tg_4:
      task_4_1 = DummyOperator(task_id='task_4_1')
      task_4_2 = DummyOperator(task_id='task_4_2')

      task_4_1 >> task_4_2

task_0 >> [tg_1, tg_2] >> tg_3 >> tg_4

Reference