컴퓨터 공부/💿 Airflow

[Airflow] Dag 생성(bash operator), Task의 수행주체

letzgorats 2023. 8. 4. 16:02

Airflow Dag 생성

Airflow에서는 workflow가 곧 DAG 인데, DAG에는 오퍼레이터와 task 라는 것이 있습니다.

 

Operator는 특정 행위를 할 수 있는 기능을 모아 놓은 클래스, 즉 설계도라고 할 수 있고,

Task는 오퍼레이터에서 객체화(인스턴스화)되어 DAG에서 실행 가능한 오브젝트라고 할 수 있습니다.

 

즉, DAG에서는 오퍼레이터가 직접 도는 것이 아니라, 오퍼레이터를 통해서 만들어진 task들이 실행되는 것입니다.

여기서 오퍼레이터의 종류로는

  • 리눅스의 쉘 명령을 수행할 수 있게끔 해주는 bash 오퍼레이터가 있고,
  • python 함수들을 실행시켜주는 python 오퍼레이터,
  • 아마존 aws S3 솔루션을 컨트롤 할 수 있게끔 해주는 S3오퍼레이터,
  • 구글 클라우드 GCS 를 다룰 수 있는 오퍼레이터인 GCS 오퍼레이터도 있습니다.

이번 프로젝트에서는 bash 오퍼레이터를 써보도록 하겠습니다!


Task의 수행 주체에 대해서 살펴보겠습니다.

airflow 동작원리
task의 수행주체

Airflow에는 "스케쥴러"와 "워커"라는 핵심적인 요소가 있습니다.

그 중에서 뇌 역할을 하는 것이 "스케줄러"인데요, 크게 스케줄러의 역할은 3가지가 있습니다.

  • 먼저, Dag파일을 읽어들이면서 문법적인 오류는 없는지 task간의 관계는 어떻게 되는지 분석합니다. (파싱)
  • 만약에 파싱을 해서 이상이 없으면, 정보를 DB에다가 넣게 됩니다. (task에는 뭐가 있고, 주기는 어떠한지에 대한 정보를 넣는다)
  • 그 DAG이 실행시간이 왔다면, 시작시켜주는 역할을 합니다. (큐를 통해 워커에게 지시합니다)

그렇다면, "워커"는 어떤 역할을 할까요?

스케줄러가 시킨 DAG파일을 찾아서 처리하는데요,

이 때, 처리가 되기 전에 메타DB를 업데이트 하고, 처리를 한 후에도 메타DB를 업데이트합니다.

 

정리하자면 스케줄러는 DAG을 파싱한 후에, DB에 정볼르 저장한 후, DAG 시작시간을 결정하고, 워커는 실제 작업 수행 전후로 메타 DB를 업데이트 합니다.

 

임시의 댁을 만들어봅시다.

dag 파일

  • 댁의 코드에서 dag_id 부분은 보통 dag 파일명과 동일하게 하는 것이 관념입니다.
  • schedule에서의 스트링 순서는 "분 시 일 월 요일" 로 표현되며, 위 댁에서는 매일마다 0시 0분에 도는 작업으로 설정되는 것으로 이해하면 됩니다.
  • start_date는 댁이 언제부터 돌건지를 결정해주는 것이고, 보면 2021년 1월 1일부터 타임zone을 "UTC"로 맞추면 해외표준시이고, 한국 시간 기준으로 맞추려면 "Asia/Seoul"로 tz를 바꿔줘야 합니다.
  • catch_up 은 false는 누락된 부분을 돌리지 않고, true면 누락된 부분도 돌립니다. 이 때, true로 돌리면 누락된 부분을 한번에 돌리기 때문에, 시간이 오래걸리거나 오류가 날 수도 있으니 보통은 false로 설정하도록 합시다.
  • dagrun_timeout은 말그대로 타임아웃 값을 설정하는 것입니다.
  • tags는 airflow dag에 작은 태그형태로 어떤 태그를 달건지 정하는 변수입니다.
    params는 dag 선언 밑에, task들을 만들어서 둘 건데, 그 task들에 공통적으로 넘겨줘야 파라미터가 있다면, params에 작성하면 됩니다.

오퍼레이터를 통해서 만들어지는 것이 task인데, 위 예시에서는 변수 'bash_t1'이 task명이고 마찬가지로 task_id값은 graph탭에서 보여지는 task의 이름입니다.

댁 파일과 파이썬 파일명의 관계와 비슷하게, 객체명과 task id 명과의 관계는 없지만 찾기 쉽게 하기 위해, 두 이름을 같게 설정하도록 하겠습니다.

bash_command는 어떤 쉘 스크립트를 수행할 것인지에 대해 적어주면 됩니다.

 

마지막으로는 task들의 수행 순서를 '>>' 를 활용해서 적어줍니다.

 

이제 깃으로 커밋을 하는데, 이전에 깔았던 docker-compose.yaml 파일을 까봅시다.

그 중에 volumes 항목을 살펴보면, 아래와 같습니다.

volumes항목은 wsl 디렉토리와 연결해줄 컨테이너 디렉토리를 매핑해주는 요소라고 이해하면 됩니다.

" : " 을 기준으로 왼쪽이 wsl의 볼륨, 오른쪽이 도커 컨테이너의 볼륨입니다.

wsl을 켜서 한 번 어떤의미인지 확인해봅시다.

그대로 echo해보면, "AIRFLOW_PROJ_DIR"의 값이 있으면 그 값을 출력하고, 그 값이 없으면 "."을 출력하라는 의미입니다. 현재 "AIRFLOW_PROG_DIR"에는 아무런 값이 없으므로, 

${AIRFLOW_PROJ_DIR:-}/dags 는 ./dags 가 되겠고, 이를 컨테이너의 /opt/airflow/dags에 연결시키라는 의미가 됩니다.

즉, 홈디렉토리 밑에 있는 dags폴더가 컨테이너의 dags와 연결되어 있다는 뜻인데, 우리가 코드를 짠 dag들은 airflow밑에 dags 폴더로 오기 때문에, 홈 밑에 있는 dags와는 다른 폴더입니다.

 

따라서, 볼륨항목을 아래와 같이 수정합니다.

이제 도커를 실행시키고 airflow를 로컬호스트로 켜보겠습니다.

task1
task2

worker 컨테이너 노드에서 확인해보면, 작성한 dag에 있는 bash operator에 의해 수행된 작업과 같은 결과가 나옵니다. 

즉, airflow에서 task를 실제 처리하는 것은 worker라는 것을 알 수 있고, 여기서는 worker 컨테이너가 실제 task를 처리했다는 것을 알 수 있습니다.

반응형