operator 13

[Airflow] BranchPython 오퍼레이터로 분기처리하기

이제부터는 Task를 다룰 수 있는 고급기능에 대해 알아볼건데요, 그 중에서 먼저 Task를 분기처리할 수 있는 방법들에 대해살펴보도록 하겠습니다. Task 분기 처리하는 방법에는 크게 3가지가 있는데, 이번 포스팅에서는 BranchPython Operator에 대해 배워보겠습니다. 1. Task 분기 처리 유형 먼저, Task 분기처리는 왜 필요할까요? 예를 들어, 상위 task 1개에 하위 task 3개가 있다고 가정해봅시다. 기본적으로, 위 사진처럼 task관계가 있다면, task1이 끝나고, task2-1, task2-2, task2-3 가 모두 다 같이 수행됩니다. 그런데, 가끔은 task1의 수행결과에 따라서 task2-x 중 하나만 수행하도록 구성하고 싶은 경우가 있을 것입니다. 예를 들어,..

[Airflow] Python & email 오퍼레이터간 Xcom 사용

이번에는 Xcom을 이용해서 email을 전송하는 실습을 해볼까 합니다. Pyython Operator의 결과값을 이용해서 Email 을 전송하는데, 중간에 Xcom을 사용해보려고 합니다. 먼저 (Python → Email 오퍼레이터 Xcom 전달)을 하기 위해서 Email 오퍼레이터는 어떤 파라미터에 Template를 쓸 수 있는지 알아봐야 합니다. 공식문서를 참고해보면, 'to', 'subject', 'html_content', 'files' 파라미터가 템플릿 문법을 적용할 수 있는 변수들입니다. 이 중에서 실습은 'subject'와 'html_content' 의 두 가지 필드에 템플릿 문법을 사용해보겠습니다. @task(task_id='find_dog_task') def dog_find(**kwar..

[Airflow] Python & Bash 오퍼레이터 with Xcom

python 오퍼레이타에서 xcom을 사용하는 방법과 bash 오퍼레이터에서 xcom을 사용하는 방법을 배웠으니, 이제는 두 오퍼레이터를 혼합해서 Xcom 데이터를 구하는 방법을 알아보겠습니다. 1. Python → Bash 오퍼레이터 Xcom 전달 코드를 먼저 살펴보겠습니다. @task(task_id='python_push') def python_push_xcom(): result_dict = {'status':'MyLove','data':['Allu','Arsenal','Myself'],'options_cnt':100} return result_dict bash_pull = BashOperator( task_id = 'bash_pull', env = { 'STATUS':'{{ti.xcom_pull(t..

[Airflow] Bash Operator에서 Xcom 사용

이번에는 Bash Operator에서 Xcom 사용하는 방법을 살펴보도록 하겠습니다. 1. Bash 오퍼레이터에서 Xcom 사용하기 먼저, 공식문서에서 Bash Operator에서 쓸 수 있는 템플릿 파라미터가 뭐가 있는지 알아야 합니다. 'env'와 'bash_command' 파라미터에서 템플릿 문법을 적용시킬 수 있는데, 이를 이용하여 push/pull 을 해봅시다.먼저 예시 코드를 보겠습니다. bash_push = BashOperator( task_id = 'bash_push', bash_command = "echo START && " "echo XCOM PUSHED " "{{ti.xcom_push(key='bash_pushed',value='first_bash_message') }} && " "e..

[Airflow] Jinja 템플릿

에어플로우에서는 Jinja 템플릿을 활용하고 있기 때문에, 먼저 Jinja 템플릿에 대해 이해해보도록 하겠습니다. 1. Jinja 템플릿 문서(파일)에서 특정 양식으로 작성된 값을 런타임시 실제 값으로 치환해주는 처리 엔진입니다. 템플릿 엔진은 여러 솔루션이 존재하며 그 중 Jinja 템플릿은 파이썬 언어에서 사용하는 엔진입니다. ※ Jinja 라이브러리는 airflow를 설치할 때, 이미 설치가 됩니다. 예시 코드를 살펴봅시다. from jinja2 import Template template = Template('my name is {{name}}') new_template = template.render(name='allu') print(new_template)# my name is allu {{n..

반응형