컴퓨터 공부/💿 Airflow

[Airflow] Python Operator에 op_kwargs로 변수 할당하기

letzgorats 2023. 8. 25. 10:39

먼저 Python 오퍼레이터의 op_kwargs 파라미터를 이해해봅시다. 

 

CASE 1) 함수에 일반 변수만 있을 경우

def register(name,gender):
    print(f'이름은 {name}이고 성별은 {gender}입니다')

파이썬 오퍼레이터로 작성해본다면, 아래와 같이 작성할 수 있습니다.

python_task = PythonOperator(
    task_id = 'python_task',
    python_callable=register,
    op_kwargs={'name':'allu','gender':'male'} # 딕셔너리로 작성!
)

 

CASE 2) 함수에 일반 변수 + **kwargs 도 있을 경우

def register(name,gender, **kwargs):
    print(name)
    print(gender)   
    print(kwargs)	 #{'country':'korea','city':'seoul'} 로 출력됩니다.

파이썬 오퍼레이터로 작성해본다면, 아래와 같이 작성할 수 있습니다.

python_task = PythonOperator(
    task_id = 'python_task',
    python_callable=register,
    # 앞선 두 인자는 name,gender에 각각 할당되고, 뒤에 두 인자가 *args에 할당됩니다.
    op_kwargs={'name':'allu','gender':'male','country':'korea','city':'seoul'}
)

 

CASE 3) 함수에 **kwargs 변수만 있을 경우

def register(**kwargs):
    name = kwargs['name'] or ''
    gender = kwargs['gender'] or ''
    country = kwargs['country'] or ''
    city = kwargs['city'] or ''
    print(f'name은 {name}이고, 성별은 {gender}이고, 국가는 {korea}이고, 도시는 {seoul}입니다.')

파이썬 오퍼레이터로 작성해본다면, 아래와 같이 작성할 수 있습니다.

python_task = PythonOperator(
    task_id = 'python_task',
    python_callable=register,
    # 네 개의 인자 모두 **kwargs에 할당됩니다.
    op_kwargs={'name':'allu','gender':'male','country':'korea','city':'seoul'}
)

 

CASE 4) 일반 파라미터 + *args + **kwargs가 모두 있는 경우에도 변수 전달 가능!

def register(name, gender, *args, **kwargs):
    print(name)
    print(gender) 
    print(args)
    print(kwargs)

파이썬 오퍼레이터로 작성해본다면, 아래와 같이 작성할 수 있습니다.

python_task = PythonOperator(
    task_id = 'python_task_2',
    python_callable=register,
    # 앞선 두 인자는 name,gender에 각각 할당되고, 뒤에 두 인자가 *args에 할당됩니다.
    op_args = ['allu','male','korea','seoul'],
    # 모든 인자가 **kwargs에 할당됩니다.
    op_kwargs={'phone':'010','email':'hockey9322@naver.com'}
)

 

CASE 4 에 대해 실습을 바로 해보도록 하죠!

plugins폴더 아래의 common폴더에 만들었던, common_func.py 파일에 아래와 같은 함수를 만들어줬습니다.

def register2(name,gender,*args,**kwargs):
    print(f'이름:{name}')
    print(f'성별:{gender}')
    print(f'기타옵션들:{args}')
    email = kwargs['email'] or None
    phone = kwargs['phone'] or None
    if email:
        print(email)
    if phone:
        print(phone)

그리고 dags_python_with_op_kwargs.py 파일을 생성해 다음과 같이 작성해줍니다.

from airflow import DAG
import pendulum
import datetime
from airflow.operators.python import PythonOperator
from common.common_func import register2
with DAG(
    dag_id="dags_python_with_op_kwargs",
    schedule="30 6 * * *",
    start_date=pendulum.datetime(2023, 8, 1, tz="UTC"),
    catchup=False
) as dag:
    
    register2_t1 = PythonOperator(
        task_id = "register2_t1",
        python_callable=register2,
        op_args = ['allu','male','korea','seoul'],
        op_kwargs= {'email':'hockey9322@naver.com','phone':'010'}
    )

    register2_t1

그럼 이제, airflow에 들어가서 제대로 동작해서 원하는 값이 나왔는지 확인해봅시다.

'allu'와 'male'을 제외한 다른 인자는 args로 들어갔고 나머지 kwargs에 해당된 인자값들이 출력된 것을 확인할 수 있습니다.

 

이상으로 파이썬 오퍼레이터를 쓸 때, 우리가 실행시키려고 하는 함수에다가 인자를 넘겨주는 방법으로 op_kwargs 라는 파라미터를 이용했습니다.

op_kwargs에 넘기고 싶은 파라미터를 딕셔너리 형태로 작성을 하게 되면, 실제 함수로 넘어갈 때, 명시적으로 선언된 변수 개수와 *args가 먼저 캡쳐가 되고,  나머지 인자들이 **kwargs에 캡쳐가 되는 원리입니다.

 

이제부터 *args와 **kwargs 를 둘 다 이용해서 인자를 전달하는 방법에 대해 익숙해졌겠죠!

반응형