data 8

[Airflow] Python Operator에서 Jinja 템플릿 사용하기

저번에는 Bash Operator를 Jinja 템플릿을 사용했다면, 이번에는 Python Operator에서 사용해봅시다. 1. Python 오퍼레이터에서 with Template Python 오퍼레이터는 어떤 파라미터에 Template을 쓸 수 있을까요? 공식문서를 살펴보면 아래와 같습니다. python_callable op_kwargs op_args template_dict template_exts show_return_value_in_logs op_kwargs 와 op_args, template_dict 세 개의 파라미터가 template을 쓸 수 있습니다. 바로 한 번 실습을 해보도록 하겠습니다. 이 중에서, op_kwargs를 이용해서 jinja 템플릿을 써보도록 하겠습니다. (※ 파이썬 오퍼레..

[Airflow] Airflow의 날짜 개념

이전 포스팅에서 bash operator를 이용해 템플릿 변수를 출력해봤는데, 치환된 값이 어떤 값인지 이해하기 위해서 먼저 Airflow에서의 날짜개념을 짚고 넘어가야 할 필요가 있습니다. 1. Airflow 날짜 Template 변수 이해 먼저, 데이터 추출 예시를 살펴봐봅시다. (ex) 등록 테이블 REG_DATAE NAME ADDRESS 2023-02-24 15:34:35 홍길동 Busan 2023-02-24 19:14:42 김태희 Seoul 2023-02-24 23:52:19 조인성 Daejeon Daily ETL 처리를 위한 조회 커리 (2023/02/25 0시 실행) 라고 가정합시다. (그렇게 되면, 24일에서 25일 사이의 데이터를 가져오는 셈이겠죠?) 쿼리를 작성해보면 아래와 같습니다. ..

[Airflow] Jinja 템플릿

에어플로우에서는 Jinja 템플릿을 활용하고 있기 때문에, 먼저 Jinja 템플릿에 대해 이해해보도록 하겠습니다. 1. Jinja 템플릿 문서(파일)에서 특정 양식으로 작성된 값을 런타임시 실제 값으로 치환해주는 처리 엔진입니다. 템플릿 엔진은 여러 솔루션이 존재하며 그 중 Jinja 템플릿은 파이썬 언어에서 사용하는 엔진입니다. ※ Jinja 라이브러리는 airflow를 설치할 때, 이미 설치가 됩니다. 예시 코드를 살펴봅시다. from jinja2 import Template template = Template('my name is {{name}}') new_template = template.render(name='allu') print(new_template)# my name is allu {{n..

[Airflow] Python Operator에 op_kwargs로 변수 할당하기

먼저 Python 오퍼레이터의 op_kwargs 파라미터를 이해해봅시다. CASE 1) 함수에 일반 변수만 있을 경우 def register(name,gender): print(f'이름은 {name}이고 성별은 {gender}입니다') 파이썬 오퍼레이터로 작성해본다면, 아래와 같이 작성할 수 있습니다. python_task = PythonOperator( task_id = 'python_task', python_callable=register, op_kwargs={'name':'allu','gender':'male'} # 딕셔너리로 작성! ) CASE 2) 함수에 일반 변수 + **kwargs 도 있을 경우 def register(name,gender, **kwargs): print(name) print..

[Airflow] Python Operator에 op_args로 변수 할당하기

먼저 Python 오퍼레이터의 op_args 파라미터를 이해해봅시다. CASE 1) 함수에 일반 변수만 있을 경우 def register(name,gender): print(f'이름은 {name}이고 성별은 {gender}입니다') 파이썬 오퍼레이터로 작성해본다면, 아래와 같이 작성할 수 있습니다. python_task = PythonOperator( task_id = 'python_task', python_callable=register, op_args=['allu','male'] # 리스트로 작성! ) CASE 2) 함수에 일반 변수 + *args 도 있을 경우 def register(name,gender, *args): print(name) print(gender) print(args)#('kore..

[Airflow] Docker를 통한 Airflow 설치

도커를 이용해서 airflow를 설치해보기 전에, 도커에 대해 간략하게 살펴보고 갑시다. 도커가 있기 전에는 가상화서버(Virtual Machine)라는 것을 많이 이용했습니다. 가상화 서버에는 'Hyper Visior' 라는 것이 있는데, Hyper Visor는 물리적인 서버 위에 설치되어 있는 OS가 아닌 그 OS위에 가상화 VM들을 올리고 관리할 수 있도록 해줍니다. 가상화 VM들을 여러개 세팅하고 나면, 이 가상화 VM들은 서로간에 영향을 주지않고 완전히 독립적인 환경에서 구동될 수 있는 장점이 있었습니다. 하지만, 단점이 분명 존재하는데, 바로 '오버헤드'입니다. CPU나 메모리, 디스크 같은 공간을 가상화 서버에다가 명시적으로 할당을 해줘야하기 때문에, 그만큼 호스트 OS가 사용할 수 있는 가..

[Airflow] 에어플로우가 뭘까?

Airflow는 파이썬으로 제작된 도구이자 워크플로우 생성시에도, 파이썬으로 구현해야 합니다 하나의 워크플로우는 DAG이라고 하며, DAG은 Directed Acyclic Graph 의 약자로, 방향성을 가진 그래프지만, 순환하지 않는 그래프를 말합니다. ( 보통 '댁'이라고 말합니다) DAG 안에는 1개 이상의 TASK 가 존재하며, TASK간 선후행 연결은 가능하지만, 순환되지는 않습니다. 보통 워크플로우(dag)는 다음과 같은 단계(task)를 거칩니다. 1. Rest API를 사용한 데이터를 받아서 전처리 2. 데이터를 DB에 넣기 전에 중복을 제거하기 위한 처리 3. 전처리한 데이터를 DB에 삽입 Cron 기반의 스케쥴링을 사용합니다. (리눅스에서 사용하는 기법으로, task들이 실행되어야 하는..

반응형