python operator 7

[Airflow] Python & email 오퍼레이터간 Xcom 사용

이번에는 Xcom을 이용해서 email을 전송하는 실습을 해볼까 합니다. Pyython Operator의 결과값을 이용해서 Email 을 전송하는데, 중간에 Xcom을 사용해보려고 합니다. 먼저 (Python → Email 오퍼레이터 Xcom 전달)을 하기 위해서 Email 오퍼레이터는 어떤 파라미터에 Template를 쓸 수 있는지 알아봐야 합니다. 공식문서를 참고해보면, 'to', 'subject', 'html_content', 'files' 파라미터가 템플릿 문법을 적용할 수 있는 변수들입니다. 이 중에서 실습은 'subject'와 'html_content' 의 두 가지 필드에 템플릿 문법을 사용해보겠습니다. @task(task_id='find_dog_task') def dog_find(**kwar..

[Airflow] Python & Bash 오퍼레이터 with Xcom

python 오퍼레이타에서 xcom을 사용하는 방법과 bash 오퍼레이터에서 xcom을 사용하는 방법을 배웠으니, 이제는 두 오퍼레이터를 혼합해서 Xcom 데이터를 구하는 방법을 알아보겠습니다. 1. Python → Bash 오퍼레이터 Xcom 전달 코드를 먼저 살펴보겠습니다. @task(task_id='python_push') def python_push_xcom(): result_dict = {'status':'MyLove','data':['Allu','Arsenal','Myself'],'options_cnt':100} return result_dict bash_pull = BashOperator( task_id = 'bash_pull', env = { 'STATUS':'{{ti.xcom_pull(t..

[Airflow] Python Operator with macros

BashOperator에서도 macro변수를 어떻게 쓸 수 있는지 알아본만큼, 이번 시간에는 PythonOperator에서 macro를 사용하는 방법을 알아보겠습니다. 1. 먼저 PythonOperator에서는 어떤 파라미터가 Template 변수를 지원할까요? 공식문서를 살펴보면, 'template_dict', 'op_args', 'op_kwargs' 가 템플릿을 지원합니다. 저번에는 op_kwargs 를 이용해서 template 변수를 써봤으므로, 이번에는 templates_dict 를 써서 macro변수를 써보겠습니다. Python 오퍼레이터를 사용해 macro변수를 써보는 코드를 작성해보겠습니다. @task(task_id = 'task_using_macros', templates_dict = {'..

[Airflow] Python Operator에서 Jinja 템플릿 사용하기

저번에는 Bash Operator를 Jinja 템플릿을 사용했다면, 이번에는 Python Operator에서 사용해봅시다. 1. Python 오퍼레이터에서 with Template Python 오퍼레이터는 어떤 파라미터에 Template을 쓸 수 있을까요? 공식문서를 살펴보면 아래와 같습니다. python_callable op_kwargs op_args template_dict template_exts show_return_value_in_logs op_kwargs 와 op_args, template_dict 세 개의 파라미터가 template을 쓸 수 있습니다. 바로 한 번 실습을 해보도록 하겠습니다. 이 중에서, op_kwargs를 이용해서 jinja 템플릿을 써보도록 하겠습니다. (※ 파이썬 오퍼레..

[Airflow] Python Operator에 op_kwargs로 변수 할당하기

먼저 Python 오퍼레이터의 op_kwargs 파라미터를 이해해봅시다. CASE 1) 함수에 일반 변수만 있을 경우 def register(name,gender): print(f'이름은 {name}이고 성별은 {gender}입니다') 파이썬 오퍼레이터로 작성해본다면, 아래와 같이 작성할 수 있습니다. python_task = PythonOperator( task_id = 'python_task', python_callable=register, op_kwargs={'name':'allu','gender':'male'} # 딕셔너리로 작성! ) CASE 2) 함수에 일반 변수 + **kwargs 도 있을 경우 def register(name,gender, **kwargs): print(name) print..

반응형