컴퓨터 공부/💿 Airflow

[Airflow] Task 연결하기

letzgorats 2023. 8. 18. 11:10

Task 연결 방법으로는 두 가지가 있습니다.

  • 1) ">>", "<<" 사용하기 ( 에어플로우 공식추천방식)
  • 2) 함수 사용하기 (set_downstream 이나 set_upstream 함수)
    • (ex) task1.set_downstream(task2,task3) == task1 >> [task2, task3] 

복잡한 Task는 어떻게 연결할까요?

Task연결방법

Task 가 복잡하게 연결될 때, 한줄씩 계속 입력하기보다는

">>"나"<<" 와 리스트 "[ ]"  를 적절히 사용하면서 보다 간결하게 표현할 수 있습니다.

Task연결 방법

DAG이 위 그림처럼 여러 task들이 얽혀 있을 때, code로 작성하면 위 그림과 같습니다.

실습을 해보죠.

from airflow import DAG
import pendulum
import datetime
from airflow.operators.empty import EmptyOperator

with DAG(
    dag_id = "dags_conn_test",
    schedule=None,
    start_date=pendulum.datetime(2023,8,1,tz="UTC"),
    catchup=False
) as dag:
    
    t1 = EmptyOperator(
        task_id= "t1"
    )

    t2 = EmptyOperator(
        task_id= "t2"
    )

    t3 = EmptyOperator(
        task_id= "t3"
    )

    t4 = EmptyOperator(
        task_id= "t4"
    )

    t5 = EmptyOperator(
        task_id= "t5"
    )

    t6 = EmptyOperator(
        task_id= "t6"
    )

    t7 = EmptyOperator(
        task_id= "t7"
    )

    t8 = EmptyOperator(
        task_id= "t8"
    )

    t1 >> [t2,t3] >> t4
    t5 >> t4
    [t4,t7] >> t6 >> t8

코드를 그대로 작성하고, airflow에 해당 dag의 그래프를 시각적으로 확인해보면 아래와 같았습니다!

(저는 경로상 오류가 떠서 다시한번 dag파일의 경로를 설정해줬습니다. 내가 현재 작성한 dag파일을 도커 경로에 그대로 올리려면, 저번 포스팅에 올린것과 같이 docker-compose.yaml 에서의 volume부분을 건들여야 합니다. 작성하고 있는 곳을 ":"를 기준으로 써주면 되는데,  

저는 위와 같이 경로를 써주는데, AIRFLOW_HOME 환경 변수를 현재 DAG 파일을 작성하고 있는 위치로 변경했습니다.

pwd를 통해 현재 경로를 파악하고, 

터미널에서 현재 세션에서AIRFLOW_HOME을 설정하려면: export AIRFLOW_HOME=$(pwd) 를 해주면 됩니다.

영구적 변경을 원한다면: 사용하는 셸의 설정 파일 (예: .bashrc, .bash_profile, .zshrc 등)에 위의 export 명령어를 추가합니다. 이렇게 하면 새 터미널 세션을 시작할 때마다 AIRFLOW_HOME이 자동으로 설정됩니다.

예를 들어, .bashrc 파일에 설정을 추가하려면:

echo "export AIRFLOW_HOME=$(pwd)" >> ~/.bashrc
source ~/.bashrc

와 같이 해주면 됩니다. 저는 일단 안전하게, 현재 세션에서만 했습니다.

 

이렇게 설정을 마치고, 해당 DAG파일의 그래프를 살펴봤는데, 코드에 작성했던 것과 같이 task의 선행-후행 관계가 잘 정립되어 있는 것을 확인할 수 있었습니다.

코드를 통해서 간결하게 task 선-후 행 관계를 작성해볼 수 있었던 시간이었습니다.

반응형