Tuesday, July 2, 2024

Use AWS Glue ETL to carry out merge, partition evolution, and schema evolution on Apache Iceberg

As enterprises gather growing quantities of knowledge from numerous sources, the construction and group of that knowledge typically want to vary over time to satisfy evolving analytical wants. Nonetheless, altering schema and desk partitions in conventional knowledge lakes generally is a disruptive and time-consuming process, requiring renaming or recreating complete tables and reprocessing giant datasets. This hampers agility and time to perception.

Schema evolution permits including, deleting, renaming, or modifying columns without having to rewrite current knowledge. That is important for fast-moving enterprises to enhance knowledge constructions to help new use circumstances. For instance, an ecommerce firm could add new buyer demographic attributes or order standing flags to complement analytics. Apache Iceberg manages these schema adjustments in a backward-compatible means by way of its progressive metadata desk evolution structure.

Equally, partition evolution permits seamless including, dropping, or splitting partitions. For example, an ecommerce market could initially partition order knowledge by day. As orders accumulate, and querying by day turns into inefficient, they could break up to day and buyer ID partitions. Desk partitioning organizes huge datasets most effectively for question efficiency. Iceberg provides enterprises the flexibleness to incrementally regulate partitions slightly than requiring tedious rebuild procedures. New partitions will be added in a completely suitable means with out downtime or having to rewrite current knowledge information.

This submit demonstrates how one can harness Iceberg, Amazon Easy Storage Service (Amazon S3), AWS Glue, AWS Lake Formation, and AWS Identification and Entry Administration (IAM) to implement a transactional knowledge lake supporting seamless evolution. By permitting for painless schema and partition changes as knowledge insights evolve, you’ll be able to profit from the future-proof flexibility wanted for enterprise success.

Overview of resolution

For our instance use case, a fictional giant ecommerce firm processes hundreds of orders every day. When orders are acquired, up to date, cancelled, shipped, delivered, or returned, the adjustments are made of their on-premises system, and people adjustments have to be replicated to an S3 knowledge lake in order that knowledge analysts can run queries by way of Amazon Athena. The adjustments can include schema updates as effectively. As a result of safety necessities of various organizations, they should handle fine-grained entry management for the analysts by way of Lake Formation.

The next diagram illustrates the answer structure.

The answer workflow consists of the next key steps:

  1. Ingest knowledge from on premises right into a Dropzone location utilizing an information ingestion pipeline.
  2. Merge the information from the Dropzone location into Iceberg utilizing AWS Glue.
  3. Question the information utilizing Athena.

Conditions

For this walkthrough, it’s best to have the next conditions:

Arrange the infrastructure with AWS CloudFormation

To create your infrastructure with an AWS CloudFormation template, full the next steps:

  1. Log in as an administrator to your AWS account.
  2. Open the AWS CloudFormation console.
  3. Select Launch Stack:
  4. For Stack title, enter a reputation (for this submit, icebergdemo1).
  5. Select Subsequent.
  6. Present info for the next parameters:
    1. DatalakeUserName
    2. DatalakeUserPassword
    3. DatabaseName
    4. TableName
    5. DatabaseLFTagKey
    6. DatabaseLFTagValue
    7. TableLFTagKey
    8. TableLFTagValue
  7. Select Subsequent.
  8. Select Subsequent once more.
  9. Within the Assessment part, assessment the values you entered.
  10. Choose I acknowledge that AWS CloudFormation would possibly create IAM assets with customized names and select Submit.

In a couple of minutes, the stack standing will change to CREATE_COMPLETE.

You may go to the Outputs tab of the stack to see all of the assets it has provisioned. The assets are prefixed with the stack title you offered (for this submit, icebergdemo1).

Create an Iceberg desk utilizing Lambda and grant entry utilizing Lake Formation

To create an Iceberg desk and grant entry on it, full the next steps:

  1. Navigate to the Assets tab of the CloudFormation stack icebergdemo1 and seek for logical ID named LambdaFunctionIceberg.
  2. Select the hyperlink of the related bodily ID.

You’re redirected to the Lambda operate icebergdemo1-Lambda-Create-Iceberg-and-Grant-access.

  1. On the Configuration tab, select Atmosphere variables within the left pane.
  1. On the Code tab, you’ll be able to examine the operate code.

The operate makes use of the AWS SDK for Python (Boto3) APIs to provision the assets. It assumes the provisioned knowledge lake admin function to carry out the next duties:

  • Grant DATA_LOCATION_ACCESS entry to the information lake admin function on the registered knowledge lake location
  • Create Lake Formation Tags (LF-Tags)
  • Create a database within the AWS Glue Knowledge Catalog utilizing the AWS Glue create_database API
  • Assign LF-Tags to the database
  • Grant DESCRIBE entry on the database utilizing LF-Tags to the information lake IAM consumer and AWS Glue ETL IAM function
  • Create an Iceberg desk utilizing the AWS Glue create_table API:
response_create_table = glue_client.create_table(
DatabaseName="icebergdb1",
OpenTableFormatInput= { 
 'IcebergInput': { 
 'MetadataOperation': 'CREATE',
 'Model': '2'
 }
},
TableInput={
    'Identify': ‘ecomorders’,
    'StorageDescriptor': {
        'Columns': [
            {'Name': 'ordernum', 'Type': 'int'},
            {'Name': 'sku', 'Type': 'string'},
            {'Name': 'quantity','Type': 'int'},
            {'Name': 'category','Type': 'string'},
            {'Name': 'status','Type': 'string'},
            {'Name': 'shipping_id','Type': 'string'}
        ],  
        'Location': 's3://icebergdemo1-s3bucketiceberg-vthvwwblrwe8/iceberg/'
    },
    'TableType': 'EXTERNAL_TABLE'
    }
)

  • Assign LF-Tags to the desk
  • Grant DESCRIBE and SELECT on the Iceberg desk LF-Tags for the information lake IAM consumer
  • Grant ALL, DESCRIBE, SELECT, INSERT, DELETE, and ALTER entry on the Iceberg desk LF-Tags to the AWS Glue ETL IAM function
  1. On the Check tab, select Check to run the operate.

When the operate is full, you will notice the message “Executing operate: succeeded.”

Lake Formation helps you centrally handle, safe, and globally share knowledge for analytics and machine studying. With Lake Formation, you’ll be able to handle fine-grained entry management in your knowledge lake knowledge on Amazon S3 and its metadata within the Knowledge Catalog.

So as to add an Amazon S3 location as Iceberg storage in your knowledge lake, register the placement with Lake Formation. You may then use Lake Formation permissions for fine-grained entry management to the Knowledge Catalog objects that time to this location, and to the underlying knowledge within the location.

The CloudFormation stack registered the information lake location.

Knowledge location permissions in Lake Formation allow principals to create and alter Knowledge Catalog assets that time to the designated registered Amazon S3 areas. Knowledge location permissions work along with Lake Formation knowledge permissions to safe info in your knowledge lake.

Lake Formation tag-based entry management (LF-TBAC) is an authorization technique that defines permissions primarily based on attributes. In Lake Formation, these attributes are referred to as LF-Tags. You may connect LF-Tags to Knowledge Catalog assets, Lake Formation principals, and desk columns. You may assign and revoke permissions on Lake Formation assets utilizing these LF-Tags. Lake Formation permits operations on these assets when the principal’s tag matches the useful resource tag.

Confirm the Iceberg desk from the Lake Formation console

To confirm the Iceberg desk, full the next steps:

  1. On the Lake Formation console, select Databases within the navigation pane.
  2. Open the small print web page for icebergdb1.

You may see the related database LF-Tags.

  1. Select Tables within the navigation pane.
  2. Open the small print web page for ecomorders.

Within the Desk particulars part, you’ll be able to observe the next:

  • Desk format reveals as Apache Iceberg
  • Desk administration reveals as Managed by Knowledge Catalog
  • Location lists the information lake location of the Iceberg desk

Within the LF-Tags part, you’ll be able to see the related desk LF-Tags.

Within the Desk particulars part, broaden Superior desk properties to view the next:

  • metadata_location factors to the placement of the Iceberg desk’s metadata file
  • table_type reveals as ICEBERG

On the Schema tab, you’ll be able to view the columns outlined on the Iceberg desk.

Combine Iceberg with the AWS Glue Knowledge Catalog and Amazon S3

Iceberg tracks particular person knowledge information in a desk as a substitute of directories. When there may be an express commit on the desk, Iceberg creates knowledge information and provides them to the desk. Iceberg maintains the desk state in metadata information. Any change in desk state creates a brand new metadata file that atomically replaces the older metadata. Metadata information observe the desk schema, partitioning configuration, and different properties.

Iceberg requires file methods that help the operations to be suitable with object shops like Amazon S3.

Iceberg creates snapshots for the desk contents. Every snapshot is a whole set of knowledge information within the desk at a cut-off date. Knowledge information in snapshots are saved in a number of manifest information that include a row for every knowledge file within the desk, its partition knowledge, and its metrics.

The next diagram illustrates this hierarchy.

While you create an Iceberg desk, it creates the metadata folder first and a metadata file within the metadata folder. The info folder is created if you load knowledge into the Iceberg desk.

Contents of the Iceberg metadata file

The Iceberg metadata file incorporates plenty of info, together with the next:

  • format-version –Model of the Iceberg desk
  • Location – Amazon S3 location of the desk
  • Schemas – Identify and knowledge sort of all columns on the desk
  • partition-specs – Partitioned columns
  • sort-orders – Type order of columns
  • properties – Desk properties
  • current-snapshot-id – Present snapshot
  • refs – Desk references
  • snapshots – Checklist of snapshots, every containing the next info:
    • sequence-number – Sequence variety of snapshots in chronological order (the best quantity represents the present snapshot, 1 for the primary snapshot)
    • snapshot-id – Snapshot ID
    • timestamp-ms – Timestamp when the snapshot was dedicated
    • abstract – Abstract of adjustments dedicated
    • manifest-list – Checklist of manifests; this file title begins with snap-< snapshot-id >
  • schema-id – Sequence variety of the schema in chronological order (the best quantity represents the present schema)
  • snapshot-log – Checklist of snapshots in chronological order
  • metadata-log – Checklist of metadata information in chronological order

The metadata file has all of the historic adjustments to the desk’s knowledge and schema. Reviewing the contents on the metafile file immediately generally is a time-consuming process. Fortuitously, you’ll be able to question the Iceberg metadata utilizing Athena.

Iceberg framework in AWS Glue

AWS Glue 4.0 helps Iceberg tables registered with Lake Formation. Within the AWS Glue ETL jobs, you want the next code to allow the Iceberg framework:

from awsglue.context import GlueContext
from pyspark.context import SparkContext
from pyspark.conf import SparkConf
aws_account_id = boto3.consumer('sts').get_caller_identity().get('Account')

args = getResolvedOptions(sys.argv, ['JOB_NAME','warehouse_path']
    
# Arrange configuration for AWS Glue to work with Apache Iceberg
conf = SparkConf()
conf.set("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
conf.set("spark.sql.catalog.glue_catalog", "org.apache.iceberg.spark.SparkCatalog")
conf.set("spark.sql.catalog.glue_catalog.warehouse", args['warehouse_path'])
conf.set("spark.sql.catalog.glue_catalog.catalog-impl", "org.apache.iceberg.aws.glue.GlueCatalog")
conf.set("spark.sql.catalog.glue_catalog.io-impl", "org.apache.iceberg.aws.s3.S3FileIO")
conf.set("spark.sql.catalog.glue_catalog.glue.lakeformation-enabled", "true")
conf.set("spark.sql.catalog.glue_catalog.glue.id", aws_account_id)

sc = SparkContext(conf=conf)
glueContext = GlueContext(sc)
spark = glueContext.spark_session

For learn/write entry to underlying knowledge, along with Lake Formation permissions, the AWS Glue IAM function to run the AWS Glue ETL jobs was granted lakeformation: GetDataAccess IAM permission. With this permission, Lake Formation grants the request for non permanent credentials to entry the information.

The CloudFormation stack provisioned the 4 AWS Glue ETL jobs for you. The title of every job begins along with your stack title (icebergdemo1). Full the next steps to view the roles:

  1. Log in as an administrator to your AWS account.
  2. On the AWS Glue console, select ETL jobs within the navigation pane.
  3. Seek for jobs with icebergdemo1 within the title.

Merge knowledge from Dropzone into the Iceberg desk

For our use case, the corporate ingests their ecommerce orders knowledge every day from their on-premises location into an Amazon S3 Dropzone location. The CloudFormation stack loaded three information with pattern orders for 3 days, as proven within the following figures. You see the information within the Dropzone location s3://icebergdemo1-s3bucketdropzone-kunftrcblhsk/knowledge.

The AWS Glue ETL job icebergdemo1-GlueETL1-merge will run every day to merge the information into the Iceberg desk. It has the next logic so as to add or replace the information on Iceberg:

  • Create a Spark DataFrame from enter knowledge:
df = spark.learn.format(dropzone_dataformat).possibility("header", True).load(dropzone_path)
df = df.withColumn("ordernum", df["ordernum"].solid(IntegerType())) 
    .withColumn("amount", df["quantity"].solid(IntegerType()))
df.createOrReplaceTempView("input_table")

  • For a brand new order, add it to the desk
  • If the desk has an identical order, replace the standing and shipping_id:
stmt_merge = f"""
    MERGE INTO glue_catalog.{database_name}.{table_name} AS t
    USING input_table AS s 
    ON t.ordernum= s.ordernum
    WHEN MATCHED 
            THEN UPDATE SET 
                t.standing = s.standing,
                t.shipping_id = s.shipping_id
    WHEN NOT MATCHED THEN INSERT *
    """
spark.sql(stmt_merge)

Full the next steps to run the AWS Glue merge job:

  1. On the AWS Glue console, select ETL jobs within the navigation pane.
  2. Choose the ETL job icebergdemo1-GlueETL1-merge.
  3. On the Actions dropdown menu, select Run with parameters.
  4. On the Run parameters web page, go to Job parameters.
  5. For the --dropzone_path parameter, present the S3 location of the enter knowledge (icebergdemo1-s3bucketdropzone-kunftrcblhsk/knowledge/merge1).
  6. Run the job so as to add all of the orders: 1001, 1002, 1003, and 1004.
  7. For the --dropzone_path parameter, change the S3 location to icebergdemo1-s3bucketdropzone-kunftrcblhsk/knowledge/merge2.
  8. Run the job once more so as to add orders 2001 and 2002, and replace orders 1001, 1002, and 1003.
  9. For the --dropzone_path parameter, change the S3 location to icebergdemo1-s3bucketdropzone-kunftrcblhsk/knowledge/merge3.
  10. Run the job once more so as to add order 3001 and replace orders 1001, 1003, 2001, and 2002.

Go to the information folder of desk to see the information information written by Iceberg if you merged the information into the desk utilizing the Glue ETL job icebergdemo1-GlueETL1-merge.

Question Iceberg utilizing Athena

The CloudFormation stack created the IAM consumer iceberguser1, which has learn entry on the Iceberg desk utilizing LF-Tags. To question Iceberg utilizing Athena through this consumer, full the next steps:

  1. Log in as iceberguser1 to the AWS Administration Console.
  2. On the Athena console, select Workgroups within the navigation pane.
  3. Find the workgroup that CloudFormation provisioned (icebergdemo1-workgroup)
  4. Confirm Athena engine model 3.

The Athena engine model 3 helps Iceberg file codecs, together with Parquet, ORC, and Avro.

  1. Go to the Athena question editor.
  2. Select the workgroup icebergdemo1-workgroup on the dropdown menu.
  3. For Database, select icebergdb1. You will note the desk ecomorders.
  4. Run the next question to see the information within the Iceberg desk:
    SELECT * FROM "icebergdb1"."ecomorders" ORDER BY ordernum ;

  5. Run the next question to see desk’s present partitions:
    DESCRIBE icebergdb1.ecomorders ;

Partition-spec describes how desk is partitioned. On this instance, there aren’t any partitioned fields since you didn’t outline any partitions on the desk.

Iceberg partition evolution

Chances are you’ll want to vary your partition construction; for instance, as a consequence of development adjustments of frequent question patterns in downstream analytics. A change of partition construction for conventional tables is a big operation that requires a whole knowledge copy.

Iceberg makes this simple. While you change the partition construction on Iceberg, it doesn’t require you to rewrite the information information. The previous knowledge written with earlier partitions stays unchanged. New knowledge is written utilizing the brand new specs in a brand new structure. Metadata for every of the partition variations is saved individually.

Let’s add the partition subject class to the Iceberg desk utilizing the AWS Glue ETL job icebergdemo1-GlueETL2-partition-evolution:

ALTER TABLE glue_catalog.icebergdb1.ecomorders
    ADD PARTITION FIELD class ;

On the AWS Glue console, run the ETL job icebergdemo1-GlueETL2-partition-evolution. When the job is full, you’ll be able to question partitions utilizing Athena.

DESCRIBE icebergdb1.ecomorders ;

SELECT * FROM "icebergdb1"."ecomorders$partitions";

You may see the partition subject class, however the partition values are null. There aren’t any new knowledge information within the knowledge folder, as a result of partition evolution is a metadata operation and doesn’t rewrite knowledge information. While you add or replace knowledge, you will notice the corresponding partition values populated.

Iceberg schema evolution

Iceberg helps in-place desk evolution. You may evolve a desk schema similar to SQL. Iceberg schema updates are metadata adjustments, so no knowledge information have to be rewritten to carry out the schema evolution.

To discover the Iceberg schema evolution, run the ETL job icebergdemo1-GlueETL3-schema-evolution through the AWS Glue console. The job runs the next SparkSQL statements:

ALTER TABLE glue_catalog.icebergdb1.ecomorders
    ADD COLUMNS (shipping_carrier string) ;

ALTER TABLE glue_catalog.icebergdb1.ecomorders
    RENAME COLUMN shipping_id TO tracking_number ;

ALTER TABLE glue_catalog.icebergdb1.ecomorders
    ALTER COLUMN ordernum TYPE bigint ;

Within the Athena question editor, run the next question:

SELECT * FROM "icebergdb1"."ecomorders" ORDER BY ordernum asc ;

You may confirm the schema adjustments to the Iceberg desk:

  • A brand new column has been added referred to as shipping_carrier
  • The column shipping_id has been renamed to tracking_number
  • The info sort of the column ordernum has modified from int to bigint
    DESCRIBE icebergdb1.ecomorders;

Positional replace

The info in tracking_number incorporates the delivery service concatenated with the monitoring quantity. Let’s assume that we wish to break up this knowledge as a way to hold the delivery service within the shipping_carrier subject and the monitoring quantity within the tracking_number subject.

On the AWS Glue console, run the ETL job icebergdemo1-GlueETL4-update-table. The job runs the next SparkSQL assertion to replace the desk:

UPDATE glue_catalog.icebergdb1.ecomorders
SET shipping_carrier = substring(tracking_number,1,3),
    tracking_number = substring(tracking_number,4,50)
WHERE tracking_number != '' ;

Question the Iceberg desk to confirm the up to date knowledge on tracking_number and shipping_carrier.

SELECT * FROM "icebergdb1"."ecomorders" ORDER BY ordernum ;

Now that the information has been up to date on the desk, it’s best to see the partition values populated for class:

SELECT * FROM "icebergdb1"."ecomorders$partitions"
ORDER BY partition;

Clear up

To keep away from incurring future expenses, clear up the assets you created:

  1. On the Lambda console, open the small print web page for the operate icebergdemo1-Lambda-Create-Iceberg-and-Grant-access.
  2. Within the Atmosphere variables part, select the important thing Task_To_Perform and replace the worth to CLEANUP.
  3. Run the operate, which drops the database, desk, and their related LF-Tags.
  4. On the AWS CloudFormation console, delete the stack icebergdemo1.

Conclusion

On this submit, you created an Iceberg desk utilizing the AWS Glue API and used Lake Formation to regulate entry on the Iceberg desk in a transactional knowledge lake. With AWS Glue ETL jobs, you merged knowledge into the Iceberg desk, and carried out schema evolution and partition evolution with out rewriting or recreating the Iceberg desk. With Athena, you queried the Iceberg knowledge and metadata.

Based mostly on the ideas and demonstrations from this submit, now you can construct a transactional knowledge lake in an enterprise utilizing Iceberg, AWS Glue, Lake Formation, and Amazon S3.


In regards to the Creator

Satya Adimula is a Senior Knowledge Architect at AWS primarily based in Boston. With over 20 years of expertise in knowledge and analytics, Satya helps organizations derive enterprise insights from their knowledge at scale.

Related Articles

LEAVE A REPLY

Please enter your comment!
Please enter your name here

Latest Articles