데이터 44

[Airflow] Task 분기처리하기 with task.branch

이번 포스팅에서는 데커레이터인 task.branch 를 이용해서 task 분기처리를 해보려고 합니다. 코드를 바로 살펴보겠습니다. from airflow.decorators import task @task.branch(task_id='python_branch_task') def select_random(): import random item_lst = ['A','B','C'] selected_item = random.choice(item_lst) if selected_item == 'A': return 'task a' elif selected_item in ['B','C']: return ['task_b','task_c'] select_random() >> [task_a,task_b,task_c] Bra..

[Airflow] BranchPython 오퍼레이터로 분기처리하기

이제부터는 Task를 다룰 수 있는 고급기능에 대해 알아볼건데요, 그 중에서 먼저 Task를 분기처리할 수 있는 방법들에 대해살펴보도록 하겠습니다. Task 분기 처리하는 방법에는 크게 3가지가 있는데, 이번 포스팅에서는 BranchPython Operator에 대해 배워보겠습니다. 1. Task 분기 처리 유형 먼저, Task 분기처리는 왜 필요할까요? 예를 들어, 상위 task 1개에 하위 task 3개가 있다고 가정해봅시다. 기본적으로, 위 사진처럼 task관계가 있다면, task1이 끝나고, task2-1, task2-2, task2-3 가 모두 다 같이 수행됩니다. 그런데, 가끔은 task1의 수행결과에 따라서 task2-x 중 하나만 수행하도록 구성하고 싶은 경우가 있을 것입니다. 예를 들어,..

[Airflow] 전역 공유변수 Variable

앞서서는 특정 dag에 있는 task끼리만 데이터를 공유할 수 있는 방법이었다면, 모든 dag에서 데이터를 접근하는 방식에 대해 알아볼까합니다. 그런 용도로 airflow에서 기능을 제공하는 것이 Variable 입니다. 1. 전역변수 Variable 이해 Xcom: 특정 DAG, 특정 schedule에 수행되는 Task 간에만 공유 모든 DAG이 공유할 수 있는 전역 변수는 없을까요? → 바로 Variable이 있습니다! ※ Variable 등록하기 : airflow 서비스를 띄우고, Admin 탭에 들어가서 Variables 메뉴를 누르고 "+" 버튼을 누르면 됩니다. : 실제 Variable의 Key, Value 값은 메타 DB에 저장됩니다. (variable 테이블) Variable도 Key, V..

[Airflow] Python & email 오퍼레이터간 Xcom 사용

이번에는 Xcom을 이용해서 email을 전송하는 실습을 해볼까 합니다. Pyython Operator의 결과값을 이용해서 Email 을 전송하는데, 중간에 Xcom을 사용해보려고 합니다. 먼저 (Python → Email 오퍼레이터 Xcom 전달)을 하기 위해서 Email 오퍼레이터는 어떤 파라미터에 Template를 쓸 수 있는지 알아봐야 합니다. 공식문서를 참고해보면, 'to', 'subject', 'html_content', 'files' 파라미터가 템플릿 문법을 적용할 수 있는 변수들입니다. 이 중에서 실습은 'subject'와 'html_content' 의 두 가지 필드에 템플릿 문법을 사용해보겠습니다. @task(task_id='find_dog_task') def dog_find(**kwar..

[Airflow] Python & Bash 오퍼레이터 with Xcom

python 오퍼레이타에서 xcom을 사용하는 방법과 bash 오퍼레이터에서 xcom을 사용하는 방법을 배웠으니, 이제는 두 오퍼레이터를 혼합해서 Xcom 데이터를 구하는 방법을 알아보겠습니다. 1. Python → Bash 오퍼레이터 Xcom 전달 코드를 먼저 살펴보겠습니다. @task(task_id='python_push') def python_push_xcom(): result_dict = {'status':'MyLove','data':['Allu','Arsenal','Myself'],'options_cnt':100} return result_dict bash_pull = BashOperator( task_id = 'bash_pull', env = { 'STATUS':'{{ti.xcom_pull(t..

반응형