컴퓨터 공부/💿 Airflow

[Airflow] Python Operator에서 Xcom 사용

letzgorats 2023. 8. 28. 17:49

1. Xcom이란 무엇일까요? (=Cross Communication)

  • Airflow DAG 안 Task 간 데이터 공유를 위해 사용되는 기술입니다.
    • (ex) Task1의 수행 중 내용이나 결과를 Task2에서 사용 또는 입력으로 주고 싶은 경우가 있을 수 있겠죠?
  • 주로 작은 규모의 데이터 공유를 위해 사용합니다.
    • (Xcom 내용은 메타 DB의 xcom 테이블에 값이 저장됩니다.)
    • 1GB 이상의 대용량 데이터 공유를 위해서는 외부 솔루션 연동이 필요합니다. (AWS S3, HDFS 등)

2. Python 오퍼레이터에서 Xcom 사용하기

크게 두 가지 방법으로 Xcom 사용이 가능한데요, 한 번 살펴보시죠.

 

1) **kwargs 에 존재하는 ti (task_instance) 객체 활용

예시 코드를 먼저 보겠습니다.

@task(task_id='python_xcom_push_task')
def xcom_push(**kwargs):
    ti = kwargs('ti')
    ti.xcom_push(key="result1", value="value_1")
    ti.xcom_push(key="result2", value=[1,2,3])

template 변수에서 task_instance 라는 객체를 얻을 수 있으며 task_instance객체가 가진 xom_push 메서드를 활용할 수 있습니다. 즉, kwargs 에서 'ti' 객체를 받아온 후, 미리 만들어진 함수인 xcom_push 메서드를 사용해서 xcom에 데이터를 올릴 수 있습니다. 이 때, 데이터를 올릴 때는 key, value값 형태로 값을 작성해서 올려주면 됩니다.

 

반대로, xcom에서 데이터를 꺼내올 수도 있는데요, 코드를 살펴보겠습니다.

@task(task_id='python_xcom_pull_task')
def xcom_pull(**kwargs):
    ti = kwargs('ti')
    value_key1 = ti.xcom_pull(key="result1")
    value_key2 = ti.xcom_pull(key="result2", task_ids='python_xcom_push_task')
    
    print(value_key1)
    print(value_key2)

똑같이 'ti'라는 객체를 받아오고, xcom_pull 이라는 미리 만들어진 함수를 통해 xcom에서 데이터를 꺼내오면 됩니다.

 

여기서, value_key1은 앞서서 xcom에 데이터를 올렸던 부분인 ti.xcom_push(key="result1", value="value_1") 로 인해서 "value1"이라는 값을 얻게 될 것입니다. value_key2에는 "[1,2,3]"값이 나오겠죠?

 

그런데, 여기서 xcom으로부터 데이터를 pull 할 때,  "task_ids" 인자를 줘도 괜찮고 안 줘도 괜찮지만, 왜 굳이 적을까 싶은 의문이 들 수 있습니다. 

value_key2 = ti.xcom_pull(key="result2", task_ids='python_xcom_push_task') 의 의미는 task id값이 'python_xcom_push_task'인 task를 찾아서 그 중에 key값이 "result2"인 value를 가져오라는 의미입니다.

 

예를 한번 들어보며 task_ids를 쓰는 이유를 설명해보겠습니다.

5개의 task가 있는 dag이 있다고 가정해봅시다
1번 task : push(key='result1')
2번 task : push(key='result1')
...
4번 task : pull(key='result1') 
...
여기서, 4번 task에서 'result1'을 pull 할 때, 어떤 value가 꺼내져올까요?
task ids 를 지정해주지 않았기에, 가장 최근에 'result1'이라는 키값으로  push된 2번 task에서의 'result1'키값에 대한 value가 꺼내져오게 됩니다. ( task_ids 인자가 없다면, 해당 key값과 관련한 가장 최근에 push된 task 꺼내옵니다.)
만약, 1번 task에서의 key값이 'result1'에 대한 value값을 원한다면, 1번 task의 id값을 task_ids 인자로 주면 됩니다.

Xcom을 사용하는 다른 방법도 살펴보도록 하겠습니다.

2-1) 파이썬 함수의 return 값 활용(1안)

예시 코드를 보겠습니다.

@task(task_id='xcom_push_by_return')
def xcom_push_by_return(**kwargs):
    transaction_value = 'status Good'
    return transaction_value
    
@task(task_id='xcom_pull_by_return')
def xcom_pull_by_return(status,**kwargs):
    print(status)

xcom_pull_by_return(xcom_push_by_return())

'xcom_push_by_return' 함수의 출력값은 'status Good' 이라는 스트링입니다. 이 스트링 객체 자체를 'xcom_pull_by_return'함수의 status인자에 넣는다는 의미인데요, 여기서 눈 여겨 봐야 할 점이 2가지가 있습니다.

  • 어떤 함수의 출력값이 어떤 함수의 인자(입력값)로 전달되는 것은 프로그래밍 문법에서 어려운 문법은 아닙니다만, 이 함수는 단순한 함수가 아니라, task decorator가 적용된 함수라는 점입니다. 즉, airflow의 task 성격을 갖고 있는 객체인데, 그 객체들도 "xcom_pull_by_return(xcom_push_by_return())" 처럼 작성할 수 있다라는 것에 의미가 있습니다. 이렇게 작성하는 것만으로 airflow는 task의 실행 순서를 결정할 수 있습니다. 인자로 들어가는 함수(xcom_push_by_return)가 먼저 돌게 되고, 함수의 출력값으로 이용되는 바깥 task(xcom_pull_by_return)가 그 이후에 돌게 되는 것이죠.  즉, Task 데커레이터 사용시 함수 입력/출력 관계만으로 Task flow가 정의된다는 뜻입니다.

두 번째로는

  • Xcom 사용하는 방법을 알아보겠다고 했는데, 위 코드에서는 xcom_push 나 xcom_pull 하는 코드가 없습니다. 그 이유는 airflow는 return을 하면 자동으로 Xcom에 저장을 하게 되기 때문입니다. "xcom_pull_by_return(xcom_push_by_return())" 에서 'xcom_pull_by_return'은 'xcom_push_by_return'의 반환값을 인자로 가지고 있습니다. 즉, xcom에 올라간 리턴값을 알아서 꺼내서 찾아온다는 의미입니다. 파이썬 데커레이터를 쓰면 다 알아서 적용이 되는 셈이죠.

2-2) 파이썬 함수의 return 값 활용(2안)

다음은 return값을 활용하는 방법의 2안입니다.

@task(task_id='xcom_push_by_return')
def xcom_push_by_return(**kwargs):
    transaction_value = 'status Good'
    # return한 값은 자동으로 xcom에 key='return_value', task_ids=task_id로 저장됩니다.
    return transaction_value   
    
@task(task_id='xcom_pull_by_return')
def xcom_pull_by_return(**kwargs):
    ti = kwargs['ti']
    # return한 값을 꺼낼 때는 key를 명시하지 않아도 됩니다.(자동으로 key=return_value)를 찾습니다.
    # return한 Task가 여러개 있을 때는 task_ids를 명시
    pull_value = ti.xcom_pull(key='return_value', task_ids='xcom_push_by_return')
    print(pull value)

xcom_push_by_return() >> xcom_pull_by_return()

xcom_push를 안해줘도, return 값은 자동으로 Xcom에 저장되기 때문에, xcom_push_by_return값은 건들이지 않았습니다. 하지만, xcom_pull_by_return함수에서 ti 객체를 사용해 key, value값을 줌으로써 원하는 데이터를 가져오려고 하는데요, 이 때 task_ids 인자에 'xcom_push_by_return' 값을 주고, key값에는 'return_value'를 줬습니다. return한 값은 자동으로 'return_value'라는 key값으로 Xcom에 저장되기 때문에, 키값이 return_value인 데이터를 꺼내올 때는 반드시, task_ids 인자를 줌으로써 어떤 task decorator에서의 반환값인지를 명시해줄 필요가 있습니다!

함수 간의 관계도 함수의 출력을 함수의 입력으로 넣어주는 방식이 아닌 정석적인 ">>" 를 사용해서 task 선후행 관계를 표현합니다.


그러면, 바로 ti (task_instance) 객체를 활용하는 방법과 파이썬 return함수를 활용하는 방법 모두를 실습해보도록 하죠.

먼저, dags폴더에 dags_python_with_xcom_eg1.py를 생성해 아래와 같이 작성합니다.

from airflow import DAG
import datetime
import pendulum
from airflow.decorators import task

with DAG(
    dag_id="dags_python_with_xcom_eg1",
    schedule="30 6 * * *",
    start_date=pendulum.datetime(2023, 8, 1, tz="UTC"),
    catchup=False
) as dag:
    
    @task(task_id='python_xcom_push_task1')
    def xcom_push1(**kwargs):
        ti = kwargs['ti']
        ti.xcom_push(key='result1',value='value_1')
        ti.xcom_push(key='result2',value=['알루','코딩','Allu','Coding'])

    @task(task_id='python_xcom_push_task2')
    def xcom_push2(**kwargs):
        ti = kwargs['ti']
        ti.xcom_push(key='result1', value='value_2')
        ti.xcom_push(key='result2', value=['정엽','letzgorats','Owen','Coding'])
    
    @task(task_id='python_xcom_pull_task')
    def xcom_pull(**kwargs):
        ti = kwargs['ti']
        value1 = ti.xcom_pull(key='result1')
        value2_of_task1 = ti.xcom_pull(key='result2', task_ids='python_xcom_push_task1')
        value2_of_task2 = ti.xcom_pull(key='result2', task_ids='python_xcom_push_task2')
        print(value1)    # value_2가 출력되어야 합니다.
        print(value2_of_task1)   # ['알루','코딩','Allu','Coding'] 가 출력되어야 합니다.
        print(value2_of_task2)   # ['정엽','letzgorats','Owen','Coding'] 가 출력되어야 합니다.

    xcom_push1 >> xcom_push2 >> xcom_pull

ti 객체를 사용해서 xcom_push와 xcom_pull 을 사용해서 원하는 값을 처리합니다.

해당 처리가 알맞게 처리됐는지 airflow를 통해 알아봅시다.

task 순서

각 task의 Xcom 탭을 확인해볼까요?

python_xcom_push_task1에서 각 key, value값이 코드에 적은 대로 잘 할당된 것을 볼 수 있습니다.

python_xcom_push_task1의 key,value

python_xcom_push_task2에서도 각 key, value값이 코드에 적은 대로 잘 할당된 것을 볼 수 있습니다.

python_xcom_push_task2의 key,value

이제는 python_xcom_pull_task의 로그를 확인해서 제대로 출력이 됐는지 확인해봅시다.

python_xcom_pull_task

코드의 pull 함수에서 아래와 같이 작성되었는데요,

더보기

 print(value1)    # value_2가 출력되어야 합니다.
 print(value2_of_task1)   # ['알루','코딩','Allu','Coding'] 가 출력되어야 합니다.
 print(value2_of_task2)   # ['정엽','letzgorats','Owen','Coding'] 가 출력되어야 합니다.

코드에서 의도한 대로, 순서대로 값이 잘 나온 것을 확인할 수 있습니다.


이번에는 dags폴더에 dags_python_with_xcom_eg2.py를 생성해 아래와 같이 작성합니다.

from airflow import DAG
import datetime
import pendulum
from airflow.decorators import task

with DAG(
    dag_id="dags_python_with_xcom_eg2",
    schedule="30 6 * * *",
    start_date=pendulum.datetime(2023, 8, 1, tz="UTC"),
    catchup=False
) as dag:
    
    @task(task_id='python_xcom_push_by_return')
    def xcom_push_result(**kwargs):
        return "ALLU CODING"

    @task(task_id='python_xcom_pull_1')
    def xcom_pull_1(**kwargs):
        ti = kwargs['ti']
        # task_ids 인자만 주면, key값의 디폴트는 return_value 입니다.
        value1 = ti.xcom_pull(task_ids='python_xcom_push_by_return')
        print('xcom_pull 메서드로 집접 찾은 리턴 값:', value1)
    
    @task(task_id='python_xcom_pull_2')
    def xcom_pull_2(status,**kwargs):
        print('필수 입력값으로 받은 값:' + status)

    
    # xcom_push_result 의 리턴값을 xcom_pull_2 의 status인자로 줍니다.
    python_xcom_push_by_return = xcom_push_result()
    xcom_pull_2(python_xcom_push_by_return) 

    # 함수 2개를 >> 를 이용한 task 선후행관계 표현
    python_xcom_push_by_return >> xcom_pull_1()

파이썬의 return 함수를 이용한 Xcom 사용 코드입니다.  xcom_push_result 가 먼저 돌고 xcom_pull_2와 xcom_pull1이 도는 것을 의미합니다. 정말 이렇게 task가 도는지 airflow에서 그래프 확인을 한 번 해보록 하죠.

task 순서

실제로, 의도한 대로 task 관계가 그려졌습니다.

python_xcom_push_by_return 의 xcom탭을 확인해보면, xcom_push 함수를 사용하지 않았는데도, "return_value"라는 키값으로 함수의 반환 값이 value로 저장된 것을 볼 수 있습니다.

 

다음은, python_xcom_pull1 의 로그를 확인해보겠습니다.

'ALLU CODING' 이라는 push함수에서의 리턴 값을 xcom_pull 함수로 잘 가져온 것을 확인 할 수 있습니다.

이 때, task_ids 값만 인자에 넣어줬고, key 값을 주지 않았는데도 알아서 return_value를 가져온 셈입니다.

 

다음은, python_xcom_pull2 의 로그를 확인해보겠습니다.

'ALLU CODING'이라는 push함수의 반환 값을 pull2함수에서의 status인자로 그대로 전달해줬는데, pull2함수의 리턴값으로 잘 나온것을 볼 수 있습니다.

이처럼, airflow에서는 일반적인 프로그래밍에서, 함수의 출력값을 어떤 다른 함수의 입력값으로 그대로 전달하는 것처럼 task flow를 작성해도 알아서 xcom을 통해서 값을 찾아오는 것을 알 수 있습니다.


이상으로, 정리해보자면 다음과 같습니다.

Xcom push 방법 Xcom pull 방법
ti.xcom_push 명시적 사용 ti.xcom_pull 명시적 사용
함수 return 
return 값을 input으로 사용

Xcom push 하는 방법은 크게 2가지가 있습니다. ti 객체가 가지고 있는 xcom_push 함수를 사용하는 방법과 Python Operator를 쓸 때, 그냥 함수 리턴을 해주면 알아서 Xcom에 'return_value'의 키값을 가진 value값으로 저장이 됩니다.

 

Xcom pull 하는 방법에 대해서도 알아봤는데, 마찬가지로 ti 객체가 가지고 있는 xcom_pull 함수를 사용해서 데이터를 가져오는 방법과 이전 task가 어떤 값을 리턴을 했다면 그 task의 출력값을 task의 입력값으로 받아서 넣어주면 알아서 Xcom에서 데이터를 꺼내옵니다.

 

추가로, task decorator를 이용하면 마치 함수의 입력값과 함수의 출력값의 관계를 정의해주는 것처럼 task flow를 작성해도 airflow는 알아서 task들간의 관계를 그리게 됩니다.

 

오늘 포스팅 내용은 굉장히 유용하게 잘 쓰이니까 잘 기억하도록 합시다!

반응형