이번 포스팅에서는 데커레이터인 task.branch 를 이용해서 task 분기처리를 해보려고 합니다.
코드를 바로 살펴보겠습니다.
from airflow.decorators import task
@task.branch(task_id='python_branch_task')
def select_random():
import random
item_lst = ['A','B','C']
selected_item = random.choice(item_lst)
if selected_item == 'A':
return 'task a'
elif selected_item in ['B','C']:
return ['task_b','task_c']
select_random() >> [task_a,task_b,task_c]
BranchPythonOperator 가 아닌 airflow.decorators에서 task operator를 가져왔습니다.
@task.branch를 사용해서 task_id를 지정해줍니다.
그리고, 함수 select_random()을 직접 실행시켜줌으로써 객체를 얻습니다.
바로 실습을 적용해보겠습니다.
dags폴더에 dags_python_with_branch_decorator.py파일을 생성해 다음과 같이 코드를 작성합니다.
from airflow import DAG
from datetime import datetime
from airflow.operators.python import PythonOperator
from airflow.decorators import task
with DAG(
dag_id = "dags_python_with_branch_decorator",
start_date=datetime(2023,8,1),
schedule=None,
catchup=False
) as dag:
@task.branch(task_id="python_branch_task")
def select_random():
import random
item_lst = ['A','B','C']
selected_item = random.choice(item_lst)
if selected_item == 'A':
return 'task_a'
elif selected_item in ['B','C']:
return ['task_b','task_c']
def common_func(**kwargs):
print(kwargs['selected'])
task_a = PythonOperator(
task_id='task_a',
python_callable=common_func,
op_kwargs={'selected':'A'}
)
task_b = PythonOperator(
task_id='task_b',
python_callable=common_func,
op_kwargs={'selected':'B'}
)
task_c = PythonOperator(
task_id='task_c',
python_callable=common_func,
op_kwargs={'selected':'C'}
)
select_random() >> [task_a,task_b,task_c]
BranchPythonOperator 를 사용해서 분기처리하는 것과 거의 비슷하지만, @task.branch를 사용하는 것과 함수를 직접 호출하는 것이 다릅니다.
airflow에서 한 번 확인해보겠습니다. 실행을 시켜서 task가 random으로 선택되게 하겠습니다.
위 사진에서 select_random() 함수를 실행함과 동시에 분기처리된 task가 task_a가 선택됐음을 알 수 있습니다.
return_value가 task a 가 선택된 것을 알 수 있고, 나머지는 Skip됐습니다. 이를 Xcom탭에서 확인해보면, 아래와 같습니다.
return value에는 task_a 가 찍혔고, skipmixin_key에는 {'followed':['task_a']}가 찍힌 것을 볼 수 있습니다.
이렇게, 분기처리 할 수 있는 세 가지 방법 중에 두 번째에 해당하는 @task.branch인 데커레이터를 이용하는 방법에 대해 알아봤습니다. 데커레이터를 선언하고, id값을 지정해준다음, 그 decorator가 wrapping하고 싶은 함수를 작성합니다. 또, task flow를 정의할 때는 데커레이터가 감싸고 있는 함수를 실행시켜주기만 하더라도 알아서 branch_python_operator로 만들었던 객체와 동일한 객체를 얻을 수 있습니다.
'컴퓨터 공부 > 💿 Airflow' 카테고리의 다른 글
[Airflow] Trigger Rule (0) | 2023.08.30 |
---|---|
[Airflow] BaseBranchOperator로 분기처리하기 (0) | 2023.08.30 |
[Airflow] BranchPython 오퍼레이터로 분기처리하기 (0) | 2023.08.29 |
[Airflow] 전역 공유변수 Variable (0) | 2023.08.29 |
[Airflow] Python & email 오퍼레이터간 Xcom 사용 (0) | 2023.08.29 |