컴퓨터 공부/💿 Airflow

[Airflow] Bash Operator에서 Xcom 사용

letzgorats 2023. 8. 29. 05:05

이번에는 Bash Operator에서 Xcom 사용하는 방법을 살펴보도록 하겠습니다.

 

1. Bash 오퍼레이터에서 Xcom 사용하기

먼저, 공식문서에서 Bash Operator에서 쓸 수 있는 템플릿 파라미터가 뭐가 있는지 알아야 합니다.

bash_operator에서의 템플릿 파라미터
'env'와 'bash_command' 파라미터에서 템플릿 문법을 적용시킬 수 있는데, 이를 이용하여 push/pull 을 해봅시다.먼저 예시 코드를 보겠습니다.

bash_push = BashOperator(
    task_id = 'bash_push',
    bash_command = "echo START && "
                   "echo XCOM PUSHED "
                   "{{ti.xcom_push(key='bash_pushed',value='first_bash_message') }} && "
                   "echo COMPLETE"
)

bash_pull = BashOperator(
    task_id = 'bash_pull'
    env = {'PUSHED_VALUE':"{{ ti.xcom_pull(key='bash_pushed') }}",
           'RETURN_VALUE':"{{ ti.xcom_pull(task_id='bash_push') }}"},
    bash_command = "echo $PUSHED_VALUE && echo $RETURN_VALUE ",
    do_xcom_push=False
)

각각 push와 pull을 하는 task입니다.

 

push하는 task에서 bash_command 쪽 파라미터에 템플릿 문법을 쓴 것을 볼 수가 있는데요, Python Operator를 쓸 때는 템플릿 문법을 쓰지 않고, kwargs에서 ti 객체를 꺼내왔었습니다. 마찬가지로, 템플릿 문법에서도 ti 객체를 이용할 수가 있습니다. ti.xcom_push처럼 xcom_push 함수를 그대로 쓰고, key, value값을 Python Operator에서처럼 적절히 넣어주면 됩니다. 

그런데, template 문법을 기준으로 위에 echo가 2개 있고, 아래에 1개로 총 3개의 echo 커맨드가 있습니다. bash_command에서는 echo처럼 출력하는 문장들이 모두 리턴값으로 간주가 됩니다. 마치 리턴을 3번한 것 처럼 보일 수 있는 셈이죠. 하지만, 리턴은 오직 1번만 할 수 있으므로 결국 마지막 출력 문장인 "echo COMPLETE"가 리턴값으로 간주가 됩니다.

Python Operator를 쓸 땐, 리턴하고 주는 값이 "return_value"라고 하는 키 값으로 그 value가 저장이 됐습니다. 마찬가지로 bash_command에서는 어떤 문장을 출력을 하면, "retrun_value"라고 하는 키에 마지막 출력문이 자동으로 저장되겠습니다.

 

pull하는 task에서는 지금 'env'에다가 템플릿 문법을 작성했는데요, 'env'는 key, value 형태로 작성한다고 했습니다. 따라서, "PUSHED_VALUE"라는 key에 value를 넣어주는데, key값이 'bash_pushed'인 value를 가져오겠다는 의미입니다. 즉, 'first_bash_message'라는 값이 "PUSHED_VALUE"에 저장이 되는 셈이겠죠.마찬가지로, "RETURN_VALUE"라는 key값에 value를 넣어주는데, task_ids값이 'bash_push'인 value를 가져오겠다는 의미로, task_id값이 'bash_push'인 task의 "return_value"인 "COMPLETE"를 가져오게 됩니다.마지막으로 do_xcom_push를 False로 줬는데, 이 값은 뭘까요?이 값은 bash_command에서 출력되는 값은 자동으로 return값으로 간주가 되고, Xcom에 push가 되어 저장이 된다고 했는데, 그 동작을 하지 말라는 뜻입니다. 즉, Xcom에 올리지 말라는 의미가 됩니다. 디폴트는 True이기 때문에, bash_command 출력값이 Xcom에 올라가는 것을 원치않는다면, do_xcom_push=False 라는 코드를 적어줘야 합니다.즉, 여기서는 "echo $PUSHED_VALUE && echo $RETURN_VALUE"값이 xcom에 들어가지 않겠죠.


이제, 위의 내용을 기반으로 실습을 한 번 해보겠습니다.dags 폴더에 dags_bash_with_xcom.py라는 파일을 생성하고 아래와 같이 코드를 작성합니다.

from airflow import DAG
import pendulum
import datetime
from airflow.operators.bash import BashOperator

with DAG(
    dag_id = "dags_bash_with_xcom",
    schedule="10 0 * * *",
    start_date=pendulum.datetime(2023, 8, 1, tz="UTC"),
    catchup=False
) as dag:
    
    bash_push = BashOperator(
        task_id ="bash_push",
        bash_command="echo START && "
                     "echo XCOM_PUSHED"
                     "{{ ti.xcom_push(key='bash_pushed',value='first_bash_message')}} && "
                     "echo COMPLETE"
    )

    bash_pull = BashOperator(
        task_id='bash_pull',
        env={'PUSHED_VALUE':"{{ ti.xcom_pull(key='bash_pushed') }}",
             'RETURN_VALUE':"{{ ti.xcom_pull(task_id='bash_push') }}"},
        bash_command="echo $PUSHED_VALUE && echo $RETURN_VALUE",
        do_xcom_push=False
    )

    bash_push >> bash_pull

이제, airflow에서 어떻게 나왔는지 보도록 하겠습니다.

 

먼저, push task에서 key, value값이 잘 매칭 됐는지 보면, 아래와 같습니다.

bash_push에서의 key, value값들
return value에는 마지막 출력 값인 "COMPLETE"가 잘 저장됐고, xcom_push로 직접 'bash_pushed'라는 키값에 'first_bash_message'라는 value값이 잘 저장된 것을 확인할 수 있습니다.

 

다음은, bash_pull xcom을 확인해볼까요?

bash_pull xcom
bash_pull에서는 bash_command의 리턴값이 자동으로 xcom에 저장되지 못하도록 do_xcom_push의 값을 False로 줬기 때문에 아무런 key, value값도 없는 것을 확인할 수 있습니다.

 

이제, bash_pull의 로그를 확인해보며 마무리하겠습니다.

bash_pull 의 출력
의도했던 대로, 'first_bash_message'와 'COMPLETE' 둘 다 잘 나온 것을 확인할 수 있었습니다.

 

이렇게, bash오퍼레이터는 python 오퍼레이터와 달리, 템플릿 문법을 쓸 수 있는 파라미터(env, bash_command)를 이용해서 xcom을 push하고 pull을 했습니다. python 오퍼레이터 같은 경우는 kwargs에서 ti 객체를 꺼낸다음에, ti객체가 가지고 있는 xcom_push와 xcom_pull 커맨드를 직접 사용했지만, bash 오퍼레이터에서는 템플릿 문법을 작성할 수 있으니, 그냥 'ti.xcom_push'나 'ti.xcom_pull' 처럼 ti 객체를 따로 꺼내지 않고 key, value형태로 바로 작성할 수 있습니다. 그리고, bash 오퍼레이터에서는 출력되는 마지막 문장이 'return_value'값으로 간주가 되기 때문에, 여러 번 echo를 찍었을 때는 마지막으로 찍은 echo문장이 리턴값으로 저장되는 것도 잊지 마시길 바랍니다!

 

이상으로, bash_operator를 사용해서 xcom에 push하고 데이터를 pull 하는 방법을 알아봤습니다.

 

 

 

 

반응형