먼저 Python 오퍼레이터의 op_kwargs 파라미터를 이해해봅시다.
CASE 1) 함수에 일반 변수만 있을 경우
def register(name,gender):
print(f'이름은 {name}이고 성별은 {gender}입니다')
파이썬 오퍼레이터로 작성해본다면, 아래와 같이 작성할 수 있습니다.
python_task = PythonOperator(
task_id = 'python_task',
python_callable=register,
op_kwargs={'name':'allu','gender':'male'} # 딕셔너리로 작성!
)
CASE 2) 함수에 일반 변수 + **kwargs 도 있을 경우
def register(name,gender, **kwargs):
print(name)
print(gender)
print(kwargs) #{'country':'korea','city':'seoul'} 로 출력됩니다.
파이썬 오퍼레이터로 작성해본다면, 아래와 같이 작성할 수 있습니다.
python_task = PythonOperator(
task_id = 'python_task',
python_callable=register,
# 앞선 두 인자는 name,gender에 각각 할당되고, 뒤에 두 인자가 *args에 할당됩니다.
op_kwargs={'name':'allu','gender':'male','country':'korea','city':'seoul'}
)
CASE 3) 함수에 **kwargs 변수만 있을 경우
def register(**kwargs):
name = kwargs['name'] or ''
gender = kwargs['gender'] or ''
country = kwargs['country'] or ''
city = kwargs['city'] or ''
print(f'name은 {name}이고, 성별은 {gender}이고, 국가는 {korea}이고, 도시는 {seoul}입니다.')
파이썬 오퍼레이터로 작성해본다면, 아래와 같이 작성할 수 있습니다.
python_task = PythonOperator(
task_id = 'python_task',
python_callable=register,
# 네 개의 인자 모두 **kwargs에 할당됩니다.
op_kwargs={'name':'allu','gender':'male','country':'korea','city':'seoul'}
)
CASE 4) 일반 파라미터 + *args + **kwargs가 모두 있는 경우에도 변수 전달 가능!
def register(name, gender, *args, **kwargs):
print(name)
print(gender)
print(args)
print(kwargs)
파이썬 오퍼레이터로 작성해본다면, 아래와 같이 작성할 수 있습니다.
python_task = PythonOperator(
task_id = 'python_task_2',
python_callable=register,
# 앞선 두 인자는 name,gender에 각각 할당되고, 뒤에 두 인자가 *args에 할당됩니다.
op_args = ['allu','male','korea','seoul'],
# 모든 인자가 **kwargs에 할당됩니다.
op_kwargs={'phone':'010','email':'hockey9322@naver.com'}
)
CASE 4 에 대해 실습을 바로 해보도록 하죠!
plugins폴더 아래의 common폴더에 만들었던, common_func.py 파일에 아래와 같은 함수를 만들어줬습니다.
def register2(name,gender,*args,**kwargs):
print(f'이름:{name}')
print(f'성별:{gender}')
print(f'기타옵션들:{args}')
email = kwargs['email'] or None
phone = kwargs['phone'] or None
if email:
print(email)
if phone:
print(phone)
그리고 dags_python_with_op_kwargs.py 파일을 생성해 다음과 같이 작성해줍니다.
from airflow import DAG
import pendulum
import datetime
from airflow.operators.python import PythonOperator
from common.common_func import register2
with DAG(
dag_id="dags_python_with_op_kwargs",
schedule="30 6 * * *",
start_date=pendulum.datetime(2023, 8, 1, tz="UTC"),
catchup=False
) as dag:
register2_t1 = PythonOperator(
task_id = "register2_t1",
python_callable=register2,
op_args = ['allu','male','korea','seoul'],
op_kwargs= {'email':'hockey9322@naver.com','phone':'010'}
)
register2_t1
그럼 이제, airflow에 들어가서 제대로 동작해서 원하는 값이 나왔는지 확인해봅시다.
'allu'와 'male'을 제외한 다른 인자는 args로 들어갔고 나머지 kwargs에 해당된 인자값들이 출력된 것을 확인할 수 있습니다.
이상으로 파이썬 오퍼레이터를 쓸 때, 우리가 실행시키려고 하는 함수에다가 인자를 넘겨주는 방법으로 op_kwargs 라는 파라미터를 이용했습니다.
op_kwargs에 넘기고 싶은 파라미터를 딕셔너리 형태로 작성을 하게 되면, 실제 함수로 넘어갈 때, 명시적으로 선언된 변수 개수와 *args가 먼저 캡쳐가 되고, 나머지 인자들이 **kwargs에 캡쳐가 되는 원리입니다.
이제부터 *args와 **kwargs 를 둘 다 이용해서 인자를 전달하는 방법에 대해 익숙해졌겠죠!
'컴퓨터 공부 > 💿 Airflow' 카테고리의 다른 글
[Airflow] Bash 오퍼레이터 with Template (0) | 2023.08.26 |
---|---|
[Airflow] Jinja 템플릿 (0) | 2023.08.25 |
[Airflow] Python Operator에 op_args로 변수 할당하기 (0) | 2023.08.25 |
[Airflow] 파이썬 함수 파라미터 이해 (0) | 2023.08.24 |
[Airflow] @task 데코레이터 사용하기 (2) | 2023.08.24 |