Apache Airflow para MLOPS y ETL - Descripción, Beneficios y Ejemplos
Buen framework para ETS/MLOPS con Python
Apache Airflow es una plataforma de código abierto diseñada para autorizar, programar y monitorear flujos de trabajo de forma programática, completamente en código Python, ofreciendo una alternativa flexible y poderosa a las herramientas tradicionales, manuales o basadas en interfaz de usuario para flujos de trabajo.
Apache Airflow fue originalmente desarrollado en Airbnb en 2014 para manejar flujos de trabajo cada vez más complejos y se convirtió en un proyecto de primer nivel de la Fundación Apache Software en 2019.
Airflow es más beneficioso cuando tienes flujos de trabajo complejos con múltiples tareas que necesitan ejecutarse en un orden específico, con dependencias y manejo de errores. Es especialmente útil para organizaciones que ejecutan muchos trabajos de datos que requieren orquestación, monitoreo y mecanismos de reintento, en lugar de scripts o trabajos cron simples.
Características clave de Apache Airflow y casos de uso típicos
- Definición de flujos de trabajo basada en Python: Los flujos de trabajo se definen como código Python, permitiendo la generación dinámica de pipelines utilizando construcciones de programación estándar como bucles y lógica condicional.
- Gráficos Acíclicos Dirigidos (DAGs): Airflow utiliza DAGs para representar flujos de trabajo, donde cada nodo es una tarea y los bordes definen dependencias. Esta estructura garantiza que las tareas se ejecuten en un orden especificado sin ciclos.
- Interfaz de usuario robusta: Airflow proporciona una interfaz web moderna para monitorear, programar y gestionar flujos de trabajo, ofreciendo visibilidad en el estado de las tareas y los registros.
- Integraciones extensas: Incluye muchos operadores y ganchos preinstalados para conectar con servicios en la nube (AWS, GCP, Azure), bases de datos y otras herramientas, lo que lo hace altamente extensible.
- Escalabilidad y flexibilidad: Airflow puede orquestar flujos de trabajo a gran escala, admitiendo tanto despliegues en premisas como en la nube, y es adecuado para una amplia gama de casos de uso, desde ETL hasta aprendizaje automático.
Casos de uso típicos
- Orquestar pipelines ETL (Extracción, Transformación, Carga)
- Programar y monitorear flujos de trabajo de datos
- Automatizar el entrenamiento y despliegue de modelos de aprendizaje automático
- Gestionar tareas de infraestructura
Servicios de Airflow gestionados
Varios proveedores en la nube ofrecen servicios gestionados de Airflow, reduciendo la carga operativa de configuración y mantenimiento:
- Amazon Managed Workflows for Apache Airflow (MWAA): Un servicio totalmente gestionado que maneja la escalabilidad, la seguridad y la disponibilidad, permitiendo a los usuarios centrarse en el desarrollo de flujos de trabajo.
- Google Cloud Composer: Airflow gestionado en Google Cloud Platform.
- Microsoft Fabric Data Factory: Ofrece trabajos de Apache Airflow como una solución de orquestación gestionada dentro del ecosistema de Azure.
Instalación y primeros pasos
Airflow se puede instalar mediante el gestor de paquetes de Python (pip install apache-airflow
) o herramientas gestionadas como Astro CLI, lo que simplifica la configuración y el manejo de proyectos. Después de la instalación, los usuarios definen DAGs en scripts de Python y los gestionan a través de la interfaz de usuario de Airflow.
Resumen
Característica | Descripción |
---|---|
Definición de flujo de trabajo | Código Python, basado en DAGs |
Interfaz de usuario | Web, moderna, robusta |
Integraciones | Nube (AWS, GCP, Azure), bases de datos, plugins personalizados |
Servicios gestionados | AWS MWAA, Google Cloud Composer, Microsoft Fabric |
Casos de uso | ETL, pipelines de datos, flujos de trabajo de ML, tareas de infraestructura |
Airflow es ampliamente adoptado en la comunidad de ingeniería de datos por su flexibilidad, escalabilidad y ecosistema sólido, lo que lo convierte en una opción líder para la orquestación de flujos de trabajo en entornos de producción.
Principales formas en que Airflow simplifica la automatización de flujos de trabajo
-
Flujos de trabajo como código (Python)
Los flujos de trabajo de Airflow se definen como scripts de Python, utilizando Gráficos Acíclicos Dirigidos (DAGs) para representar tareas y sus dependencias. Este enfoque de “flujos de trabajo como código” permite la generación dinámica, la parametrización y el control de versiones fácil de los pipelines, lo que los hace altamente mantenibles y adaptables a los cambios en los requisitos. -
Pipelines dinámicos y extensibles
Al aprovechar las capacidades completas de Python, los usuarios pueden incorporar bucles, condiciones y lógica personalizada directamente en sus definiciones de flujo de trabajo. Esto permite la creación dinámica de tareas y la parametrización avanzada que sería difícil o imposible en archivos de configuración estáticos o herramientas basadas en interfaz gráfica. -
Gestión de tareas Pythonica
La API TaskFlow de Airflow (introducida en Airflow 2.0) hace que la definición de tareas sea aún más Pythonica. Los desarrolladores escriben funciones de Python simples, las decoran y Airflow maneja automáticamente la creación de tareas, el cableado de dependencias y el paso de datos entre tareas, lo que resulta en código más limpio y mantenible. -
Operadores y integraciones personalizados
Los usuarios pueden crear operadores, sensores y ganchos personalizados en Python para interactuar con casi cualquier sistema externo, API o base de datos. Esta extensibilidad permite una integración sin problemas con el ecosistema más amplio de Python y servicios externos. -
Integración nativa con el ecosistema de Python
Dado que los flujos de trabajo se escriben en Python, los usuarios pueden aprovechar la amplia gama de bibliotecas de Python (como Pandas, NumPy o marcos de aprendizaje automático) dentro de sus tareas, lo que mejora aún más las capacidades de automatización. -
Legible y mantenible
La legibilidad de Python y su popularidad hacen que Airflow sea accesible para una amplia gama de usuarios, desde ingenieros de datos hasta analistas. El enfoque basado en código también admite prácticas estándar de ingeniería de software como revisiones de código y control de versiones.
Algunos beneficios de Apache Airflow
Característica | Beneficio para la automatización de flujos de trabajo |
---|---|
DAGs basados en Python | Flujos de trabajo dinámicos, flexibles y mantenibles |
API TaskFlow | Definiciones de flujos de trabajo más limpias y Pythonicas |
Operadores/Sensores personalizados | Integrarse con cualquier sistema o API |
Integración nativa con Python | Usar cualquier biblioteca o herramienta de Python |
Interfaz de usuario robusta | Monitorear y gestionar flujos de trabajo visualmente |
La profunda integración de Airflow con Python no solo simplifica la automatización de flujos de trabajo, sino que también empodera a los equipos para construir pipelines de datos robustos, escalables y altamente personalizados de manera eficiente.
Ejemplo: flujo de trabajo ETL simple en Python
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime
def extract_data():
print("Extrayendo datos")
def transform_data():
print("Transformando datos")
def load_data():
print("Cargando datos")
default_args = {'start_date': datetime(2024, 12, 1), 'retries': 1}
dag = DAG('etl_pipeline', default_args=default_args, schedule_interval='@daily')
extract_task = PythonOperator(task_id='extract', python_callable=extract_data, dag=dag)
transform_task = PythonOperator(task_id='transform', python_callable=transform_data, dag=dag)
load_task = PythonOperator(task_id='load', python_callable=load_data, dag=dag)
extract_task >> transform_task >> load_task
Este ejemplo muestra cómo cada etapa de un pipeline ETL se define como una función de Python y se orquesta mediante Airflow usando operadores y DAGs de Python:
extract_task >> transform_task >> load_task
Otro ejemplo de DAG de Apache Airflow
Aquí hay un ejemplo básico de cómo codificar un DAG (Gráfico Acíclico Dirigido) en Apache Airflow usando Python. Este ejemplo demuestra cómo definir un flujo de trabajo con dos tareas simples usando el BashOperator, que ejecuta comandos bash:
from airflow import DAG
from airflow.operators.bash import BashOperator
from datetime import datetime, timedelta
default_args = {
'owner': 'airflow',
'retries': 1,
'retry_delay': timedelta(minutes=5)
}
with DAG(
dag_id='my_first_dag',
default_args=default_args,
description='Mi primer DAG de Airflow!',
schedule_interval='@daily',
start_date=datetime(2023, 1, 1),
catchup=False,
) as dag:
# Tarea 1: Imprimir la fecha actual
print_date = BashOperator(
task_id='print_date',
bash_command='date'
)
# Tarea 2: Decir hola
say_hello = BashOperator(
task_id='say_hello',
bash_command='echo "Hola, Airflow!"'
)
# Definir dependencias de tareas
print_date >> say_hello
Cómo funciona esto:
- El DAG se define usando un gestor de contexto (
with DAG(...) as dag:
). - Se crean dos tareas:
print_date
ysay_hello
. - La dependencia
print_date >> say_hello
asegura quesay_hello
se ejecute solo después de queprint_date
complete su ejecución. - Guarda este código como
my_first_dag.py
en tu directoriodags/
de Airflow, y Airflow detectará y programará automáticamente este DAG.
Puedes expandir este modelo añadiendo tareas con PythonOperator, ramificación o lógica más compleja a medida que tus flujos de trabajo crezcan.
Parámetros clave al crear un objeto DAG en Python
Cuando defines un DAG en Apache Airflow, varios parámetros clave deben establecerse para garantizar una programación, identificación y comportamiento adecuados de tu flujo de trabajo.
Parámetros esenciales:
-
dag_id
El identificador único para tu DAG. Cada DAG en tu entorno de Airflow debe tener undag_id
distinto. -
start_date
La fecha y hora en que el DAG se vuelve elegible para su ejecución. Esto determina cuándo comienza la programación del DAG. -
schedule (o
schedule_interval
)
Define con qué frecuencia debe ejecutarse el DAG (por ejemplo,"@daily"
,"0 12 * * *"
, oNone
para ejecuciones manuales). -
catchup
Un booleano que determina si Airflow debe rellenar las ejecuciones perdidas entre lastart_date
y la fecha actual cuando el DAG se habilite por primera vez. Por defecto esFalse
.
Otros parámetros comunes:
-
default_args
Un diccionario de argumentos predeterminados aplicados a todas las tareas dentro del DAG (por ejemplo,owner
,email
,retries
,retry_delay
). Es opcional pero recomendado para seguir el principio DRY (Don’t Repeat Yourself). -
params
Un diccionario para parámetros de configuración en tiempo de ejecución, permitiendo pasar valores a las tareas en tiempo de ejecución y hacer más flexibles tus DAGs.
Ejemplo:
from airflow import DAG
from datetime import datetime, timedelta
default_args = {
'owner': 'jdoe',
'email': ['jdoe@example.com'],
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
with DAG(
dag_id='example_dag',
start_date=datetime(2024, 1, 1),
schedule='@daily',
catchup=False,
default_args=default_args,
params={'example_param': 'value'}
) as dag:
# Definir tareas aquí
pass
Tabla de parámetros de DAG
Parámetro | Descripción | Obligatorio |
---|---|---|
dag_id | Nombre único para el DAG | Sí |
start_date | Cuando el DAG se vuelve elegible para iniciar la ejecución | Sí |
schedule | Frecuencia/tiempo para las ejecuciones del DAG | Sí |
catchup | Si se debe rellenar las ejecuciones perdidas | No |
default_args | Argumentos predeterminados para todas las tareas del DAG | No |
params | Parámetros de configuración en tiempo de ejecución | No |
Establecer estos parámetros asegura que tu DAG se identifique de manera única, se programe correctamente y se comporte según lo esperado en Airflow.
Incluir parámetros de tiempo de ejecución personalizados usando el argumento params
en Airflow
El argumento params
de Apache Airflow permite inyectar parámetros de tiempo de ejecución personalizados en tus DAGs y tareas, lo que permite configuraciones de flujo de trabajo dinámicas y flexibles. Los parámetros permiten proporcionar configuraciones de tiempo de ejecución a las tareas. Puedes configurar parámetros predeterminados en tu código de DAG y suministrar parámetros adicionales, o sobrescribir valores de parámetros cuando se inicie una ejecución de DAG.
Este enfoque hace que tus flujos de trabajo de Airflow sean más dinámicos y configurables, apoyando una amplia variedad de escenarios de automatización.
Cómo establecer parámetros de tiempo de ejecución personalizados
- Definir
params
en el DAG:
Cuando creas un objeto DAG, incluye el argumentoparams
como un diccionario. Cada par de clave-valor representa el nombre de un parámetro y su valor predeterminado.
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime
def print_param(**kwargs):
# Acceder al parámetro mediante kwargs['params']
my_param = kwargs['params']['my_param']
print(f"Mi parámetro personalizado es: {my_param}")
with DAG(
dag_id='custom_params_example',
start_date=datetime(2024, 1, 1),
schedule='@daily',
catchup=False,
params={'my_param': 'default_value'} # Parámetro de tiempo de ejecución personalizado
) as dag:
task = PythonOperator(
task_id='print_param_task',
python_callable=print_param,
provide_context=True
)
- Sobrescribir en tiempo de ejecución:
Cuando se inicia una ejecución de DAG manualmente (vía la interfaz de usuario de Airflow, CLI o API), puedes suministrar o sobrescribir valores deparams
para esa ejecución específica.
Acceder a parámetros en tareas
- En tu función callable de Python de la tarea, accede a los parámetros usando
kwargs['params']
. - Para campos plantillados (como en BashOperator), usa
{{ params.my_param }}
en la cadena de plantilla.
Enlaces útiles
- https://airflow.apache.org
- https://github.com/apache/airflow
- https://en.wikipedia.org/wiki/Apache_Airflow
- uv - Nuevo paquete, proyecto y gestor de entorno de Python
- Hoja de trucos de Python
- Rendimiento de AWS Lambda: JavaScript vs Python vs Golang
- AWS SAM + AWS SQS + Python PowerTools
- Hoja de trucos de venv
- Generar PDF en Python - Bibliotecas y ejemplos"