Apache Airflow для MLOps и ETL - Описание, Преимущества и Примеры
Отличный фреймворк для ETS/MLOPS на Python
Apache Airflow — это открытая платформа, предназначенная для программного создания, планирования и мониторинга рабочих процессов — полностью на языке Python, предлагающая гибкую и мощную альтернативу традиционным, ручным или основанным на интерфейсе инструментам для работы с рабочими процессами.
Apache Airflow был разработан в Airbnb в 2014 году для управления все более сложными рабочими процессами и стал проектом верхнего уровня Apache Software Foundation в 2019 году.
Airflow наиболее полезен, когда у вас есть сложные рабочие процессы с множеством задач, которые необходимо выполнять в определенном порядке, с зависимостями и обработкой ошибок. Он особенно полезен для организаций, выполняющих множество задач с данными, требующих оркестрации, мониторинга и механизмов повторного запуска, а не простых скриптов или cron-задач.
Ключевые особенности Apache Airflow и типичные сценарии использования
- Определение рабочих процессов на Python: Рабочие процессы определяются как код на Python, что позволяет динамически генерировать конвейеры с использованием стандартных программных конструкций, таких как циклы и условная логика.
- Направленные ациклические графы (DAGs): Airflow использует DAGs для представления рабочих процессов, где каждый узел — это задача, а ребра определяют зависимости. Эта структура обеспечивает выполнение задач в указанном порядке без циклов.
- Удобный интерфейс: Airflow предоставляет современный веб-интерфейс для мониторинга, планирования и управления рабочими процессами, предлагая видимость статуса задач и журналов.
- Обширные интеграции: Он включает множество встроенных операторов и хуков для подключения к облачным сервисам (AWS, GCP, Azure), базам данных и другим инструментам, делая его высоко масштабируемым.
- Масштабируемость и гибкость: Airflow может оркестрировать рабочие процессы в большом масштабе, поддерживая как локальные, так и облачные развертывания, и подходит для широкого спектра сценариев использования, от ETL до машинного обучения.
Типичные сценарии использования
- Оркестрация ETL (Extract, Transform, Load) конвейеров
- Планирование и мониторинг рабочих процессов с данными
- Автоматизация обучения и развертывания моделей машинного обучения
- Управление инфраструктурными задачами
Управляемые сервисы Airflow
Несколько облачных провайдеров предлагают управляемые сервисы Airflow, снижая операционную нагрузку на настройку и обслуживание:
- Amazon Managed Workflows for Apache Airflow (MWAA): Полностью управляемый сервис, который обрабатывает масштабируемость, безопасность и доступность, позволяя пользователям сосредоточиться на разработке рабочих процессов.
- Google Cloud Composer: Управляемый Airflow на Google Cloud Platform.
- Microsoft Fabric Data Factory: Предлагает задачи Apache Airflow в качестве управляемого решения для оркестрации в экосистеме Azure.
Установка и начало работы
Airflow можно установить через менеджер пакетов Python (pip install apache-airflow
) или управляемые инструменты, такие как Astro CLI, которые упрощают настройку и управление проектами. После установки пользователи определяют DAGs в скриптах Python и управляют ими через интерфейс Airflow.
Итог
Особенность | Описание |
---|---|
Определение рабочих процессов | Код на Python, на основе DAGs |
Интерфейс | Веб-интерфейс, современный, удобный |
Интеграции | Облачные сервисы (AWS, GCP, Azure), базы данных, пользовательские плагины |
Управляемые сервисы | AWS MWAA, Google Cloud Composer, Microsoft Fabric |
Сценарии использования | ETL, конвейеры данных, рабочие процессы ML, инфраструктурные задачи |
Airflow широко используется в сообществе инженерии данных благодаря своей гибкости, масштабируемости и мощному экосистеме, делая его ведущим выбором для оркестрации рабочих процессов в производственных средах.
Ключевые способы, которыми Airflow упрощает автоматизацию рабочих процессов
-
Рабочие процессы как код (Python) Рабочие процессы Airflow определяются как скрипты Python, использующие направленные ациклические графы (DAGs) для представления задач и их зависимостей. Этот подход «рабочие процессы как код» позволяет динамически генерировать, параметризовать и легко управлять версиями конвейеров, делая их высоко поддерживаемыми и адаптируемыми к изменяющимся требованиям.
-
Динамические и расширяемые конвейеры Используя полный потенциал Python, пользователи могут включать циклы, условные операторы и пользовательскую логику непосредственно в определения рабочих процессов. Это позволяет динамически создавать задачи и продвинутую параметризацию, что было бы сложно или невозможно в статических файлах конфигурации или инструментах на основе графического интерфейса.
-
Управление задачами на Python TaskFlow API Airflow (введен в Airflow 2.0) делает определение задач еще более «питоновским». Разработчики пишут обычные функции Python, декорируют их, и Airflow автоматически обрабатывает создание задач, подключение зависимостей и передачу данных между задачами, что приводит к более чистому и поддерживаемому коду.
-
Пользовательские операторы и интеграции Пользователи могут создавать пользовательские операторы, сенсоры и хуки на Python для взаимодействия практически с любой внешней системой, API или базой данных. Эта расширяемость обеспечивает беспрепятственную интеграцию с более широкой экосистемой Python и внешними сервисами.
-
Естественная интеграция с экосистемой Python Поскольку рабочие процессы написаны на Python, пользователи могут использовать огромное количество библиотек Python (таких как Pandas, NumPy или фреймворки машинного обучения) в своих задачах, еще больше повышая возможности автоматизации.
-
Читаемость и поддерживаемость Читаемость и популярность Python делают Airflow доступным для широкого круга пользователей, от инженеров данных до аналитиков. Подход на основе кода также поддерживает стандартные практики разработки программного обеспечения, такие как проверка кода и управление версиями.
Некоторые преимущества Apache Airflow
Особенность | Преимущество для автоматизации рабочих процессов |
---|---|
DAGs на Python | Динамичные, гибкие и поддерживаемые рабочие процессы |
TaskFlow API | Чище и более «питоновские» определения рабочих процессов |
Пользовательские операторы/сенсоры | Интеграция с любой системой или API |
Естественная интеграция с Python | Использование любой библиотеки или инструмента Python |
Удобный интерфейс | Мониторинг и управление рабочими процессами визуально |
Глубокая интеграция Airflow с Python не только упрощает автоматизацию рабочих процессов, но и позволяет командам эффективно создавать надежные, масштабируемые и высоконастраиваемые конвейеры данных.
Пример: Простой ETL рабочий процесс на Python
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime
def extract_data():
print("Извлечение данных")
def transform_data():
print("Трансформация данных")
def load_data():
print("Загрузка данных")
default_args = {'start_date': datetime(2024, 12, 1), 'retries': 1}
dag = DAG('etl_pipeline', default_args=default_args, schedule_interval='@daily')
extract_task = PythonOperator(task_id='extract', python_callable=extract_data, dag=dag)
transform_task = PythonOperator(task_id='transform', python_callable=transform_data, dag=dag)
load_task = PythonOperator(task_id='load', python_callable=load_data, dag=dag)
extract_task >> transform_task >> load_task
Этот пример показывает, как каждый этап конвейера ETL определяется как функция Python и оркестрируется Airflow с использованием Python операторов и DAGs:
extract_task >> transform_task >> load_task
Еще один пример DAG в Apache Airflow
Вот базовый пример кодирования DAG (Directed Acyclic Graph) в Apache Airflow с использованием Python. Этот пример демонстрирует, как определить рабочий процесс с двумя простыми задачами с использованием BashOperator, который выполняет команды bash:
from airflow import DAG
from airflow.operators.bash import BashOperator
from datetime import datetime, timedelta
default_args = {
'owner': 'airflow',
'retries': 1,
'retry_delay': timedelta(minutes=5)
}
with DAG(
dag_id='my_first_dag',
default_args=default_args,
description='Мой первый DAG в Airflow!',
schedule_interval='@daily',
start_date=datetime(2023, 1, 1),
catchup=False,
) as dag:
# Задача 1: Печать текущей даты
print_date = BashOperator(
task_id='print_date',
bash_command='date'
)
# Задача 2: Приветствие
say_hello = BashOperator(
task_id='say_hello',
bash_command='echo "Привет, Airflow!"'
)
# Определение зависимостей задач
print_date >> say_hello
Как это работает:
- DAG определяется с использованием менеджера контекста (
with DAG(...) as dag:
). - Создаются две задачи:
print_date
иsay_hello
. - Зависимость
print_date >> say_hello
обеспечивает выполнениеsay_hello
только после завершенияprint_date
. - Сохраните этот код как
my_first_dag.py
в каталогеdags/
вашего Airflow, и Airflow автоматически обнаружит и запланирует его.
Вы можете расширять этот шаблон, добавляя задачи PythonOperator, ветвление или более сложную логику по мере роста ваших рабочих процессов.
Ключевые параметры при создании объекта DAG в Python
При определении DAG в Apache Airflow необходимо установить несколько ключевых параметров, чтобы обеспечить правильное планирование, идентификацию и поведение вашего рабочего процесса.
Основные параметры:
-
dag_id Уникальный идентификатор для вашего DAG. Каждый DAG в вашей среде Airflow должен иметь уникальный
dag_id
. -
start_date Дата и время, когда DAG становится доступным для выполнения. Это определяет, когда начинается планирование для DAG.
-
schedule (или
schedule_interval
) Определяет, как часто должен запускаться DAG (например,"@daily"
,"0 12 * * *"
, илиNone
для ручных запусков). -
catchup Булево значение, определяющее, должен ли Airflow выполнять пропущенные запуски между
start_date
и текущей датой при первом включении DAG. По умолчаниюFalse
.
Другие распространенные параметры:
-
default_args Словарь аргументов по умолчанию, применяемых ко всем задачам в DAG (например,
owner
,email
,retries
,retry_delay
). Это необязательно, но рекомендуется для DRY (Don’t Repeat Yourself) кода. -
params Словарь для параметров конфигурации времени выполнения, позволяющий передавать значения задачам во время выполнения и делающий ваши DAG более гибкими.
Пример:
from airflow import DAG
from datetime import datetime, timedelta
default_args = {
'owner': 'jdoe',
'email': ['jdoe@example.com'],
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
with DAG(
dag_id='example_dag',
start_date=datetime(2024, 1, 1),
schedule='@daily',
catchup=False,
default_args=default_args,
params={'example_param': 'value'}
) as dag:
# Определите задачи здесь
pass
Таблица параметров DAG
Параметр | Описание | Обязательно |
---|---|---|
dag_id | Уникальное имя для DAG | Да |
start_date | Когда DAG становится доступным для запуска | Да |
schedule | Частота/время запусков DAG | Да |
catchup | Нужно ли выполнять пропущенные запуски | Нет |
default_args | Аргументы по умолчанию для всех задач в DAG | Нет |
params | Параметры времени выполнения для динамической конфигурации | Нет |
Установка этих параметров гарантирует, что ваш DAG имеет уникальное имя, правильно запланирован и ведет себя так, как задумано в Airflow.
Включение пользовательских параметров времени выполнения с помощью аргумента params
в Airflow
Аргумент params
в Apache Airflow позволяет встраивать пользовательские параметры времени выполнения в ваши DAG и задачи, обеспечивая динамическую и гибкую конфигурацию рабочих процессов. Params позволяют предоставлять конфигурацию времени выполнения задачам. Вы можете настроить параметры Params по умолчанию в коде DAG и передавать дополнительные Params или переопределять значения Param при запуске DAG.
Этот подход делает ваши рабочие процессы Airflow более динамичными и настраиваемыми, поддерживая широкий спектр сценариев автоматизации.
Как установить пользовательские параметры времени выполнения
- Определите
params
в DAG: При создании объекта DAG включите аргументparams
в виде словаря. Каждая пара ключ-значение представляет имя параметра и его значение по умолчанию.
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime
def print_param(**kwargs):
# Доступ к параметру через kwargs['params']
my_param = kwargs['params']['my_param']
print(f"Мой пользовательский параметр: {my_param}")
with DAG(
dag_id='custom_params_example',
start_date=datetime(2024, 1, 1),
schedule='@daily',
catchup=False,
params={'my_param': 'default_value'} # Пользовательский параметр времени выполнения
) as dag:
task = PythonOperator(
task_id='print_param_task',
python_callable=print_param,
provide_context=True
)
- Переопределение во время выполнения:
При ручном запуске DAG (через интерфейс Airflow, CLI или API) вы можете передавать или переопределять значения
params
для этого конкретного запуска.
Доступ к параметрам в задачах
- В Python-функции вашей задачи получите доступ к params с помощью
kwargs['params']
. - Для шаблонных полей (например, в BashOperator) используйте
{{ params.my_param }}
в строке шаблона.
Полезные ссылки
- https://airflow.apache.org
- https://github.com/apache/airflow
- https://en.wikipedia.org/wiki/Apache_Airflow
- uv - Новый менеджер пакетов, проектов и сред Python
- Шпаргалка по Python
- Производительность AWS lambda: JavaScript vs Python vs Golang
- AWS SAM + AWS SQS + Python PowerTools
- Шпаргалка по venv
- Генерация PDF в Python - библиотеки и примеры