컴퓨터 공부/💿 Airflow

[Airflow] Trigger Run 오퍼레이터

letzgorats 2023. 8. 31. 08:47

이번에는 다른 DAG을 수행시킬 수 있는 'Trigger Run 오퍼레이터' 라는 것에 대해 알아보도록 하겠습니다.

 

Airflow에서는 DAG간의 의존관계는 선-후행 관계입니다. 이를 설정할 수 있는 방법은 크게 2가지가 있는데요, 그 2개 중에 하나인, "TriggerDagRun" 오퍼레이터를 이번 포스팅에서 배워보겠습니다.

 

1. DAG 간 의존관계 설정

  • DAG 의존관계 설정 방법
    • (1) TriggerDagRun 오퍼레이터

TriggerDagRun 오퍼레이터

오퍼레이터를 이용해서 Task를 만드는 것처럼 TriggerDagRun오퍼레이터로 Task를 만듭니다. task를 만들면서 파라미터를 줄 때, 어떤 DAG을 Trigger할 지 그 DAG의 id를 넣게 되어있습니다.

위 그림을 기준으로, task1이 선행 task고, task2, task3, task4 가 후행 task인데, task2의 dag_id가 B라 하고, task3의 dag_id가 C라고 하고, task4의 dag_id가 D라고 할 때, task2,task3,task4가 돌면서 DAG B,C,D가 돌게 되는 구조입니다.

  • (2) ExternalTask 센서

ExternalTask 센서

원리는 이렇습니다. 센서를 통해서 Task를 만들게 됩니다. 위 그림에서 Sensor1 이라는 식으로 Sensor라고 표기를 했지만, 실은 다 Task입니다. Task를 만들 때도, TriggerDagRun 오퍼레이터와 마찬가지로, 감지해야 할 Dag id를 넣게 되어 있습니다. 즉, ExternalTask 센서는 어떤 Dag에 어떤 Task가 완료되었는지를 감지하는 센서인 셈입니다. (dag_id만 넣어도 되고, task_id까지 넣어도 되는데, 우선 실습에서는 dag_id만 넣도록 하겠습니다.)

각 Sensor를 만들면서, 어떤 dag의 완료여부를 기다릴지 정하는 식으로 task를 만들고 task 후행에다가 돌리고 싶은 다른 task를 이어주면 됩니다.

ExternalTask 센서 같은 경우는 여러개의 dag이 완료가 되면 그것을 인지하고 해당 작업을 시작하겠다는 용도로 많이 쓰입니다.


이렇게, DAG간의 의존관계를 설정하는 2가지의 방법을 살펴봤습니다. 둘의 차이점을 표로 확인하시죠.

비교 TriggerDagRun 오퍼레이터 ExternalTask 센서
방식 실행할 다른 DAG의 ID를 지정하여 수행 본 Task가 수행되기 전 다른 DAG의 완료를 기다린 후 수행
권고 사용시점 Trigger 되는 DAG의 선행 DAG이 하나만 있을 경우 Trigger 되는 DAG의 선행 DAG이 2개 이상인 경우

 

그 중에서 이번 포스팅에서는 TriggerRun 오퍼레이터 방법에 대해 실습을 진행해보겠습니다.

from airfloww.operators.trigger_dagrun import TriggerDagRunOperator
with DAG(
   ...
) as dag:
    start_task = BashOperator(
        task_id = 'start_task',
        bash_command = 'echo "start!"',    
    )
    
    trigger_dag_task = TriggerDagRunOperator(
        task_id = 'trigger_dag_task',
        trigger_dag_id = 'dags_python_operator',
        trigger_run_id = None,
        execution_date = '{{ data_interval_start }}',
        reset_dag_run = True,
        wait_for_completion = False,
        poke_interval = 60,
        allowed_states = ['success'],
        failed_states=None
    )
    
    start_task >> trigger_dag_task

먼저 operators.trigger_dagrun 라이브러리에서 TriggerDagRunOperator를 import 해옵니다.

가지고 온 class 파일을 이용해서 객체화를 해주면 되는데, Trigger된 오퍼레이터를 airflow 공식 가이드에서 찾아보시면 알겠지만, 많은 파라미터가 존재합니다.

trigger_dagrun 오퍼레이터의 파라미터

여기서 "trigger_dag_id" 라는 파라미터가 있는데, 이 파라미터는 어떤 dag을 trigger할 것인지 명시해주는 파라미터입니다. 필수값이라고 할 수 있죠. 즉, 코드를 기준으로 봤을 때, 'dags_python_operator'라는 dag을 실행시키겠다는 의미가 됩니다.

 

나머지는 다 옵션 파라미터인데, 그 중에서 "trigger_run_id" 라는 파라미터가 무엇인지 한 번 이해해봅시다.


※ run_id란 무엇일까요?

  • DAG 의 수행 방식과 시간을 유일하게 식별해주는 키입니다.
  • 같은 시간이라 해도 수행 방식(Schedule, manual, Backfill) 에 따라 키가 달라집니다.
    • (Schdule을 정하는 방식, airflow에서 직접 수동으로 trigger을 시키는 방식인 manual방식, 과거 날짜에 대해서 수행할 수 있는 Backfil방식) 에 따라 run_id 값이 달라집니다.
  • 스케줄에 의해 실행된 경우 scheduled__{{data_interval_start}} 값을 가집니다.

schduled 방식일 때, run_id
manual방식일 때, run_id

참고로, maual 방식으로 dag을 돌렸을 때, run_id에 manual__{{data_interval_start}} 에서 'data_interval_start'값은 수작업으로 수행시킨 시간이 아니라, 수작업으로 수행시키되, 수작업으로 수행시킨 "스케줄의 구간" 중에서 'data_interval_start'값이 나온다고 보면 됩니다.

 


run_id값을 알아봤으니, 이어서 코드를 보겠습니다.

 

→ 'trigger_run_id' 값은 우리가 'trigger_dag_id'에서 지정한 dag을 어떤 run_id값으로 실행시킬 것인지 정하는 파라미터입니다. 해당 파라미터는 템플릿 문법을 적용시킬 수 있기 때문에, 'schedule__((~timestamp~}}', 'manual__{~timestamp

~}}' 등으로 템플릿 변수를 이용해서 작성하면 됩니다.

 

→ 'execution_date'는 여기에다가 값을 주고 trigger를 하게 되면, trigger가 될 때, manual 방식으로 trigger가 된 것으로 간주를 합니다. 즉, 해당 dag에서 run_id값을 확인해보면, "manual__{{execution_date에서 정한 값}}" 으로 나오는 것을 확인할 수 있습니다.

 

→ 'reset_dag_run'은 사용자가 trigger하려고 하는 이 dag에 보니까 이미 run_id로 수행된 이력이 있을 때, 그럼에도 이 dag을 수행 시킬 것인지 True, False로 주는 파라미터입니다.

 

다음 4개의 파라미터는 그림을 그려가면서 설명해보겠습니다.

t1 -> t2 -> t3

그림처럼 3개의 task가 있다고 가정해봅시다. 그 중에서 't2' task가 코드에 있는 TriggerDagRunOperator로 만들어졌다고 하고, 't2' task가 실행시키려고 하는 dag을 'C'라고 하는 dag이라고 보겠습니다. 그러면 't3' task를 't2'다음으로 돌릴건데, "C" dag이 trigger가 완료가 돼서 success가 되고, 't2' task도 success 로 마킹이 된 후에야 't3'를 돌리고 싶다면 어떻게 해야 할까요?

 

→ 그럴 때, 'wait_for_completion' 값을 True로 주면 됩니다. 그러면, Trigger는 "C"라는 DAG이 완료가 된 시점에서야 "t2"도 "success"로 마킹이 됩니다. 만약 이 값을 False로 주게 되면, Trigger는 "C"라는 DAG이 성공되는지 안 되는지 보지도 않고 바로 "success"로 빠져나옵니다. 

즉, 만약에 't3' task와 "C"라는 DAG 간에 무언가 유종관계를 주고 싶다면, 'wait_for_completion'값을 True로 줘서 "C"라는 DAG이 완료가 되어야 "t2" task도 완료가 되게끔 할 수 있습니다.

 

→ 'poke_interval' 값은 "C"라는 DAG이 완료가 되었는지 안 되었는지를 계속 봐야하기 때문에 들여다 보는 주기를 말합니다. 즉, 모니터링 주기라고 보면 됩니다.(초)

 

→ 'allowed_states'는 't2' task를 "success"로 마킹하기 위해서 "C"라는 DAG이 어떤 상태로 끝나야 하는지 정해주는 값입니다. 기본적으로는 "success"로 끝나야 't2'도 "success"로 마킹이 된다는 뜻인데요, 만약 "C"라는 DAG을 trigger하는데, "C"가 "success"뿐만 아니라 "fail"로 끝나도 그 결과에 상관없이 't2' task를 "success"로 마킹하고 싶다면, allowed_states 에 "fail"도 추가해주면 됩니다.

 

→ 'failed_states' 는 'allowd_states'의 반대인데, 't2'라는 task가 'fail'로 마킹되기 위해서 "C"라고 하는 DAG이 어떤 상태로 끝나야 하는지를 명시해주면 됩니다. 만약 'fail'이라고 값을 주면, "C"라고 하는 DAG이 "fail"로 끝나면, 't2' task도 "fail"로 마킹이 되는 셈이죠.


이제 실제로 dags폴더에 dags_trigger_dag_run_operator.py 라는 파일을 생성해 아래와 같이 코드를 작성해봅시다.

from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
import pendulum

with DAG(
    dag_id ='dags_trigger_dag_run_operator',
    start_date=pendulum.datetime(2023,8,1,tz="UTC"),
    schedule='30 9 * * *',
    catchup=False
) as dag:
    
    start_task = BashOperator(
        task_id = 'start_task',
        bash_command='echo "start!"',
    )

    trigger_dag_task = TriggerDagRunOperator(
        task_id = 'trigger_dag_task',
        trigger_dag_id = 'dags_python_operator',
        trigger_run_id = None,
        execution_date = '{{ data_interval_start }}',
        reset_dag_run = True,
        wait_for_completion = False,
        poke_interval = 60,
        allowed_states = ['success'],
        failed_states=None
    )
    
    start_task >> trigger_dag_task

run_id 값을 따로 안주고, execution_date 값을 줄건데, 해당 dag의 스케줄이 매일 9시 30분에 실행되는 dag이므로 '9시 30' 분의 값이 들어갈 것입니다. 그리고, 트리거링 된 DAG이 완료될 때 까지 기다리지 않기 위해 wait_for_completion 값을 False로 줬습니다.

 

여기서, dag_id 에 준 'dags_python_operator'는 예전에 실습해봤던 파일로 아래와 같습니다.

import datetime
import pendulum

from airflow import DAG
from airflow.operators.python import PythonOperator
import random

with DAG(
    dag_id="dags_python_operator",
    schedule="30 6 * * *",
    start_date=pendulum.datetime(2023, 8, 1, tz="UTC"),
    catchup=False,
) as dag:
    
    def select_dog():
        dogs = ['Bulldog','Pomeranian','Poodle','Chihuahua','Dashhund']
        rand_int = random.randint(0,4)
        print(dogs[rand_int])

    py_t1 = PythonOperator(
        task_id = 'py_t1',
        python_callable=select_dog
    )

    py_t1

이제, 본격적으로 airflow에서 실행을 해보도록하겠습니다.

그래프

그래프는 다음과 같은 관계를 형성합니다.

 

triggering 될 DAG인 'dags_python_operator'는 매일 6시 30분에 도는 데일리 dag입니다. 이제 trigger를 걸어서 trigger로 수행이 됐을 때, 어떻게 수행이 되는지 보도록 하겠습니다.

trigger_dag_task가 잘 trigger되었습니다.
dag_python_operator가 잘 동작했습니다.

trigger_dag_task 를 trigger했는데, dag_python_operator에 새로운 실행결과가 생겼습니다. 해당 실행 결과에서, run_id는 'manual형태로 생겼는데요, 'trigger_run' 값은 주지 않았지만, execution_date값을 {{dat_interval_start}} 로 줬기 때문에, trigger된 'dag_python_operator' DAG의 run_id 값이 "manual__2023-08-29T09:30" 이라고 나온 것입니다.

dags_trigger_dag_run_operator 의 data_interval_start 값

실제로, 'dags_trigger_dag_run_operator' 의 'data_interval_start' 값은 "2023-08-29T09:30 UTC"입니다.


이상으로, Trigger Dag Run Operator에 대해서 알아봤습니다. 오늘 포스팅은 꼭 기억에 넣도록 합시다!

반응형

'컴퓨터 공부 > 💿 Airflow' 카테고리의 다른 글

[Airflow] 서울시 공공데이터 API 키 발급받기  (0) 2023.09.30
[Airflow] 지원되는 오퍼레이터 보기  (0) 2023.08.31
[Airflow] Edge Label  (0) 2023.08.30
[Airflow] Task Group  (2) 2023.08.30
[Airflow] Trigger Rule  (0) 2023.08.30