컴퓨터 공부/💿 Airflow

[Airflow] Python Operator에서 Jinja 템플릿 사용하기

letzgorats 2023. 8. 28. 04:51

저번에는 Bash Operator를 Jinja 템플릿을 사용했다면, 이번에는 Python Operator에서 사용해봅시다.

 

1. Python 오퍼레이터에서 with Template

  • Python 오퍼레이터는 어떤 파라미터에 Template을 쓸 수 있을까요?
  • 공식문서를 살펴보면 아래와 같습니다.
    • python_callable
    • op_kwargs
    • op_args
    • template_dict
    • template_exts
    • show_return_value_in_logs
  • op_kwargs 와 op_args, template_dict 세 개의 파라미터가 template을 쓸 수 있습니다.

바로 한 번 실습을 해보도록 하겠습니다. 이 중에서, op_kwargs를 이용해서 jinja 템플릿을 써보도록 하겠습니다.

(※ 파이썬 오퍼레이터는 **kwargs에 Template 변수들을 자동으로 제공해주고 있습니다.)

--> 이 말인 즉슨, **kwargs 를 이용해서 그냥 jinja 템플릿에서 원하는 값을 꺼내서 쓰면 된다는 소리입니다.

코드로 살펴보죠.

 

dags_python_template.py 라는 파일을 dags폴더에 생성해 아래와 같이 코드를 짰습니다.

from airflow import DAG
import datetime
import pendulum
from airflow.operators.python import PythonOperator
from airflow.decorators import task

with DAG(
    dag_id = "dags_python_template",
    schedule="30 9 * * *",
    start_date=pendulum.datetime(2023, 8, 10, tz="UTC"),
    catchup=False
) as dag:
    
    # 1번 방식
    def python_function(start_date, end_date, **kwargs):
        print(start_date)
        print(end_date)

    python_t1 = PythonOperator(
        task_id = "python_t1",
        python_callable=python_function,
        op_kwargs={'start_date':'{{data_interval_start | ds}}','end_date':'{{data_interval_end | ds}}'}
    )

    # 2번 방식

    @task(task_id = 'python_t2')
    def python_function2(**kwargs):
        print(kwargs)
        print('ds:' + kwargs['ds'])
        print('ts:' + kwargs['ts'])
        print('data_interval_start:' + str(kwargs['data_interval_start']))
        print('data_interval_end:' + str(kwargs['data_interval_end']))
        print('task_instance:' + str(kwargs['ti']))

    python_t1 >> python_function2()

1번 방식처럼, op_kwargs 변수를 사용해서 'start_date' key에 {{data_interval_start}} 를 value로 주고, 'end_date' key에 {{data_interval_end}}를 value로 줌으로써 실제 치환된 값을 출력할 수 있습니다. 즉, jinja 템플릿의 템플릿 변수를 이용한 방법이죠.

 

2번 방식처럼, 데코레이터를 사용해 함수를 실행하는데, 이 때 파라미터로 **kwargs만 줍니다. op_kwargs 에 이미 우리가 쓸 수 있는 jinja템플릿 변수들이 다 있기 때문에, 거기서 그냥 꺼내쓰는 방법입니다.

 

이제, airflow에서 확인을 해볼까요?

1번 방식

1번 방식에서는, start_date 라는 키 값에 ds형태의 data_interval_start를 value값으로 줘서 '2023-08-26' 이 출력됐고, start_end 라는 키 값에 ds형태의 data_interval_end를 value값으로 줘서 '2023-08-27' 이 출력된 것을 알 수 있습니다.

 

2번 방식

2번 방식에서도, 이미 op_kwargs 에 저장된 jinja템플릿 변수들이 각각에 맞게 잘 출력된 것을 알 수 있습니다.

 

이렇게, 두 방식을 통해서 우리가 모두 동일하게 템플릿 변수들을 꺼내서 쓸 수 있다는 것을 배웠습니다.

파이썬 오퍼레이터를 쓸 때, Jinja 템플릿을 이용하는 방법 중 기호에 맞게 가져다 쓰면 됩니다.

이상으로, python operator를 이용해 Jinja 템플릿을 사용하는 방법에 대해 알아봤습니다!

 

 

 

반응형