작년 Airflow를 처음으로 실전에 적용하는 과정에서, 다양한 환경설정과 그에 따라 달라지는 데이터베이스 등 신경써야 할 부분이 많았고, 이러한 점 때문에 지식 전달은 물론 다른 동료들이 직접 사용하는데 진입장벽이 좀 생긴다는 점을 느낄 수 있었습니다. 마침 새로운 프로젝트부터는 Airflow 2.x 을 도입하기로 했고, 적용해야 되는 김에 Docker를 활용했습니다. 그리하여 이번 글에서는 Docker를 활용한 Airflow 2.x을 설치하는 과정와 방법을 정리합니다. 기존에 많이 알려진 Airflow와 Docker와 관련된 예제에는 puckel의 Airflow Docker Image가 많이 사용되었는데요, 이 이미지는 Airflow 1.x에 한정되어 있기 때문에, 이번 글에서는 Airflow의 공식 Docker 이미지를 활용해보도록 하겠습니다.

0. Docker 설치

Docker가 설치되어 있지 않은 경우 아래의 링크를 통해 Docker를 설치하도록 합니다.

맥, 윈도우의 경우 Docker 데스크탑을 설치할 시, Docker Compose가 함께 설치 됩니다. 단, 리눅스의 경우에는 별도의 설치가 필요합니다.

1. 아무것도 모르지만 일단 실행해보기

Airflow 공식 문서(Running Airflow in Docker)에서는 기본적인 Docker 및 Docker Compose를 활용한 Airflow 예제를 제공하고 있습니다. 이 파일을 토대로 일단 한번 실행 과정을 따라가보겠습니다.

1.1. Airflow 2 Docker Compose 파일

공식문서에서의 Docker Compose 파일 기본적으로 CeleryExecutor를 사용하는 환경설정이 정의되어 있지만, 이번 예제에서는 단일 머신에서 구동을 할 것이기 때문에 LocalExecutor를 사용하도록 docker-compose.yml 파일을 아래와 같이 약간만 변경하도록 하겠습니다. (기본적으로 Celery와 Redis와 관련된 설정은 제외하였습니다.)

./docker-compose.yml

version: '3'
x-airflow-common:
  &airflow-common
  image: ${AIRFLOW_IMAGE_NAME:-apache/airflow:2.1.2}       # Docker 이미지는 Airflow 2.1.2 를 사용
  environment:
    &airflow-common-env
    AIRFLOW__CORE__EXECUTOR: LocalExecutor                 # LocalExecutor를 사용
    AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow # PostgreSQL을 데이터베이스로 사용
    AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION: 'true'     # 실행시 DAG가 자동으로 동작되지 않음
    AIRFLOW__CORE__LOAD_EXAMPLES: 'false'                  # 기본 예제 DAG는 불러오지 않음
    AIRFLOW__API__AUTH_BACKEND: 'airflow.api.auth.backend.basic_auth'
    _PIP_ADDITIONAL_REQUIREMENTS: ${_PIP_ADDITIONAL_REQUIREMENTS:-}
  volumes:			                                       # DAG, 로그, 플러그인 파일이 저장될 경로의 폴더를 볼륨으로 마운트함
    - ./dags:/opt/airflow/dags
    - ./logs:/opt/airflow/logs
    - ./plugins:/opt/airflow/plugins
    - ./data_files:/opt/airflow/data_files
  user: "${AIRFLOW_UID:-50000}:${AIRFLOW_GID:-50000}"
  depends_on:
    postgres:
      condition: service_healthy

services:
  postgres:
    image: postgres:13
    environment:
      POSTGRES_USER: airflow
      POSTGRES_PASSWORD: airflow
      POSTGRES_DB: airflow
    volumes:
      - postgres-db-volume:/var/lib/postgresql/data
    healthcheck:
      test: ["CMD", "pg_isready", "-U", "airflow"]
      interval: 10s
      retries: 5
    restart: always

  airflow-webserver:
    <<: *airflow-common
    command: webserver
    ports:
      - 8080:8080						 # 실행되는 컨테이너와 localhost의 포트를 8080으로 맞춰, 실행 중인 Webserver에 접근할 수 있도록 함
    restart: always

  airflow-scheduler:
    <<: *airflow-common
    command: scheduler
    restart: always

  airflow-init:
    <<: *airflow-common
    command: version
    environment:
      <<: *airflow-common-env
      _AIRFLOW_DB_UPGRADE: 'true'
      _AIRFLOW_WWW_USER_CREATE: 'true' 		                              # 기본 유저 계정을 생성함
      _AIRFLOW_WWW_USER_USERNAME: ${_AIRFLOW_WWW_USER_USERNAME:-airflow}  # 기본 유저 Username
      _AIRFLOW_WWW_USER_PASSWORD: ${_AIRFLOW_WWW_USER_PASSWORD:-airflow}  # 기본 유저 Password

volumes:
  postgres-db-volume:

1.2. Docker Compose 파일 실행하여 Airflow 컨테이너 띄우기

  1. 우선 DAG, 로그, 플러그인, 데이터 파일을 저장할 경로를 생성합니다.

    • mkdir ./dags ./logs ./plugins ./data_files
  2. Docker 컨테이너에서 구동되는 파일과 호스트의 파일이 동일한 user / group permission을 가질 수 있도록 .env파일을 생성합니다.

    • echo -e "AIRFLOW_UID=$(id -u)\nAIRFLOW_GID=0" > .env
  3. Airflow를 실행하기에 앞서, 데이터베이스를 구축하고 유저를 생성하는 명령어를 실행합니다.

    • docker-compose up airflow-init
    • 다음과 같은 메세지와 함께 완료되었다면 데이터베이스와 유저 생성이 성공적으로 실행된 것입니다.
  4. 3번이 정상적으로 실행되었다면, 다음 명령어를 통해 Airflow를 실행합니다. (실행하는 시간이 걸릴 수 있습니다.)

    • docker-compose up
  5. localhost:8080 , 즉 Airflow Webserver UI로 접속해보겠습니다.

    • docker-compose.yml 에서 정의한 Username과 Password를 입력해서 로그인합니다.
    • Security -> List Users 경로에서 추가적으로 유저 계정을 생성할 수 있습니다.

1.3. DAG 작성하기

예제로 사용할 수 있는 간단한 DAG를 작성하고 로컬 경로(./dags/)에 저장해서, 현재 구동 중인 Airflow 컨테이너에서 실행해보겠습니다. 해당 DAG 파일은 Launch Library API에서 우주 발사 관련 데이터를 다운 받고, 이 파일을 변환하여 text파일로 저장하는 프로세스를 수행합니다.

( Launch Library API는 SpaceDev 사에서 제공하는 API로, 전세계의 우주 로켓 등 모든 우주 발사체의 발사 일정을 데이터로 제공하는 무료 API입니다. )

./dags/dag_rocket_launch_schedule.py

import datetime

import requests
import airflow
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator


def save_launch_schedule_txt():
	with open("/tmp/launches.json") as f:
		launches = json.load(f)['results']

	launches_df = pd.DataFrame.from_dict(launches, orient='records')

	cur_datetime = str(datetime.datetime.now())
	launches_df.to_csv('/opt/airflow/data_files/{}_launch_schedule.txt'.format(cur_datetime), encoding='utf-8', sep='\t')

default_args = {
	"start_date" : airflow.utils.dates.days_ago(1),
	"owner" : "airflow_admin"
}

with DAG(
	dag_id="get_upcoming_rocker_launch_schedule",
	schedule_interval=@daily,
	default_args = default_args
	) as dag:

	download_launches = BashOperator(
		task_id = "download_launches",
		bash_command = "curl -o /tmp/launches.json -L 'https://ll.thespacedevs.com/2.0.0/launch/upcoming'"
		)

	save_launch_schedule = PythonOperator(
		task_id = "update_launch_schedule",
		python_callable = save_launch_schedule_txt
		)

download_launches >> save_launch_schedule

작성한 DAG 가 실행되었고, 결과 파일이 성공적으로 저장된 것을 마운트한 로컬 머신의 디렉토리에서 확인할 수 있습니다.

2. 커스텀 이미지 만들기

가장 기본적인 예제를 순서대로 따라가며 Airflow 2.x를 Docker를 활용해서 실행해보았습니다. 그렇다면 이대로 예제를 수정해가며 DAG를 작성해서 실전에 적용시켜도 될까요? 실제 프로젝트에 적용하기에는 한계가 존재합니다. Airflow 공식 Docker 이미지는 말 그대로 reference image로, Airflow가 Docker 컨테이너에서 구동하기 위한 최소한의 요소만 갖추어져 있습니다. 즉, 대부분의 경우에는 실제 프로젝트에서 사용하는 커스텀 패키지나 의존적인 부분이 설치되어 있지 않습니다. 따라서 사용자는 Airflow 이미지를 활용하여 커스텀 이미지를 빌드하는 것이 적절한 사용 예라고 Airflow에서 안내하고 있습니다. (Airflow 공식 문서 - Building Image)

2.1. 커스텀 Airflow Docker 이미지 만들기

커스텀 Docker 이미지를 만드는 것은 그리 어렵지 않습니다.

  1. 사용하고자 하는 패키지가 정의된 requirements.txt 파일 작성합니다.

    ./requirements.txt(예시)

    Cython==0.29.23
    numpy==1.19.2
    pymssql==2.1.5
    pandas==1.1.2
    pytz==2020.1
    
  2. Docker 파일 작성을 작성합니다.

    ./Dockerfile

    FROM apache/airflow:2.1.2    # 앞서 다룬 docker-compose.yml 파일에서 사용한 동일한 Airflow 이미지를 사용합니다. 
    USER root
       
    RUN apt-get update \
      && apt-get install -y --no-install-recommends \
             build-essential libopenmpi-dev \
      && apt-get autoremove -yqq --purge \
      && apt-get clean \
      && rm -rf /var/lib/apt/lists/*
       
    USER airflow
    COPY requirements.txt requirements.txt  # 현재 경로에 작성해둔 requirements.txt 파일으로 복사합니다.
    RUN pip3 install -r requirements.txt --no-cache-dir  # 패키지를 설치합니다. 캐쉬는 저장할 필요가 없음을 파라미터로 넘깁니다.
    
  3. 커스텀 Docker 이미지를 빌드합니다.

    • --tag 로 이미지의 이름과 버전을 전달합니다.

      • docker build . --tag "my_custom_image:0.0.1"
    • Docker 이미지가 빌드된 것을 확인할 수 있습니다.

2.2. 커스텀 Airflow Docker 이미지를 통해 컨테이너 띄우기

  1. 커스텀 Docker 이미지를 활용하여 Airflow를 실행할 수 있도록 docker-compose.yml를 수정합니다.

    ./docker-compose.yml

    version: '3'
    x-airflow-common:
      &airflow-common
      image: ${AIRFLOW_IMAGE_NAME:-my_custom_image:0.0.1}   # Airflow 이미지를 커스텀 Docker 이미지로 수정합니다.
      environment:
        &airflow-common-env
        AIRFLOW__CORE__EXECUTOR: LocalExecutor
        AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow
        AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION: 'true'
    ...
    
  2. 앞서 docker-compose.yml 파일을 실행하여 컨테이너를 띄웠던 과정을 다시 실행합니다.

    docker-compose up airflow-init
    docker-compose up
    

3. 마무리하며

Airflow 공식 문서를 참고하여, Docker를 활용하여 2.x 버전을 실행시키는 과정을 정리해봤습니다. 확실히 Docker를 쓰면 앞으로의 관리나 지식 전달 등의 과정도 무척 간소화될 것 같아 적극적으로 적용해야겠다는 생각이 드네요.

Reference