이번에는 Bash Operator에서 Xcom 사용하는 방법을 살펴보도록 하겠습니다.
1. Bash 오퍼레이터에서 Xcom 사용하기
먼저, 공식문서에서 Bash Operator에서 쓸 수 있는 템플릿 파라미터가 뭐가 있는지 알아야 합니다.
'env'와 'bash_command' 파라미터에서 템플릿 문법을 적용시킬 수 있는데, 이를 이용하여 push/pull 을 해봅시다.먼저 예시 코드를 보겠습니다.
bash_push = BashOperator(
task_id = 'bash_push',
bash_command = "echo START && "
"echo XCOM PUSHED "
"{{ti.xcom_push(key='bash_pushed',value='first_bash_message') }} && "
"echo COMPLETE"
)
bash_pull = BashOperator(
task_id = 'bash_pull'
env = {'PUSHED_VALUE':"{{ ti.xcom_pull(key='bash_pushed') }}",
'RETURN_VALUE':"{{ ti.xcom_pull(task_id='bash_push') }}"},
bash_command = "echo $PUSHED_VALUE && echo $RETURN_VALUE ",
do_xcom_push=False
)
각각 push와 pull을 하는 task입니다.
push하는 task에서 bash_command 쪽 파라미터에 템플릿 문법을 쓴 것을 볼 수가 있는데요, Python Operator를 쓸 때는 템플릿 문법을 쓰지 않고, kwargs에서 ti 객체를 꺼내왔었습니다. 마찬가지로, 템플릿 문법에서도 ti 객체를 이용할 수가 있습니다. ti.xcom_push처럼 xcom_push 함수를 그대로 쓰고, key, value값을 Python Operator에서처럼 적절히 넣어주면 됩니다.
그런데, template 문법을 기준으로 위에 echo가 2개 있고, 아래에 1개로 총 3개의 echo 커맨드가 있습니다. bash_command에서는 echo처럼 출력하는 문장들이 모두 리턴값으로 간주가 됩니다. 마치 리턴을 3번한 것 처럼 보일 수 있는 셈이죠. 하지만, 리턴은 오직 1번만 할 수 있으므로 결국 마지막 출력 문장인 "echo COMPLETE"가 리턴값으로 간주가 됩니다.
Python Operator를 쓸 땐, 리턴하고 주는 값이 "return_value"라고 하는 키 값으로 그 value가 저장이 됐습니다. 마찬가지로 bash_command에서는 어떤 문장을 출력을 하면, "retrun_value"라고 하는 키에 마지막 출력문이 자동으로 저장되겠습니다.
pull하는 task에서는 지금 'env'에다가 템플릿 문법을 작성했는데요, 'env'는 key, value 형태로 작성한다고 했습니다. 따라서, "PUSHED_VALUE"라는 key에 value를 넣어주는데, key값이 'bash_pushed'인 value를 가져오겠다는 의미입니다. 즉, 'first_bash_message'라는 값이 "PUSHED_VALUE"에 저장이 되는 셈이겠죠.마찬가지로, "RETURN_VALUE"라는 key값에 value를 넣어주는데, task_ids값이 'bash_push'인 value를 가져오겠다는 의미로, task_id값이 'bash_push'인 task의 "return_value"인 "COMPLETE"를 가져오게 됩니다.마지막으로 do_xcom_push를 False로 줬는데, 이 값은 뭘까요?이 값은 bash_command에서 출력되는 값은 자동으로 return값으로 간주가 되고, Xcom에 push가 되어 저장이 된다고 했는데, 그 동작을 하지 말라는 뜻입니다. 즉, Xcom에 올리지 말라는 의미가 됩니다. 디폴트는 True이기 때문에, bash_command 출력값이 Xcom에 올라가는 것을 원치않는다면, do_xcom_push=False 라는 코드를 적어줘야 합니다.즉, 여기서는 "echo $PUSHED_VALUE && echo $RETURN_VALUE"값이 xcom에 들어가지 않겠죠.
이제, 위의 내용을 기반으로 실습을 한 번 해보겠습니다.dags 폴더에 dags_bash_with_xcom.py라는 파일을 생성하고 아래와 같이 코드를 작성합니다.
from airflow import DAG
import pendulum
import datetime
from airflow.operators.bash import BashOperator
with DAG(
dag_id = "dags_bash_with_xcom",
schedule="10 0 * * *",
start_date=pendulum.datetime(2023, 8, 1, tz="UTC"),
catchup=False
) as dag:
bash_push = BashOperator(
task_id ="bash_push",
bash_command="echo START && "
"echo XCOM_PUSHED"
"{{ ti.xcom_push(key='bash_pushed',value='first_bash_message')}} && "
"echo COMPLETE"
)
bash_pull = BashOperator(
task_id='bash_pull',
env={'PUSHED_VALUE':"{{ ti.xcom_pull(key='bash_pushed') }}",
'RETURN_VALUE':"{{ ti.xcom_pull(task_id='bash_push') }}"},
bash_command="echo $PUSHED_VALUE && echo $RETURN_VALUE",
do_xcom_push=False
)
bash_push >> bash_pull
이제, airflow에서 어떻게 나왔는지 보도록 하겠습니다.
먼저, push task에서 key, value값이 잘 매칭 됐는지 보면, 아래와 같습니다. return value에는 마지막 출력 값인 "COMPLETE"가 잘 저장됐고, xcom_push로 직접 'bash_pushed'라는 키값에 'first_bash_message'라는 value값이 잘 저장된 것을 확인할 수 있습니다.
다음은, bash_pull xcom을 확인해볼까요?
bash_pull에서는 bash_command의 리턴값이 자동으로 xcom에 저장되지 못하도록 do_xcom_push의 값을 False로 줬기 때문에 아무런 key, value값도 없는 것을 확인할 수 있습니다.
이제, bash_pull의 로그를 확인해보며 마무리하겠습니다. 의도했던 대로, 'first_bash_message'와 'COMPLETE' 둘 다 잘 나온 것을 확인할 수 있었습니다.
이렇게, bash오퍼레이터는 python 오퍼레이터와 달리, 템플릿 문법을 쓸 수 있는 파라미터(env, bash_command)를 이용해서 xcom을 push하고 pull을 했습니다. python 오퍼레이터 같은 경우는 kwargs에서 ti 객체를 꺼낸다음에, ti객체가 가지고 있는 xcom_push와 xcom_pull 커맨드를 직접 사용했지만, bash 오퍼레이터에서는 템플릿 문법을 작성할 수 있으니, 그냥 'ti.xcom_push'나 'ti.xcom_pull' 처럼 ti 객체를 따로 꺼내지 않고 key, value형태로 바로 작성할 수 있습니다. 그리고, bash 오퍼레이터에서는 출력되는 마지막 문장이 'return_value'값으로 간주가 되기 때문에, 여러 번 echo를 찍었을 때는 마지막으로 찍은 echo문장이 리턴값으로 저장되는 것도 잊지 마시길 바랍니다!
이상으로, bash_operator를 사용해서 xcom에 push하고 데이터를 pull 하는 방법을 알아봤습니다.
'컴퓨터 공부 > 💿 Airflow' 카테고리의 다른 글
[Airflow] Python & email 오퍼레이터간 Xcom 사용 (0) | 2023.08.29 |
---|---|
[Airflow] Python & Bash 오퍼레이터 with Xcom (0) | 2023.08.29 |
[Airflow] Python Operator에서 Xcom 사용 (0) | 2023.08.28 |
[Airflow] Python Operator with macros (0) | 2023.08.28 |
[Airflow] Bash Operator with macros (0) | 2023.08.28 |