Apache Airflow para MLOPS y ETL - Descripción, Beneficios y Ejemplos

Buen framework para ETS/MLOPS con Python

Índice

Apache Airflow es una plataforma de código abierto diseñada para autorizar, programar y monitorear flujos de trabajo de forma programática, completamente en código Python, ofreciendo una alternativa flexible y poderosa a las herramientas tradicionales, manuales o basadas en interfaz de usuario para flujos de trabajo.

Apache Airflow fue originalmente desarrollado en Airbnb en 2014 para manejar flujos de trabajo cada vez más complejos y se convirtió en un proyecto de primer nivel de la Fundación Apache Software en 2019.

Airflow es más beneficioso cuando tienes flujos de trabajo complejos con múltiples tareas que necesitan ejecutarse en un orden específico, con dependencias y manejo de errores. Es especialmente útil para organizaciones que ejecutan muchos trabajos de datos que requieren orquestación, monitoreo y mecanismos de reintento, en lugar de scripts o trabajos cron simples.

Cadena de eventos cibernéticos

Características clave de Apache Airflow y casos de uso típicos

  • Definición de flujos de trabajo basada en Python: Los flujos de trabajo se definen como código Python, permitiendo la generación dinámica de pipelines utilizando construcciones de programación estándar como bucles y lógica condicional.
  • Gráficos Acíclicos Dirigidos (DAGs): Airflow utiliza DAGs para representar flujos de trabajo, donde cada nodo es una tarea y los bordes definen dependencias. Esta estructura garantiza que las tareas se ejecuten en un orden especificado sin ciclos.
  • Interfaz de usuario robusta: Airflow proporciona una interfaz web moderna para monitorear, programar y gestionar flujos de trabajo, ofreciendo visibilidad en el estado de las tareas y los registros.
  • Integraciones extensas: Incluye muchos operadores y ganchos preinstalados para conectar con servicios en la nube (AWS, GCP, Azure), bases de datos y otras herramientas, lo que lo hace altamente extensible.
  • Escalabilidad y flexibilidad: Airflow puede orquestar flujos de trabajo a gran escala, admitiendo tanto despliegues en premisas como en la nube, y es adecuado para una amplia gama de casos de uso, desde ETL hasta aprendizaje automático.

Casos de uso típicos

  • Orquestar pipelines ETL (Extracción, Transformación, Carga)
  • Programar y monitorear flujos de trabajo de datos
  • Automatizar el entrenamiento y despliegue de modelos de aprendizaje automático
  • Gestionar tareas de infraestructura

Servicios de Airflow gestionados

Varios proveedores en la nube ofrecen servicios gestionados de Airflow, reduciendo la carga operativa de configuración y mantenimiento:

  • Amazon Managed Workflows for Apache Airflow (MWAA): Un servicio totalmente gestionado que maneja la escalabilidad, la seguridad y la disponibilidad, permitiendo a los usuarios centrarse en el desarrollo de flujos de trabajo.
  • Google Cloud Composer: Airflow gestionado en Google Cloud Platform.
  • Microsoft Fabric Data Factory: Ofrece trabajos de Apache Airflow como una solución de orquestación gestionada dentro del ecosistema de Azure.

Instalación y primeros pasos

Airflow se puede instalar mediante el gestor de paquetes de Python (pip install apache-airflow) o herramientas gestionadas como Astro CLI, lo que simplifica la configuración y el manejo de proyectos. Después de la instalación, los usuarios definen DAGs en scripts de Python y los gestionan a través de la interfaz de usuario de Airflow.

Resumen

Característica Descripción
Definición de flujo de trabajo Código Python, basado en DAGs
Interfaz de usuario Web, moderna, robusta
Integraciones Nube (AWS, GCP, Azure), bases de datos, plugins personalizados
Servicios gestionados AWS MWAA, Google Cloud Composer, Microsoft Fabric
Casos de uso ETL, pipelines de datos, flujos de trabajo de ML, tareas de infraestructura

Airflow es ampliamente adoptado en la comunidad de ingeniería de datos por su flexibilidad, escalabilidad y ecosistema sólido, lo que lo convierte en una opción líder para la orquestación de flujos de trabajo en entornos de producción.

Principales formas en que Airflow simplifica la automatización de flujos de trabajo

  • Flujos de trabajo como código (Python)
    Los flujos de trabajo de Airflow se definen como scripts de Python, utilizando Gráficos Acíclicos Dirigidos (DAGs) para representar tareas y sus dependencias. Este enfoque de “flujos de trabajo como código” permite la generación dinámica, la parametrización y el control de versiones fácil de los pipelines, lo que los hace altamente mantenibles y adaptables a los cambios en los requisitos.

  • Pipelines dinámicos y extensibles
    Al aprovechar las capacidades completas de Python, los usuarios pueden incorporar bucles, condiciones y lógica personalizada directamente en sus definiciones de flujo de trabajo. Esto permite la creación dinámica de tareas y la parametrización avanzada que sería difícil o imposible en archivos de configuración estáticos o herramientas basadas en interfaz gráfica.

  • Gestión de tareas Pythonica
    La API TaskFlow de Airflow (introducida en Airflow 2.0) hace que la definición de tareas sea aún más Pythonica. Los desarrolladores escriben funciones de Python simples, las decoran y Airflow maneja automáticamente la creación de tareas, el cableado de dependencias y el paso de datos entre tareas, lo que resulta en código más limpio y mantenible.

  • Operadores y integraciones personalizados
    Los usuarios pueden crear operadores, sensores y ganchos personalizados en Python para interactuar con casi cualquier sistema externo, API o base de datos. Esta extensibilidad permite una integración sin problemas con el ecosistema más amplio de Python y servicios externos.

  • Integración nativa con el ecosistema de Python
    Dado que los flujos de trabajo se escriben en Python, los usuarios pueden aprovechar la amplia gama de bibliotecas de Python (como Pandas, NumPy o marcos de aprendizaje automático) dentro de sus tareas, lo que mejora aún más las capacidades de automatización.

  • Legible y mantenible
    La legibilidad de Python y su popularidad hacen que Airflow sea accesible para una amplia gama de usuarios, desde ingenieros de datos hasta analistas. El enfoque basado en código también admite prácticas estándar de ingeniería de software como revisiones de código y control de versiones.

Algunos beneficios de Apache Airflow

Característica Beneficio para la automatización de flujos de trabajo
DAGs basados en Python Flujos de trabajo dinámicos, flexibles y mantenibles
API TaskFlow Definiciones de flujos de trabajo más limpias y Pythonicas
Operadores/Sensores personalizados Integrarse con cualquier sistema o API
Integración nativa con Python Usar cualquier biblioteca o herramienta de Python
Interfaz de usuario robusta Monitorear y gestionar flujos de trabajo visualmente

La profunda integración de Airflow con Python no solo simplifica la automatización de flujos de trabajo, sino que también empodera a los equipos para construir pipelines de datos robustos, escalables y altamente personalizados de manera eficiente.

Ejemplo: flujo de trabajo ETL simple en Python

from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime

def extract_data():
    print("Extrayendo datos")

def transform_data():
    print("Transformando datos")

def load_data():
    print("Cargando datos")

default_args = {'start_date': datetime(2024, 12, 1), 'retries': 1}

dag = DAG('etl_pipeline', default_args=default_args, schedule_interval='@daily')

extract_task = PythonOperator(task_id='extract', python_callable=extract_data, dag=dag)
transform_task = PythonOperator(task_id='transform', python_callable=transform_data, dag=dag)
load_task = PythonOperator(task_id='load', python_callable=load_data, dag=dag)

extract_task >> transform_task >> load_task

Este ejemplo muestra cómo cada etapa de un pipeline ETL se define como una función de Python y se orquesta mediante Airflow usando operadores y DAGs de Python: extract_task >> transform_task >> load_task

Otro ejemplo de DAG de Apache Airflow

Aquí hay un ejemplo básico de cómo codificar un DAG (Gráfico Acíclico Dirigido) en Apache Airflow usando Python. Este ejemplo demuestra cómo definir un flujo de trabajo con dos tareas simples usando el BashOperator, que ejecuta comandos bash:

from airflow import DAG
from airflow.operators.bash import BashOperator
from datetime import datetime, timedelta

default_args = {
    'owner': 'airflow',
    'retries': 1,
    'retry_delay': timedelta(minutes=5)
}

with DAG(
    dag_id='my_first_dag',
    default_args=default_args,
    description='Mi primer DAG de Airflow!',
    schedule_interval='@daily',
    start_date=datetime(2023, 1, 1),
    catchup=False,
) as dag:

    # Tarea 1: Imprimir la fecha actual
    print_date = BashOperator(
        task_id='print_date',
        bash_command='date'
    )

    # Tarea 2: Decir hola
    say_hello = BashOperator(
        task_id='say_hello',
        bash_command='echo "Hola, Airflow!"'
    )

    # Definir dependencias de tareas
    print_date >> say_hello

Cómo funciona esto:

  • El DAG se define usando un gestor de contexto (with DAG(...) as dag:).
  • Se crean dos tareas: print_date y say_hello.
  • La dependencia print_date >> say_hello asegura que say_hello se ejecute solo después de que print_date complete su ejecución.
  • Guarda este código como my_first_dag.py en tu directorio dags/ de Airflow, y Airflow detectará y programará automáticamente este DAG.

Puedes expandir este modelo añadiendo tareas con PythonOperator, ramificación o lógica más compleja a medida que tus flujos de trabajo crezcan.

Parámetros clave al crear un objeto DAG en Python

Cuando defines un DAG en Apache Airflow, varios parámetros clave deben establecerse para garantizar una programación, identificación y comportamiento adecuados de tu flujo de trabajo.

Parámetros esenciales:

  • dag_id
    El identificador único para tu DAG. Cada DAG en tu entorno de Airflow debe tener un dag_id distinto.

  • start_date
    La fecha y hora en que el DAG se vuelve elegible para su ejecución. Esto determina cuándo comienza la programación del DAG.

  • schedule (o schedule_interval)
    Define con qué frecuencia debe ejecutarse el DAG (por ejemplo, "@daily", "0 12 * * *", o None para ejecuciones manuales).

  • catchup
    Un booleano que determina si Airflow debe rellenar las ejecuciones perdidas entre la start_date y la fecha actual cuando el DAG se habilite por primera vez. Por defecto es False.

Otros parámetros comunes:

  • default_args
    Un diccionario de argumentos predeterminados aplicados a todas las tareas dentro del DAG (por ejemplo, owner, email, retries, retry_delay). Es opcional pero recomendado para seguir el principio DRY (Don’t Repeat Yourself).

  • params
    Un diccionario para parámetros de configuración en tiempo de ejecución, permitiendo pasar valores a las tareas en tiempo de ejecución y hacer más flexibles tus DAGs.

Ejemplo:

from airflow import DAG
from datetime import datetime, timedelta

default_args = {
    'owner': 'jdoe',
    'email': ['jdoe@example.com'],
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

with DAG(
    dag_id='example_dag',
    start_date=datetime(2024, 1, 1),
    schedule='@daily',
    catchup=False,
    default_args=default_args,
    params={'example_param': 'value'}
) as dag:
    # Definir tareas aquí
    pass

Tabla de parámetros de DAG

Parámetro Descripción Obligatorio
dag_id Nombre único para el DAG
start_date Cuando el DAG se vuelve elegible para iniciar la ejecución
schedule Frecuencia/tiempo para las ejecuciones del DAG
catchup Si se debe rellenar las ejecuciones perdidas No
default_args Argumentos predeterminados para todas las tareas del DAG No
params Parámetros de configuración en tiempo de ejecución No

Establecer estos parámetros asegura que tu DAG se identifique de manera única, se programe correctamente y se comporte según lo esperado en Airflow.

Incluir parámetros de tiempo de ejecución personalizados usando el argumento params en Airflow

El argumento params de Apache Airflow permite inyectar parámetros de tiempo de ejecución personalizados en tus DAGs y tareas, lo que permite configuraciones de flujo de trabajo dinámicas y flexibles. Los parámetros permiten proporcionar configuraciones de tiempo de ejecución a las tareas. Puedes configurar parámetros predeterminados en tu código de DAG y suministrar parámetros adicionales, o sobrescribir valores de parámetros cuando se inicie una ejecución de DAG.

Este enfoque hace que tus flujos de trabajo de Airflow sean más dinámicos y configurables, apoyando una amplia variedad de escenarios de automatización.

Cómo establecer parámetros de tiempo de ejecución personalizados

  • Definir params en el DAG:
    Cuando creas un objeto DAG, incluye el argumento params como un diccionario. Cada par de clave-valor representa el nombre de un parámetro y su valor predeterminado.
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime

def print_param(**kwargs):
    # Acceder al parámetro mediante kwargs['params']
    my_param = kwargs['params']['my_param']
    print(f"Mi parámetro personalizado es: {my_param}")

with DAG(
    dag_id='custom_params_example',
    start_date=datetime(2024, 1, 1),
    schedule='@daily',
    catchup=False,
    params={'my_param': 'default_value'}  # Parámetro de tiempo de ejecución personalizado
) as dag:

    task = PythonOperator(
        task_id='print_param_task',
        python_callable=print_param,
        provide_context=True
    )
  • Sobrescribir en tiempo de ejecución:
    Cuando se inicia una ejecución de DAG manualmente (vía la interfaz de usuario de Airflow, CLI o API), puedes suministrar o sobrescribir valores de params para esa ejecución específica.

Acceder a parámetros en tareas

  • En tu función callable de Python de la tarea, accede a los parámetros usando kwargs['params'].
  • Para campos plantillados (como en BashOperator), usa {{ params.my_param }} en la cadena de plantilla.

Enlaces útiles