Macro 변수는 Jinja 템플릿 내에서 날짜 연산을 가능하게끔 해주는 기능으로서 파이썬의 datetime이나 dateutil 같은 라이브러리를 이용해서 날짜연산을 할 수 있도록 지원을 해주고 있습니다.
1. 먼저 이런 Macro변수가 왜 필요한지 살펴봅시다.
아래상황을 가정해봅시다.
DAG 스케줄은 매일 말일에 도는 스케줄인데(ex 0 0 L * *),
BETWEEN 값을 전 월 마지막일부터 어제 날짜까지 주고 싶은데 어떻게 할까요?
sql = f``` SELECT NAME, ADDRESS FROM TBL_REG WHERE REG_DATE BETWEEN ?? AND ?? '''
와 같은 sql 이 있다고 합시다.
여기서 예를 들어,
배치일이 1월 31일이면, 12월 31일부터 1월 30일까지
배치일이 2월 28일이면, 1월 31일부터 2월 27일까지
BETWEEN이 설정됐으면 합니다.
DAG 스케줄이 월 단위이니까 Template 변수에서 data_interval_start 값은 저번 배치일 즉, 전 월의 말일이니까 시작일은 해결될 것 같은데, 끝 부분은 어떻게 만들까요?
data_interval_end 에서 하루를 뺀 값이 필요한데, 이런 날짜 계산을 할 때 Macro 변수가 필요한 것입니다.
Macro 변수에 대해 조금 더 알아봅시다. 공식 홈페이지에 가면, Macro 변수가 어떤 것이 있는지, 어떻게 써야 하는지 잘 나와 있습니다. Template 변수 기반 다양한 날짜 연산이 가능하도록 연산 모듈을 제공하고 있습니다.
상위 네 가지의 Macro 변수가 많이 사용되는데요, 이 중에서도 Macro를 잘 쓰려면 파이썬 datetime 및 dateutil 라이브러리에 익숙해져야 합니다.
2. 파이썬의 datetime, dateutil 라이브러리
주피터 노트북으로 코드를 아래와 같이 작성해보겠습니다.
from datetime import datetime
from dateutil import relativedelta
now = datetime(year=2023, month=3, day=30)
print('현재시간:'+str(now))
print('------------월 연산------------')
print(now+relativedelta.relativedelta(month=1)) # 1월로 변경
print(now.replace(month=1)) # 1월로 변경
print(now+relativedelta.relativedelta(months=-1)) # 1월로 빼기
print('------------일 연산------------')
print(now+relativedelta.relativedelta(day=1)) # 1일로 변경
print(now.replace(day=1)) # 1일로 변경
print(now+relativedelta.relativedelta(days=-1)) # 1일 빼기
print('------------연산 여러 개-----------')
print(now + relativedelta.relativedelta(months=-1) + relativedelta.relativedelta(days=-1)) # 1개월, 1일 빼기
now + relativedelta.relativedelta(month='바꾸고 싶은 월')
now.replace(month='대체하고 싶은 월')
now + relativedelta.relativedelta(months='빼고 싶은 개월 수')
relativedelta.relativedelta(day='바꾸고 싶은 일')
now.replace(day='대체하고 싶은 일')
relativedelta.relativedelta(days='빼고 싶은 일 수')
를 적절히 활용하면, 여러 날짜 연산도 할 수 있습니다.
3. 이제 Bash 오퍼레이터를 사용해서 macro 를 실습해봅시다.
t1 = BashOperator(
task_id = 't1',
env = {'START_DATE':''},
)
의 부분에 template 변수에 macro를 활용해봅시다.
예시 1)
매월 말일 수행되는 Dag에서
변수 START_DATE : 전월 말일,
변수 END_DATE: 어제로 env 셋팅하기
※ 변수는 YYYY-MM-DD 형식으로 나오도록 하기
dags_bash_with_macro_eg1.py 라는 파일을 dags폴더에 생성한 후 코드를 다음과 같이 작성했습니다.
from airflow import DAG
import datetime
import pendulum
from airflow.operators.bash import BashOperator
with DAG(
dag_id = "dags_bash_with_macro_eg1",
schedule="10 0 L * *",
start_date=pendulum.datetime(2023, 8, 1, tz="UTC"),
catchup=False
) as dag:
# START_DATE : 전월 말일, END_DATE : 1일 전
bash_task_t1 = BashOperator(
task_id = "bash_task_t1",
# UTC기준이 아닌 한국 시간기준이라면 data_interval_start.in_timezone("Asia/Seoul") 을 넣어야 합니다.
env={'START_DATE':'{{ data_interval_start | ds}}',
'END_DATE':'{{ (data_interval_end - macros.dateutil.relativedelta.relativedelta(days=1)) | ds}}'
},
bash_command='echo "START_DATE: $START_DATE" && echo "END_DATE: $END_DATE"'
)
여기서, 위에 있는 macros 라이브러리 사진을 보면, macros.dateutil 을 지원하는데, 그 중에서 relativedelta를 import 해와야 하므로, macros.dateutil.relativedelta에서 또 relativedelta를 써야 하므로 최종적으로 macros.dateutil.relativedelta.relativedelta()를 써줍니다.
예시 2)
매월 둘째주 토요일에 수행되는 Dag에서
변수 START_DATE: 2주 전 월요일
변수 END_DATE: 2주 전 토요일로 env 셋팅하기
※ 변수는 YYYY-MM-DD 형식으로 나오도록 하기
dags_bash_with_macro_eg2.py 라는 파일을 dags폴더에 생성한 후 코드를 다음과 같이 작성했습니다.
from airflow import DAG
import datetime
import pendulum
from airflow.operators.bash import BashOperator
with DAG(
dag_id = "dags_bash_with_macro_eg2",
schedule="10 0 * * 6#2",
start_date=pendulum.datetime(2023, 8, 1, tz="UTC"),
catchup=False
) as dag:
# START_DATE : 2주전 월요일, END_DATE : 2주전 토요일
bash_task_t2 = BashOperator(
task_id = "bash_task_t2",
# UTC기준이 아닌 한국 시간기준이라면 data_interval_start.in_timezone("Asia/Seoul") 을 넣어야 합니다.
env={'START_DATE':'{{ (data_interval_end - macros.dateutil.relativedelta.relativedelta(days=19)) | ds}}',
'END_DATE':'{{ (data_interval_end - macros.dateutil.relativedelta.relativedelta(days=14)) | ds}}'
},
bash_command='echo "START_DATE: $START_DATE" && echo "END_DATE: $END_DATE"'
)
2주 전 토요일은, 최근 배치일 기준을 14일을 뺀 날입니다. 또, 2주전 월요일은 19일을 뺀 날이겠죠.
그럼 이제 airflow에서 제대로 값이 나왔는지 확인해보도록 합시다.
먼저, bash_task_t1 을 살펴보면, 배치가 돈 기준 일은 data_interval_end로 7월 31일자 기준으로 돌았습니다.
출력 결과를 확인해본다면, 아래와 같습니다.
START_DATE에 전월 말일인 6월 30일이 제대로 찍혔고, END_DATE에는 하루 전인 7월 30일이 찍혔습니다. ds 형태로 잘 나온 것도 확인할 수 있습니다.
그렇다면, bash_task_t2 를 살펴볼까요? 배치기준일은 data_interval_end가 8월 12일자 기준으로 돌았습니다.
출력 결과를 확인해본다면, 아래와 같습니다.
배치일자 기준으로 START_DATE에 2주전 월요일인 7월 24일이 제대로 찍혔고, END_DATE에는 2주전 토요일인 7월 29일이 찍혔습니다. ds 형태로 잘 나온 것도 확인할 수 있습니다.
이상으로, Macros에 대해 알아봤습니다. Macros 가 필요한 이유는 우리가 템플릿 엔진에서 날짜를 꺼내 써야 하는데, 그 날짜에 연산이 필요한 경우가 있습니다. 예를 들면, data_interval_end에서 하루전, 하루 이후, 1주일전, 한달 전 등과 같이 날짜 연산을 해야할 때, Macros 를 주로 씁니다.
이 때, macros를 쓰는 방식은 macros.datetime 이나 macros.dateutil 과 같은 명령어를 입력해서 쓰면 되는데, 결국 파이썬 라이브러리를 쓰는 것과 똑같은 것이기에 잘 활용하면 되겠습니다.
'컴퓨터 공부 > 💿 Airflow' 카테고리의 다른 글
[Airflow] Python Operator에서 Xcom 사용 (0) | 2023.08.28 |
---|---|
[Airflow] Python Operator with macros (0) | 2023.08.28 |
[Airflow] Python Operator에서 Jinja 템플릿 사용하기 (0) | 2023.08.28 |
[Airflow] Airflow의 날짜 개념 (0) | 2023.08.27 |
[Airflow] Bash 오퍼레이터 with Template (0) | 2023.08.26 |