컴퓨터 공부 217

[Airflow] 지원되는 오퍼레이터 보기

이번 포스팅에서는 여러가지 airflow에서의 오퍼레이터로 뭐가 있는지 다양하게 살펴보겠습니다. 1. 기본 오퍼레이터 : Airflow에서 기본적으로 제공해주는 오퍼레이터를 한 번 살펴보도록 하겠습니다. 파일 경로 오퍼레이터(클래스) 중요도 비고 airflow.models.baseoperator BaseOperator ⭐⭐⭐ - 오펄레이터를 직접 개발하고 싶은 경우, 이 클래스 상속하여 개발 (execute() 함수를 오버라이딩 하여 사용합니다.) - 아래 오퍼레이터들은 모두 이 클래스를 상속하여 개발되어 있습니다. - Airflow를 잘 쓰려면, 이 오퍼레이터 상속/개발하는 것을 자유자재로 할 줄 알아야 합니다. airflow.operators.bash BashOperator ⭐⭐⭐ - bash쉘 스크..

[Airflow] Edge Label

오늘은 간단한 개념인 Edge Label을 짚고 넘어가볼까 합니다. 1. Edge Label 개념이란 무엇일까요? : Task 연결에 대한 설명(Comment)입니다. 굉장히 간단합니다. 말 그대로, Task 간의 edge에 있는 comment를 뜻합니다. 바로 실습을 해보도록 하죠. 2. Edge Label 실습 #1 from airflow.utils.edgemodifier import Label empty_1 = EmptyOperator( task_id='empty_1' ) empty_2 = EmptyOperator( task_id='empty_2' ) empty_1 >> Label('1과 2사이') >> empty_2 먼저, 라벨을 달려면, airflow.utils.edgemodifier에서 Lab..

[Airflow] Task Group

이번 포스팅에서는 Task들을 모아서 관리할 수 있는 Task Group에 대해 알아보겠습니다. 하나의 dag에 task가 많다면, 관련있는 task끼리 그룹화하여 관리하도록 지원해주는 기능입니다. 1. Task Group 의 개념 Task들의 모음입니다. UI Graph 탭에서 Task들을 Group화하여 보여줍니다.(https://airflow.apache.org/docs/apache-airflow/stable/core-concepts/dags.html#taskgroups) DAGs — Airflow Documentation airflow.apache.org 각 task가 그룹화된 섹션을 열어보면, inner_section이 또 있습니다. 즉, task 그룹 안에 또 다른 task 그룹도 계층적으로 ..

[Airflow] Trigger Rule

이제까지는 상위 task가 하나 있을 때, 상위 task가 하위 task로 분기하는 경우 분기하는 조건에 대해서 알아봤습니다. 이번에는 상위 task 여러개가 하나의 하위 task로 연결이 되는 구조에서 하위 task의 실행조건을 설정하는 방법(Trigger Rule)에 대해 알아보겠습니다. 1. Trigger Rule 종류 아래와 같은 task들이 있다고 해봅시다. 기본적으로는 task1, task2, task3이 모두 성공적으로 잘 끝나야 task4가 도는 구조입니다. 하지만, 이 rule을 바꾸고 싶을 경우가 있을 것입니다. 예를 들어, task1, task2, task3 중에서 어느 하나라도 정상적으로 끝나면, task4가 수행되도록 하는 rule을 만들 수도 있고, 성공여부에 상관없이 task1..

[Airflow] BaseBranchOperator로 분기처리하기

이번 포스팅에서는 Task 분기처리하는 방법 중에 마지막 방법인 BaseBranchOperator로 분기처리하는 방법에 대해 살펴보겠습니다. 코드를 바로 살펴보겠습니다. from airflow.operators.branch import BaseBranchOperator with DAG(... ) as dag: class CustomBranchOperator(BaseBranchOperator): def choose_branch(self, context): import random item_lst = ['A','B','C'] selected_item = random.choice(item_lst) if selected_item == 'A': return 'task_a' elif selected_item in ..

반응형