컴퓨터 공부/💿 Airflow

[Airflow] Bash Operator with macros

letzgorats 2023. 8. 28. 09:46

Macro 변수는 Jinja 템플릿 내에서 날짜 연산을 가능하게끔 해주는 기능으로서 파이썬의 datetime이나 dateutil 같은 라이브러리를 이용해서 날짜연산을 할 수 있도록 지원을 해주고 있습니다.

 

1. 먼저 이런 Macro변수가 왜 필요한지 살펴봅시다.

아래상황을 가정해봅시다.

DAG 스케줄은 매일 말일에 도는 스케줄인데(ex 0 0 L * *),
BETWEEN 값을 전 월 마지막일부터 어제 날짜까지 주고 싶은데 어떻게 할까요?

sql = f```
SELECT NAME, ADDRESS
FROM TBL_REG
WHERE REG_DATE BETWEEN ?? AND ?? '''

와 같은 sql 이 있다고 합시다.

여기서 예를 들어,
배치일이 1월 31일이면, 12월 31일부터 1월 30일까지
배치일이 2월 28일이면, 1월 31일부터 2월 27일까지
BETWEEN이 설정됐으면 합니다.

DAG 스케줄이 월 단위이니까 Template 변수에서 data_interval_start 값은 저번 배치일 즉, 전 월의 말일이니까 시작일은 해결될 것 같은데, 끝 부분은 어떻게 만들까요?
data_interval_end 에서 하루를 뺀 값이 필요한데, 이런 날짜 계산을 할 때 Macro 변수가 필요한 것입니다. 

Macro 변수에 대해 조금 더 알아봅시다. 공식 홈페이지에 가면, Macro 변수가 어떤 것이 있는지, 어떻게 써야 하는지 잘 나와 있습니다. Template 변수 기반 다양한 날짜 연산이 가능하도록 연산 모듈을 제공하고 있습니다.

macro 변수

상위 네 가지의 Macro 변수가 많이 사용되는데요, 이 중에서도 Macro를 잘 쓰려면 파이썬 datetime 및 dateutil 라이브러리에 익숙해져야 합니다.

 

2. 파이썬의 datetime, dateutil 라이브러리

주피터 노트북으로 코드를 아래와 같이 작성해보겠습니다.

from datetime import datetime
from dateutil import relativedelta

now = datetime(year=2023, month=3, day=30)
print('현재시간:'+str(now))

print('------------월 연산------------')
print(now+relativedelta.relativedelta(month=1))   # 1월로 변경
print(now.replace(month=1))                         # 1월로 변경
print(now+relativedelta.relativedelta(months=-1)) # 1월로 빼기

print('------------일 연산------------')
print(now+relativedelta.relativedelta(day=1))   # 1일로 변경
print(now.replace(day=1))                         # 1일로 변경
print(now+relativedelta.relativedelta(days=-1)) # 1일 빼기

print('------------연산 여러 개-----------')
print(now + relativedelta.relativedelta(months=-1) + relativedelta.relativedelta(days=-1))  # 1개월, 1일 빼기

now + relativedelta.relativedelta(month='바꾸고 싶은 월')

now.replace(month='대체하고 싶은 월')

now + relativedelta.relativedelta(months='빼고 싶은 개월 수')

 

relativedelta.relativedelta(day='바꾸고 싶은 일')

now.replace(day='대체하고 싶은 일')

relativedelta.relativedelta(days='빼고 싶은 일 수')

 

를 적절히 활용하면, 여러 날짜 연산도 할 수 있습니다.

날짜계산 결과

 

 

3. 이제 Bash 오퍼레이터를 사용해서 macro 를 실습해봅시다.

 

t1 = BashOperator(
         task_id = 't1',
         env = {'START_DATE':''},
     )

의 부분에 template 변수에 macro를 활용해봅시다.

 

예시 1)

  매월 말일 수행되는 Dag에서
  변수 START_DATE : 전월 말일,
  변수 END_DATE: 어제로 env 셋팅하기

※ 변수는 YYYY-MM-DD 형식으로 나오도록 하기

dags_bash_with_macro_eg1.py 라는 파일을 dags폴더에 생성한 후 코드를 다음과 같이 작성했습니다.

from airflow import DAG
import datetime
import pendulum
from airflow.operators.bash import BashOperator

with DAG(
    dag_id = "dags_bash_with_macro_eg1",
    schedule="10 0 L * *",
    start_date=pendulum.datetime(2023, 8, 1, tz="UTC"),
    catchup=False
) as dag:
    
    # START_DATE : 전월 말일, END_DATE : 1일 전
    bash_task_t1 = BashOperator(
        task_id = "bash_task_t1",
        # UTC기준이 아닌 한국 시간기준이라면 data_interval_start.in_timezone("Asia/Seoul") 을 넣어야 합니다.
        env={'START_DATE':'{{ data_interval_start | ds}}',
             'END_DATE':'{{ (data_interval_end - macros.dateutil.relativedelta.relativedelta(days=1)) | ds}}'
             },
             bash_command='echo "START_DATE: $START_DATE" && echo "END_DATE: $END_DATE"'
    )

여기서, 위에 있는 macros 라이브러리 사진을 보면, macros.dateutil 을 지원하는데, 그 중에서 relativedelta를 import 해와야 하므로, macros.dateutil.relativedelta에서 또 relativedelta를 써야 하므로 최종적으로 macros.dateutil.relativedelta.relativedelta()를 써줍니다.

 

예시 2)

  매월 둘째주 토요일에 수행되는 Dag에서
  변수 START_DATE: 2주 전 월요일
  변수 END_DATE: 2주 전 토요일로 env 셋팅하기

※ 변수는 YYYY-MM-DD 형식으로 나오도록 하기

dags_bash_with_macro_eg2.py 라는 파일을 dags폴더에 생성한 후 코드를 다음과 같이 작성했습니다.

from airflow import DAG
import datetime
import pendulum
from airflow.operators.bash import BashOperator

with DAG(
    dag_id = "dags_bash_with_macro_eg2",
    schedule="10 0 * * 6#2",
    start_date=pendulum.datetime(2023, 8, 1, tz="UTC"),
    catchup=False
) as dag:
    
    # START_DATE : 2주전 월요일, END_DATE : 2주전 토요일
    bash_task_t2 = BashOperator(
        task_id = "bash_task_t2",
        # UTC기준이 아닌 한국 시간기준이라면 data_interval_start.in_timezone("Asia/Seoul") 을 넣어야 합니다.
        env={'START_DATE':'{{ (data_interval_end - macros.dateutil.relativedelta.relativedelta(days=19)) | ds}}',
             'END_DATE':'{{ (data_interval_end - macros.dateutil.relativedelta.relativedelta(days=14)) | ds}}'
             },
             bash_command='echo "START_DATE: $START_DATE" && echo "END_DATE: $END_DATE"'
    )

2주 전 토요일은, 최근 배치일 기준을 14일을 뺀 날입니다. 또, 2주전 월요일은 19일을 뺀 날이겠죠.

그럼 이제 airflow에서 제대로 값이 나왔는지 확인해보도록 합시다. 

 

먼저, bash_task_t1 을 살펴보면, 배치가 돈 기준 일은 data_interval_end로 7월 31일자 기준으로 돌았습니다.

bash_task_t1

출력 결과를 확인해본다면, 아래와 같습니다.

bash_task_t1

START_DATE에 전월 말일인 6월 30일이 제대로 찍혔고, END_DATE에는 하루 전인 7월 30일이 찍혔습니다. ds 형태로 잘 나온 것도 확인할 수 있습니다.


그렇다면, bash_task_t2 를 살펴볼까요? 배치기준일은 data_interval_end가 8월 12일자 기준으로 돌았습니다.

bash_task_t2

출력 결과를 확인해본다면, 아래와 같습니다.

배치일자 기준으로 START_DATE에 2주전 월요일인 7월 24일이 제대로 찍혔고, END_DATE에는 2주전 토요일인 7월 29일이 찍혔습니다. ds 형태로 잘 나온 것도 확인할 수 있습니다.

 

이상으로, Macros에 대해 알아봤습니다. Macros 가 필요한 이유는 우리가 템플릿 엔진에서 날짜를 꺼내 써야 하는데, 그 날짜에 연산이 필요한 경우가 있습니다. 예를 들면, data_interval_end에서 하루전, 하루 이후, 1주일전, 한달 전 등과 같이 날짜 연산을 해야할 때, Macros 를 주로 씁니다.

 

이 때, macros를 쓰는 방식은 macros.datetime 이나 macros.dateutil 과 같은 명령어를 입력해서 쓰면 되는데, 결국 파이썬 라이브러리를 쓰는 것과 똑같은 것이기에 잘 활용하면 되겠습니다.

 

반응형