앞서서는 특정 dag에 있는 task끼리만 데이터를 공유할 수 있는 방법이었다면, 모든 dag에서 데이터를 접근하는 방식에 대해 알아볼까합니다. 그런 용도로 airflow에서 기능을 제공하는 것이 Variable 입니다.
1. 전역변수 Variable 이해
- Xcom: 특정 DAG, 특정 schedule에 수행되는 Task 간에만 공유
- 모든 DAG이 공유할 수 있는 전역 변수는 없을까요? → 바로 Variable이 있습니다!
※ Variable 등록하기
: airflow 서비스를 띄우고, Admin 탭에 들어가서 Variables 메뉴를 누르고 "+" 버튼을 누르면 됩니다.
: 실제 Variable의 Key, Value 값은 메타 DB에 저장됩니다. (variable 테이블)
Variable도 Key, Value 형태여서 smaple_key와 sample_value로 Variable를 등록해봤습니다.
그렇다면, 이제 등록한 Variable을 꺼내보는 실습을 진행해봅시다.
2. Variable 사용하기
1안) Variable 라이브러리 이용, 파이썬 문법을 이용해 미리 가져오기
from airflow.operators.bash import BashOperator
from airflow.models import Variable
var_value = Variable.get("sample_key")
bash_var_1 = BashOperator(
task_id = "bash_var_1",
bash_command = f"echo variable:{var_value}"
)
파이썬 문법을 이용해서 BashOperator 바깥부분에서 해당 value값을 가져옵니다.
airflow.models에 Variable 라이브러리가 있는데, 이 라이브러리를 사용해 "sample_key"라고 등록한 키값에 접근해 원하는 Value값을 var_value에 할당하는 로직입니다.
2안) Jinja 템플릿 이용, 오퍼레이터 내부에서 가져오기
from airflow.operators.bash import BashOperator
bash_var_2 = BashOperator(
task_id = "bash_var_2",
bash_command = f"echo variable:{{var.value.sample_key}}"
)
해당 코드에서는 Variable 라이브러리를 사용하지 않고, Jinja 템플릿을 이용해서 BashOperator 내부에서 value를 가져옵니다. 'var.value'는 공통이고, 그 중에서, 꺼내고 싶은 Key 값을 적어주면 됩니다. 여기서는 "sample_key"라는 값을 적어주면 되겠죠?
이 두 방법 모두 가능하지만, 권고하는 방법은 2안) 입니다.
1안)에서는 스케줄러의 주기적 DAG 파싱시 Variable.get 개수만큼 DB연결을 일으켜 불필요한 부하가 발생하기 때문이죠.
스케줄러의 과부하 원인 중 하나일 수 있습니다. 그렇기 때문에, 스케줄러의 부하를 줄일 수 있는 "오퍼레이터 내부"에서 Jinja템플릿을 사용하는 방법이 더 효율적이라고 할 수 있습니다.
※ 그런데, 이 전역변수는 언제, 어떻게 쓰면 좋을까요?
: 협업 환경에서 표준화된 dag을 만들기 위해 주로 사용합니다.
: 주로 상수(CONST)로 지정해서 사용할 변수들을 셋팅합니다.
예를 들면,
: e.g) bash_sh_dir = /opt/airflow/plugins/shell
: e.g) base_file_dir = /opt/airflow/plugins/files
처럼 기본적인 경로들을 상수처럼 지정해서 쓸 수 있는 셈이죠.
아니면,
: e.g) email, Alert 메시지를 받을 담당자의 email 주소 정보
처럼 담당자의 이메일 정보나 알림 메시지를 Variable에 등록해놓고 관리를 할 수 있습니다.
그럼, 바로 실습을 해봅시다.
dags폴더에 dags_bash_with_variable.py를 생성해 아래와 같이 코드를 작성합니다.
from airflow import DAG
import pendulum
from airflow.operators.bash import BashOperator
from airflow.models import Variable
with DAG(
dag_id = "dags_bash_with_variable",
schedule = "10 9 * * *",
start_date=pendulum.datetime(2023, 8, 1, tz="UTC"),
catchup=False
) as dag:
var_value = Variable.get("sample_key")
# 1안) Variable 라이브러리 이용
bash_var_1 = BashOperator(
task_id="bash_var_1",
bash_command=f"echo variable{var_value}"
)
# 2안) Jinja template 사용
bash_var_2 = BashOperator(
task_id="bash_var_2",
bash_command="echo variable:{{var.value.sample_key}}"
)
1안에서는 Variable이라는 라이브러리를 가지고 와서 직접 파이썬 문법으로 sample_key에 있는 value를 꺼내서 담았고, bash_command에 전달해준 것입니다.
2안에서는 template변수를 이용해서 직접 variable을 꺼내는 방식인데, 보면 f 문자열 포맷팅이 빠진 것을 확인할 수 있습니다. 디폴트는 var.value이기에, var_value중에 sample_key를 key로 가지는 value를 뽑아온다는 뜻입니다.
이제, 잘 나왔는지 airflow를 통해 확인해봅시다.
1안 방법을 적용한 task 'bash_var_1'을 실행시킨 결과입니다. var_value에 "sample_value"가 잘 할당되어서 bash_command에서도 잘 출력된 것을 확인할 수 있습니다. 실제로 Xcom을 확인해보면, 아래와 같습니다.
그렇다면, 2안을 적용한 task인 'bash_var_2'도 봐보겠습니다.
마찬가지로, Jinja 템플릿을 활용해 Xcom에 있는 원하는 키값에 접근해 그에 맞는 value값인 sample_value가 잘 나온 것을 확인 할 수 있습니다.
이처럼, 결과는 같지만, Variable변수에 접근하는 방식이 다른 두 가지 방법을 살펴봤습니다.
그 중에서도, 스케줄러의 부하를 줄이기 위해 jinja템플릿을 사용하는 2안을 사용하는 것에 익숙해집시다!
'컴퓨터 공부 > 💿 Airflow' 카테고리의 다른 글
[Airflow] Task 분기처리하기 with task.branch (0) | 2023.08.29 |
---|---|
[Airflow] BranchPython 오퍼레이터로 분기처리하기 (0) | 2023.08.29 |
[Airflow] Python & email 오퍼레이터간 Xcom 사용 (0) | 2023.08.29 |
[Airflow] Python & Bash 오퍼레이터 with Xcom (0) | 2023.08.29 |
[Airflow] Bash Operator에서 Xcom 사용 (0) | 2023.08.29 |