컴퓨터 공부/💿 Airflow

[Airflow] Bash 오퍼레이터 with Template

letzgorats 2023. 8. 26. 11:06

Bash Opertor를 쓰면서 Jinja 템플릿을 어떻게 하면 적용할 수 있는지 살펴봅시다.

 

1. Bash 오퍼레이터

  • Bash 오퍼레이터는 어떤 파라미터에 Template를 쓸 수 있을까요?
  • 공식문서를 살펴보면 아래와 같습니다.
    • bash_command (str)  (templated)
    • env (dict[str,str] | None)
    • append_env (bool)
    • output_encoding (str)
    • skip_exit_code (int)
    • cwd (str | None)
  • bash_command 와 env 두 개의 파라미터가 template을 쓸 수 있습니다.

바로 한 번 실습을 해보도록 하겠습니다.    dags_bash_with_template.py 라는 파일을 dags폴더에 생성해 아래와 같이 코드를 짰습니다.

from airflow import DAG
import pendulum
import datetime
from airflow.operators.bash import BashOperator

with DAG(
    dag_id="dags_bash_with_template",
    schedule="10 0 * * *",
    start_date=pendulum.datetime(2023, 8, 1, tz="UTC"),
    catchup=False
) as dag:
    
    bash_t1 = BashOperator(
        task_id = 'bash_t1',
        bash_command= 'echo "data_interval_end: {{ data_interval_end }}" '
    )

    bash_t2 = BashOperator(
        task_id = 'bash_t2',
        env = {
            'START_DATE':'{{data_interval_start | ds }}',
            'END_DATE':'{{data_interval_end | ds }}'
        },
        bash_command='echo $START_DATE && echo $END_DATE'
    )

    bash_t1 >> bash_t2

BashOperator 클래스를 사용해서, echo문을 짜봤습니다.bash_t1 task의 bash_command를 사용해서 template을 설정했는데, echo문의 {{ }} 에 data_interval_end 를 인자로 줬습니다.bash_t2 task에서는 env를 사용했는데, env에서는 딕셔너리 형태로 key값과 value값을 줘야 합니다.data_interval_start 값을 대쉬형태가 포함되어 있는 스트링값으로 주기 위해 " | ds " 로 하고 value값으로 설정했습니다.이렇게 하면, dag이 실행될 때, 특정 날짜값으로 template이 설정된 부분이 치환되어서 출력 될 것입니다.

(※ bash_command에서 '&&'는 앞선 커맨드가 성공하면, 뒤의 명령도 실행한다는 쉘 스크립트 문법입니다.)

 

그런 후, bash_t1 >> bash_t2 순으로 task를 실행해봅시다. 

airflow를 확인해본다면, 아래와 같습니다.

bash_t1
bash_t2

이렇게, 원하는 값이 치환돼서 잘 나온 것을 확인할 수 있습니다.

 

이상으로, bash_operator를 확인하여 템플릿을 활용해봤습니다.

 

반응형