오퍼레이터 6

[Airflow] BaseBranchOperator로 분기처리하기

이번 포스팅에서는 Task 분기처리하는 방법 중에 마지막 방법인 BaseBranchOperator로 분기처리하는 방법에 대해 살펴보겠습니다. 코드를 바로 살펴보겠습니다. from airflow.operators.branch import BaseBranchOperator with DAG(... ) as dag: class CustomBranchOperator(BaseBranchOperator): def choose_branch(self, context): import random item_lst = ['A','B','C'] selected_item = random.choice(item_lst) if selected_item == 'A': return 'task_a' elif selected_item in ..

[Airflow] Python Operator에 op_kwargs로 변수 할당하기

먼저 Python 오퍼레이터의 op_kwargs 파라미터를 이해해봅시다. CASE 1) 함수에 일반 변수만 있을 경우 def register(name,gender): print(f'이름은 {name}이고 성별은 {gender}입니다') 파이썬 오퍼레이터로 작성해본다면, 아래와 같이 작성할 수 있습니다. python_task = PythonOperator( task_id = 'python_task', python_callable=register, op_kwargs={'name':'allu','gender':'male'} # 딕셔너리로 작성! ) CASE 2) 함수에 일반 변수 + **kwargs 도 있을 경우 def register(name,gender, **kwargs): print(name) print..

[Airflow] Python Operator에 op_args로 변수 할당하기

먼저 Python 오퍼레이터의 op_args 파라미터를 이해해봅시다. CASE 1) 함수에 일반 변수만 있을 경우 def register(name,gender): print(f'이름은 {name}이고 성별은 {gender}입니다') 파이썬 오퍼레이터로 작성해본다면, 아래와 같이 작성할 수 있습니다. python_task = PythonOperator( task_id = 'python_task', python_callable=register, op_args=['allu','male'] # 리스트로 작성! ) CASE 2) 함수에 일반 변수 + *args 도 있을 경우 def register(name,gender, *args): print(name) print(gender) print(args)#('kore..

[Airflow] 외부 파이썬 함수 수행하기

DAG 외부에서 함수를 만들었을 때, 그 함수를 import 해서 실행시키는 방법에 대해서 알아보겠습니다. 1) 파이썬 모듈 경로 이해하기 : dag에서 우리가 만든 외부 함수를 import 해와야 하는데, import 경로를 어떻게 작성해야 하는지 알려면, 파이썬 모듈 경로를 이해해야 합니다. 먼저 airflow의 오퍼레이터를 불러올 때는 아래와 같은 코드가 필요했습니다. from airflow.operators.python import PythonOperator : "Airflow 폴더 아래 operators 폴더 아래 python 파일 아래에서 PythonOperator 클래스를 가지고 온다"는 뜻입니다. ※ 그렇다면, 파이썬은 위 경로를 어떻게 찾을까요? : 파이썬은 sys.path 변수에서 모듈..

[Airflow] Python operator 기본

파이썬 오퍼레이터는 어떤 역할을 하는지 알아봅시다. 먼저 라이브러리를 어떻게 사용하는지부터 살펴볼까요? from airflow.operators.python import PythonOperator # bash operator는 .bahs 였다면, python operator는 .python으로 라이브러리를 불러옵니다. Python Operator는 무엇을 하는 오퍼레이터일까요? : "정의된 파이썬 함수를 실행시키는 오퍼레이터" 입니다. (오퍼레이터는 새로운 파일을 생성하는 것이 아닌, 기존 파일을 실행 시켜주는 역할을 합니다.) 가장 많이 쓰이는 Operator로서, Airflow를 배운다면, 꼭 알아야 하는 오퍼레이터라고 할 수 있습니다! ※ 파이썬 모듈에는 어떤 오퍼레이터가 있을까? 패키지 오퍼레이터..

[Airflow] Dag 생성(bash operator), Task의 수행주체

Airflow에서는 workflow가 곧 DAG 인데, DAG에는 오퍼레이터와 task 라는 것이 있습니다. Operator는 특정 행위를 할 수 있는 기능을 모아 놓은 클래스, 즉 설계도라고 할 수 있고, Task는 오퍼레이터에서 객체화(인스턴스화)되어 DAG에서 실행 가능한 오브젝트라고 할 수 있습니다. 즉, DAG에서는 오퍼레이터가 직접 도는 것이 아니라, 오퍼레이터를 통해서 만들어진 task들이 실행되는 것입니다. 여기서 오퍼레이터의 종류로는 리눅스의 쉘 명령을 수행할 수 있게끔 해주는 bash 오퍼레이터가 있고, python 함수들을 실행시켜주는 python 오퍼레이터, 아마존 aws S3 솔루션을 컨트롤 할 수 있게끔 해주는 S3오퍼레이터, 구글 클라우드 GCS 를 다룰 수 있는 오퍼레이터인 ..

반응형