컴퓨터 공부/💿 Airflow

[Airflow] Task 분기처리하기 with task.branch

letzgorats 2023. 8. 29. 17:16

이번 포스팅에서는 데커레이터인 task.branch 를 이용해서 task 분기처리를 해보려고 합니다.

코드를 바로 살펴보겠습니다.

from airflow.decorators import task

@task.branch(task_id='python_branch_task')
def select_random():
    import random
    item_lst = ['A','B','C']
    selected_item = random.choice(item_lst)
    if selected_item == 'A':
        return 'task a'
    elif selected_item in ['B','C']:
        return ['task_b','task_c']
        
select_random() >> [task_a,task_b,task_c]

BranchPythonOperator 가 아닌 airflow.decorators에서 task operator를 가져왔습니다.

@task.branch를 사용해서 task_id를 지정해줍니다.

그리고, 함수 select_random()을 직접 실행시켜줌으로써 객체를 얻습니다.

 

바로 실습을 적용해보겠습니다.

dags폴더에 dags_python_with_branch_decorator.py파일을 생성해 다음과 같이 코드를 작성합니다.

from airflow import DAG
from datetime import datetime
from airflow.operators.python import PythonOperator
from airflow.decorators import task

with DAG(
    dag_id = "dags_python_with_branch_decorator",
    start_date=datetime(2023,8,1),
    schedule=None,
    catchup=False
) as dag:
    @task.branch(task_id="python_branch_task")
    def select_random():
        import random
        item_lst = ['A','B','C']
        selected_item = random.choice(item_lst)
        if selected_item == 'A':
            return 'task_a'
        elif selected_item in ['B','C']:
            return ['task_b','task_c']
        
    def common_func(**kwargs):
        print(kwargs['selected'])
    
    task_a = PythonOperator(
        task_id='task_a',
        python_callable=common_func,
        op_kwargs={'selected':'A'}
    )
    task_b = PythonOperator(
        task_id='task_b',
        python_callable=common_func,
        op_kwargs={'selected':'B'}
    )
    task_c = PythonOperator(
        task_id='task_c',
        python_callable=common_func,
        op_kwargs={'selected':'C'}
    )

    select_random() >> [task_a,task_b,task_c]

BranchPythonOperator 를 사용해서 분기처리하는 것과 거의 비슷하지만, @task.branch를 사용하는 것과 함수를 직접 호출하는 것이 다릅니다.

airflow에서 한 번 확인해보겠습니다. 실행을 시켜서 task가 random으로 선택되게 하겠습니다.

분기된 task

위 사진에서 select_random() 함수를 실행함과 동시에 분기처리된 task가 task_a가 선택됐음을 알 수 있습니다.

 

python_branch_task

return_value가 task a 가 선택된 것을 알 수 있고, 나머지는 Skip됐습니다. 이를 Xcom탭에서 확인해보면, 아래와 같습니다.

python_branch_task xcom

return value에는 task_a 가 찍혔고, skipmixin_key에는 {'followed':['task_a']}가 찍힌 것을 볼 수 있습니다.

 

이렇게, 분기처리 할 수 있는 세 가지 방법 중에 두 번째에 해당하는 @task.branch인 데커레이터를 이용하는 방법에 대해 알아봤습니다. 데커레이터를 선언하고, id값을 지정해준다음, 그 decorator가 wrapping하고 싶은 함수를 작성합니다. 또, task flow를 정의할 때는 데커레이터가 감싸고 있는 함수를 실행시켜주기만 하더라도 알아서 branch_python_operator로 만들었던 객체와 동일한 객체를 얻을 수 있습니다.

반응형