एमएलओपीएस और ईटीएल के लिए एपैचे एयरफ्लो - विवरण, लाभ और उदाहरण

ईएमटीएस/एमएलओपीएस के लिए पायथन के साथ एक अच्छा फ्रेमवर्क

Page content

Apache Airflow एक ओपन-सोर्स प्लेटफॉर्म है जो प्रोग्रामेटिकली लेखन, नियोजन और वर्कफ़्लो के मॉनिटरिंग के लिए डिज़ाइन किया गया है, पूरी तरह से पायथन कोड में, जो पारंपरिक, मनमाने या यूआई-आधारित वर्कफ़्लो उपकरणों के लिए एक लचीला और शक्तिशाली विकल्प प्रदान करता है।

Apache Airflow को मूल रूप से 2014 में Airbnb पर विकसित किया गया था ताकि बढ़ती जटिलता वाले वर्कफ़्लो के प्रबंधन के लिए और 2019 में एपेक्स सॉफ्टवेयर फाउंडेशन के टॉप-लेवल प्रोजेक्ट बन गया।

जब आपके पास विशिष्ट क्रम में निष्पादित करने के लिए जटिल वर्कफ़्लो होते हैं, जिनमें निर्भरताएं और त्रुटि प्रबंधन होते हैं, तो Airflow सबसे अधिक उपयोगी होता है। यह विशेष रूप से उन संगठनों के लिए उपयोगी होता है जो कई डेटा जॉब चलाते हैं जिनके लिए संगठन, मॉनिटरिंग और पुनः प्रयास योजनाएं आवश्यक होती हैं, बजाय सरल स्क्रिप्ट्स या क्रोन जॉब्स के।

चेन ऑफ़ साइबर इवेंट्स

Apache Airflow के मुख्य विशेषताएं और सामान्य उपयोग के मामले

  • पायथन-आधारित वर्कफ़्लो परिभाषा: वर्कफ़्लो को पायथन कोड के रूप में परिभाषित किया जाता है, जो लूप और स्थिति तर्क जैसे मानक कार्यक्रम निर्माण निर्माण उपकरणों के उपयोग से गतिशील पाइपलाइन उत्पन्न करने की अनुमति देता है।
  • निर्देशित अकार्बन चित्र (DAGs): Airflow DAGs का उपयोग वर्कफ़्लो के प्रतिनिधित्व के लिए किया जाता है, जहां प्रत्येक नोड एक कार्य है और किनारे निर्भरताओं को परिभाषित करते हैं। यह संरचना चक्रों के बिना निर्दिष्ट क्रम में कार्यों के चलने की गारंटी देता है।
  • मजबूत यूआई: Airflow एक आधुनिक वेब इंटरफ़ेस प्रदान करता है जो वर्कफ़्लो के मॉनिटरिंग, नियोजन और प्रबंधन के लिए है, जो कार्य के स्थिति और लॉग्स के दृश्यता के साथ आता है।
  • व्यापक समावेशन: इसमें कई बनाए गए ऑपरेटर और हूक्स होते हैं जो क्लाउड सेवाओं (AWS, GCP, Azure), डेटाबेस और अन्य उपकरणों के साथ जोड़े जा सकते हैं, जिससे यह बहुत विस्तारित हो सकता है।
  • स्केलेबिलिटी और लचीलापन: Airflow बड़े पैमाने पर वर्कफ़्लो के संगठन के लिए सक्षम होता है, जो ऑन-प्रीमिस और क्लाउड तैनाती का समर्थन करता है, और ईटीएल से मशीन लर्निंग तक के विस्तृत उपयोग के मामलों के लिए उपयुक्त होता है।

सामान्य उपयोग के मामले

  • ईटीएल (एग्ज़ैक्ट, ट्रांसफॉर्म, लोड) पाइपलाइन के संगठन
  • डेटा वर्कफ़्लो के नियोजन और मॉनिटरिंग
  • मशीन लर्निंग मॉडल के प्रशिक्षण और तैनाती के ऑटोमेशन
  • इंफ्रास्ट्रक्चर कार्यों के प्रबंधन

प्रबंधित एयरफ़्लो सेवाएं

कई क्लाउड प्रदाता प्रबंधित एयरफ़्लो सेवाएं प्रदान करते हैं, जो स्थापना और रखरखाव के संचालन भार को कम करते हैं:

  • एमएस वीएमए एपेक्स एयरफ़्लो (एमडब्ल्यूएए): एक पूर्णतः प्रबंधित सेवा जो स्केलिंग, सुरक्षा और उपलब्धता के लिए देखभाल करती है, जिससे उपयोगकर्ता वर्कफ़्लो विकास पर ध्यान केंद्रित कर सकते हैं।
  • गूगल क्लाउड कॉम्पोजर: गूगल क्लाउड प्लेटफॉर्म पर प्रबंधित एयरफ़्लो।
  • माइक्रोसॉफ्ट फैब्रिक डेटा फैक्टरी: एपेक्स एयरफ़्लो जॉब के रूप में एज़र एकोसिस्टम में प्रबंधित संगठन समाधान प्रदान करता है।

इस्ताल और शुरुआत

एयरफ़्लो को पायथन के पैकेज प्रबंधक (pip install apache-airflow) के माध्यम से या एस्ट्रो क्ली के जैसे प्रबंधित उपकरणों के माध्यम से इस्ताल किया जा सकता है, जो स्थापना और प्रोजेक्ट प्रबंधन को सरल बनाता है। इस्ताल के बाद, उपयोगकर्ता डीएजी को पायथन स्क्रिप्ट में परिभाषित करते हैं और एयरफ़्लो यूआई के माध्यम से उनका प्रबंधन करते हैं।

सारांश

विशेषता विवरण
वर्कफ़्लो परिभाषा पायथन कोड, डीएजी-आधारित
यूआई वेब-आधारित, आधुनिक, मजबूत
समावेशन क्लाउड (AWS, GCP, Azure), डेटाबेस, विशेष उपकरणों के साथ
प्रबंधित सेवाएं एमडब्ल्यूएए, गूगल क्लाउड कॉम्पोजर, माइक्रोसॉफ्ट फैब्रिक
उपयोग के मामले ईटीएल, डेटा पाइपलाइन, एमएल वर्कफ़्लो, इंफ्रास्ट्रक्चर कार्यों

एयरफ़्लो डेटा इंजीनियरिंग समुदाय में अपनी लचीलापन, स्केलेबिलिटी और मजबूत एकोसिस्टम के लिए व्यापक रूप से अपनाया गया है, जिससे उत्पादन वातावरण में वर्कफ़्लो संगठन के लिए एक नेतृत्व वाला चयन बन गया है।

एयरफ़्लो के मुख्य तरीकों जिनसे वर्कफ़्लो ऑटोमेशन सरल हो जाता है

  • कोड के रूप में वर्कफ़्लो (पायथन)
    एयरफ़्लो वर्कफ़्लो को पायथन स्क्रिप्ट के रूप में परिभाषित किया जाता है, जिसमें निर्देशित अकार्बन चित्र (DAGs) का उपयोग कार्यों और उनकी निर्भरताओं के प्रतिनिधित्व के लिए किया जाता है। यह “कोड के रूप में वर्कफ़्लो” दृष्टिकोण पाइपलाइन के गतिशील उत्पन्न, पैरामीटरीकरण और सरल संस्करण नियंत्रण की अनुमति देता है, जिससे वे बदलती आवश्यकताओं के अनुसार बहुत बर्दाश्त करने योग्य और अनुकूल बन जाते हैं।

  • गतिशील और विस्तारित पाइपलाइन
    पायथन की पूरी क्षमता का उपयोग करके, उपयोगकर्ता अपने वर्कफ़्लो परिभाषाओं में लूप, स्थिति और विशेष तर्क को सीधे शामिल कर सकते हैं। यह तालमेल वाले कार्यों के गतिशील निर्माण और उन्नत पैरामीटरीकरण की अनुमति देता है, जो निश्चित स्थिति फ़ाइलों या ग्राफ़-आधारित उपकरणों में कठिन या असंभव हो सकता है।

  • पायथनिक कार्य प्रबंधन
    एयरफ़्लो के टास्कफ़्लो एपीआई (एयरफ़्लो 2.0 में पेश किया गया) टास्कों के परिभाषित करने को और अधिक पायथनिक बनाता है। डेवलपर्स सामान्य पायथन फ़ंक्शन लिखते हैं, उन्हें डिकोरेट करते हैं, और एयरफ़्लो स्वचालित रूप से टास्क निर्माण, निर्भरता वायरिंग और कार्यों के बीच डेटा पारगमन के लिए देखभाल करता है, जिसके परिणामस्वरूप साफ़ और बर्दाश्त करने योग्य कोड होता है।

  • विशेष ऑपरेटर और समावेशन
    उपयोगकर्ता विशेष पायथन ऑपरेटर, सेंसर और हूक्स बना सकते हैं जो किसी भी बाहरी प्रणाली, एपीआई या डेटाबेस के साथ बातचीत कर सकते हैं। यह विस्तारित करने की क्षमता व्यापक पायथन एकोसिस्टम और बाहरी सेवाओं के साथ अविच्छिन्न समावेशन की अनुमति देता है।

  • स्वाभाविक पायथन एकोसिस्टम समावेशन
    क्योंकि वर्कफ़्लो पायथन में लिखे जाते हैं, उपयोगकर्ता अपने कार्यों में पायथन के विशाल संख्या में पायथन पुस्तकालयों (जैसे पैंडस, नंबरी, या मशीन लर्निंग फ़्रेमवर्क) का उपयोग कर सकते हैं, जिससे ऑटोमेशन क्षमताओं को और अधिक बढ़ाया जा सकता है।

  • पढ़े जा सकने योग्य और बर्दाश्त करने योग्य
    पायथन की पढ़े जा सकने योग्यता और लोकप्रियता एयरफ़्लो को डेटा इंजीनियर्स से एनालिस्ट्स तक के व्यापक उपयोगकर्ताओं के लिए उपलब्ध कराता है। कोड-आधारित दृष्टिकोण अतिरिक्त सॉफ्टवेयर इंजीनियरिंग प्रथाओं जैसे कोड समीक्षा और संस्करण नियंत्रण के लिए भी समर्थन प्रदान करता है।

कुछ एपेक्स एयरफ़्लो लाभ

विशेषता वर्कफ़्लो ऑटोमेशन के लिए लाभ
पायथन-आधारित डीएजी गतिशील, लचीला और बर्दाश्त करने योग्य वर्कफ़्लो
टास्कफ़्लो एपीआई साफ़ और अधिक पायथनिक वर्कफ़्लो परिभाषाएं
विशेष ऑपरेटर/सेंसर किसी भी प्रणाली या एपीआई के साथ एकीकरण करें
स्वाभाविक पायथन एकोसिस्टम एकीकरण किसी भी पायथन पुस्तकालय या उपकरण का उपयोग करें
मजबूत यूआई वर्कफ़्लो के दृश्य रूप से मॉनिटरिंग और प्रबंधन

एयरफ़्लो के पायथन के साथ गहरा एकीकरण वर्कफ़्लो ऑटोमेशन को सरल बनाता है और टीमों को उत्पादन वातावरण में शक्तिशाली, स्केलेबिल और बहुत विशिष्ट डेटा पाइपलाइन बनाने के लिए शक्ति प्रदान करता है।

उदाहरण: पायथन में सरल ETL वर्कफ़्लो

from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime

def extract_data():
    print("Extracting data")

def transform_data():
    print("Transforming data")

def load_data():
    print("Loading data")

default_args = {'start_date': datetime(2024, 12, 1), 'retries': 1}

dag = DAG('etl_pipeline', default_args=default_args, schedule_interval='@daily')

extract_task = PythonOperator(task_id='extract', python_callable=extract_data, dag=dag)
transform_task = PythonOperator(task_id='transform', python_callable=transform_data, dag=dag)
load_task = PythonOperator(task_id='load', python_callable=load_data, dag=dag)

extract_task >> transform_task >> load_task

यह उदाहरण दिखाता है कि ETL पाइपलाइन के प्रत्येक चरण को पायथन फ़ंक्शन के रूप में परिभाषित किया जाता है और एयरफ़्लो एयरफ़्लो ऑपरेटर और डीएजी के उपयोग से एयरफ़्लो द्वारा अनुक्रमित किया जाता है: extract_task >> transform_task >> load_task

एपेक्स एयरफ़्लो डीएजी का एक अन्य उदाहरण

यहां एक बेसिक उदाहरण दिया गया है जिसमें एपेक्स एयरफ़्लो में पायथन के उपयोग से डीएजी (Directed Acyclic Graph) को कोडिंग करना दिखाया गया है। यह उदाहरण दिखाता है कि कैसे बैश ऑपरेटर के उपयोग से दो सरल कार्यों वाले वर्कफ़्लो को परिभाषित किया जा सकता है, जो बैश कमांड चलाता है:

from airflow import DAG
from airflow.operators.bash import BashOperator
from datetime import datetime, timedelta

default_args = {
    'owner': 'airflow',
    'retries': 1,
    'retry_delay': timedelta(minutes=5)
}

with DAG(
    dag_id='my_first_dag',
    default_args=default_args,
    description='My first Airflow DAG!',
    schedule_interval='@daily',
    start_date=datetime(2023, 1, 1),
    catchup=False,
) as dag:

    # Task 1: Print the current date
    print_date = BashOperator(
        task_id='print_date',
        bash_command='date'
    )

    # Task 2: Say Hello
    say_hello = BashOperator(
        task_id='say_hello',
        bash_command='echo "Hello, Airflow!"'
    )

    # Define task dependencies
    print_date >> say_hello

यह कैसे काम करता है:

  • डीएजी को एक कंटेक्स्ट मैनेजर (with DAG(...) as dag:) के उपयोग से परिभाषित किया जाता है।
  • दो कार्य बनाए गए हैं: print_date और say_hello
  • निर्भरता print_date >> say_hello यह सुनिश्चित करता है कि say_hello केवल print_date पूरा होने के बाद चले।
  • इस कोड को my_first_dag.py के रूप में अपने एयरफ़्लो dags/ निर्देशिका में सहेजें, और एयरफ़्लो इसे स्वचालित रूप से पहचान और नियोजित करेगा।

आप इस टेम्पलेट को बढ़ा सकते हैं द्वारा पायथन ऑपरेटर कार्यों, शाखा या अधिक जटिल तर्क के उपयोग जब आपके वर्कफ़्लो बढ़ते हैं।

पायथन में डीएजी वस्तु बनाते समय मुख्य पैरामीटर

जब आप Apache Airflow में एक डीएजी परिभाषित करते हैं, तो कुछ मुख्य पैरामीटर सेट करना आवश्यक होता है ताकि आपके वर्कफ़्लो के सही नियोजन, पहचान और व्यवहार सुनिश्चित किया जा सके।

आवश्यक पैरामीटर:

  • dag_id
    आपके डीएजी के लिए अद्वितीय पहचानकर्ता। आपके एयरफ़्लो वातावरण में प्रत्येक डीएजी के लिए एक अद्वितीय dag_id होना आवश्यक है।

  • start_date
    डीएजी के निष्पादन के लिए पात्र होने की तारीख और समय। यह डीएजी के लिए नियोजन के शुरुआत की तारीख निर्धारित करता है।

  • schedule (या schedule_interval)
    डीएजी के चलने की आवृत्ति को परिभाषित करता है (जैसे "@daily", "0 12 * * *" या None मैनुअल चलाने के लिए)।

  • catchup
    एक बूलियन जो निर्धारित करता है कि क्या एयरफ़्लो डीएजी को पहली बार सक्षम करते समय start_date और वर्तमान तारीख के बीच छोड़े गए चलानों के लिए बैकफ़िल करेगा। डिफ़ॉल्ट रूप से False है।

अन्य सामान्य पैरामीटर:

  • default_args
    डीएजी के सभी कार्यों के लिए डिफ़ॉल्ट आर्ग्यूमेंट के रूप में लागू किया जाने वाला एक शब्दकोश (जैसे owner, email, retries, retry_delay)। यह वैकल्पिक है लेकिन DRY (Don’t Repeat Yourself) कोड के लिए अनुशंसित है।

  • params
    रनटाइम विनिर्देश पैरामीटर के लिए एक शब्दकोश, जो आप रनटाइम में कार्यों के लिए मूल्य पास कर सकते हैं और आपके डीएजी को अधिक लचीला बना सकते हैं।

उदाहरण:

from airflow import DAG
from datetime import datetime, timedelta

default_args = {
    'owner': 'jdoe',
    'email': ['jdoe@example.com'],
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

with DAG(
    dag_id='example_dag',
    start_date=datetime(2024, 1, 1),
    schedule='@daily',
    catchup=False,
    default_args=default_args,
    params={'example_param': 'value'}
) as dag:
    # Define tasks here
    pass

डीएजी पैरामीटर की तालिका

पैरामीटर विवरण आवश्यक
dag_id डीएजी के लिए अद्वितीय नाम हां
start_date जब डीएजी चलाने के लिए पात्र होता है हां
schedule डीएजी के चलने की आवृत्ति/समय निर्धारित करता है हां
catchup छोड़े गए चलानों के लिए बैकफ़िल करने के लिए क्या करे नहीं
default_args डीएजी के सभी कार्यों के लिए डिफ़ॉल्ट आर्ग्यूमेंट नहीं
params डायनामिक विनिर्देश के लिए रनटाइम पैरामीटर नहीं

इन पैरामीटर को सेट करने से आपके डीएजी की अद्वितीय पहचान, सही नियोजन और एयरफ़्लो में इच्छित व्यवहार सुनिश्चित किया जाता है।

एयरफ़्लो में params आर्ग्यूमेंट के उपयोग से विशेष रनटाइम पैरामीटर को शामिल करना

एपेक्स एयरफ़्लो के params आर्ग्यूमेंट के उपयोग से आप अपने डीएजी और कार्यों में विशेष रनटाइम पैरामीटर को शामिल कर सकते हैं, जिससे वर्कफ़्लो विनिर्देशों के लिए गतिशील और लचीला बन सकता है। पैरामीटर आपके कार्यों के लिए रनटाइम विनिर्देश प्रदान करते हैं। आप अपने डीएजी कोड में डिफ़ॉल्ट पैरामीटर परिभाषित कर सकते हैं और अतिरिक्त पैरामीटर प्रदान कर सकते हैं, या डीएजी चलाने के समय पैरामीटर मूल्यों को ओवरराइड कर सकते हैं।

यह दृष्टिकोण आपके एयरफ़्लो वर्कफ़्लो को अधिक गतिशील और विनिर्देश करने योग्य बनाता है, जो विस्तृत ऑटोमेशन परिदृश्यों के लिए समर्थन प्रदान करता है।

विशेष रनटाइम पैरामीटर सेट करने के तरीका

  • डीएजी में params परिभाषित करें:
    जब एक डीएजी वस्तु बनाते समय, params आर्ग्यूमेंट को एक शब्दकोश के रूप में शामिल करें। प्रत्येक कुंजी-मूल्य जोड़ी एक पैरामीटर नाम और उसके डिफ़ॉल्ट मूल्य को प्रतिनिधित्व करता है।
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime

def print_param(**kwargs):
    # kwargs['params'] के माध्यम से पैरामीटर प्राप्त करें
    my_param = kwargs['params']['my_param']
    print(f"मेरा विशेष पैरामीटर है: {my_param}")

with DAG(
    dag_id='custom_params_example',
    start_date=datetime(2024, 1, 1),
    schedule='@daily',
    catchup=False,
    params={'my_param': 'default_value'}  # विशेष रनटाइम पैरामीटर
) as dag:

    task = PythonOperator(
        task_id='print_param_task',
        python_callable=print_param,
        provide_context=True
    )
  • रनटाइम पर ओवरराइड करें:
    जब आप एयरफ़्लो यूआई, क्ली या एपीआई के माध्यम से डीएजी चलाने के लिए मनमाने तरीके से चलाना शुरू करते हैं, तो आप उस विशेष चलाने के लिए params मूल्यों को प्रदान या ओवरराइड कर सकते हैं।

कार्यों में पैरामीटर प्राप्त करें

  • अपने कार्य के पायथन कॉलेबल में, kwargs['params'] के माध्यम से पैरामीटर प्राप्त करें।
  • टेम्पलेट फ़ील्ड (जैसे बैश ऑपरेटर में) में, {{ params.my_param }} को टेम्पलेट स्ट्रिंग में उपयोग करें।

उपयोगी लिंक