أباتش أيرفلو لـ MLOPS وETL - وصف، الفوائد وأمثلة

إطار عمل ممتاز لـ ETS/MLOPS باستخدام لغة بايثون

Page content

Apache Airflow هو منصة مفتوحة المصدر تم تصميمها لكتابة وجدولة ومراقبة التدفقات بشكل برمجي، تمامًا باستخدام كود بايثون، مما يوفر بديلًا مرنًا وقويًا لل أدوات تدفق العمل التقليدية أو اليدوية أو القائمة على واجهة المستخدم.

تم تطوير Airflow في الأصل في Airbnb عام 2014 لإدارة التدفقات المعقدة بشكل متزايد، وتم ترقيته إلى مشروع مستوى عالٍ في Apache Software Foundation في عام 2019.

يكون Airflow مفيدًا بشكل خاص عندما يكون لديك تدفقات عمل معقدة تحتوي على مهام متعددة يجب تنفيذها بترتيب معين، مع وجود اعتماديات وتعامل مع الأخطاء. إنه مفيد بشكل خاص للمنظمات التي تقوم بتشغيل عدد كبير من مهام البيانات التي تتطلب تنسيقًا ومراقبة وآليات إعادة المحاولة، بدلًا من النصوص البسيطة أو وظائف cron.

سلسلة من أحداث الأمن السيبراني

الميزات الرئيسية لـ Apache Airflow والتطبيقات النموذجية

  • تعريف تدفقات العمل بناءً على بايثون: يتم تعريف التدفقات ككود بايثون، مما يسمح بإنشاء أنابيب ديناميكية باستخدام بناءات برمجية قياسية مثل الحلقات والمنطق الشرطي.
  • الرسوم البيانية الموجهة غير الدورية (DAGs): يستخدم Airflow DAGs لتمثيل التدفقات، حيث يكون كل عقدة مهمة، وتحدد الحواف الاعتماديات. هذا الهيكل يضمن تنفيذ المهام في ترتيب محدد دون وجود دوائر.
  • واجهة مستخدم قوية: يوفر Airflow واجهة وبصرية حديثة لمراقبة وجدولة وإدارة التدفقات، مما يوفر رؤية في حالة المهام والسجلات.
  • تكاملات واسعة النطاق: يحتوي على العديد من المشغلات والأساليب المدمجة لربط الخدمات السحابية (AWS، GCP، Azure)، والقواعد البيانات، والأدوات الأخرى، مما يجعله قابلًا للتوسع بشكل كبير.
  • التوسع والمرن: يمكن لـ Airflow تنسيق التدفقات على نطاق واسع، مع دعم الانتشار المحلي والسحابي، وهو مناسب لعدد واسع من التطبيقات من ETL إلى تعلم الآلة.

التطبيقات النموذجية

  • تنسيق أنابيب ETL (استخراج، تحويل، تحميل)
  • جدولة ومراقبة تدفقات البيانات
  • تلقين تدريب نماذج تعلم الآلة ونشرها
  • إدارة مهام البنية التحتية

خدمات Airflow المدارة

تقدم عدة مزودي سحابة خدمات مدارة لـ Airflow، مما يقلل من عبء التشغيل والصيانة:

  • Amazon Managed Workflows for Apache Airflow (MWAA): خدمة مدارة بالكامل تتعامل مع التوسع والأمان والتوافر، مما يسمح للمستخدمين بالتركيز على تطوير التدفقات.
  • Google Cloud Composer: Airflow المدارة على منصة Google Cloud.
  • Microsoft Fabric Data Factory: تقدم وظائف Apache Airflow كحل مدارة لتنسيق البيانات داخل نظام Azure.

التركيب والبدء

يمكن تركيب Airflow عبر مدير حزم Python (pip install apache-airflow) أو أدوات إدارة مثل Astro CLI، مما يبسط التركيب وإدارة المشاريع. بعد التركيب، يحدد المستخدمون DAGs في ملفات كود بايثون ويقومون بإدارتها عبر واجهة Airflow.

ملخص

الميزة الوصف
تعريف التدفق كود بايثون، بناءً على DAG
الواجهة ويب، حديثة، قوية
التكاملات سحابة (AWS، GCP، Azure)، قواعد البيانات، ملحقات مخصصة
الخدمات المدارة AWS MWAA، Google Cloud Composer، Microsoft Fabric
التطبيقات ETL، أنابيب البيانات، تدفقات تعلم الآلة، مهام البنية التحتية

يُستخدم Airflow على نطاق واسع في مجتمع هندسة البيانات لمرنايته وتوسعه وبيئة قوية، مما يجعله خيارًا رائدًا لتنسيق التدفقات في البيئات الإنتاجية.

الطرق الرئيسية التي تبسط Airflow تلقين تدفقات العمل

  • تدفقات العمل ككود (بايثون)
    تُعرّف تدفقات Airflow كنصوص بايثون، باستخدام الرسوم البيانية الموجهة غير الدورية (DAGs) لتمثيل المهام واعتمادياتها. هذا النهج “تدفق العمل ككود” يسمح بإنشاء ديناميكي، وتحديد المعلمات، وتحكم سهل في إصدارات الأنابيب، مما يجعلها سهلة الصيانة وقابلة للتكيف مع المتطلبات المتغيرة.

  • أنابيب ديناميكية وقابلة للتوسع
    من خلال الاستفادة من قدرات بايثون الكاملة، يمكن للمستخدمين دمج الحلقات، والشرطيات، والمنطق المخصص مباشرة في تعريفات تدفقات العمل. هذا يسمح بإنشاء مهام ديناميكية وتحديد معلمات متقدمة سيكون من الصعب أو مستحيل تنفيذها في ملفات تكوين ثابتة أو أدوات تعتمد على واجهة المستخدم.

  • إدارة المهام بطريقة بايثونية
    يوفر Airflow واجهة API TaskFlow (المقدمة في Airflow 2.0) لجعل تعريف المهام أكثر بايثونية. يكتب المطورون وظائف بايثون عادية، ويديرونها، ويقوم Airflow تلقائيًا بإنشاء المهام، وربط الاعتماديات، ونقل البيانات بين المهام، مما يؤدي إلى كود نظيف وسهل الصيانة.

  • مشغلات ومستشعرات مخصصة
    يمكن للمستخدمين إنشاء مشغلات ومستشعرات وأساليب مخصصة للتفاعل مع أي نظام خارجي، وواجهة برمجة تطبيقات، أو قاعدة بيانات. هذه المرونة تسمح بدمج سلس مع البيئة الأوسع لبايثون والخدمات الخارجية.

  • التكامل مع البيئة المحلية لبايثون
    نظرًا لأن التدفقات تُكتب في بايثون، يمكن للمستخدمين الاستفادة من مجموعة واسعة من مكتبات بايثون (مثل Pandas، NumPy، أو أطر تعلم الآلة) داخل المهام، مما يعزز قدرات التلقين بشكل إضافي.

  • قابلية القراءة والصيانة
    سهولة قراءة بايثون وشعبيته تجعل Airflow متاحًا لعدد واسع من المستخدمين، من مهندسي البيانات إلى المدققين. النهج القائم على الكود أيضًا يدعم ممارسات هندسة البرمجيات القياسية مثل مراجعة الكود وتحكم الإصدار.

بعض فوائد Apache Airflow

الميزة الفائدة لتوسيع تلقين التدفقات
DAGs بناءً على بايثون تدفقات عمل ديناميكية ومرنة وقابلة للصيانة
API TaskFlow تعريفات تدفقات عمل نظيفة أكثر بايثونية
مشغلات/مستشعرات مخصصة دمج مع أي نظام أو واجهة برمجة تطبيقات
التكامل مع بايثون المحلية استخدام أي مكتبة أو أداة بايثون
واجهة مستخدم قوية مراقبة وإدارة التدفقات بصريًا

التكامل العميق مع بايثون في Airflow لا يبسط فقط تلقين التدفقات، بل يمنح الفرق أيضًا القدرة على بناء أنابيب بيانات قوية وقابلة للتوسع و高度 مخصصة بكفاءة.

مثال: تدفق ETL بسيط في بايثون

from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime

def extract_data():
    print("استخراج البيانات")

def transform_data():
    print("تحويل البيانات")

def load_data():
    print("تحميل البيانات")

default_args = {'start_date': datetime(2024, 12, 1), 'retries': 1}

dag = DAG('etl_pipeline', default_args=default_args, schedule_interval='@daily')

extract_task = PythonOperator(task_id='extract', python_callable=extract_data, dag=dag)
transform_task = PythonOperator(task_id='transform', python_callable=transform_data, dag=dag)
load_task = PythonOperator(task_id='load', python_callable=load_data, dag=dag)

extract_task >> transform_task >> load_task

هذا المثال يوضح كيف يتم تعريف كل مرحلة من مراحل أنبوب ETL كدالة بايثون، ويتم تنسيقها بواسطة Airflow باستخدام مشغلات بايثون وDAGs: extract_task >> transform_task >> load_task

مثال آخر لـ DAG في Apache Airflow

هنا مثال بسيط لكتابة DAG (الرسوم البيانية الموجهة غير الدورية) في Apache Airflow باستخدام بايثون. يوضح هذا المثال كيفية تعريف تدفق عمل يحتوي على مهام بسيطة باستخدام BashOperator، الذي يقوم بتشغيل أوامر bash:

from airflow import DAG
from airflow.operators.bash import BashOperator
from datetime import datetime, timedelta

default_args = {
    'owner': 'airflow',
    'retries': 1,
    'retry_delay': timedelta(minutes=5)
}

with DAG(
    dag_id='my_first_dag',
    default_args=default_args,
    description='My first Airflow DAG!',
    schedule_interval='@daily',
    start_date=datetime(2023, 1, 1),
    catchup=False,
) as dag:

    # مهمة 1: طباعة التاريخ الحالي
    print_date = BashOperator(
        task_id='print_date',
        bash_command='date'
    )

    # مهمة 2: قول "مرحبا"
    say_hello = BashOperator(
        task_id='say_hello',
        bash_command='echo "Hello, Airflow!"'
    )

    # تعريف اعتماديات المهام
    print_date >> say_hello

كيف يعمل هذا:

  • يتم تعريف DAG باستخدام مدير السياق (with DAG(...) as dag:).
  • يتم إنشاء مهامتين: print_date و say_hello.
  • الاعتماد print_date >> say_hello يضمن أن say_hello يُنفذ فقط بعد اكتمال print_date.
  • احفظ هذا الكود كـ my_first_dag.py في دليل dags/ الخاص بك في Airflow، وسينتقل Airflow تلقائيًا ويسcheduleه.

يمكنك توسيع هذا النموذج بإضافة مهام PythonOperator، أو الفرع، أو منطق أكثر تعقيدًا مع نمو تدفقاتك.

المعلمات الرئيسية عند إنشاء كائن DAG في بايثون

عند تعريف DAG في Apache Airflow، يجب تعيين عدة معلمات رئيسية لضمان جدولة وتحديد وسلوك تدفق العمل بشكل صحيح.

المعلمات الأساسية:

  • dag_id
    المعرف الفريد لـ DAG. يجب أن يكون لكل DAG في بيئة Airflow الخاص بك dag_id فريدًا.

  • start_date
    التاريخ والوقت الذي يصبح فيه DAG مؤهلاً للتنفيذ. يحدد هذا متى تبدأ الجدولة لـ DAG.

  • schedule (أو schedule_interval)
    يحدد متى يجب أن يُنفذ DAG (مثلاً "@daily", "0 12 * * *"، أو None للتشغيل اليدوي).

  • catchup
    قيمة منطقية تحدد ما إذا كان Airflow يجب أن يملأ التسجيلات المفقودة بين start_date والوقت الحالي عندما يتم تمكين DAG لأول مرة. القيمة الافتراضية هي False.

المعلمات الشائعة الأخرى:

  • default_args
    قاموس من المعلمات الافتراضية المطبقة على جميع المهام داخل DAG (مثلاً owner, email, retries, retry_delay). هذا خياري ولكن يُنصح به لتجنب تكرار الكود (DRY).

  • params
    قاموس للمعلمات المخصصة لتكوين التشغيل، مما يسمح بتمرير القيم إلى المهام أثناء التشغيل، مما يجعل DAGs أكثر مرونة.

مثال:

from airflow import DAG
from datetime import datetime, timedelta

default_args = {
    'owner': 'jdoe',
    'email': ['jdoe@example.com'],
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

with DAG(
    dag_id='example_dag',
    start_date=datetime(2024, 1, 1),
    schedule='@daily',
    catchup=False,
    default_args=default_args,
    params={'example_param': 'value'}
) as dag:
    # تعريف المهام هنا
    pass

جدول معلمات DAG

المعلمة الوصف الضروري
dag_id اسم فريد لـ DAG نعم
start_date متى يصبح DAG مؤهلاً للتشغيل نعم
schedule تكرار/توقيت تنفيذ DAG نعم
catchup ما إذا كان يجب ملء التسجيلات المفقودة لا
default_args المعلمات الافتراضية لجميع المهام في DAG لا
params معلمات التشغيل لتكوين ديناميكي لا

تحديد هذه المعلمات يضمن أن DAG مُعرف بشكل فريد، مُجدول بشكل صحيح، ويعمل كما هو مخطط له في Airflow.

تضمين معلمات تشغيل مخصصة باستخدام معلمة params في Airflow

تتيح معلمة params في Apache Airflow إدخال معلمات تشغيل مخصصة في DAGs ومهامك، مما يسمح بتكوين تدفقات العمل الديناميكية والمرونة. تسمح المعلمات بتقديم إعدادات تشغيلية لمهامك. يمكنك تكوين معلمات افتراضية في كود DAG الخاص بك وتقديم معلمات إضافية أو تغيير قيم المعلمات عند تشغيل DAG.

هذا النهج يجعل تدفقات Airflow أكثر ديناميكية وقابلة للتكوين، مما يدعم مجموعة واسعة من سيناريوهات التلقين.

كيفية تحديد معلمات تشغيل مخصصة

  • تحديد params في DAG:
    عند إنشاء كائن DAG، اشمل معلمة params كقاموس. كل زوج مفتاح-قيمة يمثل اسم معلمة وقيمتها الافتراضية.
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime

def print_param(**kwargs):
    # الوصول إلى المعلمة عبر kwargs['params']
    my_param = kwargs['params']['my_param']
    print(f"معلمة مخصصة الخاصة بي هي: {my_param}")

with DAG(
    dag_id='custom_params_example',
    start_date=datetime(2024, 1, 1),
    schedule='@daily',
    catchup=False,
    params={'my_param': 'default_value'}  # معلمة تشغيل مخصصة
) as dag:

    task = PythonOperator(
        task_id='print_param_task',
        python_callable=print_param,
        provide_context=True
    )
  • استبدال عند التشغيل:
    عند تشغيل DAG يدويًا (عبر واجهة Airflow، CLI، أو API)، يمكنك تزويده أو استبدال قيم params الخاصة بهذه التشغيل.

الوصول إلى المعلمات في المهام

  • في دالة callable الخاصة بمهمتك في بايثون، الوصول إلى المعلمات باستخدام kwargs['params'].
  • في الحقول النموذجية (مثل في BashOperator)، استخدم {{ params.my_param }} في سلسلة النموذج.

روابط مفيدة