컴퓨터 공부/💿 Airflow

[Airflow] Python & Bash 오퍼레이터 with Xcom

letzgorats 2023. 8. 29. 06:38

python 오퍼레이타에서 xcom을 사용하는 방법과 bash 오퍼레이터에서 xcom을 사용하는 방법을 배웠으니, 이제는 두 오퍼레이터를 혼합해서 Xcom 데이터를 구하는 방법을 알아보겠습니다.

 

1. Python → Bash 오퍼레이터 Xcom 전달

코드를 먼저 살펴보겠습니다.

@task(task_id='python_push')
def python_push_xcom():
    result_dict = {'status':'MyLove','data':['Allu','Arsenal','Myself'],'options_cnt':100}
    return result_dict
    
bash_pull = BashOperator(
    task_id = 'bash_pull',
    env = {
        'STATUS':'{{ti.xcom_pull(task_ids="python_push")["status"]}}',
        'DATA':'{{ti.xcom_pull(task_ids="python_push")["data"]}}',
        'OPTIONS_CNT':'{{ti.xcom_pull(task_ids="python_push")["oprionts_cnt"]}}'
    },
    bash_command = 'echo $STATUS && echo $DATA && echo $OPTIONS_CNT'
)
python_push_xcom() >> bash_pull

PythonOperator에서 함수의 리턴 값은 그대로 xcom의 "result_value" key값의 value로 저장됩니다.

BashOperator에서 템플릿 문법을 통해 ti.xcom_pull을 통해 task_ids를 명시해주고 원하는 키값에 해당하는 value값을 가져오는 코드입니다.

bash_command에서 STATUS에는 'MyLove'라는 값이 나오겠고, DATA에는 ['Allu','Arsenal','Myself'], OPTION_CNT에는 100이라는 값이 꺼내져올 것입니다.

 

2. Bash → Python 오퍼레이터 Xcom 전달

코드를 살펴보겠습니다.

bash_push = BashOperator(
    task_id = 'bash_push',
    bash_command = 'echo PUSH_START '
                   '{{ti.xcom_push(key="bash_pushed",value=200)}} && '
                   'echo PUSH_COMPLETE'
)

@task(task_id='python_pull')
def python_pull_xcom(**kwargs):
    ti = kwargs['ti']
    status_value = ti.xcom_pull(key='bash_pushed')
    return_value = ti.xcom_pull(task_ids='bash_push')
    print('status_value:' + str(status_value))
    print('return_value:' + return_value)

bash_push >> python_pull_xcom()

이번에는 bash operator로 xcom에 push를 했습니다. key값이 "bash_pushed"인 value에는 200이라는 숫자를 push했고, 'bash_push' 자체의 task의 "return_value"는 PUSH_COMPLETE 라는 스트링을 저장했습니다.

pull 하는 과정에서는 python operator를 사용했는데요, kwargs에 있는 'ti'객체를 가져와서 xcom_pull 함수를 통해 데이터를 꺼내왔습니다. 

'status_value'에는 200이 담기겠고, 'return value'에는 PUSH_COMPLETE가 담길 것입니다.


실습을 바로 해볼까요?

dags폴더에 dags_bash_python_with_xcom.py 파일을 생성해서 아래와 같이 코드를 작성했습니다.

from airflow import DAG
import pendulum
from airflow.decorators import task
from airflow.operators.bash import BashOperator

with DAG(
    dag_id = "dags_bash_python_with_xcom",
    schedule="30 9 * * *",
    start_date=pendulum.datetime(2023, 8, 1, tz="UTC"),
    catchup=False
) as dag:
    
    @task(task_id='python_push')
    def python_push_xcom():
        result_dict = {'status':'MyLove','data':['Allu','Arsenal','Myself'],'options_cnt':100}
        return result_dict
    
    bash_pull = BashOperator(
        task_id = 'bash_pull',
        env = {
            'STATUS':'{{ti.xcom_pull(task_ids="python_push")["status"]}}',
            'DATA':'{{ti.xcom_pull(task_ids="python_push")["data"]}}',
            'OPTIONS_CNT':'{{ti.xcom_pull(task_ids="python_push")["oprionts_cnt"]}}'
        },
        bash_command = 'echo $STATUS && echo $DATA && echo $OPTIONS_CNT'
    )
    python_push_xcom() >> bash_pull  # python operator에서 push -> bash operator에서 pull

    bash_push = BashOperator(
    task_id = 'bash_push',
    bash_command = 'echo PUSH_START '
                   '{{ti.xcom_push(key="bash_pushed",value=200)}} && '
                   'echo PUSH_COMPLETE'
    )

    @task(task_id='python_pull')
    def python_pull_xcom(**kwargs):
        ti = kwargs['ti']
        status_value = ti.xcom_pull(key='bash_pushed')
        return_value = ti.xcom_pull(task_ids='bash_push')
        print('status_value:' + str(status_value))
        print('return_value:' + return_value)

    bash_push >> python_pull_xcom()  # bash operator에서 push -> python operator에서 pull

제대로 값들이 저장되고 꺼내져왔는지 airflow를 통해 확인해보도록 하겠습니다.

task 관계

[python_push → bash_pull]   와 [bash_push → python_push] 가 잘 된 것을 볼 수 있습니다.

각 task의 Xcom과 결과를 살펴보자면, 아래와 같습니다.

python_push Xcom
bash_pull 결과


bash_push Xcom
python_pull 결과

 

결과가 모두 잘 출력됐습니다.

 

이상으로 Python오퍼레이터와 Bash 오퍼레이터 간의 Xcom 값을 pull & push 하는 방법에 대해 알아봤습니다.

 

 

반응형