컴퓨터 공부/💿 Airflow

[Airflow] Trigger Rule

letzgorats 2023. 8. 30. 07:22

이제까지는 상위 task가 하나 있을 때, 상위 task가 하위 task로 분기하는 경우 분기하는 조건에 대해서 알아봤습니다.

이번에는 상위 task 여러개가 하나의 하위 task로 연결이 되는 구조에서 하위 task의 실행조건을 설정하는 방법(Trigger Rule)에 대해 알아보겠습니다.  

Trigger Rule

 

1. Trigger Rule 종류

 

아래와 같은 task들이 있다고 해봅시다.

기본적으로는 task1, task2, task3이 모두 성공적으로 잘 끝나야 task4가 도는 구조입니다.

하지만, 이 rule을 바꾸고 싶을 경우가 있을 것입니다. 예를 들어, task1, task2, task3 중에서 어느 하나라도 정상적으로 끝나면, task4가 수행되도록 하는 rule을 만들 수도 있고, 성공여부에 상관없이 task1, task2, task3이 돌기만 하면, task4를 수행시키도록 하는 rule을 만들 수도 있습니다.

 

그럴 때는, 이 task4에다가 trigger rule을 설정해주면 됩니다.

trigger rule에는 어떤 것이 있을까요?

all_success(기본값) 상위 Task가 모두 성공하면 실행
all_failed 상위 Task가 모두 실패하면 실행
all_done 상위 Task가 모두 수행되면 실행(실패해도 수행된 것에 포함)
all_skipped 상위 Task가 모두 Skipped 상태면 실행
one_failed 상위 Task 중 하나 이상 실패하면 실행(모든 상위 Task 완료를 기다리지 않음)
one_success 상위 Task 중 하나 이상 성공하면 실행(모든 상위 Task 완료를 기다리지 않음)
one_done 상위 Task 중 하나 이상 성공 또는 실패 하면 실행
none_failed 상위 Task 중 실패가 없는 경우 실행(성공 또는 Skipped 상태)
none_failed_min_one_success 상위 Task 중 실패가 없고 성공한 Task가 적어도 1개 이상이면 실행
none_skipped Skip된 상위 Task가 없으면 실행(상위 Task가 성공, 실패여도 무방)
always 언제나 실행

 

2. Trigger Rule 실습

2-1) Trigger Rule 중에 all_done에 대해서 실습해보겠습니다.

코드는 아래와 같습니다.

bash_upstream_1 = BashOperator(
    task_id='bash_upstream_1',
    bash_command='echo upstream1'
)

@task(task_id='python_upstream_1')
def python_upstream_1():
    raise AirflowException('downstream_1 Exception!')
    
@task(task_id='python_upstream_2')
def python_upstream_2():
    print('정상 처리')

@task(task_id='python_downstream_1',trigger_rule='all_done')
def python_downstream_1():
    print('정상 처리')

[bash_upstream_1, python_upstream_1(), python_upstream_2()] >> python_downstream_1()

위 코드에서 task는 총 4개가 정의되어 있습니다.

upstream이라고 하는 task가 상위 task이고, downstream이라고 하는 task가 하위 task라고 이해하면 되겠습니다.

 

여기서, 하위 task인 'python_downstream_1'에서 trigger_rule을 'all_done'이라고 설정했습니다.

task decorator를 썼기 때문에, task_id를 줄 때, trigger_rule도 주면 됩니다.

(만약, PythonOperator를 썼다면, 내부에 trigger_rule을 설정하면 되고  BashOperator라면, BashOperator의 괄호를 닫고, 그 이후에 trigger_rule='~' 을 설정해주면 됩니다.)

 

여기서 task_id가 'python_upstream_1'인 상위 task에 Exception을 띄워줬는데, 이는 airflowException을 만나면 실패처리가 되는 로직입니다. 즉, 이 task가 실패가 난다면, 후행 task인 'python_downstream_1'가 돌지 안 돌지 확인해보기 위한 코드라고 할 수 있겠죠.

 

dags폴더에 dags_python_with_trigger_rule_eg1.py 라는 파일을 만들고, 아래와 같은 코드를 작성해봅시다.

from airflow import DAG
from airflow.decorators import task
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
from airflow.exceptions import AirflowException
import pendulum

with DAG(
    dag_id='dags_python_with_trigger_rule_eg1',
    start_date=pendulum.datetime(2023,8,1,tz="UTC"),
    schedule=None,
    catchup=False
) as dag:
    
    bash_upstream_1 = BashOperator(
    task_id='bash_upstream_1',
    bash_command='echo upstream1'
    )

    @task(task_id='python_upstream_1')
    def python_upstream_1():
        raise AirflowException('downstream_1 Exception!')
        
    @task(task_id='python_upstream_2')
    def python_upstream_2():
        print('정상 처리')

    @task(task_id='python_downstream_1',trigger_rule='all_done')
    def python_downstream_1():
        print('정상 처리')

    [bash_upstream_1, python_upstream_1(), python_upstream_2()] >> python_downstream_1()

airflow에서 실행시켜보면 예상한대로 나오는지 확인해봅시다.

task 실행 관계

예상한대로, 'python_upstream_1'이 고의적으로 실패가 되었는데도 불구하고, 'python_downstream_1'이 잘 실행이 된 것을 볼 수가 있습니다.

두 번째 task가 실패했어도 all_done이기 때문에 후행 task가 잘 동작한 것이죠.


2-2) Trigger Rule 중에 none_skipped 에 대해서 실습해보겠습니다.

코드는 아래와 같습니다.

@task.branch(task_id='branching')
def random_branch():
    import random
    item_lst = ['A','B','C']
    selected_item = random.choice(item_lst)
    if selected_item == 'A':
        return 'task_a'
    elif selected_item == 'B':
        return 'task_b'
    elif selected_item == 'C':
        return 'task_c'

task_a = BashOperator(
    task_id = 'task_a',
    bash_command='echo upstream1'
)

@task(task_id='task_b')
def task_b():
    print('정상처리')

@task(task_id='task_c')
def task_c():
    print('정상처리')

@task(task_id='task_d',trigger_rule='non_skipped')
def task_d():
    print('정상처리')

random_branch() >> [task_a, task_b(), task_c()] >> task_d()

위 코드에서 task는 총 5개가 정의되어 있습니다.

순서대로 task flow를 그려보면 이런 그래프가 그려질 것입니다.

task_id가 'branching'인 1번 task는 task를 분기하는 방법중에서 @task Branch decorator를 사용한 방법입니다. 해당 함수는 random에 따라 2번, 3번, 4번 task 중에 하나가 도는 로직입니다. 예를 들어, 3번 task가 선택됐다면, 2번 task와 4번 task는 skip처리가 될 것입니다. 그럼 이 때, 'none_skipped'라는 trigger_rule을 가진 5번 task는 돌지 않아야 하므로, 해당 처리가 어떻게 되는지 확인해보는 코드라고 할 수 있겠죠.

 

dags폴더에 dags_python_with_trigger_rule_eg2.py 파일을 생성해 아래와 같이 코드를 짜봅시다.

from airflow import DAG
from airflow.decorators import task
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
from airflow.exceptions import AirflowException
import pendulum

with DAG(
    dag_id='dags_python_with_trigger_rule_eg2',
    start_date=pendulum.datetime(2023,8,1,tz="UTC"),
    schedule=None,
    catchup=False
) as dag:
    
    @task.branch(task_id='branching')
    def random_branch():
        import random
        item_lst = ['A','B','C']
        selected_item = random.choice(item_lst)
        if selected_item == 'A':
            return 'task_a'
        elif selected_item == 'B':
            return 'task_b'
        elif selected_item == 'C':
            return 'task_c'

    task_a = BashOperator(
        task_id = 'task_a',
        bash_command='echo upstream1'
    )

    @task(task_id='task_b')
    def task_b():
        print('정상처리')

    @task(task_id='task_c')
    def task_c():
        print('정상처리')

    @task(task_id='task_d',trigger_rule='none_skipped')
    def task_d():
        print('정상처리')

    random_branch() >> [task_a, task_b(), task_c()] >> task_d()

airflow를 통해서 의도한 대로 동작하는지 확인해봅시다.

task 실행관계

branching task를 통해 task_c가 선택됐고, 그에 따라 후행 task인 task_d도 skip된 것을 확인할 수 있습니다.

task_d는 trigger_rule이 none_skipped 이기에 선행 task들이 skip되지 않아야 실행되는 task이기 때문입니다.


이상으로, Trigger Rule의 종류와 어떻게 사용하는지에 대해 배워봤습니다.

 

 

반응형