Sunday, July 7, 2024

Mix AWS Glue and Amazon MWAA to construct superior VPC choice and failover methods

AWS Glue is a serverless knowledge integration service that makes it simple to find, put together, transfer, and combine knowledge from a number of sources for analytics, machine studying (ML), and software growth.

AWS Glue prospects typically have to satisfy strict safety necessities, which generally contain locking down the community connectivity allowed to the job, or working inside a particular VPC to entry one other service. To run contained in the VPC, the roles must be assigned to a single subnet, however essentially the most appropriate subnet can change over time (as an illustration, based mostly on the utilization and availability), so you might favor to make that call at runtime, based mostly by yourself technique.

Amazon Managed Workflows for Apache Airflow (Amazon MWAA) is an AWS service to run managed Airflow workflows, which permit writing customized logic to coordinate how duties akin to AWS Glue jobs run.

On this publish, we present learn how to run an AWS Glue job as a part of an Airflow workflow, with dynamic configurable collection of the VPC subnet assigned to the job at runtime.

Answer overview

To run inside a VPC, an AWS Glue job must be assigned at the least a connection that features community configuration. Any connection permits specifying a VPC, subnet, and safety group, however for simplicity, this publish makes use of connections of kind: NETWORK, which simply defines the community configuration and doesn’t contain exterior methods.

If the job has a set subnet assigned by a single connection, in case of a service outage on the Availability Zones or if the subnet isn’t out there for different causes, the job can’t run. Moreover, every node (driver or employee) in an AWS Glue job requires an IP handle assigned from the subnet. When working many massive jobs concurrently, this might result in an IP handle scarcity and the job working with fewer nodes than meant or not working in any respect.

AWS Glue extract, remodel, and cargo (ETL) jobs enable a number of connections to be specified with a number of community configurations. Nonetheless, the job will at all times attempt to use the connections’ community configuration within the order listed and choose the primary one which passes the well being checks and has at the least two IP addresses to get the job began, which could not be the optimum possibility.

With this answer, you possibly can improve and customise that conduct by reordering the connections dynamically and defining the choice precedence. If a retry is required, the connections are reprioritized once more based mostly on the technique, as a result of the circumstances may need modified for the reason that final run.

Because of this, it helps forestall the job from failing to run or working underneath capability resulting from subnet IP handle scarcity and even an outage, whereas assembly the community safety and connectivity necessities.

The next diagram illustrates the answer structure.

Conditions

To observe the steps of the publish, you want a consumer that may log in to the AWS Administration Console and has permission to entry Amazon MWAA, Amazon Digital Non-public Cloud (Amazon VPC), and AWS Glue. The AWS Area the place you select to deploy the answer wants the capability to create a VPC and two elastic IP addresses. The default Regional quota for each kinds of sources is 5, so that you would possibly must request a rise by way of the console.

You additionally want an AWS Identification and Entry Administration (IAM) position appropriate to run AWS Glue jobs in the event you don’t have one already. For directions, consult with Create an IAM position for AWS Glue.

Deploy an Airflow surroundings and VPC

First, you’ll deploy a brand new Airflow surroundings, together with the creation of a brand new VPC with two public subnets and two personal ones. It is because Amazon MWAA requires Availability Zone failure tolerance, so it must run on two subnets on two totally different Availability Zones within the Area. The general public subnets are used so the NAT Gateway can present web entry for the personal subnets.

Full the next steps:

  1. Create an AWS CloudFormation template in your pc by copying the template from the next fast begin information into an area textual content file.
  2. On the AWS CloudFormation console, select Stacks within the navigation pane.
  3. Select Create stack with the choice With new sources (customary).
  4. Select Add a template file and select the native template file.
  5. Select Subsequent.
  6. Full the setup steps, coming into a reputation for the surroundings, and go away the remainder of the parameters as default.
  7. On the final step, acknowledge that sources will likely be created and select Submit.

The creation can take 20–half-hour, till the standing of the stack adjustments to CREATE_COMPLETE.

The useful resource that can take most of time is the Airflow surroundings. Whereas it’s being created, you possibly can proceed with the next steps, till you might be required to open the Airflow UI.

  1. On the stack’s Sources tab, notice the IDs for the VPC and two personal subnets (PrivateSubnet1 and PrivateSubnet2), to make use of within the subsequent step.

Create AWS Glue connections

The CloudFormation template deploys two personal subnets. On this step, you create an AWS Glue connection to every one so AWS Glue jobs can run in them. Amazon MWAA just lately added the capability to run the Airflow cluster on shared VPCs, which reduces price and simplifies community administration. For extra data, consult with Introducing shared VPC help on Amazon MWAA.

Full the next steps to create the connections:

  1. On the AWS Glue console, select Knowledge connections within the navigation pane.
  2. Select Create connection.
  3. Select Community as the info supply.
  4. Select the VPC and personal subnet (PrivateSubnet1) created by the CloudFormation stack.
  5. Use the default safety group.
  6. Select Subsequent.
  7. For the connection identify, enter MWAA-Glue-Weblog-Subnet1.
  8. Evaluate the small print and full the creation.
  9. Repeat these steps utilizing PrivateSubnet2 and identify the connection MWAA-Glue-Weblog-Subnet2.

Create the AWS Glue job

Now you create the AWS Glue job that will likely be triggered later by the Airflow workflow. The job makes use of the connections created within the earlier part, however as an alternative of assigning them straight on the job, as you’d usually do, on this state of affairs you permit the job connections listing empty and let the workflow determine which one to make use of at runtime.

The job script on this case just isn’t vital and is simply meant to show the job ran in one of many subnets, relying on the connection.

  1. On the AWS Glue console, select ETL jobs within the navigation pane, then select Script editor.
  2. Go away the default choices (Spark engine and Begin recent) and select Create script.
  3. Substitute the placeholder script with the next Python code:
    import ipaddress
    import socket
    
    subnets = {
        "PrivateSubnet1": "10.192.20.0/24",
        "PrivateSubnet2": "10.192.21.0/24"
    }
    
    ip = socket.gethostbyname(socket.gethostname())
    subnet_name = "unknown"
    for subnet, cidr in subnets.objects():
        if ipaddress.ip_address(ip) in ipaddress.ip_network(cidr):
            subnet_name = subnet
    
    print(f"The motive force node has been assigned the ip: {ip}"
          + f" which belongs to the subnet: {subnet_name}")
    

  4. Rename the job to AirflowBlogJob.
  5. On the Job particulars tab, for IAM Position, select any position and enter 2 for the variety of staff (only for frugality).
  6. Save these adjustments so the job is created.

Grant AWS Glue permissions to the Airflow surroundings position

The position created for Airflow by the CloudFormation template offers the fundamental permissions to run workflows however to not work together with different providers akin to AWS Glue. In a manufacturing mission, you’d outline your individual templates with these further permissions, however on this publish, for simplicity, you add the extra permissions as an inline coverage. Full the next steps:

  1. On the IAM console, select Roles within the navigation pane.
  2. Find the position created by the template; it’s going to begin with the identify you assigned to the CloudFormation stack after which -MwaaExecutionRole-.
  3. On the position particulars web page, on the Add permissions menu, select Create inline coverage.
  4. Change from Visible to JSON mode and enter the next JSON on the textbox. It assumes that the AWS Glue position you may have follows the conference of beginning with AWSGlueServiceRole. For enhanced safety, you possibly can substitute the wildcard useful resource on the ec2:DescribeSubnets permission with the ARNs of the 2 personal subnets from the CloudFormation stack.
    {
        "Model": "2012-10-17",
        "Assertion": [
            {
                "Effect": "Allow",
                "Action": [
                    "glue:GetConnection"
                ],
                "Useful resource": [
                    "arn:aws:glue:*:*:connection/MWAA-Glue-Blog-Subnet*",
                    "arn:aws:glue:*:*:catalog"
                ]
            },
            {
                "Impact": "Permit",
                "Motion": [
                    "glue:UpdateJob",
                    "glue:GetJob",
                    "glue:StartJobRun",
                    "glue:GetJobRun"
                ],
                "Useful resource": [
                    "arn:aws:glue:*:*:job/AirflowBlogJob",
                    "arn:aws:glue:*:*:job/BlogAirflow"
                ]
            },
            {
                "Impact": "Permit",
                "Motion": [
                    "ec2:DescribeSubnets"
                ],
                "Useful resource": "*"
            },
            {
                "Impact": "Permit",
                "Motion": [
                    "iam:GetRole",
                    "iam:PassRole"
                ],
                "Useful resource": "arn:aws:iam::*:position/service-role/AWSGlueServiceRole*"
            }
        ]
    }
    

  5. Select Subsequent.
  6. Enter GlueRelatedPermissions because the coverage identify and full the creation.

On this instance, we use an ETL script job; for a visible job, as a result of it generates the script mechanically on save, the Airflow position would want permission to write down to the configured script path on Amazon Easy Storage Service (Amazon S3).

Create the Airflow DAG

An Airflow workflow is predicated on a Directed Acyclic Graph (DAG), which is outlined by a Python file that programmatically specifies the totally different duties concerned and its interdependencies. Full the next scripts to create the DAG:

  1. Create an area file named glue_job_dag.py utilizing a textual content editor.

In every of the next steps, we offer a code snippet to enter into the file and an evidence of what’s does.

  1. The next snippet provides the required Python modules imports. The modules are already put in on Airflow; if that weren’t the case, you would want to make use of a necessities.txt file to point to Airflow which modules to put in. It additionally defines the Boto3 purchasers that the code will use later. By default, they’ll use the identical position and Area as Airflow, that’s why you arrange earlier than the position with the extra permissions required.
    import boto3
    from pendulum import datetime, period
    from random import shuffle
    from airflow import DAG
    from airflow.decorators import dag, process
    from airflow.fashions import Variable
    from airflow.suppliers.amazon.aws.operators.glue import GlueJobOperator
    
    glue_client = boto3.consumer('glue')
    ec2 = boto3.consumer('ec2')
    

  2. The next snippet provides three capabilities to implement the connection order technique, which defines learn how to reorder the connections given to determine their precedence. That is simply an instance; you possibly can construct your customized code to implement your individual logic, as per your wants. The code first checks the IPs out there on every connection subnet and separates those which have sufficient IPs out there to run the job at full capability and those who might be used as a result of they’ve at the least two IPs out there, which is the minimal a job wants to start out. If the technique is ready to random, it’s going to randomize the order inside every of the connection teams beforehand described and add every other connections. If the technique is capability, it’s going to organize them from most IPs free to fewest.
    def get_available_ips_from_connection(glue_connection_name):
        conn_response = glue_client.get_connection(Identify=glue_connection_name)
        connection_properties = conn_response['Connection']['PhysicalConnectionRequirements']
        subnet_id = connection_properties['SubnetId']
        subnet_response = ec2.describe_subnets(SubnetIds=[subnet_id])
        return subnet_response['Subnets'][0]['AvailableIpAddressCount']
    
    def get_connections_free_ips(glue_connection_names, num_workers):
        good_connections = []
        usable_connections = []    
        for connection_name in glue_connection_names:
            attempt:
                available_ips = get_available_ips_from_connection(connection_name)
                # Precedence to connections that may maintain the complete cluster and we have not simply tried
                if available_ips >= num_workers:
                    good_connections.append((connection_name, available_ips))
                elif available_ips >= 2: # The naked minimal to start out a Glue job
                    usable_connections.append((connection_name, available_ips))                
            besides Exception as e:
                print(f"[WARNING] Didn't verify the free ips for:{connection_name}, will skip. Exception: {e}")  
        return good_connections, usable_connections
    
    def prioritize_connections(connection_list, num_workers, technique):
        (good_connections, usable_connections) = get_connections_free_ips(connection_list, num_workers)
        print(f"Good connections: {good_connections}")
        print(f"Usable connections: {usable_connections}")
        all_conn = []
        if technique=="random":
            shuffle(good_connections)
            shuffle(usable_connections)
            # Good connections have precedence
            all_conn = good_connections + usable_connections
        elif technique=="capability":
            # We are able to kind each on the similar time
            all_conn = good_connections + usable_connections
            all_conn.kind(key=lambda x: -x[1])
        else: 
            elevate ValueError(f"Unknown technique specified: {technique}")    
        outcome = [c[0] for c in all_conn] # Simply want the identify
        # Preserve on the finish every other connections that would not be checked for ips
        outcome += [c for c in connection_list if c not in result]
        return outcome
    

  3. The next code creates the DAG itself with the run job process, which updates the job with the connection order outlined by the technique, runs it, and waits for the outcomes. The job identify, connections, and technique come from Airflow variables, so it may be simply configured and up to date. It has two retries with exponential backoff configured, so if the duties fails, it’s going to repeat the complete process together with the connection choice. Perhaps now the only option is one other connection, or the subnet beforehand picked randomly is in an Availability Zone that’s at the moment struggling an outage, and by choosing a distinct one, it could recuperate.
    with DAG(
        dag_id='glue_job_dag',
        schedule_interval=None, # Run on demand solely
        start_date=datetime(2000, 1, 1), # A begin date is required
        max_active_runs=1,
        catchup=False
    ) as glue_dag:
        
        @process(
            task_id="glue_task", 
            retries=2,
            retry_delay=period(seconds = 30),
            retry_exponential_backoff=True
        )
        def run_job_task(**ctx):    
            glue_connections = Variable.get("glue_job_dag.glue_connections").strip().break up(',')
            glue_jobname = Variable.get("glue_job_dag.glue_job_name").strip()
            technique= Variable.get('glue_job_dag.technique', 'random') # random or capability
            print(f"Connections out there: {glue_connections}")
            print(f"Glue job identify: {glue_jobname}")
            print(f"Technique to make use of: {technique}")
            job_props = glue_client.get_job(JobName=glue_jobname)['Job']            
            num_workers = job_props['NumberOfWorkers']
            
            glue_connections = prioritize_connections(glue_connections, num_workers, technique)
            print(f"Operating Glue job with the connection order: {glue_connections}")
            existing_connections = job_props.get('Connections',{}).get('Connections', [])
            # Protect different connections that we do not handle
            other_connections = [con for con in existing_connections if con not in glue_connections]
            job_props['Connections'] = {"Connections": glue_connections + other_connections}
            # Clear up properties so we will reuse the dict for the replace request
            for prop_name in ['Name', 'CreatedOn', 'LastModifiedOn', 'AllocatedCapacity', 'MaxCapacity']:
                del job_props[prop_name]
    
            GlueJobOperator(
                task_id='submit_job',
                job_name=glue_jobname,
                iam_role_name=job_props['Role'].break up('/')[-1],
                update_config=True,
                create_job_kwargs=job_props,
                wait_for_completion=True
            ).execute(ctx)   
            
        run_job_task()
    

Create the Airflow workflow

Now you create a workflow that invokes the AWS Glue job you simply created:

  1. On the Amazon S3 console, find the bucket created by the CloudFormation template, which may have a reputation beginning with the identify of the stack after which -environmentbucket- (for instance, myairflowstack-environmentbucket-ap1qks3nvvr4).
  2. Inside that bucket, create a folder referred to as dags, and inside that folder, add the DAG file glue_job_dag.py that you just created within the earlier part.
  3. On the Amazon MWAA console, navigate to the surroundings you deployed with the CloudFormation stack.

If the standing just isn’t but Out there, wait till it reaches that state. It shouldn’t take longer than half-hour because you deployed the CloudFormation stack.

  1. Select the surroundings hyperlink on the desk to see the surroundings particulars.

It’s configured to select up DAGs from the bucket and folder you used within the earlier steps. Airflow will monitor that folder for adjustments.

  1. Select Open Airflow UI to open a brand new tab accessing the Airflow UI, utilizing the built-in IAM safety to log you in.

If there’s any subject with the DAG file you created, it’s going to show an error on high of the web page indicating the strains affected. In that case, overview the steps and add once more. After a number of seconds, it’s going to parse it and replace or take away the error banner.

  1. On the Admin menu, select Variables.
  2. Add three variables with the next keys and values:
    1. Key glue_job_dag.glue_connections with worth MWAA-Glue-Weblog-Subnet1,MWAA-Glue-Weblog-Subnet2.
    2. Key glue_job_dag.glue_job_name with worth AirflowBlogJob.
    3. Key glue_job_dag.technique with worth capability.

Run the job with a dynamic subnet task

Now you’re able to run the workflow and see the technique dynamically reordering the connections.

  1. On the Airflow UI, select DAGs, and on the row glue_job_dag, select the play icon.
  2. On the Browse menu, select Job cases.
  3. On the cases desk, scroll proper to show the Log Url and select the icon on it to open the log.

The log will replace as the duty runs; you possibly can find the road beginning with “Operating Glue job with the connection order:” and the earlier strains exhibiting particulars of the connection IPs and the class assigned. If an error happens, you’ll see the small print on this log.

  1. On the AWS Glue console, select ETL jobs within the navigation pane, then select the job AirflowBlogJob.
  2. On the Runs tab, select the run occasion, then the Output logs hyperlink, which can open a brand new tab.
  3. On the brand new tab, use the log stream hyperlink to open it.

It can show the IP that the driving force was assigned and which subnet it belongs to, which ought to match the connection indicated by Airflow (if the log just isn’t displayed, select Resume so it will get up to date as quickly because it’s out there).

  1. On the Airflow UI, edit the Airflow variable glue_job_dag.technique to set it to random.
  2. Run the DAG a number of occasions and see how the ordering adjustments.

Clear up

In case you now not want the deployment, delete the sources to keep away from any additional expenses:

  1. Delete the Python script you uploaded, so the S3 bucket could be mechanically deleted within the subsequent step.
  2. Delete the CloudFormation stack.
  3. Delete the AWS Glue job.
  4. Delete the script that the job saved in Amazon S3.
  5. Delete the connections you created as a part of this publish.

Conclusion

On this publish, we confirmed how AWS Glue and Amazon MWAA can work collectively to construct extra superior customized workflows, whereas minimizing the operational and administration overhead. This answer provides you extra management about how your AWS Glue job runs to satisfy particular operational, community, or safety necessities.

You’ll be able to deploy your individual Amazon MWAA surroundings in a number of methods, akin to with the template used on this publish, on the Amazon MWAA console, or utilizing the AWS CLI. It’s also possible to implement your individual methods to orchestrate AWS Glue jobs, based mostly in your community structure and necessities (as an illustration, to run the job nearer to the info when attainable).


Concerning the authors

Michael Greenshtein is an Analytics Specialist Options Architect for the Public Sector.

Gonzalo Herreros is a Senior Large Knowledge Architect on the AWS Glue staff.

Related Articles

LEAVE A REPLY

Please enter your comment!
Please enter your name here

Latest Articles