operator 13

[Airflow] Trigger Run 오퍼레이터

이번에는 다른 DAG을 수행시킬 수 있는 'Trigger Run 오퍼레이터' 라는 것에 대해 알아보도록 하겠습니다. Airflow에서는 DAG간의 의존관계는 선-후행 관계입니다. 이를 설정할 수 있는 방법은 크게 2가지가 있는데요, 그 2개 중에 하나인, "TriggerDagRun" 오퍼레이터를 이번 포스팅에서 배워보겠습니다. 1. DAG 간 의존관계 설정 DAG 의존관계 설정 방법 (1) TriggerDagRun 오퍼레이터 오퍼레이터를 이용해서 Task를 만드는 것처럼 TriggerDagRun오퍼레이터로 Task를 만듭니다. task를 만들면서 파라미터를 줄 때, 어떤 DAG을 Trigger할 지 그 DAG의 id를 넣게 되어있습니다. 위 그림을 기준으로, task1이 선행 task고, task2, t..

[Airflow] 지원되는 오퍼레이터 보기

이번 포스팅에서는 여러가지 airflow에서의 오퍼레이터로 뭐가 있는지 다양하게 살펴보겠습니다. 1. 기본 오퍼레이터 : Airflow에서 기본적으로 제공해주는 오퍼레이터를 한 번 살펴보도록 하겠습니다. 파일 경로 오퍼레이터(클래스) 중요도 비고 airflow.models.baseoperator BaseOperator ⭐⭐⭐ - 오펄레이터를 직접 개발하고 싶은 경우, 이 클래스 상속하여 개발 (execute() 함수를 오버라이딩 하여 사용합니다.) - 아래 오퍼레이터들은 모두 이 클래스를 상속하여 개발되어 있습니다. - Airflow를 잘 쓰려면, 이 오퍼레이터 상속/개발하는 것을 자유자재로 할 줄 알아야 합니다. airflow.operators.bash BashOperator ⭐⭐⭐ - bash쉘 스크..

[Airflow] Trigger Rule

이제까지는 상위 task가 하나 있을 때, 상위 task가 하위 task로 분기하는 경우 분기하는 조건에 대해서 알아봤습니다. 이번에는 상위 task 여러개가 하나의 하위 task로 연결이 되는 구조에서 하위 task의 실행조건을 설정하는 방법(Trigger Rule)에 대해 알아보겠습니다. 1. Trigger Rule 종류 아래와 같은 task들이 있다고 해봅시다. 기본적으로는 task1, task2, task3이 모두 성공적으로 잘 끝나야 task4가 도는 구조입니다. 하지만, 이 rule을 바꾸고 싶을 경우가 있을 것입니다. 예를 들어, task1, task2, task3 중에서 어느 하나라도 정상적으로 끝나면, task4가 수행되도록 하는 rule을 만들 수도 있고, 성공여부에 상관없이 task1..

[Airflow] BaseBranchOperator로 분기처리하기

이번 포스팅에서는 Task 분기처리하는 방법 중에 마지막 방법인 BaseBranchOperator로 분기처리하는 방법에 대해 살펴보겠습니다. 코드를 바로 살펴보겠습니다. from airflow.operators.branch import BaseBranchOperator with DAG(... ) as dag: class CustomBranchOperator(BaseBranchOperator): def choose_branch(self, context): import random item_lst = ['A','B','C'] selected_item = random.choice(item_lst) if selected_item == 'A': return 'task_a' elif selected_item in ..

[Airflow] Task 분기처리하기 with task.branch

이번 포스팅에서는 데커레이터인 task.branch 를 이용해서 task 분기처리를 해보려고 합니다. 코드를 바로 살펴보겠습니다. from airflow.decorators import task @task.branch(task_id='python_branch_task') def select_random(): import random item_lst = ['A','B','C'] selected_item = random.choice(item_lst) if selected_item == 'A': return 'task a' elif selected_item in ['B','C']: return ['task_b','task_c'] select_random() >> [task_a,task_b,task_c] Bra..

[Airflow] BranchPython 오퍼레이터로 분기처리하기

이제부터는 Task를 다룰 수 있는 고급기능에 대해 알아볼건데요, 그 중에서 먼저 Task를 분기처리할 수 있는 방법들에 대해살펴보도록 하겠습니다. Task 분기 처리하는 방법에는 크게 3가지가 있는데, 이번 포스팅에서는 BranchPython Operator에 대해 배워보겠습니다. 1. Task 분기 처리 유형 먼저, Task 분기처리는 왜 필요할까요? 예를 들어, 상위 task 1개에 하위 task 3개가 있다고 가정해봅시다. 기본적으로, 위 사진처럼 task관계가 있다면, task1이 끝나고, task2-1, task2-2, task2-3 가 모두 다 같이 수행됩니다. 그런데, 가끔은 task1의 수행결과에 따라서 task2-x 중 하나만 수행하도록 구성하고 싶은 경우가 있을 것입니다. 예를 들어,..

[Airflow] Python & email 오퍼레이터간 Xcom 사용

이번에는 Xcom을 이용해서 email을 전송하는 실습을 해볼까 합니다. Pyython Operator의 결과값을 이용해서 Email 을 전송하는데, 중간에 Xcom을 사용해보려고 합니다. 먼저 (Python → Email 오퍼레이터 Xcom 전달)을 하기 위해서 Email 오퍼레이터는 어떤 파라미터에 Template를 쓸 수 있는지 알아봐야 합니다. 공식문서를 참고해보면, 'to', 'subject', 'html_content', 'files' 파라미터가 템플릿 문법을 적용할 수 있는 변수들입니다. 이 중에서 실습은 'subject'와 'html_content' 의 두 가지 필드에 템플릿 문법을 사용해보겠습니다. @task(task_id='find_dog_task') def dog_find(**kwar..

[Airflow] Python & Bash 오퍼레이터 with Xcom

python 오퍼레이타에서 xcom을 사용하는 방법과 bash 오퍼레이터에서 xcom을 사용하는 방법을 배웠으니, 이제는 두 오퍼레이터를 혼합해서 Xcom 데이터를 구하는 방법을 알아보겠습니다. 1. Python → Bash 오퍼레이터 Xcom 전달 코드를 먼저 살펴보겠습니다. @task(task_id='python_push') def python_push_xcom(): result_dict = {'status':'MyLove','data':['Allu','Arsenal','Myself'],'options_cnt':100} return result_dict bash_pull = BashOperator( task_id = 'bash_pull', env = { 'STATUS':'{{ti.xcom_pull(t..

[Airflow] Bash Operator에서 Xcom 사용

이번에는 Bash Operator에서 Xcom 사용하는 방법을 살펴보도록 하겠습니다. 1. Bash 오퍼레이터에서 Xcom 사용하기 먼저, 공식문서에서 Bash Operator에서 쓸 수 있는 템플릿 파라미터가 뭐가 있는지 알아야 합니다. 'env'와 'bash_command' 파라미터에서 템플릿 문법을 적용시킬 수 있는데, 이를 이용하여 push/pull 을 해봅시다.먼저 예시 코드를 보겠습니다. bash_push = BashOperator( task_id = 'bash_push', bash_command = "echo START && " "echo XCOM PUSHED " "{{ti.xcom_push(key='bash_pushed',value='first_bash_message') }} && " "e..

[Airflow] Jinja 템플릿

에어플로우에서는 Jinja 템플릿을 활용하고 있기 때문에, 먼저 Jinja 템플릿에 대해 이해해보도록 하겠습니다. 1. Jinja 템플릿 문서(파일)에서 특정 양식으로 작성된 값을 런타임시 실제 값으로 치환해주는 처리 엔진입니다. 템플릿 엔진은 여러 솔루션이 존재하며 그 중 Jinja 템플릿은 파이썬 언어에서 사용하는 엔진입니다. ※ Jinja 라이브러리는 airflow를 설치할 때, 이미 설치가 됩니다. 예시 코드를 살펴봅시다. from jinja2 import Template template = Template('my name is {{name}}') new_template = template.render(name='allu') print(new_template)# my name is allu {{n..

반응형