Sunday, November 24, 2024

Dynamic DAG era with YAML and DAG Manufacturing unit in Amazon MWAA

Amazon Managed Workflow for Apache Airflow (Amazon MWAA) is a managed service that permits you to use a well-recognized Apache Airflow atmosphere with improved scalability, availability, and safety to boost and scale your small business workflows with out the operational burden of managing the underlying infrastructure. In Airflow, Directed Acyclic Graphs (DAGs) are outlined as Python code. Dynamic DAGs seek advice from the flexibility to generate DAGs on the fly throughout runtime, usually primarily based on some exterior situations, configurations, or parameters. Dynamic DAGs lets you create, schedule, and run duties inside a DAG primarily based on information and configurations that will change over time.

There are numerous methods to introduce dynamism in Airflow DAGs (dynamic DAG era) utilizing atmosphere variables and exterior recordsdata. One of many approaches is to make use of the DAG Manufacturing unit YAML primarily based configuration file methodology. This library goals to facilitate the creation and configuration of recent DAGs by utilizing declarative parameters in YAML. It permits default customizations and is open-source, making it easy to create and customise new functionalities.

On this submit, we discover the method of making Dynamic DAGs with YAML recordsdata, utilizing the DAG Manufacturing unit library. Dynamic DAGs provide a number of advantages:

  1. Enhanced code reusability – By structuring DAGs by means of YAML recordsdata, we promote reusable elements, lowering redundancy in your workflow definitions.
  2. Streamlined upkeep – YAML-based DAG era simplifies the method of modifying and updating workflows, guaranteeing smoother upkeep procedures.
  3. Versatile parameterization – With YAML, you may parameterize DAG configurations, facilitating dynamic changes to workflows primarily based on various necessities.
  4. Improved scheduler effectivity – Dynamic DAGs allow extra environment friendly scheduling, optimizing useful resource allocation and enhancing general workflow runs
  5. Enhanced scalability – YAML-driven DAGs permit for parallel runs, enabling scalable workflows able to dealing with elevated workloads effectively.

By harnessing the ability of YAML recordsdata and the DAG Manufacturing unit library, we unleash a flexible method to constructing and managing DAGs, empowering you to create sturdy, scalable, and maintainable information pipelines.

Overview of resolution

On this submit, we are going to use an instance DAG file that’s designed to course of a COVID-19 information set. The workflow course of entails processing an open supply information set provided by WHO-COVID-19-World. After we set up the DAG-Manufacturing unit Python package deal, we create a YAML file that has definitions of assorted duties. We course of the country-specific dying rely by passing Nation as a variable, which creates particular person country-based DAGs.

The next diagram illustrates the general resolution together with information flows inside logical blocks.

Overview of the Solution

Stipulations

For this walkthrough, you must have the next stipulations:

Moreover, full the next steps (run the setup in an AWS Area the place Amazon MWAA is out there):

  1. Create an Amazon MWAA atmosphere (if you happen to don’t have one already). If that is your first time utilizing Amazon MWAA, seek advice from Introducing Amazon Managed Workflows for Apache Airflow (MWAA).

Make sure that the AWS Id and Entry Administration (IAM) person or position used for establishing the atmosphere has IAM insurance policies hooked up for the next permissions:

The entry insurance policies talked about listed below are only for the instance on this submit. In a manufacturing atmosphere, present solely the wanted granular permissions by exercising least privilege rules.

  1. Create an distinctive (inside an account) Amazon S3 bucket identify whereas creating your Amazon MWAA atmosphere, and create folders referred to as dags and necessities.
    Amazon S3 Bucket
  2. Create and add a necessities.txt file with the next content material to the necessities folder. Exchange {environment-version} together with your atmosphere’s model quantity, and {Python-version} with the model of Python that’s suitable together with your atmosphere:
    --constraint "https://uncooked.githubusercontent.com/apache/airflow/constraints-{Airflow-version}/constraints-{Python-version}.txt"
    dag-factory==0.19.0
    pandas==2.1.4

Pandas is required only for the instance use case described on this submit, and dag-factory is the one required plug-in. It is suggested to examine the compatibility of the newest model of dag-factory with Amazon MWAA. The boto and psycopg2-binary libraries are included with the Apache Airflow v2 base set up and don’t must be laid out in your necessities.txt file.

  1. Obtain the WHO-COVID-19-global information file to your native machine and add it beneath the dags prefix of your S3 bucket.

Just remember to are pointing to the newest AWS S3 bucket model of your necessities.txt file for the extra package deal set up to occur. This could usually take between 15 – 20 minutes relying in your atmosphere configuration.

Validate the DAGs

When your Amazon MWAA atmosphere reveals as Accessible on the Amazon MWAA console, navigate to the Airflow UI by selecting Open Airflow UI subsequent to your atmosphere.

Validate the DAG

Confirm the prevailing DAGs by navigating to the DAGs tab.

Verify the DAG

Configure your DAGs

Full the next steps:

  1. Create empty recordsdata named dynamic_dags.yml, example_dag_factory.py and process_s3_data.py in your native machine.
  2. Edit the process_s3_data.py file and reserve it with following code content material, then add the file again to the Amazon S3 bucket dags folder. We’re doing a little primary information processing within the code:
    1. Learn the file from an Amazon S3 location
    2. Rename the Country_code column as acceptable to the nation.
    3. Filter information by the given nation.
    4. Write the processed last information into CSV format and add again to S3 prefix.
import boto3
import pandas as pd
import io
   
def process_s3_data(COUNTRY):
### High degree Variables substitute S3_BUCKET together with your bucket identify ###
    s3 = boto3.consumer('s3')
    S3_BUCKET = "my-mwaa-assets-bucket-sfj33ddkm"
    INPUT_KEY = "dags/WHO-COVID-19-global-data.csv"
    OUTPUT_KEY = "dags/count_death"
### get csv file ###
   response = s3.get_object(Bucket=S3_BUCKET, Key=INPUT_KEY)
   standing = response['ResponseMetadata']['HTTPStatusCode']
   if standing == 200:
### learn csv file and filter primarily based on the nation to jot down again ###
       df = pd.read_csv(response.get("Physique"))
       df.rename(columns={"Country_code": "nation"}, inplace=True)
       filtered_df = df[df['country'] == COUNTRY]
       with io.StringIO() as csv_buffer:
                   filtered_df.to_csv(csv_buffer, index=False)
                   response = s3.put_object(
                       Bucket=S3_BUCKET, Key=OUTPUT_KEY + '_' + COUNTRY + '.csv', Physique=csv_buffer.getvalue()
                   )
       standing = response['ResponseMetadata']['HTTPStatusCode']
       if standing == 200:
           print(f"Profitable S3 put_object response. Standing - {standing}")
       else:
           print(f"Unsuccessful S3 put_object response. Standing - {standing}")
   else:
       print(f"Unsuccessful S3 get_object response. Standing - {standing}")

  1. Edit the dynamic_dags.yml and reserve it with the next code content material, then add the file again to the dags folder. We’re stitching numerous DAGs primarily based on the nation as follows:
    1. Outline the default arguments which can be handed to all DAGs.
    2. Create a DAG definition for particular person nations by passing op_args
    3. Map the process_s3_data perform with python_callable_name.
    4. Use Python Operator to course of csv file information saved in Amazon S3 bucket.
    5. We’ve got set schedule_interval as 10 minutes, however be happy to regulate this worth as wanted.
default:
  default_args:
    proprietor: "airflow"
    start_date: "2024-03-01"
    retries: 1
    retry_delay_sec: 300
  concurrency: 1
  max_active_runs: 1
  dagrun_timeout_sec: 600
  default_view: "tree"
  orientation: "LR"
  schedule_interval: "*/10 * * * *"
 
module3_dynamic_dag_Australia:
  duties:
    task_process_s3_data:
      task_id: process_s3_data
      operator: airflow.operators.python.PythonOperator
      python_callable_name: process_s3_data
      python_callable_file: /usr/native/airflow/dags/process_s3_data.py
      op_args:
        - "Australia"
 
module3_dynamic_dag_Brazil:
  duties:
    task_process_s3_data:
      task_id: process_s3_data
      operator: airflow.operators.python.PythonOperator
      python_callable_name: process_s3_data
      python_callable_file: /usr/native/airflow/dags/process_s3_data.py
      op_args:
        - "Brazil"
 
module3_dynamic_dag_India:
  duties:
    task_process_s3_data:
      task_id: process_s3_data
      operator: airflow.operators.python.PythonOperator
      python_callable_name: process_s3_data
      python_callable_file: /usr/native/airflow/dags/process_s3_data.py
      op_args:
        - "India"
 
module3_dynamic_dag_Japan:
  duties:
    task_process_s3_data:
      task_id: process_s3_data
      operator: airflow.operators.python.PythonOperator
      python_callable_name: process_s3_data
      python_callable_file: /usr/native/airflow/dags/process_s3_data.py
      op_args:
        - "Japan"
 
module3_dynamic_dag_Mexico:
  duties:
    task_process_s3_data:
      task_id: process_s3_data
      operator: airflow.operators.python.PythonOperator
      python_callable_name: process_s3_data
      python_callable_file: /usr/native/airflow/dags/process_s3_data.py
      op_args:
        - "Mexico"
 
module3_dynamic_dag_Russia:
  duties:
    task_process_s3_data:
      task_id: process_s3_data
      operator: airflow.operators.python.PythonOperator
      python_callable_name: process_s3_data
      python_callable_file: /usr/native/airflow/dags/process_s3_data.py
      op_args:
        - "Russia"
 
module3_dynamic_dag_Spain:
  duties:
    task_process_s3_data:
      task_id: process_s3_data
      operator: airflow.operators.python.PythonOperator
      python_callable_name: process_s3_data
      python_callable_file: /usr/native/airflow/dags/process_s3_data.py
      op_args:
        - "Spain"

  1. Edit the file example_dag_factory.py and reserve it with the next code content material, then add the file again to dags folder. The code cleans the prevailing the DAGs and generates clean_dags() methodology and the creating new DAGs utilizing the generate_dags() methodology from the DagFactory occasion.
from airflow import DAG
import dagfactory
  
config_file = "/usr/native/airflow/dags/dynamic_dags.yml"
example_dag_factory = dagfactory.DagFactory(config_file)
  
## to scrub up or delete any current DAGs ##
example_dag_factory.clean_dags(globals())
## generate and create new DAGs ##
example_dag_factory.generate_dags(globals())

  1. After you add the recordsdata, return to the Airflow UI console and navigate to the DAGs tab, the place you’ll find new DAGs.
    List the new DAGs
  2. When you add the recordsdata, return to the Airflow UI console and beneath the DAGs tab you’ll find new DAGs are showing as proven under:DAGs

You’ll be able to allow DAGs by making them energetic and testing them individually. Upon activation, an extra CSV file named count_death_{COUNTRY_CODE}.csv is generated within the dags folder.

Cleansing up

There could also be prices related to utilizing the assorted AWS companies mentioned on this submit. To stop incurring future costs, delete the Amazon MWAA atmosphere after you might have accomplished the duties outlined on this submit, and empty and delete the S3 bucket.

Conclusion

On this weblog submit we demonstrated how one can use the dag-factory library to create dynamic DAGs. Dynamic DAGs are characterised by their means to generate outcomes with every parsing of the DAG file primarily based on configurations. Think about using dynamic DAGs within the following situations:

  • Automating migration from a legacy system to Airflow, the place flexibility in DAG era is essential
  • Conditions the place solely a parameter modifications between completely different DAGs, streamlining the workflow administration course of
  • Managing DAGs which can be reliant on the evolving construction of a supply system, offering adaptability to modifications
  • Establishing standardized practices for DAGs throughout your workforce or group by creating these blueprints, selling consistency and effectivity
  • Embracing YAML-based declarations over complicated Python coding, simplifying DAG configuration and upkeep processes
  • Creating information pushed workflows that adapt and evolve primarily based on the info inputs, enabling environment friendly automation

By incorporating dynamic DAGs into your workflow, you may improve automation, adaptability, and standardization, finally bettering the effectivity and effectiveness of your information pipeline administration.

To study extra about Amazon MWAA DAG Manufacturing unit, go to Amazon MWAA for Analytics Workshop: DAG Manufacturing unit. For added particulars and code examples on Amazon MWAA, go to the Amazon MWAA Consumer Information and the Amazon MWAA examples GitHub repository.


Concerning the Authors

 Jayesh Shinde is Sr. Software Architect with AWS ProServe India. He focuses on creating numerous options which can be cloud centered utilizing trendy software program growth practices like serverless, DevOps, and analytics.

Harshd Yeola is Sr. Cloud Architect with AWS ProServe India serving to clients emigrate and modernize their infrastructure into AWS. He focuses on constructing DevSecOps and scalable infrastructure utilizing containers, AIOPs, and AWS Developer Instruments and companies.

Related Articles

LEAVE A REPLY

Please enter your comment!
Please enter your name here

Latest Articles