Apache Airflow untuk MLOPS dan ETL - Deskripsi, Manfaat, dan Contoh

Framework yang bagus untuk ETS/MLOPS dengan Python

Konten Halaman

Apache Airflow adalah platform open-source yang dirancang untuk membuat, menjadwalkan, dan memantau alur kerja secara programatis, seluruhnya dalam kode Python, menawarkan alternatif fleksibel dan kuat terhadap alat alur kerja tradisional, manual, atau berbasis UI.

Apache Airflow awalnya dikembangkan di Airbnb pada tahun 2014 untuk mengelola alur kerja yang semakin kompleks dan menjadi proyek tingkat atas Apache Software Foundation pada tahun 2019.

Airflow paling bermanfaat ketika Anda memiliki alur kerja yang kompleks dengan beberapa tugas yang perlu dieksekusi dalam urutan tertentu, dengan ketergantungan dan penanganan kesalahan. Ini sangat berguna bagi organisasi yang menjalankan banyak pekerjaan data yang memerlukan orkestrasi, pemantauan, dan mekanisme pengulangan, bukan hanya skrip sederhana atau pekerjaan cron.

Chain oof cyber events

Fitur Utama dan Kasus Penggunaan Umum Apache Airflow

  • Definisi Alur Kerja Berbasis Python: Alur kerja didefinisikan sebagai kode Python, memungkinkan pembuatan pipeline dinamis menggunakan konstruksi pemrograman standar seperti loop dan logika kondisional.
  • Directed Acyclic Graphs (DAGs): Airflow menggunakan DAGs untuk merepresentasikan alur kerja, di mana setiap node adalah tugas dan edge mendefinisikan ketergantungan. Struktur ini memastikan tugas berjalan dalam urutan yang ditentukan tanpa siklus.
  • UI yang Kuat: Airflow menyediakan antarmuka web modern untuk memantau, menjadwalkan, dan mengelola alur kerja, memberikan visibilitas terhadap status tugas dan log.
  • Integrasi yang Luas: Mengandung banyak operator dan hook bawaan untuk terhubung dengan layanan cloud (AWS, GCP, Azure), database, dan alat lainnya, membuatnya sangat dapat diperluas.
  • Skalabilitas dan Fleksibilitas: Airflow dapat mengorkestrasi alur kerja dalam skala besar, mendukung deployment on-premises dan cloud, dan cocok untuk berbagai kasus penggunaan dari ETL hingga machine learning.

Kasus Penggunaan Umum

  • Mengorkestrasi ETL (Extract, Transform, Load) pipeline
  • Menjadwalkan dan memantau alur kerja data
  • Mengotomatisasi pelatihan dan deployment model machine learning
  • Mengelola tugas infrastruktur

Layanan Airflow yang Dikelola

Beberapa penyedia cloud menawarkan layanan Airflow yang dikelola, mengurangi beban operasional dari pengaturan dan pemeliharaan:

  • Amazon Managed Workflows for Apache Airflow (MWAA): Layanan yang dikelola sepenuhnya yang menangani skalabilitas, keamanan, dan ketersediaan, memungkinkan pengguna fokus pada pengembangan alur kerja.
  • Google Cloud Composer: Airflow yang dikelola di Google Cloud Platform.
  • Microsoft Fabric Data Factory: Menawarkan pekerjaan Airflow sebagai solusi orkestrasi yang dikelola dalam ekosistem Azure.

Instalasi dan Memulai

Airflow dapat diinstal melalui manajer paket Python (pip install apache-airflow) atau alat yang dikelola seperti Astro CLI, yang menyederhanakan pengaturan dan manajemen proyek. Setelah instalasi, pengguna mendefinisikan DAGs dalam skrip Python dan mengelolanya melalui UI Airflow.

Ringkasan

Fitur Deskripsi
Definisi Alur Kerja Kode Python, berbasis DAG
UI Berbasis web, modern, kuat
Integrasi Cloud (AWS, GCP, Azure), database, plugin kustom
Layanan yang Dikelola AWS MWAA, Google Cloud Composer, Microsoft Fabric
Kasus Penggunaan ETL, pipeline data, alur kerja ML, tugas infrastruktur

Airflow secara luas diterima oleh komunitas engineering data karena fleksibilitas, skalabilitas, dan ekosistem yang kuat, menjadikannya pilihan utama untuk orkestrasi alur kerja dalam lingkungan produksi.

Cara Utama Airflow Mempermudah Otomasi Alur Kerja

  • Alur Kerja sebagai Kode (Python)
    Alur kerja Airflow didefinisikan sebagai skrip Python, menggunakan Directed Acyclic Graphs (DAGs) untuk merepresentasikan tugas dan ketergantungannya. Pendekatan “alur kerja sebagai kode” ini memungkinkan pembuatan dinamis, parameterisasi, dan kontrol versi yang mudah dari pipeline, membuatnya sangat mudah dipelihara dan disesuaikan dengan kebutuhan yang berubah.

  • Pipeline yang Dinamis dan Dapat Diperluas
    Dengan memanfaatkan kemampuan penuh Python, pengguna dapat menyisipkan loop, kondisi, dan logika kustom langsung ke dalam definisi alur kerja mereka. Ini memungkinkan pembuatan tugas dinamis dan parameterisasi lanjutan yang sulit atau bahkan mustahil dilakukan dalam file konfigurasi statis atau alat berbasis GUI.

  • Manajemen Tugas yang Pythonik
    API TaskFlow Airflow (diperkenalkan dalam Airflow 2.0) membuat definisi tugas bahkan lebih Pythonik. Pengembang menulis fungsi Python biasa, mendekorasinya, dan Airflow secara otomatis menangani pembuatan tugas, pengkabelan ketergantungan, dan pengiriman data antar tugas, menghasilkan kode yang lebih bersih dan mudah dipelihara.

  • Operator dan Integrasi Kustom
    Pengguna dapat membuat operator, sensor, dan hook kustom untuk berinteraksi dengan hampir semua sistem eksternal, API, atau database. Kemampuan ekstensibilitas ini memungkinkan integrasi yang mulus dengan ekosistem Python yang lebih luas dan layanan eksternal.

  • Integrasi dengan Ekosistem Python Bawaan
    Karena alur kerja ditulis dalam Python, pengguna dapat memanfaatkan berbagai perpustakaan Python (seperti Pandas, NumPy, atau kerangka kerja machine learning) dalam tugas mereka, meningkatkan kemampuan otomatisasi lebih lanjut.

  • Bacaable dan Mudah Dipelihara
    Keterbacaan Python dan popularitasnya membuat Airflow dapat diakses oleh berbagai pengguna, dari engineer data hingga analis. Pendekatan berbasis kode juga mendukung praktik rekayasa perangkat lunak standar seperti tinjauan kode dan kontrol versi.

Beberapa Manfaat Apache Airflow

Fitur Manfaat untuk Otomasi Alur Kerja
DAGs Berbasis Python Alur kerja dinamis, fleksibel, dan mudah dipelihara
API TaskFlow Definisi alur kerja yang lebih bersih dan Pythonik
Operator/Sensor Kustom Integrasikan dengan sistem atau API apa pun
Integrasi Python Bawaan Gunakan perpustakaan atau alat Python apa pun
UI yang Kuat Pantau dan kelola alur kerja secara visual

Integrasi mendalam Airflow dengan Python tidak hanya mempermudah otomasi alur kerja tetapi juga memberdayakan tim untuk membangun pipeline data yang kuat, skalabel, dan sangat disesuaikan secara efisien.

Contoh: Alur Kerja ETL Sederhana dalam Python

from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime

def extract_data():
    print("Extracting data")

def transform_data():
    print("Transforming data")

def load_data():
    print("Loading data")

default_args = {'start_date': datetime(2024, 12, 1), 'retries': 1}

dag = DAG('etl_pipeline', default_args=default_args, schedule_interval='@daily')

extract_task = PythonOperator(task_id='extract', python_callable=extract_data, dag=dag)
transform_task = PythonOperator(task_id='transform', python_callable=transform_data, dag=dag)
load_task = PythonOperator(task_id='load', python_callable=load_data, dag=dag)

extract_task >> transform_task >> load_task

Contoh ini menunjukkan bagaimana setiap tahap dari pipeline ETL didefinisikan sebagai fungsi Python dan diatur oleh Airflow menggunakan operator Python dan DAGs: extract_task >> transform_task >> load_task

Contoh Lain dari DAG Apache Airflow

Berikut adalah contoh dasar dari penulisan DAG (Directed Acyclic Graph) dalam Apache Airflow menggunakan Python. Contoh ini menunjukkan bagaimana mendefinisikan alur kerja dengan dua tugas sederhana menggunakan BashOperator, yang menjalankan perintah bash:

from airflow import DAG
from airflow.operators.bash import BashOperator
from datetime import datetime, timedelta

default_args = {
    'owner': 'airflow',
    'retries': 1,
    'retry_delay': timedelta(minutes=5)
}

with DAG(
    dag_id='my_first_dag',
    default_args=default_args,
    description='My first Airflow DAG!',
    schedule_interval='@daily',
    start_date=datetime(2023, 1, 1),
    catchup=False,
) as dag:

    # Task 1: Print the current date
    print_date = BashOperator(
        task_id='print_date',
        bash_command='date'
    )

    # Task 2: Say Hello
    say_hello = BashOperator(
        task_id='say_hello',
        bash_command='echo "Hello, Airflow!"'
    )

    # Define task dependencies
    print_date >> say_hello

Bagaimana ini bekerja:

  • DAG didefinisikan menggunakan context manager (with DAG(...) as dag:).
  • Dua tugas dibuat: print_date dan say_hello.
  • Ketergantungan print_date >> say_hello memastikan say_hello hanya berjalan setelah print_date selesai.
  • Simpan kode ini sebagai my_first_dag.py di direktori dags/ Anda, dan Airflow akan secara otomatis mendeteksi dan menjadwalkannya.

Anda dapat mengembangkan template ini dengan menambahkan tugas PythonOperator, branching, atau logika yang lebih kompleks seiring berkembangnya alur kerja Anda.

Parameter Utama Saat Membuat Objek DAG dalam Python

Ketika mendefinisikan DAG di Apache Airflow, beberapa parameter utama harus diatur untuk memastikan jadwal, identifikasi, dan perilaku alur kerja Anda berjalan dengan benar.

Parameter Penting:

  • dag_id
    Identifikasi unik untuk DAG Anda. Setiap DAG di lingkungan Airflow Anda harus memiliki dag_id yang berbeda.

  • start_date
    Tanggal dan waktu ketika DAG menjadi layak untuk dieksekusi. Ini menentukan kapan jadwal dimulai untuk DAG.

  • schedule (atau schedule_interval)
    Mendefinisikan seberapa sering DAG harus berjalan (misalnya, "@daily", "0 12 * * *", atau None untuk eksekusi manual).

  • catchup
    Sebuah boolean yang menentukan apakah Airflow harus mengisi ulang eksekusi yang terlewat antara start_date dan tanggal saat ini ketika DAG pertama kali diaktifkan. Defaultnya adalah False.

Parameter Umum Lainnya:

  • default_args
    Sebuah dictionary dari argumen default yang diterapkan ke semua tugas dalam DAG (misalnya, owner, email, retries, retry_delay). Ini opsional tetapi disarankan untuk kode DRY (Don’t Repeat Yourself).

  • params
    Sebuah dictionary untuk parameter konfigurasi runtime, memungkinkan Anda untuk meneruskan nilai ke tugas saat runtime dan membuat DAG Anda lebih fleksibel.

Contoh:

from airflow import DAG
from datetime import datetime, timedelta

default_args = {
    'owner': 'jdoe',
    'email': ['jdoe@example.com'],
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

with DAG(
    dag_id='example_dag',
    start_date=datetime(2024, 1, 1),
    schedule='@daily',
    catchup=False,
    default_args=default_args,
    params={'example_param': 'value'}
) as dag:
    # Definisikan tugas di sini
    pass

Tabel Parameter DAG

Parameter Deskripsi Diperlukan
dag_id Nama unik untuk DAG Ya
start_date Kapan DAG layak untuk dimulai Ya
schedule Frekuensi/waktu untuk eksekusi DAG Ya
catchup Apakah mengisi ulang eksekusi yang terlewat Tidak
default_args Argumen default untuk semua tugas dalam DAG Tidak
params Parameter konfigurasi runtime untuk fleksibilitas dinamis Tidak

Menetapkan parameter ini memastikan DAG Anda diidentifikasi secara unik, dijadwalkan dengan benar, dan berperilaku sesuai yang diinginkan dalam Airflow.

Menyertakan Parameter Runtime Kustom Menggunakan Argumen params dalam Airflow

Argumen params dalam Apache Airflow memungkinkan Anda menyisipkan parameter runtime kustom ke dalam DAG dan tugas Anda, memberdayakan konfigurasi alur kerja yang dinamis dan fleksibel. Params memungkinkan Anda menyediakan konfigurasi runtime ke tugas. Anda dapat mengatur parameter default dalam kode DAG Anda dan menyediakan parameter tambahan, atau mengganti nilai parameter saat memicu eksekusi DAG.

Pendekatan ini membuat alur kerja Airflow Anda lebih dinamis dan dapat dikonfigurasi, mendukung berbagai skenario otomatisasi.

Cara Menetapkan Parameter Runtime Kustom

  • Definisikan params dalam DAG:
    Saat membuat objek DAG, sertakan argumen params sebagai dictionary. Setiap pasangan kunci-nilai mewakili nama parameter dan nilai defaultnya.
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime

def print_param(**kwargs):
    # Akses parameter melalui kwargs['params']
    my_param = kwargs['params']['my_param']
    print(f"Parameter kustom saya adalah: {my_param}")

with DAG(
    dag_id='custom_params_example',
    start_date=datetime(2024, 1, 1),
    schedule='@daily',
    catchup=False,
    params={'my_param': 'default_value'}  # Parameter runtime kustom
) as dag:

    task = PythonOperator(
        task_id='print_param_task',
        python_callable=print_param,
        provide_context=True
    )
  • Ganti Saat Runtime:
    Saat memicu eksekusi DAG secara manual (melalui UI Airflow, CLI, atau API), Anda dapat menyediakan atau mengganti nilai params untuk eksekusi tertentu.

Mengakses Parameter dalam Tugas

  • Dalam fungsi Python callable tugas Anda, akses params menggunakan kwargs['params'].
  • Untuk bidang yang ditemplatkan (seperti dalam BashOperator), gunakan {{ params.my_param }} dalam string templat.

Tautan yang Berguna