컴퓨터 공부/💿 Airflow

[Airflow] Task Group

letzgorats 2023. 8. 30. 08:29

이번 포스팅에서는 Task들을 모아서 관리할 수 있는 Task Group에 대해 알아보겠습니다. 하나의 dag에 task가 많다면, 관련있는 task끼리 그룹화하여 관리하도록 지원해주는 기능입니다.

 

1. Task Group 의 개념

 

DAGs — Airflow Documentation

 

airflow.apache.org

각 task가 그룹화된 섹션을 열어보면, inner_section이 또 있습니다. 즉, task 그룹 안에 또 다른 task 그룹도 계층적으로 담을 수 있다는 의미입니다.

  • Task Group 안에 Task Group을 중첩하여 구성 가능합니다.

※ 그렇다면, Task Group 이 편한 것은 알겠는데, 꼭 써야 할까요? → 물론, 필수는 아닙니다! 다만, 관련있는 task들끼리 UI적으로 편하게 볼 수 있고 관리하기 쉽게 도와주는 기능입니다.

 

2. Task Group 실습 ( task_group 데커레이터 이용)

 

코드를 살펴보겠습니다.

from airflow.decorators import task_group
with DAG(...
) as dag:
    @task_group(group_id='first_group')
    def group_1():
        ''' task_group 데커레이터를 이용한 첫 번째 그룹입니다. '''
        
        @task(task_id='inner_function1')
        def inner_func1(**kwargs):
            print('첫 번째 TaskGroup 내 첫 번째 task입니다.')
        
        inner_function2 = PythonOperator(
            task_id='inner_function2',
            python_callable=inner_func,
            op_kwargs={'msg':'첫 번째 TaskGroup내 두 번째 task입니다.'}
        )
        inner_func1() >> inner_function2

데커레이터를 이용하기 위해서 먼저, decorators 라이브러리를 불러오고 task_group 을 불러와줍니다.

task_id 대신 task_group을 통해 group_id를 지정해주고, wrapping할 함수를 만들어줍니다.

이 때,  데커레이터를 쓸 때 ''' ''' 부분으로 주석처리한 부분을 docstring이라고 하는데, 이는 해당 함수를 설명해주는 기법이라고 이해하시면 됩니다. 이는 나중에 airflow UI 화면에서 봤을 때, 해당 설명을 확인해 볼 수 있습니다.

 

이제, 그 내부에 task들을 지정해주면 되는데, 위 코드에서는 Python decorator를 이용한 python operator로 만든 task와 와 그냥 PythonOperator로 만든 task, 두 개가 있습니다. 마지막으로, task flow를 정해주며 마무리합니다.


2. Task Group 실습 (클래스 이용)

from airflow.utils.task_group import TaskGroup
    with TaskGroup(group_id='second_group', tooltip='두 번째 그룹입니다.') as group_2:
    
    @task(task_id='inner_function1')
    def inner_func1(**kwargs):
        print('두 번째 TaskGroup 내 첫 번째 task입니다.')
        
    inner_function2 = PythonOperator(
        task_id='inner_function2',
        python_callable=inner_func,
        op_kwargs={'msg':'두 번째 TaskGroup내 두 번째 task입니다.'}
    )
    inner_func1() >> inner_function2

아까는 decorator를 사용해서 group을 지정했다면, 이번에는 airflow.utils.task_group에서 TaskGroup을 import해와서 그룹을 만듭니다.

TaskGroup을 만들 때는, dag을 만들 때처럼 with문으로 만들면 되는데, 이번에는 'tooltip'이라는 파라미터가 아까 decorator에서의 docstring을 대체하는 인자입니다.

나머지 내용은 아까 데커레이터를 이용한 방법과 동일합니다.


바로 실습을 해보죠.

dags폴더에 dags_python_with_task_group.py 파일을 생성해 아래와 같이 작성합니다.

from airflow import DAG
import pendulum
import datetime
from airflow.operators.python import PythonOperator
from airflow.decorators import task
from airflow.decorators import task_group
from airflow.utils.task_group import TaskGroup



with DAG(
    dag_id="dags_python_with_task_group",
    start_date=pendulum.datetime(2023,8,1,tz="UTC"),
    catchup=False
) as dag:
    
    # Task Group with decorator
    def inner_func(**kwargs):
        msg = kwargs.get('msg') or ''
        print(msg)
    
    @task_group(group_id='first_group')
    def group_1():
        '''task_group 데커레이터를 이용한 첫 번째 그룹입니다.'''
        
        @task(task_id='inner_function1')
        def inner_func1(**kwargs):
            print('첫 번째 TaskGroup 내 첫 번째 task입니다.')
        
        inner_function2 = PythonOperator(
            task_id='inner_function2',
            python_callable=inner_func,
            op_kwargs={'msg':'첫 번째 TaskGroup내 두 번째 task입니다.'}
        )
        
        inner_func1() >> inner_function2
    
    # Task Group with class,TaskGroup
    with TaskGroup(group_id='second_group', tooltip='두 번째 그룹입니다.') as group_2:
        '''여기에 적은 docstring은 표시되지 않습니다.tooltip 값이 표시됩니다.'''
        
        @task(task_id='inner_function1')
        def inner_func1(**kwargs):
            print('두 번째 TaskGroup 내 첫 번째 task입니다.')
            
        inner_function2 = PythonOperator(
            task_id='inner_function2',
            python_callable=inner_func,
            op_kwargs={'msg':'두 번째 TaskGroup내 두 번째 task입니다.'}
        )
        inner_func1() >> inner_function2

    group_1() >> group_2

원래 하나의 dag에서 task_id 값이 같으면 오류가 나는데, 그룹이 다르기 때문에, task_id값이 똑같은 "inner_function1"과 "inner_function2"여도 됩니다.

진짜 문제가 없이 잘 돌아가는지 airflow에서 확인해봅시다.

 

또한, 파이썬에서는 'docstring'이라고 함수에 대한 설명을 제공하는 기법이 있는데, airflow UI 화면에서는 'tooltip'이라고 하는 이름으로 task_group에 관한 설명을 제공해주는 역할을 해주고 있습니다. 데커레이터를 쓰지 않고 클래스를 이용해 task group을 직접 만든 경우에 docstring은 나올까요 안 나올까요? 이것도 나중에 airflow에서 확인해보도록 하죠.

 

다음으로 눈 여겨 봐야 할 부분은 task flow를 정하는 부분인 group_1() >> group_2 부분입니다. group_1()은 task decorator가 감싸고 있는 함수이고, group_2는 클래스를 이용해서 만든 그룹입니다.

즉, Task group도 flow를 지정할 수 있다는 것을 의미합니다.

예상되는 task flow

아마 위 그림과 같은 flow를 형성할 것입니다.

 

이제 본격적으로, airflow에서 실행을 해봐서 모든 예상이 제대로 나오는지 살펴보겠습니다.

airflow에서의 graph

예상되로 flow가 잘 흘러가는 것을 확인할 수 있습니다. task_id도 중복이 되지만, 그룹이 다르기 때문에 에러가 나지 않고 잘 올라간 것을 볼 수 있죠.

tooltip도 확인해보면, first_group에는 docstring으로 쓴 부분이 보이는데, second_group에서는 docstring이 아닌 tooltip에 준 내용이 보입니다.

실행을 시키면 모든 task가 순서대로 잘 수행됐습니다.

task가 잘 수행됨


요약을 해보자면, 아래와 같습니다.

  • Task Group 작성 방법은 2가지가 존재합니다. → (데커레이터 & 클래스)
  • Task Group 안에 Task Group을 중첩하여 정의할 수 있습니다. (group -> inner_group 해당실습은 위에서 안해본 사항입니다.)
  • Task Group 간에도 Flow 정의가 가능합니다.
  • Group이 다르면 task_id가 같아도 무방합니다.
  • Tooltip 파라미터를 이용해 UI화면에서 Task group에 대한 설명 제공이 가능합니다. (데커레이터 활용시 docstring으로도 가능합니다.)

이상으로 Task Group에 대해 알아봤습니다!

 

반응형