컴퓨터 공부/💿 Airflow

[Airflow] 외부 파이썬 함수 수행하기

letzgorats 2023. 8. 23. 09:26

DAG 외부에서 함수를 만들었을 때, 그 함수를 import 해서 실행시키는 방법에 대해서 알아보겠습니다.

 

 

1) 파이썬 모듈 경로 이해하기

    : dag에서 우리가 만든 외부 함수를 import 해와야 하는데, import 경로를 어떻게 작성해야 하는지 알려면, 파이썬 모듈 경로를 이해해야 합니다.

 

먼저 airflow의 오퍼레이터를 불러올 때는 아래와 같은 코드가 필요했습니다.

from airflow.operators.python import PythonOperator

: "Airflow 폴더 아래 operators 폴더 아래 python 파일 아래에서 PythonOperator 클래스를 가지고 온다"는 뜻입니다.

 

※ 그렇다면, 파이썬은 위 경로를 어떻게 찾을까요?

    : 파이썬은 sys.path 변수에서 모듈의 위치를 검색합니다.

여기서, ''은 실행하는 파이썬 파일과 동일디렉토리에 있는 파일입니다.

즉, 같은 디렉토리에 있는 파이썬 파일끼리는 import로 바로 불러들일수 있습니다.

 

그 밑의 5가지 정도되는 경로는 pip로 설치한 라이브러리들입니다.

즉, 파이썬 라이브러리를 설치할 때, 기본적으로 설치되는 경로들입니다.

 

즉, 외부의 함수를 dag에다가 import해오려면, 우리가 만들었던 파이썬 파일의 경로를 sys.path에 추가를 해줘야 합니다.

추가하는 방법은 크게 2가지 입니다.

  1. 명시적으로 추가 (ex: sys.path.append('/home/letzgorats')) --> sys.path의 리스트에 append 해주는 방식입니다
  2. OS 환경변수 PYTHONPATH에 값을 추가

하지만, 이 두가지 방법은 꽤 귀찮은 과정입니다. 매번 이렇게 할 수 없는 노릇이겠죠.

그래서, Airflow는 자동적으로 dags폴더와 plugins 폴더를 sys.path에 추가합니다.

(컨테이너에서 airflow info 명령을 수행해보면 됩니다.)

 

2) plugins 폴더 이용하기

모듈 불러오기

이렇게 하면, 공통함수 작성해 놓을 수 있어 dag 코드가 깔끔해집니다.

또한, 재활용성이 증가해지는 장접도 있습니다.

 

해당 개념을 가지고 실습을 진행해보겠습니다.

3) 파이썬 외부 함수 실행하기

 

먼저, plugins 폴더 안에 'common' 이라는 폴더를 만들어 주고, 그 안에 'get_sftp' 라는 함수를 작성해줍니다.

common_func 파일 작성

from airflow import DAG
import pendulum
import datetime
from airflow.operators.python import PythonOperator
from plugins.common.common_func import get_sftp

with DAG(
    dag_id = "dags_python_import_func",
    schedule="30 6 * * *",
    start_date=pendulum.datetime(2023, 8, 1, tz="UTC"),
    catchup=False
) as dag:
    
    task_get_sftp = PythonOperator(
        task_id = "task_get_sftp",
        python_callable=get_sftp
    )

그런 후에, dags폴더에 'dags_python_import_func'라는 파이썬 파일을 생성하고, 위와 같이 코드를 짜면, 로컬 내 개발환경에서 에러가 나진 않지만, airflow에서는 에러가 발생할 것입니다.

 

이유는 바로 

from plugins.common.common_func import get_sftp

이곳에서 문제가 나는 것인데요, 

airflow는 기본적으로 plugins까지 path로 잡기 때문에, 해당 코드를

from common.common_func import get_sftp

와 같이 바꿔줘야 합니다. 하지만, 이렇게 바꾸면, 이번에는 로컬 내 개발환경에서 오류가 나기 때문에,

해당 문제를 해결해주기 위해서는 airflow 작업공간에 '.env'파일을 생성해주고 아래와 같이 작성해야 합니다.

WORKSPACE_FOLDER=C:/Users/hocke/vscode/airflow
PYTHONPATH=${WORKSPACE_FOLDER}/plugins

WORKSPACE_FOLDER에는 프로젝트의 최상위 홈 디렉토리를 적어주면 되고,
(pwd로 현재 작업경로가 어디인지 파악하시면 됩니다.)
PYTHONPATH에는 /plugins만 붙어주면 됩니다.

(혹시나 작성한 .env 파일을 깃헙에 올린다면, git에 올리지 않아도 되니 gitignore에 넣어줍시다.)

 

이제, 준비가 완료됐고, airflow에서도 제대로 동작하는지 확인해봅시다.

dag이 잘 올라갔고, unpaused를 한 번 시켜줌으로써 실행시키고, 로그를 확인해보면 위와 같이 get_sftp함수를 제대로 실행시키는 것을 확인할 수 있습니다.

 

이번에는 파이썬 오퍼레이터를 쓰지만, 외부에서 만든 함수를 가져와서 쓰는 실습을 진행해봤습니다. import하는 경로를 작성하기 위해서는 우리가 작성한 외부함수를 plugins 경로 안에 넣어줘야 import할 때, 인식을 할 수 있었습니다.

이상으로, 외부함수를 python operator를 통해 사용하는 방법에 대해 알아봤습니다.

반응형