컴퓨터 공부/💿 Airflow

[Airflow] Python Operator with macros

letzgorats 2023. 8. 28. 15:45

BashOperator에서도 macro변수를 어떻게 쓸 수 있는지 알아본만큼, 이번 시간에는 PythonOperator에서 macro를 사용하는 방법을 알아보겠습니다.

 

1. 먼저 PythonOperator에서는 어떤 파라미터가 Template 변수를 지원할까요?

python operator에서 템플릿을 지원하는 파라미터

공식문서를 살펴보면, 'template_dict', 'op_args', 'op_kwargs' 가 템플릿을 지원합니다.

저번에는 op_kwargs 를 이용해서 template 변수를 써봤으므로, 이번에는 templates_dict 를 써서 macro변수를 써보겠습니다.

 

Python 오퍼레이터를 사용해 macro변수를 써보는 코드를 작성해보겠습니다.

@task(task_id = 'task_using_macros',
      templates_dict = {'start_date':'{{ (data_interval_end 
      + macros.dateutil.relativedelta.relativedelta(months=-1, day=1)) | ds }}',
                        'end_date':'{{ (data_interval_end.replace(day=1) 
      + macros.dateutil.relativedelta.relativedelta(days=-1)) | ds }}'
          }
      )

def get_datetime_macro(**kwargs) :
    templates_dict = kwargs.get('templates_dict') or {}
    if templates_dict:
        start_date = templates_dict.get('start_date') or 'start_date없음'
        end_date = templates_dict.get('end_date') or 'end_date없음'
        print(start_date)
        print(end_date)

task decorator를 이용해서 선언한 부분이고, 그것을 이용해서 실행하고자 하는 함수로 구성되어 있는 코드입니다.

'templates_dict'에는 우리가 templates 변수를 이용해서 꺼내온 값들을 key, value 형식으로 작성해서 넣어주면 되는데요, 이 templates_dict는 아래 함수의  **kwargs 인자로 전달이 됩니다.

templates_dict 안에는 'start_date' 의 key 값과 end_date' 의 key값이 있습니다. 'start_date'의 value에는 배치일 기준으로 "전 월 1일"을 의미하는 값이 들어가 있고, 'end_date'의 value에는 배치일 기준으로 "해당 월의 1일로 바꾼 후 그 전날"을 의미하는 즉, "전 월 마지막 일"을 뜻하는 값이 들어가 있습니다.

 

get_datetime_macro 함수를 살펴보자면, kwargs의 'templates_dict' 키값을 가져오는데, task decorator에서 선언했던  templates_dict는 kwargs에 통째로 key로 들어갑니다. 그리고 { }로 감싸진 딕셔너리가 value값으로 들어갑니다.

즉, 함수에서 선언한 templates_dict 변수에 전체 딕셔너리가 들어가게 되고, 그 딕셔너리에서 또 'start_date'의 키값과 'end_date'의 키 값을 추출하는 로직입니다.


여기서 한 가지 의문이 들 수 있습니다.

※ Python Operator에서 굳이 macro 변수를 사용할 필요가 있을까요?

만약 파이썬 문법을 이용해서 날짜를 DAG 안에서 직접 연산할 수 있다면 macro변수를 굳이 쓸 필요가 있을지 한 번 직접 연산을 하는 코드를 작성해보겠습니다.

@task(task_id='task_direct_calc')
def get_datetime_calc(**kwargs):
    from dateutil.relativedelta import relativedelta
    data_interval_end = kwargs['data_interval_end']
    
    prev_month_day_first = data_interval_end + relativedelta(months=-1, days=1)
    prev_month_day_last = data_interval_end.replace(day=1) + relativedelta(days=-1)
    
    print(prev_month_day_first.strftime('%Y-%m-%d'))
    print(prev_month_day_last.strftime('%Y-%m-%d'))

kwargs에서 'data_interval_end' 키값을 추출해와 relativedelta 라이브러리를 import 해서 원하는 날짜를 가져오도록 계산했습니다.

 

이 두 방법 중에 어떤 것이 편한지 실제로 dag을 작성해보면서 airflow에서 확인하는 작업을 거쳐보겠습니다.

dags_python_with_macro.py를 dags폴더에 생성해 아래와 같이 작성합니다.

from airflow import DAG
import pendulum
from airflow.decorators import task

with DAG(
    dag_id = "dags_python_with_macro",
    schedule="10 0 * * *",
    start_date=pendulum.datetime(2023, 8, 1, tz="UTC"),
    catchup=False
) as dag:
    
    # macro 변수 사용
    @task(task_id = 'task_using_macros',
          templates_dict = {'start_date':'{{ (data_interval_end + macros.dateutil.relativedelta.relativedelta(months=-1, day=1)) | ds }}',
                        'end_date':'{{ (data_interval_end.replace(day=1) + macros.dateutil.relativedelta.relativedelta(days=-1)) | ds }}'
          }
      )
    
    def get_datetime_macro(**kwargs) :
        templates_dict = kwargs.get('templates_dict') or {}
        if templates_dict:
            start_date = templates_dict.get('start_date') or 'start_date없음'
            end_date = templates_dict.get('end_date') or 'end_date없음'
            print(start_date)
            print(end_date)

    # 직접 연산
    @task(task_id='task_direct_calc')
    def get_datetime_calc(**kwargs):
        from dateutil.relativedelta import relativedelta
        
        data_interval_end = kwargs['data_interval_end']
        prev_month_day_first = data_interval_end + relativedelta(months=-1, days=1)
        prev_month_day_last = data_interval_end.replace(day=1) + relativedelta(days=-1)
        
        print(prev_month_day_first.strftime('%Y-%m-%d'))
        print(prev_month_day_last.strftime('%Y-%m-%d'))


    get_datetime_macro() >> get_datetime_calc()

이 때, 직접 연산을 할 때, 라이브러리를 코드 맨 위에 적지 않고, 함수안에서 import하는 이유는 스케줄러의 부하를 경감해주기 위함입니다.

스케줄러는 주기적으로 우리가 만든 dag을 파싱하는데, DAG이 시작하기 전인 맨 윗부분이나 as dag으로 끝나는 부분(오퍼레이터 선언하기 전 부분)에 우리가 어떤 내용을 작성을 해놓으면 DAG이 실행되지 않아도 그 내용들을 주기적으로 검사를 합니다.

하지만, 오퍼레이터 안에, 혹은 task decorator 안의 코드는 검사를 하지는 않으므로, 스케줄러가 검사를 하는 부분은 최소로 하는 것이 부하경감에 도움이 됩니다. 즉, 오퍼레이터 안에서만 쓰이는 라이브러리는 오퍼레이터 안에다가 쓰는 것이 좋습니다.

(※ 대규모 환경을 경험하다보면, 스케줄러 부하 문제 때문에 골치를 많이 썪는데, 가급적이면 스케줄러 부하를 줄이는 방향으로 dag을 작성하는 것이 중요합니다.) 

 

작성된 코드가 잘 돌아갔는지 airflow에서 확인해보면 아래와 같습니다.

dags_python_with_macro 의 배치기준일

data_interval_end를 살펴보면 배치기준일이 2023-08-28인 것을 알 수 있습니다. 이제, "전 월 1일"과 "전 월 마지막 일"인 "7월 1일"과 "7월 31"이 잘 나왔는지 로그를 확인해보겠습니다.

macro변수를 사용한 로직

위 사진처럼, macro변수를 사용한 로직에서 원하는 값이 잘 출력됐음을 확인할 수 있습니다.

직접 연산을 한 로직

 파이썬 문법을 이용해서 날짜를 DAG 안에서 직접 연산을 한 로직에서도 똑같이 원하는 값이 잘 나왔습니다.

 

즉, 두 방법 잘 작동되니까, 각자 원하는 방법을 선택해서 사용하면 됩니다!

이상으로, python operator에서 macro변수를 사용하는 방법과 직접 날짜연산을 하는 방법에 대해 알아봤습니다.

반응형