Apache Airflow 在 MLOPS 和 ETL 中的应用 - 介绍、优势与示例

用于 ETS/MLOPS 的 Python 框架,设计精良

目录

Apache Airflow 是一个开源平台,旨在通过 Python 代码以编程方式创建、安排和监控工作流,为传统手动或基于用户界面的工作流工具提供了一个灵活且强大的替代方案。

Apache Airflow 最初由 Airbnb 于 2014 年开发,用于管理日益复杂的工作流,并于 2019 年成为 Apache 软件基金会的顶级项目。

当您拥有多个需要按特定顺序执行的任务、具有依赖关系和错误处理机制的复杂工作流时,Airflow 最为有益。它特别适用于需要编排、监控和重试机制的组织,而不是简单的脚本或 cron 作业。

Chain oof cyber events

Apache Airflow 的关键功能和典型使用场景

  • 基于 Python 的工作流定义:工作流以 Python 代码形式定义,允许使用标准编程结构(如循环和条件逻辑)动态生成管道。
  • 有向无环图(DAGs):Airflow 使用 DAGs 来表示工作流,其中每个节点是一个任务,边定义依赖关系。这种结构确保任务按指定顺序运行,且不形成循环。
  • 强大的用户界面:Airflow 提供现代的网络界面,用于监控、调度和管理工作流,提供任务状态和日志的可见性。
  • 广泛的集成:它包含许多内置操作符和钩子,用于连接云服务(AWS、GCP、Azure)、数据库和其他工具,使其高度可扩展。
  • 可扩展性和灵活性:Airflow 可以在大规模上编排工作流,支持本地和云部署,适用于从 ETL 到机器学习的广泛使用场景。

典型使用场景

  • 编排 ETL(提取、转换、加载)管道
  • 安排和监控数据工作流
  • 自动化机器学习模型的训练和部署
  • 管理基础设施任务

托管 Airflow 服务

多个云提供商提供托管的 Airflow 服务,减少设置和维护的运营负担:

  • Amazon Managed Workflows for Apache Airflow (MWAA):一个完全托管的服务,处理扩展、安全性和可用性,使用户能够专注于工作流开发。
  • Google Cloud Composer:Google Cloud Platform 上的托管 Airflow。
  • Microsoft Fabric Data Factory:在 Azure 生态系统中提供 Apache Airflow 作业作为托管编排解决方案。

安装和入门

可以通过 Python 的包管理器(pip install apache-airflow)或简化设置和项目管理的工具如 Astro CLI 安装 Airflow。安装后,用户在 Python 脚本中定义 DAGs,并通过 Airflow UI 管理它们。

总结

功能 描述
工作流定义 Python 代码,基于 DAG
用户界面 网络界面,现代且强大
集成 云(AWS、GCP、Azure)、数据库、自定义插件
托管服务 AWS MWAA、Google Cloud Composer、Microsoft Fabric
使用场景 ETL、数据管道、机器学习工作流、基础设施任务

由于其灵活性、可扩展性和强大的生态系统,Airflow 在数据工程社区中被广泛采用,使其成为生产环境中工作流编排的首选工具。

Airflow 简化工作流自动化的主要方式

  • 工作流作为代码(Python)
    Airflow 工作流以 Python 脚本的形式定义,使用有向无环图(DAGs)表示任务及其依赖关系。这种“工作流作为代码”的方法使管道能够动态生成、参数化,并轻松进行版本控制,使其高度可维护且能适应不断变化的需求。

  • 动态且可扩展的管道
    通过充分利用 Python 的全部功能,用户可以直接在工作流定义中加入循环、条件和自定义逻辑。这使得动态任务创建和高级参数化成为可能,这在静态配置文件或基于 GUI 的工具中是困难或不可能实现的。

  • Pythonic 的任务管理
    Airflow 的 TaskFlow API(在 Airflow 2.0 中引入)使定义任务更加 Pythonic。开发人员编写普通的 Python 函数,进行装饰,Airflow 自动处理任务创建、依赖关系连接和任务之间的数据传递,从而产生更清晰、更易于维护的代码。

  • 自定义操作符和集成
    用户可以创建自定义的 Python 操作符、传感器和钩子,以与几乎任何外部系统、API 或数据库进行交互。这种可扩展性使与更广泛的 Python 生态系统和外部服务的无缝集成成为可能。

  • 原生 Python 生态系统集成
    由于工作流是用 Python 编写的,用户可以在任务中利用大量 Python 库(如 Pandas、NumPy 或机器学习框架),进一步增强自动化能力。

  • 可读性和可维护性
    Python 的可读性和流行性使 Airflow 对数据工程师和分析师等广泛用户群体都易于使用。基于代码的方法还支持标准的软件工程实践,如代码审查和版本控制。

Apache Airflow 的一些优势

功能 对工作流自动化的益处
基于 Python 的 DAGs 动态、灵活且可维护的工作流
TaskFlow API 更加清晰、Pythonic 的工作流定义
自定义操作符/传感器 与任何系统或 API 集成
原生 Python 集成 使用任何 Python 库或工具
强大的用户界面 可视化监控和管理工作流

Airflow 与 Python 的深度集成不仅简化了工作流自动化,还使团队能够高效地构建强大、可扩展且高度定制的数据管道。

示例:使用 Python 的简单 ETL 工作流

from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime

def extract_data():
    print("Extracting data")

def transform_data():
    print("Transforming data")

def load_data():
    print("Loading data")

default_args = {'start_date': datetime(2024, 12, 1), 'retries': 1}

dag = DAG('etl_pipeline', default_args=default_args, schedule_interval='@daily')

extract_task = PythonOperator(task_id='extract', python_callable=extract_data, dag=dag)
transform_task = PythonOperator(task_id='transform', python_callable=transform_data, dag=dag)
load_task = PythonOperator(task_id='load', python_callable=load_data, dag=dag)

extract_task >> transform_task >> load_task

此示例展示了如何将 ETL 管道的每个阶段定义为 Python 函数,并通过 Airflow 使用 Python 操作符和 DAGs 进行编排:
extract_task >> transform_task >> load_task

Apache Airflow DAG 的另一个示例

以下是一个使用 Python 在 Apache Airflow 中编写 DAG(有向无环图)的基本示例。此示例演示了如何使用 BashOperator 定义一个包含两个简单任务的工作流,该操作符运行 bash 命令:

from airflow import DAG
from airflow.operators.bash import BashOperator
from datetime import datetime, timedelta

default_args = {
    'owner': 'airflow',
    'retries': 1,
    'retry_delay': timedelta(minutes=5)
}

with DAG(
    dag_id='my_first_dag',
    default_args=default_args,
    description='My first Airflow DAG!',
    schedule_interval='@daily',
    start_date=datetime(2023, 1, 1),
    catchup=False,
) as dag:

    # 任务 1:打印当前日期
    print_date = BashOperator(
        task_id='print_date',
        bash_command='date'
    )

    # 任务 2:说“你好”
    say_hello = BashOperator(
        task_id='say_hello',
        bash_command='echo "Hello, Airflow!"'
    )

    # 定义任务依赖关系
    print_date >> say_hello

工作原理:

  • 使用上下文管理器(with DAG(...) as dag:)定义 DAG。
  • 创建了两个任务:print_datesay_hello
  • 依赖关系 print_date >> say_hello 确保 say_hello 仅在 print_date 完成后运行。
  • 将此代码保存为 my_first_dag.py 并放在您的 Airflow dags/ 目录中,Airflow 将自动检测并安排它。

随着工作流的增长,您可以通过添加 PythonOperator 任务、分支或更复杂的逻辑来扩展此模板。

在 Python 中创建 DAG 对象时的关键参数

在 Apache Airflow 中定义 DAG 时,应设置几个关键参数,以确保工作流的正确调度、识别和行为。

关键参数:

  • dag_id
    DAG 的唯一标识符。Airflow 环境中的每个 DAG 必须具有唯一的 dag_id

  • start_date
    DAG 可执行的起始日期和时间。这决定了 DAG 的调度何时开始。

  • schedule(或 schedule_interval
    定义 DAG 应运行的频率(例如 "@daily""0 12 * * *"None 表示手动运行)。

  • catchup
    一个布尔值,确定当 DAG 首次启用时,Airflow 是否应回填 start_date 和当前日期之间的未执行运行。默认值为 False

其他常见参数:

  • default_args
    一个字典,包含应用于 DAG 中所有任务的默认参数(例如 owneremailretriesretry_delay)。虽然不是必需的,但推荐使用以实现 DRY(不要重复自己)的代码。

  • params
    一个字典,用于运行时配置参数,允许在运行时向任务传递值,使 DAG 更加灵活。

示例:

from airflow import DAG
from datetime import datetime, timedelta

default_args = {
    'owner': 'jdoe',
    'email': ['jdoe@example.com'],
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

with DAG(
    dag_id='example_dag',
    start_date=datetime(2024, 1, 1),
    schedule='@daily',
    catchup=False,
    default_args=default_args,
    params={'example_param': 'value'}
) as dag:
    # 定义任务
    pass

DAG 参数表

参数 描述 必需
dag_id DAG 的唯一名称
start_date DAG 可开始运行的日期
schedule DAG 运行的频率/时间
catchup 是否回填未执行的运行
default_args 所有任务的默认参数
params 用于动态配置的运行时参数

设置这些参数确保您的 DAG 被唯一标识、正确调度,并在 Airflow 中按预期运行。

在 Airflow 中使用 params 参数包含自定义运行时参数

Apache Airflow 的 params 参数允许您将自定义运行时参数注入到 DAG 和任务中,从而实现动态且灵活的工作流配置。参数使您能够向任务提供运行时配置。您可以在 DAG 代码中配置默认参数,并提供额外的参数,或在触发 DAG 运行时覆盖参数值。

这种方法使您的 Airflow 工作流更加动态和可配置,支持各种自动化场景。

如何设置自定义运行时参数

  • 在 DAG 中定义 params
    创建 DAG 对象时,将 params 参数作为字典包含进去。每个键值对代表一个参数名称及其默认值。
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime

def print_param(**kwargs):
    # 通过 kwargs['params'] 访问参数
    my_param = kwargs['params']['my_param']
    print(f"我的自定义参数是: {my_param}")

with DAG(
    dag_id='custom_params_example',
    start_date=datetime(2024, 1, 1),
    schedule='@daily',
    catchup=False,
    params={'my_param': 'default_value'}  # 自定义运行时参数
) as dag:

    task = PythonOperator(
        task_id='print_param_task',
        python_callable=print_param,
        provide_context=True
    )
  • 在运行时覆盖
    手动触发 DAG 运行(通过 Airflow UI、CLI 或 API)时,可以为特定运行提供或覆盖 params 值。

在任务中访问参数

  • 在任务的 Python 可调用函数中,使用 kwargs['params'] 访问参数。
  • 在模板字段(如 BashOperator)中,使用 {{ params.my_param }} 在模板字符串中。

有用的链接