Saturday, July 6, 2024

Introducing Amazon MWAA assist for Apache Airflow model 2.8.1

Amazon Managed Workflows for Apache Airflow (Amazon MWAA) is a managed orchestration service for Apache Airflow that makes it simple to arrange and function end-to-end knowledge pipelines within the cloud.

Organizations use Amazon MWAA to reinforce their enterprise workflows. For instance, C2i Genomics makes use of Amazon MWAA of their knowledge platform to orchestrate the validation of algorithms processing most cancers genomics knowledge in billions of information. Twitch, a reside streaming platform, manages and orchestrates the coaching and deployment of its advice fashions for over 140 million lively customers. They use Amazon MWAA to scale, whereas considerably enhancing safety and decreasing infrastructure administration overhead.

Right this moment, we’re asserting the supply of Apache Airflow model 2.8.1 environments on Amazon MWAA. On this put up, we stroll you thru a few of the new options and capabilities of Airflow now out there in Amazon MWAA, and how one can arrange or improve your Amazon MWAA surroundings to model 2.8.1.

Object storage

As knowledge pipelines scale, engineers wrestle to handle storage throughout a number of techniques with distinctive APIs, authentication strategies, and conventions for accessing knowledge, requiring customized logic and storage-specific operators. Airflow now provides a unified object storage abstraction layer that handles these particulars, letting engineers give attention to their knowledge pipelines. Airflow object storage makes use of fsspec to allow constant knowledge entry code throughout completely different object storage techniques, thereby streamlining infrastructure complexity.

The next are a few of the characteristic’s key advantages:

  • Transportable workflows – You’ll be able to change storage companies with minimal adjustments in your Directed Acyclic Graphs (DAGs)
  • Environment friendly knowledge transfers – You’ll be able to stream knowledge as a substitute of loading into reminiscence
  • Decreased upkeep – You don’t want separate operators, making your pipelines simple to keep up
  • Acquainted programming expertise – You need to use Python modules, like shutil, for file operations

To make use of object storage with Amazon Easy Storage Service (Amazon S3), it is advisable set up the bundle further s3fs with the Amazon supplier (apache-airflow-providers-amazon[s3fs]==x.x.x).

Within the pattern code beneath, you may see methods to transfer knowledge instantly from Google Cloud Storage to Amazon S3. As a result of Airflow’s object storage makes use of shutil.copyfileobj, the objects’ knowledge is learn in chunks from gcs_data_source and streamed to amazon_s3_data_target.

gcs_data_source = ObjectStoragePath("gcs://source-bucket/prefix/", conn_id="google_cloud_default")

amazon_s3_data_target = ObjectStoragePath("s3://target-bucket/prefix/", conn_id="aws_default ")

with DAG(
    dag_id="copy_from_gcs_to_amazon_s3",
    start_date=datetime(2024, 2, 26),
    schedule="0 0 * * *",
    catchup=False,    
    tags=["2.8", "ObjectStorage"],
) as dag:

    def list_objects(path: ObjectStoragePath) -> checklist[ObjectStoragePath]:
        objects = [f for f in path.iterdir() if f.is_file()]
        return objects

    def copy_object(path: ObjectStoragePath, object: ObjectStoragePath):    
        object.copy(dst=path)

    objects_list = list_objects(path=gcs_data_source)
    copy_object.partial(path=amazon_s3_data_target).broaden(object=objects_list)

For extra data on Airflow object storage, consult with Object Storage.

XCom UI

XCom (cross-communications) permits for the passing of knowledge between duties, facilitating communication and coordination between them. Beforehand, builders needed to change to a diffferent view to see XComs associated to a job. With Airflow 2.8, XCom key-values are rendered instantly on a tab inside the Airflow Grid view, as proven within the following screenshot.

The brand new XCom tab supplies the next advantages:

  • Improved XCom visibility – A devoted tab within the UI supplies a handy and user-friendly strategy to see all XComs related to a DAG or job.
  • Improved debugging – With the ability to see XCom values instantly within the UI is useful for debugging DAGs. You’ll be able to rapidly see the output of upstream duties without having to manually pull and examine them utilizing Python code.

Job context logger

Managing job lifecycles is essential for the graceful operation of knowledge pipelines in Airflow. Nonetheless, sure challenges have continued, significantly in eventualities the place duties are unexpectedly stopped. This will happen as a consequence of numerous causes, together with scheduler timeouts, zombie duties (duties that stay in a working state with out sending heartbeats), or situations the place the employee runs out of reminiscence.

Historically, such failures, significantly these triggered by core Airflow elements just like the scheduler or executor, weren’t recorded inside the job logs. This limitation required customers to troubleshoot exterior the Airflow UI, complicating the method of pinpointing and resolving points.

Airflow 2.8 launched a big enchancment that addresses this downside. Airflow elements, together with the scheduler and executor, can now use the brand new TaskContextLogger to ahead error messages on to the duty logs. This characteristic permits you to see all of the related error messages associated to a job’s run in a single place. This simplifies the method of determining why a job failed, providing a whole perspective of what went improper inside a single log view.

The next screenshot reveals how the duty is detected as zombie, and the scheduler log is being included as a part of the duty log.

You want to set the surroundings configuration parameter enable_task_context_logger to True, to allow the characteristic. As soon as it’s enabled, Airflow can ship logs from the scheduler, the executor, or callback run context to the duty logs, and make them out there within the Airflow UI.

Listener hooks for datasets

Datasets had been launched in Airflow 2.4 as a logical grouping of knowledge sources to create data-aware scheduling and dependencies between DAGs. For instance, you may schedule a shopper DAG to run when a producer DAG updates a dataset. Listeners allow Airflow customers to create subscriptions to sure occasions taking place within the surroundings. In Airflow 2.8, listeners are added for 2 datasets occasions: on_dataset_created and on_dataset_changed, successfully permitting Airflow customers to put in writing customized code to react to dataset administration operations. For instance, you may set off an exterior system, or ship a notification.

Utilizing listener hooks for datasets is simple. Full the next steps to create a listener for on_dataset_changed:

  1. Create the listener (dataset_listener.py):
    from airflow import Dataset
    from airflow.listeners import hookimpl
    
    @hookimpl
    def on_dataset_changed(dataset: Dataset):
        """Following customized code is executed when a dataset is modified."""
        print("Invoking exterior endpoint")
    
        """Validating a particular dataset"""
        if dataset.uri == "s3://bucket-prefix/object-key.ext":
            print ("Execute particular/completely different motion for this dataset")

  2. Create a plugin to register the listener in your Airflow surroundings (dataset_listener_plugin.py):
    from airflow.plugins_manager import AirflowPlugin
    from plugins import listener_code
    
    class DatasetListenerPlugin(AirflowPlugin):
        title = "dataset_listener_plugin"
        listeners = [dataset_listener]

For extra data on methods to set up plugins in Amazon MWAA, consult with Putting in customized plugins.

Arrange a brand new Airflow 2.8.1 surroundings in Amazon MWAA

You’ll be able to provoke the setup in your account and most popular Area utilizing the AWS Administration Console, API, or AWS Command Line Interface (AWS CLI). In the event you’re adopting infrastructure as code (IaC), you may automate the setup utilizing AWS CloudFormation, the AWS Cloud Improvement Equipment (AWS CDK), or Terraform scripts.

Upon profitable creation of an Airflow model 2.8.1 surroundings in Amazon MWAA, sure packages are routinely put in on the scheduler and employee nodes. For an entire checklist of put in packages and their variations, consult with Apache Airflow supplier packages put in on Amazon MWAA environments. You’ll be able to set up extra packages utilizing a necessities file.

Improve from older variations of Airflow to model 2.8.1

You’ll be able to benefit from these newest capabilities by upgrading your older Airflow model 2.x-based environments to model 2.8.1 utilizing in-place model upgrades. To study extra about in-place model upgrades, consult with Upgrading the Apache Airflow model or Introducing in-place model upgrades with Amazon MWAA.

Conclusion

On this put up, we mentioned some necessary options launched in Airflow model 2.8, equivalent to object storage, the brand new XCom tab added to the grid view, job context logging, listener hooks for datasets, and how one can begin utilizing them. We additionally supplied some pattern code to point out implementations in Amazon MWAA. For the whole checklist of adjustments, consult with Airflow’s launch notes.

For extra particulars and code examples on Amazon MWAA, go to the Amazon MWAA Person Information and the Amazon MWAA examples GitHub repo.

Apache, Apache Airflow, and Airflow are both registered emblems or emblems of the Apache Software program Basis in the US and/or different international locations.


Concerning the Authors

Mansi Bhutada is an ISV Options Architect based mostly within the Netherlands. She helps clients design and implement well-architected options in AWS that handle their enterprise issues. She is keen about knowledge analytics and networking. Past work, she enjoys experimenting with meals, enjoying pickleball, and diving into enjoyable board video games.

Hernan Garcia is a Senior Options Architect at AWS based mostly within the Netherlands. He works within the monetary companies trade, supporting enterprises of their cloud adoption. He’s keen about serverless applied sciences, safety, and compliance. He enjoys spending time with household and mates, and making an attempt out new dishes from completely different cuisines.

Related Articles

LEAVE A REPLY

Please enter your comment!
Please enter your name here

Latest Articles