데이터 35

[Airflow] BaseBranchOperator로 분기처리하기

이번 포스팅에서는 Task 분기처리하는 방법 중에 마지막 방법인 BaseBranchOperator로 분기처리하는 방법에 대해 살펴보겠습니다. 코드를 바로 살펴보겠습니다. from airflow.operators.branch import BaseBranchOperator with DAG(... ) as dag: class CustomBranchOperator(BaseBranchOperator): def choose_branch(self, context): import random item_lst = ['A','B','C'] selected_item = random.choice(item_lst) if selected_item == 'A': return 'task_a' elif selected_item in ..

[Airflow] Task 분기처리하기 with task.branch

이번 포스팅에서는 데커레이터인 task.branch 를 이용해서 task 분기처리를 해보려고 합니다. 코드를 바로 살펴보겠습니다. from airflow.decorators import task @task.branch(task_id='python_branch_task') def select_random(): import random item_lst = ['A','B','C'] selected_item = random.choice(item_lst) if selected_item == 'A': return 'task a' elif selected_item in ['B','C']: return ['task_b','task_c'] select_random() >> [task_a,task_b,task_c] Bra..

[Airflow] BranchPython 오퍼레이터로 분기처리하기

이제부터는 Task를 다룰 수 있는 고급기능에 대해 알아볼건데요, 그 중에서 먼저 Task를 분기처리할 수 있는 방법들에 대해살펴보도록 하겠습니다. Task 분기 처리하는 방법에는 크게 3가지가 있는데, 이번 포스팅에서는 BranchPython Operator에 대해 배워보겠습니다. 1. Task 분기 처리 유형 먼저, Task 분기처리는 왜 필요할까요? 예를 들어, 상위 task 1개에 하위 task 3개가 있다고 가정해봅시다. 기본적으로, 위 사진처럼 task관계가 있다면, task1이 끝나고, task2-1, task2-2, task2-3 가 모두 다 같이 수행됩니다. 그런데, 가끔은 task1의 수행결과에 따라서 task2-x 중 하나만 수행하도록 구성하고 싶은 경우가 있을 것입니다. 예를 들어,..

[Airflow] 전역 공유변수 Variable

앞서서는 특정 dag에 있는 task끼리만 데이터를 공유할 수 있는 방법이었다면, 모든 dag에서 데이터를 접근하는 방식에 대해 알아볼까합니다. 그런 용도로 airflow에서 기능을 제공하는 것이 Variable 입니다. 1. 전역변수 Variable 이해 Xcom: 특정 DAG, 특정 schedule에 수행되는 Task 간에만 공유 모든 DAG이 공유할 수 있는 전역 변수는 없을까요? → 바로 Variable이 있습니다! ※ Variable 등록하기 : airflow 서비스를 띄우고, Admin 탭에 들어가서 Variables 메뉴를 누르고 "+" 버튼을 누르면 됩니다. : 실제 Variable의 Key, Value 값은 메타 DB에 저장됩니다. (variable 테이블) Variable도 Key, V..

[Airflow] Python & email 오퍼레이터간 Xcom 사용

이번에는 Xcom을 이용해서 email을 전송하는 실습을 해볼까 합니다. Pyython Operator의 결과값을 이용해서 Email 을 전송하는데, 중간에 Xcom을 사용해보려고 합니다. 먼저 (Python → Email 오퍼레이터 Xcom 전달)을 하기 위해서 Email 오퍼레이터는 어떤 파라미터에 Template를 쓸 수 있는지 알아봐야 합니다. 공식문서를 참고해보면, 'to', 'subject', 'html_content', 'files' 파라미터가 템플릿 문법을 적용할 수 있는 변수들입니다. 이 중에서 실습은 'subject'와 'html_content' 의 두 가지 필드에 템플릿 문법을 사용해보겠습니다. @task(task_id='find_dog_task') def dog_find(**kwar..

[Airflow] Python & Bash 오퍼레이터 with Xcom

python 오퍼레이타에서 xcom을 사용하는 방법과 bash 오퍼레이터에서 xcom을 사용하는 방법을 배웠으니, 이제는 두 오퍼레이터를 혼합해서 Xcom 데이터를 구하는 방법을 알아보겠습니다. 1. Python → Bash 오퍼레이터 Xcom 전달 코드를 먼저 살펴보겠습니다. @task(task_id='python_push') def python_push_xcom(): result_dict = {'status':'MyLove','data':['Allu','Arsenal','Myself'],'options_cnt':100} return result_dict bash_pull = BashOperator( task_id = 'bash_pull', env = { 'STATUS':'{{ti.xcom_pull(t..

[Airflow] Bash Operator에서 Xcom 사용

이번에는 Bash Operator에서 Xcom 사용하는 방법을 살펴보도록 하겠습니다. 1. Bash 오퍼레이터에서 Xcom 사용하기 먼저, 공식문서에서 Bash Operator에서 쓸 수 있는 템플릿 파라미터가 뭐가 있는지 알아야 합니다. 'env'와 'bash_command' 파라미터에서 템플릿 문법을 적용시킬 수 있는데, 이를 이용하여 push/pull 을 해봅시다.먼저 예시 코드를 보겠습니다. bash_push = BashOperator( task_id = 'bash_push', bash_command = "echo START && " "echo XCOM PUSHED " "{{ti.xcom_push(key='bash_pushed',value='first_bash_message') }} && " "e..

[Airflow] Python Operator에서 Xcom 사용

1. Xcom이란 무엇일까요? (=Cross Communication) Airflow DAG 안 Task 간 데이터 공유를 위해 사용되는 기술입니다. (ex) Task1의 수행 중 내용이나 결과를 Task2에서 사용 또는 입력으로 주고 싶은 경우가 있을 수 있겠죠? 주로 작은 규모의 데이터 공유를 위해 사용합니다. (Xcom 내용은 메타 DB의 xcom 테이블에 값이 저장됩니다.) 1GB 이상의 대용량 데이터 공유를 위해서는 외부 솔루션 연동이 필요합니다. (AWS S3, HDFS 등) 2. Python 오퍼레이터에서 Xcom 사용하기 크게 두 가지 방법으로 Xcom 사용이 가능한데요, 한 번 살펴보시죠. 1) **kwargs 에 존재하는 ti (task_instance) 객체 활용 예시 코드를 먼저 보..

[Airflow] Python Operator with macros

BashOperator에서도 macro변수를 어떻게 쓸 수 있는지 알아본만큼, 이번 시간에는 PythonOperator에서 macro를 사용하는 방법을 알아보겠습니다. 1. 먼저 PythonOperator에서는 어떤 파라미터가 Template 변수를 지원할까요? 공식문서를 살펴보면, 'template_dict', 'op_args', 'op_kwargs' 가 템플릿을 지원합니다. 저번에는 op_kwargs 를 이용해서 template 변수를 써봤으므로, 이번에는 templates_dict 를 써서 macro변수를 써보겠습니다. Python 오퍼레이터를 사용해 macro변수를 써보는 코드를 작성해보겠습니다. @task(task_id = 'task_using_macros', templates_dict = {'..

[Airflow] Bash Operator with macros

Macro 변수는 Jinja 템플릿 내에서 날짜 연산을 가능하게끔 해주는 기능으로서 파이썬의 datetime이나 dateutil 같은 라이브러리를 이용해서 날짜연산을 할 수 있도록 지원을 해주고 있습니다. 1. 먼저 이런 Macro변수가 왜 필요한지 살펴봅시다. 아래상황을 가정해봅시다. DAG 스케줄은 매일 말일에 도는 스케줄인데(ex 0 0 L * *), BETWEEN 값을 전 월 마지막일부터 어제 날짜까지 주고 싶은데 어떻게 할까요? sql = f``` SELECT NAME, ADDRESS FROM TBL_REG WHERE REG_DATE BETWEEN ?? AND ?? ''' 와 같은 sql 이 있다고 합시다. 여기서 예를 들어, 배치일이 1월 31일이면, 12월 31일부터 1월 30일까지 배치일이..

반응형