Apache Airflow pour MLOPS et ETL - Description, avantages et exemples
Belle framework pour ETS/MLOPS avec Python
Apache Airflow est une plateforme open source conçue pour créer, planifier et surveiller des workflows de manière programmée, entièrement en code Python, offrant une alternative flexible et puissante aux outils traditionnels, manuels ou basés sur une interface graphique.
Apache Airflow a été initialement développé chez Airbnb en 2014 pour gérer des workflows de plus en plus complexes et est devenu un projet de niveau supérieur de la fondation Apache Software en 2019.
Airflow est le plus bénéfique lorsque vous avez des workflows complexes avec plusieurs tâches qui doivent être exécutées dans un ordre spécifique, avec des dépendances et un traitement des erreurs. Il est particulièrement utile pour les organisations qui exécutent de nombreuses tâches de données nécessitant une orchestration, un suivi et des mécanismes de réessai, plutôt que des scripts ou des tâches cron simples.
Fonctionnalités principales d’Apache Airflow et cas d’usage typiques
- Définition des workflows basée sur Python : Les workflows sont définis comme du code Python, permettant une génération dynamique de pipelines à l’aide de constructions de programmation standard telles que les boucles et la logique conditionnelle.
- Graphes acycliques dirigés (DAGs) : Airflow utilise des DAGs pour représenter les workflows, où chaque nœud est une tâche et les arêtes définissent les dépendances. Cette structure garantit que les tâches s’exécutent dans un ordre spécifié sans cycles.
- Interface utilisateur robuste : Airflow fournit une interface web moderne pour surveiller, planifier et gérer les workflows, offrant une visibilité sur l’état des tâches et les journaux.
- Intégrations étendues : Il inclut de nombreux opérateurs et hooks intégrés pour se connecter aux services cloud (AWS, GCP, Azure), aux bases de données et à d’autres outils, le rendant hautement extensible.
- Évolutivité et flexibilité : Airflow peut orchestrer des workflows à grande échelle, en soutenant à la fois les déploiements locaux et cloud, et convient à une large gamme de cas d’usage allant de l’ETL à l’apprentissage automatique.
Cas d’usage typiques
- Orchestration de pipelines ETL (Extraction, Transformation, Chargement)
- Planification et suivi de workflows de données
- Automatisation de l’entraînement et du déploiement de modèles d’apprentissage automatique
- Gestion des tâches d’infrastructure
Services d’Airflow gérés
Plusieurs fournisseurs de cloud proposent des services d’Airflow gérés, réduisant la charge opérationnelle liée à l’installation et à l’entretien :
- Amazon Managed Workflows for Apache Airflow (MWAA) : Un service entièrement géré qui gère l’évolutivité, la sécurité et la disponibilité, permettant aux utilisateurs de se concentrer sur le développement des workflows.
- Google Cloud Composer : Airflow géré sur Google Cloud Platform.
- Microsoft Fabric Data Factory : Propose des tâches d’Airflow comme solution d’orchestration gérée au sein de l’écosystème Azure.
Installation et mise en route
Airflow peut être installé via le gestionnaire de paquets Python (pip install apache-airflow
) ou des outils gérés comme Astro CLI, qui simplifient l’installation et la gestion des projets. Après l’installation, les utilisateurs définissent des DAGs dans des scripts Python et les gèrent via l’interface utilisateur d’Airflow.
Résumé
Fonctionnalité | Description |
---|---|
Définition des workflows | Code Python, basé sur DAGs |
Interface utilisateur | Web, moderne, robuste |
Intégrations | Cloud (AWS, GCP, Azure), bases de données, plugins personnalisés |
Services gérés | AWS MWAA, Google Cloud Composer, Microsoft Fabric |
Cas d’usage | ETL, pipelines de données, workflows d’apprentissage automatique, tâches d’infrastructure |
Airflow est largement adopté dans la communauté d’ingénierie des données pour sa flexibilité, son évolutivité et son écosystème solide, le rendant un choix majeur pour l’orchestration de workflows dans les environnements de production.
Principales façons dont Airflow simplifie l’automatisation des workflows
-
Workflows comme code (Python)
Les workflows d’Airflow sont définis comme des scripts Python, en utilisant des Graphes acycliques dirigés (DAGs) pour représenter les tâches et leurs dépendances. Cette approche “workflows comme code” permet une génération dynamique, une paramétrisation et un contrôle de version facile des pipelines, les rendant très maintenables et adaptables aux exigences changeantes. -
Pipelines dynamiques et extensibles
En exploitant pleinement les capacités de Python, les utilisateurs peuvent intégrer des boucles, des conditions et de la logique personnalisée directement dans leurs définitions de workflow. Cela permet une création dynamique de tâches et une paramétrisation avancée qui serait difficile ou impossible à réaliser avec des fichiers de configuration statiques ou des outils basés sur une interface graphique. -
Gestion des tâches Pythonique
L’API TaskFlow d’Airflow (introduite avec Airflow 2.0) rend la définition des tâches encore plus Pythonique. Les développeurs écrivent des fonctions Python simples, les décorent, et Airflow gère automatiquement la création de tâches, le câblage des dépendances et le passage de données entre les tâches, ce qui donne un code plus propre et plus maintenable. -
Opérateurs et intégrations personnalisés
Les utilisateurs peuvent créer des opérateurs, des capteurs et des hooks personnalisés en Python pour interagir avec presque tout système externe, API ou base de données. Cette extensibilité permet une intégration fluide avec l’écosystème Python plus large et les services externes. -
Intégration native avec l’écosystème Python
Puisque les workflows sont écrits en Python, les utilisateurs peuvent exploiter l’immense variété de bibliothèques Python (comme Pandas, NumPy ou des cadres d’apprentissage automatique) au sein de leurs tâches, renforçant ainsi les capacités d’automatisation. -
Lisible et maintenable
La lisibilité de Python et sa popularité rendent Airflow accessible à un large éventail d’utilisateurs, des ingénieurs en données aux analystes. L’approche basée sur le code soutient également les pratiques standard de l’ingénierie logicielle comme les revues de code et le contrôle de version.
Quelques avantages d’Apache Airflow
Fonctionnalité | Avantage pour l’automatisation des workflows |
---|---|
DAGs basés sur Python | Workflows dynamiques, flexibles et maintenables |
API TaskFlow | Définitions de workflows plus propres et plus Pythoniques |
Opérateurs/Senseurs personnalisés | Intégration avec tout système ou API |
Intégration native avec Python | Utilisation de toute bibliothèque ou outil Python |
Interface utilisateur robuste | Surveillance et gestion visuelle des workflows |
L’intégration approfondie d’Airflow avec Python simplifie non seulement l’automatisation des workflows, mais permet également aux équipes de construire efficacement des pipelines de données robustes, évolutifs et hautement personnalisés.
Exemple : workflow ETL simple en Python
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime
def extract_data():
print("Extraction des données")
def transform_data():
print("Transformation des données")
def load_data():
print("Chargement des données")
default_args = {'start_date': datetime(2024, 12, 1), 'retries': 1}
dag = DAG('etl_pipeline', default_args=default_args, schedule_interval='@daily')
extract_task = PythonOperator(task_id='extract', python_callable=extract_data, dag=dag)
transform_task = PythonOperator(task_id='transform', python_callable=transform_data, dag=dag)
load_task = PythonOperator(task_id='load', python_callable=load_data, dag=dag)
extract_task >> transform_task >> load_task
Cet exemple montre comment chaque étape d’un pipeline ETL est définie comme une fonction Python et orchestrée par Airflow à l’aide d’opérateurs et de DAGs :
extract_task >> transform_task >> load_task
Autre exemple d’un DAG d’Apache Airflow
Voici un exemple de base de codage d’un DAG (Directed Acyclic Graph) dans Apache Airflow à l’aide de Python. Cet exemple illustre comment définir un workflow avec deux tâches simples à l’aide de l’opérateur BashOperator, qui exécute des commandes bash :
from airflow import DAG
from airflow.operators.bash import BashOperator
from datetime import datetime, timedelta
default_args = {
'owner': 'airflow',
'retries': 1,
'retry_delay': timedelta(minutes=5)
}
with DAG(
dag_id='my_first_dag',
default_args=default_args,
description='Mon premier DAG Airflow !',
schedule_interval='@daily',
start_date=datetime(2023, 1, 1),
catchup=False,
) as dag:
# Tâche 1 : Imprimer la date actuelle
print_date = BashOperator(
task_id='print_date',
bash_command='date'
)
# Tâche 2 : Dire bonjour
say_hello = BashOperator(
task_id='say_hello',
bash_command='echo "Bonjour, Airflow !"'
)
# Définir les dépendances des tâches
print_date >> say_hello
Fonctionnement :
- Le DAG est défini à l’aide d’un gestionnaire de contexte (
with DAG(...) as dag:
). - Deux tâches sont créées :
print_date
etsay_hello
. - La dépendance
print_date >> say_hello
garantit quesay_hello
s’exécute uniquement après queprint_date
ait terminé. - Enregistrez ce code comme
my_first_dag.py
dans votre répertoiredags/
d’Airflow, et Airflow détectera et planifiera automatiquement ce DAG.
Vous pouvez étendre ce modèle en ajoutant des tâches PythonOperator, des branches ou de la logique plus complexe à mesure que vos workflows s’élargissent.
Paramètres clés lors de la création d’un objet DAG en Python
Lors de la définition d’un DAG dans Apache Airflow, plusieurs paramètres clés doivent être configurés pour garantir un planification, une identification et un comportement corrects de votre workflow.
Paramètres essentiels :
-
dag_id
L’identifiant unique de votre DAG. Chaque DAG dans votre environnement Airflow doit avoir undag_id
distinct. -
start_date
La date et l’heure à partir desquelles le DAG devient éligible à l’exécution. Cela détermine à partir de quand le planification commence pour le DAG. -
schedule (ou
schedule_interval
)
Définit aussi souvent le DAG doit s’exécuter (par exemple,"@daily"
,"0 12 * * *"
, ouNone
pour les exécutions manuelles). -
catchup
Un booléen qui détermine si Airflow doit rattraper les exécutions manquées entre lastart_date
et la date actuelle lorsque le DAG est activé pour la première fois. Par défaut, il estFalse
.
Autres paramètres courants :
-
default_args
Un dictionnaire de paramètres par défaut appliqués à toutes les tâches du DAG (par exemple,owner
,email
,retries
,retry_delay
). C’est optionnel mais recommandé pour le code DRY (Don’t Repeat Yourself). -
params
Un dictionnaire pour les paramètres de configuration en temps réel, permettant de passer des valeurs aux tâches en temps réel et rendant vos DAGs plus flexibles.
Exemple :
from airflow import DAG
from datetime import datetime, timedelta
default_args = {
'owner': 'jdoe',
'email': ['jdoe@example.com'],
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
with DAG(
dag_id='example_dag',
start_date=datetime(2024, 1, 1),
schedule='@daily',
catchup=False,
default_args=default_args,
params={'example_param': 'value'}
) as dag:
# Définir les tâches ici
pass
Tableau des paramètres de DAG
Paramètre | Description | Obligatoire |
---|---|---|
dag_id | Nom unique pour le DAG | Oui |
start_date | Quand le DAG est éligible pour commencer à s’exécuter | Oui |
schedule | Fréquence/temps d’exécution du DAG | Oui |
catchup | Si les exécutions manquées doivent être rattrapées | Non |
default_args | Arguments par défaut pour toutes les tâches du DAG | Non |
params | Paramètres de configuration en temps réel pour une flexibilité accrue | Non |
La configuration de ces paramètres garantit que votre DAG est correctement identifié, planifié et se comporte comme prévu dans Airflow.
Inclusion de paramètres de temps d’exécution personnalisés à l’aide de l’argument params
dans Airflow
L’argument params
d’Apache Airflow permet d’injecter des paramètres de temps d’exécution personnalisés dans vos DAGs et tâches, permettant des configurations de workflow dynamiques et flexibles. Les paramètres permettent de fournir une configuration de temps d’exécution aux tâches. Vous pouvez définir des paramètres par défaut dans votre code DAG et fournir des paramètres supplémentaires, ou remplacer les valeurs des paramètres lors de la déclenchement d’une exécution de DAG.
Cette approche rend vos workflows Airflow plus dynamiques et configurables, soutenant une large variété de scénarios d’automatisation.
Comment définir des paramètres de temps d’exécution personnalisés
- Définir
params
dans le DAG :
Lors de la création d’un objet DAG, incluez l’argumentparams
comme un dictionnaire. Chaque paire clé-valeur représente le nom d’un paramètre et sa valeur par défaut.
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime
def print_param(**kwargs):
# Accédez au paramètre via kwargs['params']
my_param = kwargs['params']['my_param']
print(f"Mon paramètre personnalisé est : {my_param}")
with DAG(
dag_id='custom_params_example',
start_date=datetime(2024, 1, 1),
schedule='@daily',
catchup=False,
params={'my_param': 'default_value'} # Paramètre de temps d'exécution personnalisé
) as dag:
task = PythonOperator(
task_id='print_param_task',
python_callable=print_param,
provide_context=True
)
- Remplacer en temps d’exécution :
Lorsque vous déclenchez manuellement une exécution de DAG (via l’interface utilisateur d’Airflow, le CLI ou l’API), vous pouvez fournir ou remplacer les valeursparams
pour cette exécution spécifique.
Accès aux paramètres dans les tâches
- Dans votre fonction callable Python de la tâche, accédez aux paramètres à l’aide de
kwargs['params']
. - Pour les champs modèles (comme dans BashOperator), utilisez
{{ params.my_param }}
dans la chaîne de modèle.
Liens utiles
- https://airflow.apache.org
- https://github.com/apache/airflow
- https://en.wikipedia.org/wiki/Apache_Airflow
- uv - Nouveau package, projet et gestionnaire d’environnement Python
- Feuille de rappel Python
- Performance d’AWS Lambda : JavaScript vs Python vs Golang
- AWS SAM + AWS SQS + Python PowerTools
- Feuille de rappel venv
- Génération de PDF en Python - Bibliothèques et exemples"