컴퓨터 공부/💿 Airflow

[Airflow] Bash Operator & 외부 쉘 파일 수행하기

letzgorats 2023. 8. 19. 07:39

1. 쉘 스크립트란?

  • Unix/Linux 쉘 명령을 이용하여 만들어지고, 인터프리터에 의해 한 줄씩 처리되는 파일을 말합니다.
    • (컴파일 : 속도 ↑, c, java // 인터프리터 : 속도 ↓, python, shell)
  • Echo, mkdir, cd, cp, tar, touch 등의 기본적인 쉘 명령어를 입력하여 작성하며 변수를 입력받거나 For문, if문 그리고 함수도 사용가능합니다.
  • 확장자가 없어도 동작하지만, 주로 파일명에 .sh 확장자를 붙입니다.

 

2. 왜 쉘 스크립트가 필요할까요?

  • 쉘 명령어를 이용하여 복잡한 로직을 처리하는 경우
    • (ex) "sftp를 통해 파일을 받은 후 DB에 Insert & tar.gz으로 압축해두기" 처럼 복잡한 로직도 쉘 스크립트로 만들어 놓으면 유지보수성도 좋고 관리하기 편합니다.
  • 쉘 명령어 재사용을 위해서 필요합니다
    • (ex) sftp를 통해서 받아와야 하는 서버가 100대라고 가정해본다면, 100대를 위해서 각각의 쉘 스크립트를 만들기보다 로직은 똑같고, 소스서버만 다를 뿐이니까, 쉘 스크립트는 하나만 만들고, 가능한 한 재사용하면 될 것입니다. sftp는 접속할 때, IP나 port, user계정명, pw 등이 필요한데, 이러한 정보들을 변수로 입력받게 하고, DB에 insert한 로직들은 shell 스크립트 하나로 만들어두면, 소스 서버가 100개든 200개든 쉘 스크립트 하나로 모두 처리할 수 있을 것입니다. 이렇듯 변동될 수 있는 부분은 입력하는 변수로 저장하고 나머지는 쉘 스크립트에 작성을 해두면 편합니다.

 

3. Worker 컨테이너가 쉘 스크립트를 수행하려면?

  • 문제점
    • 1) 컨테이너는 외부의 파일을 인식할 수 없습니다.
    • 2) 컨테이너 안에 파일을 만들어주면 컨테이너 재시작시 파일이 사라집니다.
  • 해결방법
    • 우리가 컨테이너에 dags파일을 인식시키기 위해 경로를 바꿔준 것처럼 plugins폴더도 바꿔줍니다.

실습을 바로 해보죠!

airflow의 plugins폴더에 아래와 같은 sh파일을 작성해봤습니다.

DOG=$1
if [ $DOG == DACHSHUND ]; then
    echo "You selected DACHSHUND!"
elif [ $DOG == GOLDENRETRIEVER ]; then
    echo "You selected GOLDEN RETRIEVER!"
elif [ $DOG == POMERANIAN ]; then
    echo "You selected POMERANIAN!"
else
    echo "You selected other Dog!"
fi

$1은 처음으로 입력하는 값을 뜻하고, 그 입력값이 어떤 강아지인지에 따라 출력해주는 간단한 분기문입니다.

쉘 파일을 실행시키면, 다음과 같이 분기에 따라 값이 출력되는 것을 확인할 수 있습니다.


그리고 해당 sh파일을 수행할 수 있는 dag파일을 아래와 같이 작성해봅시다.

from airflow import DAG
import pendulum
import datetime
from airflow.operators.bash import BashOperator

with DAG(
    dag_id="dags_bash_select_dog",
    schedule="10 0 * * 6#1",
    start_date=pendulum.datetime(2023, 8, 1, tz="UTC"),
    catchup=False
) as dag:

    t1_dachshund = BashOperator(
        task_id="t1_dachshund",
        bash_command="/plugins/shell/select_dog.sh DACHSHUND",
    )

    t2_pomeranian = BashOperator(
        task_id="t2_pomeranian",
        bash_command="/plugins/shell/select_dog.sh POMERANIAN",
    )

    t1_dachshund >> t2_pomeranian

그리고, 이제 도커를 실행시켜 airflow를 확인해보면, 잘 올라간 것을 확인할 수 있습니다.

여기서, 다음 실행 날짜(next run)가, 2023년 8월 5일로 되어 있는데, 현재 이 dag파일을 UTC기준으로 작성한 날짜가 8월 18일입니다. 이미 다음 실행 날짜가 지난셈이죠.

 

위의 dag파일의 schedule을 확인해보면, 매월 첫째 주 토요일 0시 10분에 실행되는 파일입니다.

이전 배치일이 7월 5일이고, next run이 8월 5일이라고 생각해봅시다.

오늘 날짜가 이전 배치일과 next run 사이에 있는지, 아니면, 벗어난 구간에 있는지에 따라 dag을 unpaused 시켰을 때, 해당 dag파일이 기본적으로 한 번 도는지 안 도는지가 결정됩니다.

즉, next run 이전이라면, 기본적으로 next run에 한 번 돌게 되고, 지금과 같이 next run이 이미 지난 이후라면  그 이후라면, dag을 unpaused 시켰을 때, 기본적으로 실행되지 않습니다. 현재 catchup 변수를 보면 False로 줬기 때문에, unpaused 시켜도 돌지 않게 되는 것입니다.

따라서, 우리가 작성한 코드가 제대로 작동되는지 한 번 확인해보려면, trigger dag으로 한번 수작업으로 실행시켜줘야 합니다.

 

해당 dag을 실행시켜주고 로그를 살펴보면 아래와 같이 로그가 잘 나오는 것을 확인할 수 있습니다.

t1_dachshund
t2_pomeranian

 

이렇게 bash_operator를 사용해 쉘 스크립트를 워커 노드(컨테이너)에다가 돌리려고 한다면, plugins 폴더에 넣어야 하고, plugins폴더를 연동을 시킴으로써 워커 컨테이너가 인식하고, 해당 작업을 돌릴 수 있도록 해줬습니다.

 

 

반응형