Apache Airflow w MLOPS i ETL — opis, korzyści i przykłady
Dobry framework do ETS/MLOPS z użyciem Pythona
Apache Airflow to otwarty platforma programowy, zaprojektowana do programistycznej tworzenia, harmonogramowania i monitorowania przepływów pracy – całkowicie w kodzie Pythona, oferując elastyczne i potężne alternatywy dla tradycyjnych, ręcznych lub opartych na interfejsie graficznym narzędzi do zarządzania przepływami pracy.
Apache Airflow został pierwotnie opracowany w Airbnb w 2014 roku w celu zarządzania coraz bardziej złożonymi przepływami pracy i został w 2019 roku projektem poziomu województwa Apache Software Foundation.
Airflow jest najbardziej korzystny, gdy masz złożone przepływy pracy z wieloma zadaniami, które muszą być wykonywane w określonej kolejności, z zależnościami i mechanizmami obsługi błędów. Jest szczególnie przydatny dla organizacji prowadzących wiele zadań związanych z danymi, które wymagają orchestracji, monitorowania i mechanizmów ponownego uruchamiania, zamiast prostych skryptów lub zadań cron.
Kluczowe funkcje i typowe przypadki użycia Apache Airflow
- Definicja przepływu pracy oparta na Pythonie: Przepływy pracy są definiowane jako kod Pythona, umożliwiając dynamiczne generowanie potoków przy użyciu standardowych konstrukcji programistycznych, takich jak pętle i logika warunkowa.
- Skierowane Grafy Acykliczne (DAGs): Airflow korzysta z DAGów do reprezentowania przepływów pracy, gdzie każdy węzeł to zadanie, a krawędzie definiują zależności. Ta struktura zapewnia, że zadania są wykonywane w określonej kolejności bez cykli.
- Robustny interfejs użytkownika: Airflow oferuje nowoczesny interfejs sieciowy do monitorowania, harmonogramowania i zarządzania przepływami pracy, zapewniając widoczność statusu zadań i logów.
- Rozszerzalne integracje: Obejmuje wiele wbudowanych operatorów i hooków do łączenia się z usługami chmurowymi (AWS, GCP, Azure), bazami danych i innymi narzędziami, co czyni go bardzo elastycznym.
- Skalowalność i elastyczność: Airflow może orchestrować przepływy pracy na dużą skalę, wspierając zarówno wdrożenia lokalne, jak i w chmurze, a także jest odpowiedni dla szerokiego zakresu przypadków użycia od ETL do uczenia maszynowego.
Typowe przypadki użycia
- Orchestracja potoków ETL (Extract, Transform, Load)
- Harmonogramowanie i monitorowanie przepływów danych
- Automatyzacja trenowania i wdrażania modeli uczenia maszynowego
- Zarządzanie zadaniami infrastruktury
Zarządzane usługi Airflow
Wiele dostawców chmurowych oferuje zarządzane usługi Airflow, zmniejszając obciążenie operacyjne związane z konfiguracją i utrzymaniem:
- Amazon Managed Workflows for Apache Airflow (MWAA): Pełnie zarządzana usługa, która obsługuje skalowanie, bezpieczeństwo i dostępność, pozwalając użytkownikom skupić się na tworzeniu przepływów pracy.
- Google Cloud Composer: Zarządzany Airflow na platformie Google Cloud.
- Microsoft Fabric Data Factory: Oferuje zadania Apache Airflow jako rozwiązanie zarządzanej orchestracji w ekosystemie Azure.
Instalacja i pierwsze kroki
Airflow można zainstalować za pomocą menedżera pakietów Pythona (pip install apache-airflow
) lub narzędzi zarządzających, takich jak Astro CLI, które upraszczają konfigurację i zarządzanie projektami. Po zainstalowaniu użytkownicy definiują DAGi w skryptach Pythona i zarządzają nimi przez interfejs użytkownika Airflow.
Podsumowanie
Funkcja | Opis |
---|---|
Definicja przepływu pracy | Kod Pythona, oparty na DAGach |
Interfejs użytkownika | Webowy, nowoczesny, solidny |
Integracje | Chmura (AWS, GCP, Azure), bazy danych, wewnętrzne wtyczki |
Zarządzane usługi | AWS MWAA, Google Cloud Composer, Microsoft Fabric |
Przypadki użycia | ETL, potoki danych, przepływy uczenia maszynowego, zadania infrastruktury |
Airflow jest szeroko stosowany w społeczności inżynierii danych ze względu na swoją elastyczność, skalowalność i silny ekosystem, czyniąc go najlepszym wyborem do orchestracji przepływów pracy w środowiskach produkcyjnych.
Kluczowe sposoby, w jakie Airflow upraszcza automatyzację przepływów pracy
-
Przepływy pracy jako kod (Python)
Przepływy pracy są definiowane jako skrypty Pythona, korzystając z Skierowanych Grafów Acyklicznych (DAGs) do reprezentowania zadań i ich zależności. Ten „przepływy pracy jako kod” podejście umożliwia dynamiczne generowanie, parametryzowanie i łatwe kontrolowanie wersji potoków, czyniąc je bardzo utrzyjmowalnymi i elastycznymi wobec zmieniających się wymagań. -
Dynamiczne i rozszerzalne potoki
Korzystając z pełnych możliwości Pythona, użytkownicy mogą włączać pętle, warunki i niestandardową logikę bezpośrednio do definicji przepływu pracy. To umożliwia dynamiczne tworzenie zadań i zaawansowane parametryzowanie, które byłyby trudne lub niemożliwe w statycznych plikach konfiguracyjnych lub narzędziach opartych na interfejsie graficznym. -
Zarządzanie zadaniami w stylu Pythona
API TaskFlow (wprowadzone w wersji Airflow 2.0) czyni definiowanie zadań jeszcze bardziej Pythonowym. Programiści piszą zwykłe funkcje Pythona, dekorują je, a Airflow automatycznie obsługuje tworzenie zadań, przewiązanie zależności i przekazywanie danych między zadaniami, co prowadzi do czystszej i bardziej utrzyjmowalnej kodowej struktury. -
Niestandardowe operatory i integracje
Użytkownicy mogą tworzyć niestandardowe operatory, czujniki i hooki do interakcji z niemal każdym zewnętrznym systemem, API lub bazą danych. Ta elastyczność umożliwia płynną integrację z większym ekosystemem Pythona i usługami zewnętrznych. -
Integracja z natywnym ekosystemem Pythona
Ponieważ przepływy pracy są pisane w Pythonie, użytkownicy mogą korzystać z ogromnej liczby bibliotek Pythona (takich jak Pandas, NumPy lub ramów uczenia maszynowego) w swoich zadaniach, co dodatkowo wzmocni możliwości automatyzacji. -
Czytelność i utrzyjmowalność
Czytelność Pythona i jego popularność czynią Airflow dostępny dla szerokiego zakresu użytkowników, od inżynierów danych po analityków. Podejście oparte na kodzie również wspiera standardowe praktyki inżynierii oprogramowania, takie jak recenzje kodu i kontrola wersji.
Niektóre korzyści z użycia Apache Airflow
Funkcja | Korzyść dla automatyzacji przepływów pracy |
---|---|
DAGi oparte na Pythonie | Dynamiczne, elastyczne i utrzyjmowalne przepływy pracy |
API TaskFlow | Czystsze, bardziej Pythonowe definicje przepływów pracy |
Niestandardowe operatory/Czujniki | Integracja z dowolnym systemem lub API |
Integracja z natywnym ekosystemem Pythona | Użycie dowolnej biblioteki lub narzędzia Pythona |
Solidny interfejs użytkownika | Monitorowanie i zarządzanie przepływami pracy wizualnie |
Głęboka integracja Airflow z Pythonem nie tylko upraszcza automatyzację przepływów pracy, ale również umożliwia zespołom budowanie solidnych, skalowalnych i bardzo dopasowanych potoków danych z dużą wydajnością.
Przykład: Prosty potok ETL w Pythonie
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime
def extract_data():
print("Wyodrębnianie danych")
def transform_data():
print("Przetwarzanie danych")
def load_data():
print("Ładowanie danych")
default_args = {'start_date': datetime(2024, 12, 1), 'retries': 1}
dag = DAG('etl_pipeline', default_args=default_args, schedule_interval='@daily')
extract_task = PythonOperator(task_id='extract', python_callable=extract_data, dag=dag)
transform_task = PythonOperator(task_id='transform', python_callable=transform_data, dag=dag)
load_task = PythonOperator(task_id='load', python_callable=load_data, dag=dag)
extract_task >> transform_task >> load_task
Ten przykład pokazuje, jak każdy etap potoku ETL jest definiowany jako funkcja Pythona i orchestrowany przez Airflow przy użyciu operatorów Pythona i DAGów:
extract_task >> transform_task >> load_task
Inny przykład DAGa Apache Airflow
Oto podstawowy przykład kodowania DAGa (Directed Acyclic Graph) w Apache Airflow przy użyciu Pythona. Ten przykład demonstruje, jak zdefiniować przepływ pracy z dwoma prostymi zadaniami przy użyciu BashOperator, który uruchamia polecenia bash:
from airflow import DAG
from airflow.operators.bash import BashOperator
from datetime import datetime, timedelta
default_args = {
'owner': 'airflow',
'retries': 1,
'retry_delay': timedelta(minutes=5)
}
with DAG(
dag_id='my_first_dag',
default_args=default_args,
description='Mój pierwszy DAG Airflow!',
schedule_interval='@daily',
start_date=datetime(2023, 1, 1),
catchup=False,
) as dag:
# Zadanie 1: Wypisz aktualną datę
print_date = BashOperator(
task_id='print_date',
bash_command='date'
)
# Zadanie 2: Powiedz "Cześć"
say_hello = BashOperator(
task_id='say_hello',
bash_command='echo "Cześć, Airflow!"'
)
# Zdefiniuj zależności zadań
print_date >> say_hello
Jak to działa:
- DAG jest definiowany przy użyciu menedżera kontekstu (
with DAG(...) as dag:
). - Utworzono dwa zadania:
print_date
isay_hello
. - Zależność
print_date >> say_hello
zapewnia, żesay_hello
uruchamia się tylko po zakończeniuprint_date
. - Zapisz ten kod jako
my_first_dag.py
w katalogudags/
Airflow, a Airflow automatycznie wykryje i zaplanuje go.
Możesz rozszerzyć ten szablon dodając zadania PythonOperator, rozgałęzienia lub bardziej złożoną logikę wraz z rozwojem swoich przepływów pracy.
Kluczowe parametry przy tworzeniu obiektu DAG w Pythonie
Podczas definiowania DAGa w Apache Airflow należy ustawić kilka kluczowych parametrów, aby zapewnić poprawne harmonogramowanie, identyfikację i zachowanie przepływu pracy.
Niezbędne parametry:
-
dag_id
Unikalny identyfikator dla DAGa. Każdy DAG w środowisku Airflow musi mieć unikalnydag_id
. -
start_date
Data i czas, w którym DAG staje się elegible do wykonania. Określa, kiedy rozpoczyna się harmonogramowanie DAGa. -
schedule (lub
schedule_interval
)
Definiuje, jak często powinien być uruchamiany DAG (np."@daily"
,"0 12 * * *"
, lubNone
dla ręcznych uruchomień). -
catchup
Wartość logiczna określająca, czy Airflow powinien uzupełniać brakujące uruchomienia międzystart_date
a aktualną datą, gdy DAG zostanie po raz pierwszy włączony. DomyślnieFalse
.
Inne powszechne parametry:
-
default_args
Słownik domyślnych argumentów stosowanych do wszystkich zadań w DAGu (np.owner
,email
,retries
,retry_delay
). Jest opcjonalny, ale zalecany w celu uniknięcia powtarzania się kodu (DRY). -
params
Słownik do konfiguracji parametrów uruchamiania, pozwalający przekazywać wartości do zadań w czasie uruchamiania i zwiększający elastyczność DAGów.
Przykład:
from airflow import DAG
from datetime import datetime, timedelta
default_args = {
'owner': 'jdoe',
'email': ['jdoe@example.com'],
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
with DAG(
dag_id='example_dag',
start_date=datetime(2024, 1, 1),
schedule='@daily',
catchup=False,
default_args=default_args,
params={'example_param': 'value'}
) as dag:
# Zdefiniuj zadania tutaj
pass
Tabela parametrów DAGa
Parametr | Opis | Wymagane |
---|---|---|
dag_id | Unikalna nazwa DAGa | Tak |
start_date | Kiedy DAG staje się elegible do uruchomienia | Tak |
schedule | Częstotliwość/timing uruchamiania DAGa | Tak |
catchup | Czy uzupełniać brakujące uruchomienia | Nie |
default_args | Domyślne argumenty dla wszystkich zadań w DAGu | Nie |
params | Parametry uruchamiania do dynamicznej konfiguracji | Nie |
Ustawienie tych parametrów zapewnia, że DAG jest unikalnie identyfikowany, poprawnie harmonogramowany i zachowuje się zgodnie z oczekiwaniami w Airflow.
Włączanie niestandardowych parametrów uruchamiania przy użyciu argumentu params
w Airflow
Argument params
w Apache Airflow umożliwia wstrzykiwanie niestandardowych parametrów uruchamiania do DAGów i zadań, umożliwiając dynamiczne i elastyczne konfiguracje przepływów pracy. Parametry umożliwiają dostarczanie konfiguracji uruchamiania do zadań. Można skonfigurować domyślne Parametry w kodzie DAGa i dostarczyć dodatkowe Parametry lub nadpisać wartości Parametrów podczas uruchamiania DAGa.
Ten podejście czyni przepływy pracy Airflow bardziej dynamicznymi i konfigurowalnymi, wspierając szeroki zakres scenariuszy automatyzacji.
Jak ustawić niestandardowe parametry uruchamiania
- Zdefiniuj
params
w DAGu:
Podczas tworzenia obiektu DAG, dołącz argumentparams
jako słownik. Każda para klucz-wartość reprezentuje nazwę parametru i jego wartość domyślną.
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime
def print_param(**kwargs):
# Dostęp do parametru przez kwargs['params']
my_param = kwargs['params']['my_param']
print(f"Mój niestandardowy parametr to: {my_param}")
with DAG(
dag_id='custom_params_example',
start_date=datetime(2024, 1, 1),
schedule='@daily',
catchup=False,
params={'my_param': 'default_value'} # Niestandardowy parametr uruchamiania
) as dag:
task = PythonOperator(
task_id='print_param_task',
python_callable=print_param,
provide_context=True
)
- Nadpisz w czasie uruchamiania:
Podczas ręcznego uruchamiania działania DAGa (przez interfejs użytkownika Airflow, CLI lub API), możesz dostarczyć lub nadpisać wartościparams
dla tej konkretnej sesji uruchamiania.
Dostęp do parametrów w zadaniach
- W Twojej funkcji Pythona zadania, uzyskaj dostęp do parametrów za pomocą
kwargs['params']
. - Dla pól szablonowanych (np. w BashOperator), użyj
{{ params.my_param }}
w ciągu szablonu.
Przydatne linki
- https://airflow.apache.org
- https://github.com/apache/airflow
- https://en.wikipedia.org/wiki/Apache_Airflow
- uv - Nowy pakiet, projekt i menedżer środowiska Pythona
- Python Cheatsheet
- Wydajność AWS lambda: JavaScript vs Python vs Golang
- AWS SAM + AWS SQS + Python PowerTools
- Cheatsheet venv
- Generowanie PDF w Pythonie - biblioteki i przykłady"