Wednesday, November 6, 2024

Construct an end-to-end serverless streaming pipeline with Apache Kafka on Amazon MSK utilizing Python

The quantity of information generated globally continues to surge, from gaming, retail, and finance, to manufacturing, healthcare, and journey. Organizations are searching for extra methods to shortly use the fixed influx of information to innovate for his or her companies and prospects. They must reliably seize, course of, analyze, and cargo the info right into a myriad of information shops, all in actual time.

Apache Kafka is a well-liked alternative for these real-time streaming wants. Nevertheless, it may be difficult to arrange a Kafka cluster together with different information processing elements that scale routinely relying in your software’s wants. You danger under-provisioning for peak visitors, which may result in downtime, or over-provisioning for base load, resulting in wastage. AWS provides a number of serverless providers like Amazon Managed Streaming for Apache Kafka (Amazon MSK), Amazon Information Firehose, Amazon DynamoDB, and AWS Lambda that scale routinely relying in your wants.

On this publish, we clarify how you should use a few of these providers, together with MSK Serverless, to construct a serverless information platform to fulfill your real-time wants.

Answer overview

Let’s think about a state of affairs. You’re liable for managing 1000’s of modems for an web service supplier deployed throughout a number of geographies. You need to monitor the modem connectivity high quality that has a big affect on buyer productiveness and satisfaction. Your deployment consists of totally different modems that should be monitored and maintained to make sure minimal downtime. Every machine transmits 1000’s of 1 KB information each second, comparable to CPU utilization, reminiscence utilization, alarm, and connection standing. You need real-time entry to this information so you possibly can monitor efficiency in actual time, and detect and mitigate points shortly. You additionally want longer-term entry to this information for machine studying (ML) fashions to run predictive upkeep assessments, discover optimization alternatives, and forecast demand.

Your shoppers that collect the info onsite are written in Python, and so they can ship all the info as Apache Kafka matters to Amazon MSK. On your software’s low-latency and real-time information entry, you should use Lambda and DynamoDB. For longer-term information storage, you should use managed serverless connector service Amazon Information Firehose to ship information to your information lake.

The next diagram exhibits how one can construct this end-to-end serverless software.

end-to-end serverless application

Let’s observe the steps within the following sections to implement this structure.

Create a serverless Kafka cluster on Amazon MSK

We use Amazon MSK to ingest real-time telemetry information from modems. Making a serverless Kafka cluster is easy on Amazon MSK. It solely takes a couple of minutes utilizing the AWS Administration Console or AWS SDK. To make use of the console, confer with Getting began utilizing MSK Serverless clusters. You create a serverless cluster, AWS Identification and Entry Administration (IAM) function, and shopper machine.

Create a Kafka subject utilizing Python

When your cluster and shopper machine are prepared, SSH to your shopper machine and set up Kafka Python and the MSK IAM library for Python.

  • Run the next instructions to put in Kafka Python and the MSK IAM library:
pip set up kafka-python

pip set up aws-msk-iam-sasl-signer-python

  • Create a brand new file known as createTopic.py.
  • Copy the next code into this file, changing the bootstrap_servers and area data with the small print to your cluster. For directions on retrieving the bootstrap_servers data to your MSK cluster, see Getting the bootstrap brokers for an Amazon MSK cluster.
from kafka.admin import KafkaAdminClient, NewTopic
from aws_msk_iam_sasl_signer import MSKAuthTokenProvider

# AWS area the place MSK cluster is positioned
area= '<UPDATE_AWS_REGION_NAME_HERE>'

# Class to offer MSK authentication token
class MSKTokenProvider():
    def token(self):
        token, _ = MSKAuthTokenProvider.generate_auth_token(area)
        return token

# Create an occasion of MSKTokenProvider class
tp = MSKTokenProvider()

# Initialize KafkaAdminClient with required configurations
admin_client = KafkaAdminClient(
    bootstrap_servers="<UPDATE_BOOTSTRAP_SERVER_STRING_HERE>",
    security_protocol="SASL_SSL",
    sasl_mechanism='OAUTHBEARER',
    sasl_oauth_token_provider=tp,
    client_id='client1',
)

# create subject
topic_name="mytopic"
topic_list =[NewTopic(name=topic_name, num_partitions=1, replication_factor=2)]
existing_topics = admin_client.list_topics()
if(topic_name not in existing_topics):
    admin_client.create_topics(topic_list)
    print("Subject has been created")
else:
    print("subject already exists!. Listing of matters are:" + str(existing_topics))

  • Run the createTopic.py script to create a brand new Kafka subject known as mytopic in your serverless cluster:

Produce information utilizing Python

Let’s generate some pattern modem telemetry information.

  • Create a brand new file known as kafkaDataGen.py.
  • Copy the next code into this file, updating the BROKERS and area data with the small print to your cluster:
from kafka import KafkaProducer
from aws_msk_iam_sasl_signer import MSKAuthTokenProvider
import json
import random
from datetime import datetime
topicname="mytopic"

BROKERS = '<UPDATE_BOOTSTRAP_SERVER_STRING_HERE>'
area= '<UPDATE_AWS_REGION_NAME_HERE>'
class MSKTokenProvider():
    def token(self):
        token, _ = MSKAuthTokenProvider.generate_auth_token(area)
        return token

tp = MSKTokenProvider()

producer = KafkaProducer(
    bootstrap_servers=BROKERS,
    value_serializer=lambda v: json.dumps(v).encode('utf-8'),
    retry_backoff_ms=500,
    request_timeout_ms=20000,
    security_protocol="SASL_SSL",
    sasl_mechanism='OAUTHBEARER',
    sasl_oauth_token_provider=tp,)

# Methodology to get a random mannequin title
def getModel():
    merchandise=["Ultra WiFi Modem", "Ultra WiFi Booster", "EVG2000", "Sagemcom 5366 TN", "ASUS AX5400"]
    randomnum = random.randint(0, 4)
    return (merchandise[randomnum])

# Methodology to get a random interface standing
def getInterfaceStatus():
    standing=["connected", "connected", "connected", "connected", "connected", "connected", "connected", "connected", "connected", "connected", "connected", "connected", "down", "down"]
    randomnum = random.randint(0, 13)
    return (standing[randomnum])

# Methodology to get a random CPU utilization
def getCPU():
    i = random.randint(50, 100)
    return (str(i))

# Methodology to get a random reminiscence utilization
def getMemory():
    i = random.randint(1000, 1500)
    return (str(i))
    
# Methodology to generate pattern information
def generateData():
    
    mannequin=getModel()
    deviceid='dvc' + str(random.randint(1000, 10000))
    interface="eth4.1"
    interfacestatus=getInterfaceStatus()
    cpuusage=getCPU()
    memoryusage=getMemory()
    now = datetime.now()
    event_time = now.strftime("%Y-%m-%d %H:%M:%S")
    
    modem_data={}
    modem_data["model"]=mannequin
    modem_data["deviceid"]=deviceid
    modem_data["interface"]=interface
    modem_data["interfacestatus"]=interfacestatus
    modem_data["cpuusage"]=cpuusage
    modem_data["memoryusage"]=memoryusage
    modem_data["event_time"]=event_time
    return modem_data

# Constantly generate and ship information
whereas True:
    information =generateData()
    print(information)
    attempt:
        future = producer.ship(topicname, worth=information)
        producer.flush()
        record_metadata = future.get(timeout=10)
        
    besides Exception as e:
        print(e.with_traceback())

  • Run the kafkaDataGen.py to repeatedly generate random information and publish it to the desired Kafka subject:

Retailer occasions in Amazon S3

Now you retailer all of the uncooked occasion information in an Amazon Easy Storage Service (Amazon S3) information lake for analytics. You should use the identical information to coach ML fashions. The integration with Amazon Information Firehose permits Amazon MSK to seamlessly load information out of your Apache Kafka clusters into an S3 information lake. Full the next steps to repeatedly stream information from Kafka to Amazon S3, eliminating the necessity to construct or handle your individual connector functions:

  • On the Amazon S3 console, create a brand new bucket. You may as well use an current bucket.
  • Create a brand new folder in your S3 bucket known as streamingDataLake.
  • On the Amazon MSK console, select your MSK Serverless cluster.
  • On the Actions menu, select Edit cluster coverage.

cluster policy

  • Choose Embrace Firehose service principal and select Save adjustments.

firehose service principal

  • On the S3 supply tab, select Create supply stream.

delivery stream

  • For Supply, select Amazon MSK.
  • For Vacation spot, select Amazon S3.

source and destination

  • For Amazon MSK cluster connectivity, choose Non-public bootstrap brokers.
  • For Subject, enter a subject title (for this publish, mytopic).

source settings

  • For S3 bucket, select Browse and select your S3 bucket.
  • Enter streamingDataLake as your S3 bucket prefix.
  • Enter streamingDataLakeErr as your S3 bucket error output prefix.

destination settings

  • Select Create supply stream.

create delivery stream

You possibly can confirm that the info was written to your S3 bucket. It’s best to see that the streamingDataLake listing was created and the recordsdata are saved in partitions.

amazon s3

Retailer occasions in DynamoDB

For the final step, you retailer the latest modem information in DynamoDB. This enables the shopper software to entry the modem standing and work together with the modem remotely from wherever, with low latency and excessive availability. Lambda seamlessly works with Amazon MSK. Lambda internally polls for brand spanking new messages from the occasion supply after which synchronously invokes the goal Lambda operate. Lambda reads the messages in batches and offers these to your operate as an occasion payload.

Lets first create a desk in DynamoDB. Discuss with DynamoDB API permissions: Actions, sources, and circumstances reference to confirm that your shopper machine has the required permissions.

  • Create a brand new file known as createTable.py.
  • Copy the next code into the file, updating the area data:
import boto3
area='<UPDATE_AWS_REGION_NAME_HERE>'
dynamodb = boto3.shopper('dynamodb', region_name=area)
table_name="device_status"
key_schema = [
    {
        'AttributeName': 'deviceid',
        'KeyType': 'HASH'
    }
]
attribute_definitions = [
    {
        'AttributeName': 'deviceid',
        'AttributeType': 'S'
    }
]
# Create the desk with on-demand capability mode
dynamodb.create_table(
    TableName=table_name,
    KeySchema=key_schema,
    AttributeDefinitions=attribute_definitions,
    BillingMode="PAY_PER_REQUEST"
)
print(f"Desk '{table_name}' created with on-demand capability mode.")

  • Run the createTable.py script to create a desk known as device_status in DynamoDB:

Now let’s configure the Lambda operate.

  • On the Lambda console, select Capabilities within the navigation pane.
  • Select Create operate.
  • Choose Creator from scratch.
  • For Perform title¸ enter a reputation (for instance, my-notification-kafka).
  • For Runtime, select Python 3.11.
  • For Permissions, choose Use an current function and select a job with permissions to learn out of your cluster.
  • Create the operate.

On the Lambda operate configuration web page, now you can configure sources, locations, and your software code.

  • Select Add set off.
  • For Set off configuration, enter MSK to configure Amazon MSK as a set off for the Lambda supply operate.
  • For MSK cluster, enter myCluster.
  • Deselect Activate set off, since you haven’t configured your Lambda operate but.
  • For Batch measurement, enter 100.
  • For Beginning place, select Newest.
  • For Subject title¸ enter a reputation (for instance, mytopic).
  • Select Add.
  • On the Lambda operate particulars web page, on the Code tab, enter the next code:
import base64
import boto3
import json
import os
import random

def convertjson(payload):
    attempt:
        aa=json.masses(payload)
        return aa
    besides:
        return 'err'

def lambda_handler(occasion, context):
    base64records = occasion['records']['mytopic-0']
    
    raw_records = [base64.b64decode(x["value"]).decode('utf-8') for x in base64records]
    
    for report in raw_records:
        merchandise = json.masses(report)
        deviceid=merchandise['deviceid']
        interface=merchandise['interface']
        interfacestatus=merchandise['interfacestatus']
        cpuusage=merchandise['cpuusage']
        memoryusage=merchandise['memoryusage']
        event_time=merchandise['event_time']
        
        dynamodb = boto3.shopper('dynamodb')
        table_name="device_status"
        merchandise = {
            'deviceid': {'S': deviceid},  
            'interface': {'S': interface},               
            'interface': {'S': interface},
            'interfacestatus': {'S': interfacestatus},
            'cpuusage': {'S': cpuusage},          
            'memoryusage': {'S': memoryusage},
            'event_time': {'S': event_time},
        }
        
        # Write the merchandise to the DynamoDB desk
        response = dynamodb.put_item(
            TableName=table_name,
            Merchandise=merchandise
        )
        
        print(f"Merchandise written to DynamoDB")

  • Deploy the Lambda operate.
  • On the Configuration tab, select Edit to edit the set off.

edit trigger

  • Choose the set off, then select Save.
  • On the DynamoDB console, select Discover gadgets within the navigation pane.
  • Choose the desk device_status.

You will note Lambda is writing occasions generated within the Kafka subject to DynamoDB.

ddb table

Abstract

Streaming information pipelines are vital for constructing real-time functions. Nevertheless, establishing and managing the infrastructure may be daunting. On this publish, we walked by means of find out how to construct a serverless streaming pipeline on AWS utilizing Amazon MSK, Lambda, DynamoDB, Amazon Information Firehose, and different providers. The important thing advantages aren’t any servers to handle, automated scalability of the infrastructure, and a pay-as-you-go mannequin utilizing totally managed providers.

Able to construct your individual real-time pipeline? Get began in the present day with a free AWS account. With the ability of serverless, you possibly can focus in your software logic whereas AWS handles the undifferentiated heavy lifting. Let’s construct one thing superior on AWS!


In regards to the Authors

Masudur Rahaman Sayem is a Streaming Information Architect at AWS. He works with AWS prospects globally to design and construct information streaming architectures to resolve real-world enterprise issues. He makes a speciality of optimizing options that use streaming information providers and NoSQL. Sayem could be very captivated with distributed computing.

Michael Oguike is a Product Supervisor for Amazon MSK. He’s captivated with utilizing information to uncover insights that drive motion. He enjoys serving to prospects from a variety of industries enhance their companies utilizing information streaming. Michael additionally loves studying about behavioral science and psychology from books and podcasts.

Related Articles

LEAVE A REPLY

Please enter your comment!
Please enter your name here

Latest Articles