python operator 7

[Airflow] Python & email 오퍼레이터간 Xcom 사용

이번에는 Xcom을 이용해서 email을 전송하는 실습을 해볼까 합니다. Pyython Operator의 결과값을 이용해서 Email 을 전송하는데, 중간에 Xcom을 사용해보려고 합니다. 먼저 (Python → Email 오퍼레이터 Xcom 전달)을 하기 위해서 Email 오퍼레이터는 어떤 파라미터에 Template를 쓸 수 있는지 알아봐야 합니다. 공식문서를 참고해보면, 'to', 'subject', 'html_content', 'files' 파라미터가 템플릿 문법을 적용할 수 있는 변수들입니다. 이 중에서 실습은 'subject'와 'html_content' 의 두 가지 필드에 템플릿 문법을 사용해보겠습니다. @task(task_id='find_dog_task') def dog_find(**kwar..

[Airflow] Python & Bash 오퍼레이터 with Xcom

python 오퍼레이타에서 xcom을 사용하는 방법과 bash 오퍼레이터에서 xcom을 사용하는 방법을 배웠으니, 이제는 두 오퍼레이터를 혼합해서 Xcom 데이터를 구하는 방법을 알아보겠습니다. 1. Python → Bash 오퍼레이터 Xcom 전달 코드를 먼저 살펴보겠습니다. @task(task_id='python_push') def python_push_xcom(): result_dict = {'status':'MyLove','data':['Allu','Arsenal','Myself'],'options_cnt':100} return result_dict bash_pull = BashOperator( task_id = 'bash_pull', env = { 'STATUS':'{{ti.xcom_pull(t..

[Airflow] Python Operator with macros

BashOperator에서도 macro변수를 어떻게 쓸 수 있는지 알아본만큼, 이번 시간에는 PythonOperator에서 macro를 사용하는 방법을 알아보겠습니다. 1. 먼저 PythonOperator에서는 어떤 파라미터가 Template 변수를 지원할까요? 공식문서를 살펴보면, 'template_dict', 'op_args', 'op_kwargs' 가 템플릿을 지원합니다. 저번에는 op_kwargs 를 이용해서 template 변수를 써봤으므로, 이번에는 templates_dict 를 써서 macro변수를 써보겠습니다. Python 오퍼레이터를 사용해 macro변수를 써보는 코드를 작성해보겠습니다. @task(task_id = 'task_using_macros', templates_dict = {'..

[Airflow] Python Operator에서 Jinja 템플릿 사용하기

저번에는 Bash Operator를 Jinja 템플릿을 사용했다면, 이번에는 Python Operator에서 사용해봅시다. 1. Python 오퍼레이터에서 with Template Python 오퍼레이터는 어떤 파라미터에 Template을 쓸 수 있을까요? 공식문서를 살펴보면 아래와 같습니다. python_callable op_kwargs op_args template_dict template_exts show_return_value_in_logs op_kwargs 와 op_args, template_dict 세 개의 파라미터가 template을 쓸 수 있습니다. 바로 한 번 실습을 해보도록 하겠습니다. 이 중에서, op_kwargs를 이용해서 jinja 템플릿을 써보도록 하겠습니다. (※ 파이썬 오퍼레..

[Airflow] Python Operator에 op_kwargs로 변수 할당하기

먼저 Python 오퍼레이터의 op_kwargs 파라미터를 이해해봅시다. CASE 1) 함수에 일반 변수만 있을 경우 def register(name,gender): print(f'이름은 {name}이고 성별은 {gender}입니다') 파이썬 오퍼레이터로 작성해본다면, 아래와 같이 작성할 수 있습니다. python_task = PythonOperator( task_id = 'python_task', python_callable=register, op_kwargs={'name':'allu','gender':'male'} # 딕셔너리로 작성! ) CASE 2) 함수에 일반 변수 + **kwargs 도 있을 경우 def register(name,gender, **kwargs): print(name) print..

[Airflow] Python Operator에 op_args로 변수 할당하기

먼저 Python 오퍼레이터의 op_args 파라미터를 이해해봅시다. CASE 1) 함수에 일반 변수만 있을 경우 def register(name,gender): print(f'이름은 {name}이고 성별은 {gender}입니다') 파이썬 오퍼레이터로 작성해본다면, 아래와 같이 작성할 수 있습니다. python_task = PythonOperator( task_id = 'python_task', python_callable=register, op_args=['allu','male'] # 리스트로 작성! ) CASE 2) 함수에 일반 변수 + *args 도 있을 경우 def register(name,gender, *args): print(name) print(gender) print(args)#('kore..

[Airflow] 외부 파이썬 함수 수행하기

DAG 외부에서 함수를 만들었을 때, 그 함수를 import 해서 실행시키는 방법에 대해서 알아보겠습니다. 1) 파이썬 모듈 경로 이해하기 : dag에서 우리가 만든 외부 함수를 import 해와야 하는데, import 경로를 어떻게 작성해야 하는지 알려면, 파이썬 모듈 경로를 이해해야 합니다. 먼저 airflow의 오퍼레이터를 불러올 때는 아래와 같은 코드가 필요했습니다. from airflow.operators.python import PythonOperator : "Airflow 폴더 아래 operators 폴더 아래 python 파일 아래에서 PythonOperator 클래스를 가지고 온다"는 뜻입니다. ※ 그렇다면, 파이썬은 위 경로를 어떻게 찾을까요? : 파이썬은 sys.path 변수에서 모듈..

반응형