이제부터는 Task를 다룰 수 있는 고급기능에 대해 알아볼건데요, 그 중에서 먼저 Task를 분기처리할 수 있는 방법들에 대해살펴보도록 하겠습니다.
Task 분기 처리하는 방법에는 크게 3가지가 있는데, 이번 포스팅에서는 BranchPython Operator에 대해 배워보겠습니다.
1. Task 분기 처리 유형
- 먼저, Task 분기처리는 왜 필요할까요?
예를 들어, 상위 task 1개에 하위 task 3개가 있다고 가정해봅시다. 기본적으로, 위 사진처럼 task관계가 있다면, task1이 끝나고, task2-1, task2-2, task2-3 가 모두 다 같이 수행됩니다.
그런데, 가끔은 task1의 수행결과에 따라서 task2-x 중 하나만 수행하도록 구성하고 싶은 경우가 있을 것입니다. 예를 들어, Task1의 결과로 "Good", "Bad", "Pending" 이라는 결과 3개 중 하나가 나오고 그에 따라 task2-1 ~ task2-3 중 하나가 실행되도록 해야 할 경우가 있을겁니다.
이처럼, 선택적으로 하위 task를 선별해서 실행시키고 싶을 때, task 분기처리가 필요한 것입니다.
Task 분기처리 방법은 크게 3가지로 나뉘어지는데, 한 번 살펴보시죠.
- Task 분기처리 방법
- 1) BranchPythonOperator
- : 우리가 일반적으로 오퍼레이터를 이용해서 task를 만들듯이 그냥 작성하면 됩니다.
- 2) task.branch 데커레이터 이용
- : PythonOperator를 이용해서 파이썬 함수를 수행한 것을 task라는 데커레이터를 이용해서 더 쉽게 쓸 수 있던 것처럼 비슷하게 task.branch 라고 하는 데커레이터도 제공해주고 있어서 이를 이용하면 됩니다.
- 3) BaseBranchOperator 상속하여 직접 개발
- : BaseBranchOperator는 직접 가져다가 쓸 수 있는 것이 아니고, 상속해서 우리만의 클래스를 하나 만들고 분기할 수 있는 로직을 직접 개발해서 사용할 수 있습니다.
- 1) BranchPythonOperator
이 중에서 BranchPythonOperator를 실습해보도록 하겠습니다.
2. BranchPython Operator
다음과 같은 함수를 작성해봤습니다.
def select_random():
import random
item_lst = ['A','B','C']
selected_item = random.choice(item_lst)
if selected_item == 'A':
return 'task_a'
elif selected_item in ['B','C']:
return ['task_b','task_c']
python_branch_task = BranchPythonOperator(
task_id = 'python_branch_task',
python_callable=select_random
)
python_branch_task >> [task_a,task_b,task_c]
python_branch_task 부분을 보면, PythonOperator로 task를 만들었던 것과 거의 비슷합니다.
여기서, task가 실행하는 함수인 select_random 의 리턴값을 잘 작성해줘야 합니다.
해당 리턴값은 python_branch_task의 후행으로 오는 task의 id값을 리턴값으로 줘야합니다.
만약, 어떤 로직처리에 따라서, 후행 task들 중에서 하나만 실행시켜야 한다면, 리턴값을 후행으로 실행할 task의 id값을 "스트링"형태로 줘야 한다. 만약, 후행 task로 복수의 task를 실행시켜야 하면, 리턴값으로 "[리스트]" 형태로 후행 task의 id값을 담아야 합니다.
위 코드에서 task flow를 봤을 때, 그려질 수 있는 그림은 아래와 같습니다.
그렇다면, 이제 dag을 작성해봅시다.
dags 폴더에 'dags_branch_python_operator.py'파일을 생성해 아래와 같이 코드를 작성합니다.
from airflow import DAG
import pendulum
import datetime
from airflow.operators.python import PythonOperator
from airflow.operators.python import BranchPythonOperator
with DAG(
dag_id = "dags_branch_python_operator",
start_date = datetime(2023,8,1,tz="UTC"),
schedule=None,
catchup=False
) as dag:
def select_random():
import random
item_lst = ['A','B','C']
selected_item = random.choice(item_lst)
if selected_item == 'A':
return 'task_a'
elif selected_item in ['B','C']:
return ['task_b','task_c']
python_branch_task = BranchPythonOperator(
task_id = 'python_branch_task',
python_callable=select_random
)
def common_func(**kwargs):
print(kwargs['selected'])
task_a = PythonOperator(
task_id='task_a',
python_callable=common_func,
op_kwargs={'selected':'A'}
)
task_b = PythonOperator(
task_id='task_b',
python_callable=common_func,
op_kwargs={'selected':'B'}
)
task_c = PythonOperator(
task_id='task_c',
python_callable=common_func,
op_kwargs={'selected':'C'}
)
python_branch_task >> [task_a,task_b,task_c]
airflow에서 실행시키고 확인해보면, 다음과 같은 그래프가 그려졌습니다.
분기처리가 완료가 됐고, task_b와 task_c 가 선택됐고, task_a는 skip 된 것을 알 수 있습니다.
airflow에도 잘 출력된 것을 볼 수 있습니다.
정리하자면, 선행 task와 거기에 물려있는 하위 task들이 여러개 있을 때, 어떤 하위 task를 선택할지 골라주기 위해서 분기 처리하는 로직이 있습니다. airflow에서는 분기처리하는 로직을 크게 3가지 기법으로 나눌 수 있는데, 그 중에서 이번 포스팅에서는 BranchPythonOperator를 이용하는 방법을 배웠습니다.
'컴퓨터 공부 > 💿 Airflow' 카테고리의 다른 글
[Airflow] BaseBranchOperator로 분기처리하기 (0) | 2023.08.30 |
---|---|
[Airflow] Task 분기처리하기 with task.branch (0) | 2023.08.29 |
[Airflow] 전역 공유변수 Variable (0) | 2023.08.29 |
[Airflow] Python & email 오퍼레이터간 Xcom 사용 (0) | 2023.08.29 |
[Airflow] Python & Bash 오퍼레이터 with Xcom (0) | 2023.08.29 |