python 오퍼레이타에서 xcom을 사용하는 방법과 bash 오퍼레이터에서 xcom을 사용하는 방법을 배웠으니, 이제는 두 오퍼레이터를 혼합해서 Xcom 데이터를 구하는 방법을 알아보겠습니다.
1. Python → Bash 오퍼레이터 Xcom 전달
코드를 먼저 살펴보겠습니다.
@task(task_id='python_push')
def python_push_xcom():
result_dict = {'status':'MyLove','data':['Allu','Arsenal','Myself'],'options_cnt':100}
return result_dict
bash_pull = BashOperator(
task_id = 'bash_pull',
env = {
'STATUS':'{{ti.xcom_pull(task_ids="python_push")["status"]}}',
'DATA':'{{ti.xcom_pull(task_ids="python_push")["data"]}}',
'OPTIONS_CNT':'{{ti.xcom_pull(task_ids="python_push")["oprionts_cnt"]}}'
},
bash_command = 'echo $STATUS && echo $DATA && echo $OPTIONS_CNT'
)
python_push_xcom() >> bash_pull
PythonOperator에서 함수의 리턴 값은 그대로 xcom의 "result_value" key값의 value로 저장됩니다.
BashOperator에서 템플릿 문법을 통해 ti.xcom_pull을 통해 task_ids를 명시해주고 원하는 키값에 해당하는 value값을 가져오는 코드입니다.
bash_command에서 STATUS에는 'MyLove'라는 값이 나오겠고, DATA에는 ['Allu','Arsenal','Myself'], OPTION_CNT에는 100이라는 값이 꺼내져올 것입니다.
2. Bash → Python 오퍼레이터 Xcom 전달
코드를 살펴보겠습니다.
bash_push = BashOperator(
task_id = 'bash_push',
bash_command = 'echo PUSH_START '
'{{ti.xcom_push(key="bash_pushed",value=200)}} && '
'echo PUSH_COMPLETE'
)
@task(task_id='python_pull')
def python_pull_xcom(**kwargs):
ti = kwargs['ti']
status_value = ti.xcom_pull(key='bash_pushed')
return_value = ti.xcom_pull(task_ids='bash_push')
print('status_value:' + str(status_value))
print('return_value:' + return_value)
bash_push >> python_pull_xcom()
이번에는 bash operator로 xcom에 push를 했습니다. key값이 "bash_pushed"인 value에는 200이라는 숫자를 push했고, 'bash_push' 자체의 task의 "return_value"는 PUSH_COMPLETE 라는 스트링을 저장했습니다.
pull 하는 과정에서는 python operator를 사용했는데요, kwargs에 있는 'ti'객체를 가져와서 xcom_pull 함수를 통해 데이터를 꺼내왔습니다.
'status_value'에는 200이 담기겠고, 'return value'에는 PUSH_COMPLETE가 담길 것입니다.
실습을 바로 해볼까요?
dags폴더에 dags_bash_python_with_xcom.py 파일을 생성해서 아래와 같이 코드를 작성했습니다.
from airflow import DAG
import pendulum
from airflow.decorators import task
from airflow.operators.bash import BashOperator
with DAG(
dag_id = "dags_bash_python_with_xcom",
schedule="30 9 * * *",
start_date=pendulum.datetime(2023, 8, 1, tz="UTC"),
catchup=False
) as dag:
@task(task_id='python_push')
def python_push_xcom():
result_dict = {'status':'MyLove','data':['Allu','Arsenal','Myself'],'options_cnt':100}
return result_dict
bash_pull = BashOperator(
task_id = 'bash_pull',
env = {
'STATUS':'{{ti.xcom_pull(task_ids="python_push")["status"]}}',
'DATA':'{{ti.xcom_pull(task_ids="python_push")["data"]}}',
'OPTIONS_CNT':'{{ti.xcom_pull(task_ids="python_push")["oprionts_cnt"]}}'
},
bash_command = 'echo $STATUS && echo $DATA && echo $OPTIONS_CNT'
)
python_push_xcom() >> bash_pull # python operator에서 push -> bash operator에서 pull
bash_push = BashOperator(
task_id = 'bash_push',
bash_command = 'echo PUSH_START '
'{{ti.xcom_push(key="bash_pushed",value=200)}} && '
'echo PUSH_COMPLETE'
)
@task(task_id='python_pull')
def python_pull_xcom(**kwargs):
ti = kwargs['ti']
status_value = ti.xcom_pull(key='bash_pushed')
return_value = ti.xcom_pull(task_ids='bash_push')
print('status_value:' + str(status_value))
print('return_value:' + return_value)
bash_push >> python_pull_xcom() # bash operator에서 push -> python operator에서 pull
제대로 값들이 저장되고 꺼내져왔는지 airflow를 통해 확인해보도록 하겠습니다.
[python_push → bash_pull] 와 [bash_push → python_push] 가 잘 된 것을 볼 수 있습니다.
각 task의 Xcom과 결과를 살펴보자면, 아래와 같습니다.
결과가 모두 잘 출력됐습니다.
이상으로 Python오퍼레이터와 Bash 오퍼레이터 간의 Xcom 값을 pull & push 하는 방법에 대해 알아봤습니다.
'컴퓨터 공부 > 💿 Airflow' 카테고리의 다른 글
[Airflow] 전역 공유변수 Variable (0) | 2023.08.29 |
---|---|
[Airflow] Python & email 오퍼레이터간 Xcom 사용 (0) | 2023.08.29 |
[Airflow] Bash Operator에서 Xcom 사용 (0) | 2023.08.29 |
[Airflow] Python Operator에서 Xcom 사용 (0) | 2023.08.28 |
[Airflow] Python Operator with macros (0) | 2023.08.28 |