컴퓨터 공부/💿 Airflow 33

[Airflow] Python & email 오퍼레이터간 Xcom 사용

이번에는 Xcom을 이용해서 email을 전송하는 실습을 해볼까 합니다. Pyython Operator의 결과값을 이용해서 Email 을 전송하는데, 중간에 Xcom을 사용해보려고 합니다. 먼저 (Python → Email 오퍼레이터 Xcom 전달)을 하기 위해서 Email 오퍼레이터는 어떤 파라미터에 Template를 쓸 수 있는지 알아봐야 합니다. 공식문서를 참고해보면, 'to', 'subject', 'html_content', 'files' 파라미터가 템플릿 문법을 적용할 수 있는 변수들입니다. 이 중에서 실습은 'subject'와 'html_content' 의 두 가지 필드에 템플릿 문법을 사용해보겠습니다. @task(task_id='find_dog_task') def dog_find(**kwar..

[Airflow] Python & Bash 오퍼레이터 with Xcom

python 오퍼레이타에서 xcom을 사용하는 방법과 bash 오퍼레이터에서 xcom을 사용하는 방법을 배웠으니, 이제는 두 오퍼레이터를 혼합해서 Xcom 데이터를 구하는 방법을 알아보겠습니다. 1. Python → Bash 오퍼레이터 Xcom 전달 코드를 먼저 살펴보겠습니다. @task(task_id='python_push') def python_push_xcom(): result_dict = {'status':'MyLove','data':['Allu','Arsenal','Myself'],'options_cnt':100} return result_dict bash_pull = BashOperator( task_id = 'bash_pull', env = { 'STATUS':'{{ti.xcom_pull(t..

[Airflow] Bash Operator에서 Xcom 사용

이번에는 Bash Operator에서 Xcom 사용하는 방법을 살펴보도록 하겠습니다. 1. Bash 오퍼레이터에서 Xcom 사용하기 먼저, 공식문서에서 Bash Operator에서 쓸 수 있는 템플릿 파라미터가 뭐가 있는지 알아야 합니다. 'env'와 'bash_command' 파라미터에서 템플릿 문법을 적용시킬 수 있는데, 이를 이용하여 push/pull 을 해봅시다.먼저 예시 코드를 보겠습니다. bash_push = BashOperator( task_id = 'bash_push', bash_command = "echo START && " "echo XCOM PUSHED " "{{ti.xcom_push(key='bash_pushed',value='first_bash_message') }} && " "e..

[Airflow] Python Operator에서 Xcom 사용

1. Xcom이란 무엇일까요? (=Cross Communication) Airflow DAG 안 Task 간 데이터 공유를 위해 사용되는 기술입니다. (ex) Task1의 수행 중 내용이나 결과를 Task2에서 사용 또는 입력으로 주고 싶은 경우가 있을 수 있겠죠? 주로 작은 규모의 데이터 공유를 위해 사용합니다. (Xcom 내용은 메타 DB의 xcom 테이블에 값이 저장됩니다.) 1GB 이상의 대용량 데이터 공유를 위해서는 외부 솔루션 연동이 필요합니다. (AWS S3, HDFS 등) 2. Python 오퍼레이터에서 Xcom 사용하기 크게 두 가지 방법으로 Xcom 사용이 가능한데요, 한 번 살펴보시죠. 1) **kwargs 에 존재하는 ti (task_instance) 객체 활용 예시 코드를 먼저 보..

[Airflow] Python Operator with macros

BashOperator에서도 macro변수를 어떻게 쓸 수 있는지 알아본만큼, 이번 시간에는 PythonOperator에서 macro를 사용하는 방법을 알아보겠습니다. 1. 먼저 PythonOperator에서는 어떤 파라미터가 Template 변수를 지원할까요? 공식문서를 살펴보면, 'template_dict', 'op_args', 'op_kwargs' 가 템플릿을 지원합니다. 저번에는 op_kwargs 를 이용해서 template 변수를 써봤으므로, 이번에는 templates_dict 를 써서 macro변수를 써보겠습니다. Python 오퍼레이터를 사용해 macro변수를 써보는 코드를 작성해보겠습니다. @task(task_id = 'task_using_macros', templates_dict = {'..

반응형