MLOPS 및 ETL을 위한 Apache Airflow - 설명, 장점 및 예시
파이썬을 사용한 ETS/MLOPS에 적합한 프레임워크
Apache Airflow은 프로그래밍적으로 워크플로우를 작성, 예약 및 모니터링할 수 있는 오픈소스 플랫폼으로, 완전히 파이썬 코드로 작성되어 전통적인, 수동적, 또는 UI 기반 워크플로우 도구보다 유연하고 강력한 대안을 제공합니다.
Apache Airflow는 2014년에 Airbnb에서 개발되어 점점 복잡해지는 워크플로우를 관리하기 위해 사용되었고, 2019년에는 Apache Software Foundation의 최상위 프로젝트가 되었습니다.
Airflow는 여러 작업이 특정 순서로 실행되어야 하며, 의존성과 오류 처리가 필요한 복잡한 워크플로우에 특히 유리합니다. 많은 데이터 작업을 수행하는 조직에서 오케스트레이션, 모니터링, 재시도 메커니즘을 필요로 하는 경우, 단순한 스크립트나 cron 작업보다 Airflow가 유용합니다.
Apache Airflow 주요 기능 및 일반적인 사용 사례
- 파이썬 기반 워크플로우 정의: 워크플로우는 파이썬 코드로 정의되며, 루프와 조건문과 같은 표준 프로그래밍 구조를 사용하여 동적인 파이프라인 생성이 가능합니다.
- 방향성 비순환 그래프 (DAGs): Airflow는 DAGs를 사용하여 워크플로우를 표현하며, 각 노드는 작업이고, 엣지는 의존성을 정의합니다. 이 구조는 순환 없이 지정된 순서로 작업이 실행되도록 보장합니다.
- 강력한 UI: Airflow는 워크플로우의 모니터링, 예약 및 관리를 위한 현대적인 웹 인터페이스를 제공하며, 작업 상태와 로그에 대한 가시성을 제공합니다.
- 광범위한 통합: 클라우드 서비스(AWS, GCP, Azure), 데이터베이스 및 기타 도구와의 통합을 위한 많은 내장 오퍼레이터와 훅을 포함하고 있어 매우 확장 가능합니다.
- 확장성과 유연성: Airflow는 온프레미스와 클라우드 배포 모두를 지원하며, ETL부터 머신러닝까지 다양한 사용 사례에 적합합니다.
일반적인 사용 사례
- ETL (Extract, Transform, Load) 파이프라인의 오케스트레이션
- 데이터 워크플로우의 예약 및 모니터링
- 머신러닝 모델의 자동화된 학습 및 배포
- 인프라 작업 관리
관리형 Airflow 서비스
여러 클라우드 제공업체는 Airflow의 설정 및 유지보수에 대한 운영 부담을 줄이기 위해 관리형 Airflow 서비스를 제공합니다:
- Amazon Managed Workflows for Apache Airflow (MWAA): 스케일링, 보안 및 가용성을 완전히 관리하여 사용자가 워크플로우 개발에 집중할 수 있도록 합니다.
- Google Cloud Composer: Google Cloud Platform에서 제공하는 관리형 Airflow입니다.
- Microsoft Fabric Data Factory: Azure 생태계 내에서 Apache Airflow 작업을 제공하는 관리형 오케스트레이션 솔루션입니다.
설치 및 시작
Airflow는 파이썬의 패키지 관리자(pip install apache-airflow
) 또는 Astro CLI와 같은 관리 도구를 통해 설치할 수 있으며, 이는 설정 및 프로젝트 관리를 간소화합니다. 설치 후 사용자는 파이썬 스크립트에서 DAG를 정의하고 Airflow UI를 통해 이를 관리합니다.
요약
기능 | 설명 |
---|---|
워크플로우 정의 | 파이썬 코드, DAG 기반 |
UI | 웹 기반, 현대적, 강력 |
통합 | 클라우드 (AWS, GCP, Azure), 데이터베이스, 커스텀 플러그인 |
관리 서비스 | AWS MWAA, Google Cloud Composer, Microsoft Fabric |
사용 사례 | ETL, 데이터 파이프라인, ML 워크플로우, 인프라 작업 |
Airflow는 유연성, 확장성, 강력한 생태계 덕분에 데이터 엔지니어링 커뮤니티에서 널리 사용되며, 프로덕션 환경에서 워크플로우 오케스트레이션의 주요 선택지가 되고 있습니다.
Airflow가 워크플로우 자동화를 간소화하는 주요 방법
-
코드로 작성된 워크플로우 (파이썬)
Airflow 워크플로우는 파이썬 스크립트로 정의되며, Directed Acyclic Graphs (DAGs)를 사용하여 작업과 그 의존성을 표현합니다. 이 “코드로 작성된 워크플로우” 접근법은 파이프라인의 동적 생성, 매개변수화 및 쉬운 버전 관리를 가능하게 하여, 요구사항 변경에 따라 매우 유지보수가 쉽고 유연하게 대응할 수 있습니다. -
동적이고 확장 가능한 파이프라인
파이썬의 전반적인 기능을 활용하여 사용자는 루프, 조건문, 커스텀 논리를 워크플로우 정의에 직접 포함할 수 있습니다. 이는 정적 구성 파일이나 GUI 기반 도구에서 어려거나 불가능한 동적 작업 생성 및 고급 매개변수화를 가능하게 합니다. -
파이썬식 작업 관리
Airflow 2.0에서 도입된 TaskFlow API는 작업 정의를 더욱 파이썬식으로 만들었습니다. 개발자는 일반 파이썬 함수를 작성하고 이를 장식하여, Airflow가 자동으로 작업 생성, 의존성 연결, 작업 간 데이터 전달을 처리하여, 더 깔끔하고 유지보수가 쉬운 코드를 생성할 수 있습니다. -
커스텀 오퍼레이터 및 통합
사용자는 거의 모든 외부 시스템, API 또는 데이터베이스와 상호작용하기 위해 커스텀 파이썬 오퍼레이터, 센서 및 훅을 생성할 수 있습니다. 이 확장성은 파이썬 생태계와 외부 서비스와의 원활한 통합을 가능하게 합니다. -
파이썬 생태계와의 원ative 통합
워크플로우가 파이썬으로 작성되기 때문에, 사용자는 Pandas, NumPy 또는 머신러닝 프레임워크와 같은 파이썬 라이브러리를 작업 내에서 활용할 수 있어, 자동화 기능을 더욱 강화할 수 있습니다. -
가독성과 유지보수성
파이썬의 가독성과 인기 덕분에 Airflow는 데이터 엔지니어부터 분석가에 이르는 다양한 사용자에게 접근 가능합니다. 코드 기반 접근법은 또한 코드 리뷰 및 버전 관리와 같은 표준 소프트웨어 엔지니어링 실천을 지원합니다.
Apache Airflow의 일부 주요 이점
기능 | 워크플로우 자동화에 대한 이점 |
---|---|
파이썬 기반 DAGs | 동적이고 유연하며 유지보수가 쉬운 워크플로우 |
TaskFlow API | 더 깔끔하고 파이썬식인 워크플로우 정의 |
커스텀 오퍼레이터/센서 | 모든 시스템 또는 API와 통합 가능 |
파이썬과의 원ative 통합 | 모든 파이썬 라이브러리 또는 도구 사용 가능 |
강력한 UI | 워크플로우를 시각적으로 모니터링 및 관리 가능 |
Airflow는 파이썬과의 깊은 통합을 통해 워크플로우 자동화를 단순화하는 동시에, 팀이 효율적으로 강력하고 확장성 있는 데이터 파이프라인을 구축할 수 있도록 지원합니다.
예시: 파이썬으로 작성된 간단한 ETL 워크플로우
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime
def extract_data():
print("데이터 추출 중")
def transform_data():
print("데이터 변환 중")
def load_data():
print("데이터 로드 중")
default_args = {'start_date': datetime(2024, 12, 1), 'retries': 1}
dag = DAG('etl_pipeline', default_args=default_args, schedule_interval='@daily')
extract_task = PythonOperator(task_id='extract', python_callable=extract_data, dag=dag)
transform_task = PythonOperator(task_id='transform', python_callable=transform_data, dag=dag)
load_task = PythonOperator(task_id='load', python_callable=load_data, dag=dag)
extract_task >> transform_task >> load_task
이 예시는 ETL 파이프라인의 각 단계가 파이썬 함수로 정의되고, Airflow가 Python 오퍼레이터와 DAG를 사용하여 워크플로우를 오케스트레이션하는 방법을 보여줍니다:
extract_task >> transform_task >> load_task
Apache Airflow DAG의 또 다른 예시
Apache Airflow에서 파이썬을 사용하여 DAG (Directed Acyclic Graph)를 코딩하는 기본 예시입니다. 이 예시는 BashOperator를 사용하여 두 개의 간단한 작업으로 구성된 워크플로우를 어떻게 정의하는지 보여줍니다. BashOperator는 bash 명령을 실행합니다:
from airflow import DAG
from airflow.operators.bash import BashOperator
from datetime import datetime, timedelta
default_args = {
'owner': 'airflow',
'retries': 1,
'retry_delay': timedelta(minutes=5)
}
with DAG(
dag_id='my_first_dag',
default_args=default_args,
description='My first Airflow DAG!',
schedule_interval='@daily',
start_date=datetime(2023, 1, 1),
catchup=False,
) as dag:
# 작업 1: 현재 날짜 출력
print_date = BashOperator(
task_id='print_date',
bash_command='date'
)
# 작업 2: 안녕 인사
say_hello = BashOperator(
task_id='say_hello',
bash_command='echo "Hello, Airflow!"'
)
# 작업 의존성 정의
print_date >> say_hello
이것은 어떻게 작동하는가:
- DAG은 컨텍스트 관리자(
with DAG(...) as dag:
)를 사용하여 정의됩니다. - 두 작업이 생성됩니다:
print_date
와say_hello
. - 의존성
print_date >> say_hello
는say_hello
가print_date
가 완료되기 전에는 실행되지 않도록 보장합니다. - 이 코드를
my_first_dag.py
로 저장하고 Airflow의dags/
디렉토리에 넣으면, Airflow가 자동으로 감지하고 예약합니다.
이 템플릿을 확장하여 워크플로우가 복잡해질수록 PythonOperator 작업, 분기, 또는 더 복잡한 논리를 추가할 수 있습니다.
파이썬에서 DAG 객체를 생성할 때의 주요 파라미터
Apache Airflow에서 DAG을 정의할 때, 워크플로우의 올바른 예약, 식별 및 동작을 보장하기 위해 몇 가지 주요 파라미터를 설정해야 합니다.
필수 파라미터:
-
dag_id
DAG의 고유 식별자입니다. Airflow 환경의 모든 DAG은 고유한dag_id
를 가져야 합니다. -
start_date
DAG이 실행 가능한 날짜와 시간입니다. 이는 DAG의 예약 시작 시점을 결정합니다. -
schedule (또는
schedule_interval
)
DAG이 얼마나 자주 실행되어야 하는지 정의합니다 (예:"@daily"
,"0 12 * * *"
, 또는None
으로 수동 실행). -
catchup
DAG이 처음 활성화될 때start_date
와 현재 날짜 사이에 누락된 실행을 보충할지 여부를 결정하는 불리언입니다. 기본값은False
입니다.
기타 일반적인 파라미터:
-
default_args
DAG 내 모든 작업에 적용되는 기본 인수 딕셔너리입니다 (예:owner
,email
,retries
,retry_delay
). 이는 선택적이지만 DRY (Don’t Repeat Yourself) 코드를 위해 권장됩니다. -
params
런타임 구성 매개변수를 위한 딕셔너리로, 작업에 런타임에 값을 전달하여 DAG을 더 유연하게 만들 수 있습니다.
예시:
from airflow import DAG
from datetime import datetime, timedelta
default_args = {
'owner': 'jdoe',
'email': ['jdoe@example.com'],
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
with DAG(
dag_id='example_dag',
start_date=datetime(2024, 1, 1),
schedule='@daily',
catchup=False,
default_args=default_args,
params={'example_param': 'value'}
) as dag:
# 작업 정의
pass
DAG 파라미터 표
파라미터 | 설명 | 필수 |
---|---|---|
dag_id | DAG의 고유 이름 | 예 |
start_date | DAG이 실행 가능한 시작 시간 | 예 |
schedule | DAG 실행 주기 | 예 |
catchup | 누락된 실행 보충 여부 | 아니오 |
default_args | DAG 내 모든 작업에 적용되는 기본 인수 | 아니오 |
params | 런타임에 동적 구성이 필요한 매개변수 | 아니오 |
이러한 파라미터를 설정함으로써 DAG이 고유하게 식별되고, 올바르게 예약되고, Airflow에서 의도한 대로 작동하도록 보장할 수 있습니다.
Airflow에서 params
인수를 사용하여 커스텀 런타임 매개변수 포함
Apache Airflow의 params
인수는 DAG 및 작업에 커스텀 런타임 매개변수를 주입할 수 있도록 해주어, 동적이고 유연한 워크플로우 구성이 가능합니다. Params는 작업에 런타임 구성 정보를 제공할 수 있습니다. DAG 코드에서 기본 Params를 정의하고 추가 Params를 제공하거나, DAG 실행 시 Params 값의 덮어쓰기를 수행할 수 있습니다.
이 접근법은 Airflow 워크플로우를 더욱 동적이고 구성 가능하게 만들어, 다양한 자동화 시나리오를 지원합니다.
커스텀 런타임 매개변수 설정 방법
- DAG 내에서
params
정의:
DAG 객체를 생성할 때,params
인수를 딕셔너리로 포함합니다. 각 키-값 쌍은 매개변수 이름과 기본 값을 나타냅니다.
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime
def print_param(**kwargs):
# kwargs['params']를 통해 매개변수에 접근
my_param = kwargs['params']['my_param']
print(f"내 커스텀 매개변수는: {my_param}")
with DAG(
dag_id='custom_params_example',
start_date=datetime(2024, 1, 1),
schedule='@daily',
catchup=False,
params={'my_param': 'default_value'} # 커스텀 런타임 매개변수
) as dag:
task = PythonOperator(
task_id='print_param_task',
python_callable=print_param,
provide_context=True
)
- 런타임에서 덮어쓰기:
DAG 실행을 수동으로 트리거할 때 (Airflow UI, CLI 또는 API를 통해), 특정 실행에 대한params
값을 제공하거나 덮어쓸 수 있습니다.
작업 내에서 매개변수 접근
- 작업의 파이썬 콜러블에서
kwargs['params']
를 사용하여 매개변수에 접근합니다. - 템플릿 필드 (예: BashOperator)에서는 템플릿 문자열 내에서
{{ params.my_param }}
을 사용합니다.