Thursday, July 4, 2024

Optimize information structure by bucketing with Amazon Athena and AWS Glue to speed up downstream queries

Within the period of knowledge, organizations are more and more utilizing information lakes to retailer and analyze huge quantities of structured and unstructured information. Knowledge lakes present a centralized repository for information from numerous sources, enabling organizations to unlock helpful insights and drive data-driven decision-making. Nevertheless, as information volumes proceed to develop, optimizing information structure and group turns into essential for environment friendly querying and evaluation.

One of many key challenges in information lakes is the potential for gradual question efficiency, particularly when coping with massive datasets. This may be attributed to components comparable to inefficient information structure, leading to extreme information scanning and inefficient use of compute sources. To deal with this problem, widespread practices like partitioning and bucketing can considerably enhance question efficiency and cut back computation prices.

Partitioning is a way that divides a big dataset into smaller, extra manageable components primarily based on particular standards, comparable to date, area, or product class. By partitioning information, downstream analytical queries can skip irrelevant partitions, lowering the quantity of knowledge that must be scanned and processed. You need to use partition columns within the WHERE clause in queries to scan solely the particular partitions that your question wants. This could result in quicker question runtimes and extra environment friendly useful resource utilization. It particularly works nicely when columns with low cardinality are chosen as the important thing.

What in case you have a excessive cardinality column that you simply typically must filter by VIP clients? Every buyer is often recognized with an ID, which might be hundreds of thousands. Partitioning isn’t appropriate for such excessive cardinality columns as a result of you find yourself with small recordsdata, gradual partition filtering, and excessive Amazon Easy Storage Service (Amazon S3) API value (one S3 prefix is created per worth of partition column). Though you should utilize partitioning with a pure key comparable to metropolis or state to slender down your dataset to some extent, it’s nonetheless crucial to question throughout date-based partitions in case your information is time collection.

That is the place bucketing comes into play. Bucketing makes certain that every one rows with the identical values of a number of columns find yourself in the identical file. As a substitute of 1 file per worth, like partitioning, a hash operate is used to distribute values evenly throughout a set variety of recordsdata. By organizing information this fashion, you’ll be able to carry out environment friendly filtering, as a result of solely the related buckets have to be processed, additional lowering computational overhead.

There are a number of choices for implementing bucketing on AWS. One method is to make use of the Amazon Athena CREATE TABLE AS SELECT (CTAS) assertion, which lets you create a bucketed desk straight from a question. Alternatively, you should utilize AWS Glue for Apache Spark, which gives built-in assist for bucketing configurations in the course of the information transformation course of. AWS Glue permits you to outline bucketing parameters, such because the variety of buckets and the columns to bucket on, offering an optimized information structure for environment friendly querying with Athena.

On this put up, we talk about how you can implement bucketing on AWS information lakes, together with utilizing Athena CTAS assertion and AWS Glue for Apache Spark. We additionally cowl bucketing for Apache Iceberg tables.

Instance use case

On this put up, you employ a public dataset, the NOAA Built-in Floor Database. Knowledge analysts run one-time queries for information in the course of the previous 5 years by way of Athena. A lot of the queries are for particular stations with particular report varieties. The queries want to finish in 10 seconds, and the fee must be optimized fastidiously. On this situation, you’re an information engineer liable for optimizing question efficiency and price.

For instance, if an analyst needs to retrieve information for a selected station (for instance, station ID 123456) with a selected report sort (for instance, CRN01), the question would possibly appear to be the next question:

SELECT station, report_type, columnA, columnB, ...
FROM table_name
WHERE
report_type="CRN01"
AND station = '123456'

Within the case of the NOAA Built-in Floor Database, the station_id column is more likely to have a excessive cardinality, with quite a few distinctive station identifiers. Then again, the report_type column might have a comparatively low cardinality, with a restricted set of report varieties. Given this situation, it will be a good suggestion to partition the info by report_type and bucket it by station_id.

With this partitioning and bucketing technique, Athena can first remove partitions for irrelevant report varieties, after which scan solely the buckets throughout the related partition that match the desired station ID, considerably lowering the quantity of knowledge processed and accelerating question runtimes. This method not solely meets the question efficiency requirement, but in addition helps optimize prices by minimizing the quantity of knowledge scanned and billed for every question.

On this put up, we look at how question efficiency is affected by information structure, specifically, bucketing. We additionally examine three other ways to attain bucketing. The next desk represents situations for the tables to be created.

. noaa_remote_original athena_non_bucketed athena_bucketed glue_bucketed athena_bucketed_iceberg
Format CSV Parquet Parquet Parquet Parquet
Compression n/a Snappy Snappy Snappy Snappy
Created by way of n/a Athena CTAS Athena CTAS Glue ETL Athena CTAS with Iceberg
Engine n/a Trino Trino Apache Spark Apache Iceberg
Is partitioned? Sure however with completely different manner Sure Sure Sure Sure
Is bucketed? No No Sure Sure Sure

noaa_remote_original is partitioned by the yr column, however not by the report_type column. This row represents if the desk is partitioned by the precise columns which might be used within the queries.

Baseline desk

For this put up, you create a number of tables with completely different situations: some with out bucketing and a few with bucketing, to showcase the efficiency traits of bucketing. First, let’s create an unique desk utilizing the NOAA information. In subsequent steps, you ingest information from this desk to create check tables.

There are a number of methods to outline a desk definition: working DDL, an AWS Glue crawler, the AWS Glue Knowledge Catalog API, and so forth. On this step, you run DDL by way of the Athena console.

Full the next steps to create the "bucketing_blog"."noaa_remote_original" desk within the Knowledge Catalog:

  1. Open the Athena console.
  2. Within the question editor, run the next DDL to create a brand new AWS Glue database:
    -- Create Glue database
    CREATE DATABASE bucketing_blog;

  3. For Database underneath Knowledge, select bucketing_blog to set the present database.
  4. Run the next DDL to create the unique desk:
    -- Create unique desk
    CREATE EXTERNAL TABLE `bucketing_blog`.`noaa_remote_original`(
      `station` STRING, 
      `date` STRING, 
      `supply` STRING, 
      `latitude` STRING, 
      `longitude` STRING, 
      `elevation` STRING, 
      `identify` STRING, 
      `report_type` STRING, 
      `call_sign` STRING, 
      `quality_control` STRING, 
      `wnd` STRING, 
      `cig` STRING, 
      `vis` STRING, 
      `tmp` STRING, 
      `dew` STRING, 
      `slp` STRING, 
      `aj1` STRING, 
      `gf1` STRING, 
      `mw1` STRING)
    PARTITIONED BY (
        yr STRING)
    ROW FORMAT SERDE 
      'org.apache.hadoop.hive.serde2.OpenCSVSerde' 
    WITH SERDEPROPERTIES ( 
      'escapeChar'='',
      'quoteChar'='"',
      'separatorChar'=',') 
    STORED AS INPUTFORMAT 
      'org.apache.hadoop.mapred.TextInputFormat' 
    OUTPUTFORMAT 
      'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
    LOCATION
      's3://noaa-global-hourly-pds/'
    TBLPROPERTIES (
      'skip.header.line.depend'='1'
    )

As a result of the supply information has quoted fields, we use OpenCSVSerde as an alternative of the default LazySimpleSerde.

These CSV recordsdata have a header row, which we inform Athena to skip by including skip.header.line.depend and setting the worth to 1.

For extra particulars, check with OpenCSVSerDe for processing CSV.

  1. Run the next DDL so as to add partitions. We add partitions just for 5 years out of 124 years primarily based on the use case requirement:
    -- Load partitions
    ALTER TABLE `bucketing_blog`.`noaa_remote_original` ADD
      PARTITION (yr="2024") LOCATION 's3://noaa-global-hourly-pds/2024/'
      PARTITION (yr="2023") LOCATION 's3://noaa-global-hourly-pds/2023/'
      PARTITION (yr="2022") LOCATION 's3://noaa-global-hourly-pds/2022/'
      PARTITION (yr="2021") LOCATION 's3://noaa-global-hourly-pds/2021/'
      PARTITION (yr="2020") LOCATION 's3://noaa-global-hourly-pds/2020/';

  2. Run the next DML to confirm in case you can efficiently question the info:
    -- Examine information 
    SELECT * FROM "bucketing_blog"."noaa_remote_original" LIMIT 10;

Now you’re prepared to start out querying the unique desk to look at the baseline efficiency.

  1. Run a question towards the unique desk to guage the question efficiency as a baseline. The next question selects information for 5 particular stations with report sort CRN05:
    -- Baseline
    SELECT station, report_type, date, supply, latitude, longitude, elevation, identify, call_sign, quality_control, wnd, cig, tmp
    FROM "bucketing_blog"."noaa_remote_original"
    WHERE
        report_type="CRN05"
        AND ( station = '99999904237'
            OR station = '99999953132'
            OR station = '99999903061'
            OR station = '99999963856'
            OR station = '99999994644'
        );

We ran this question 10 occasions. The common question runtime for 10 queries is 27.6 seconds, which is much longer than our goal of 10 seconds, and 155.75 GB information is scanned to return 1.65 million information. That is the baseline efficiency of the unique uncooked desk. It’s time to start out optimizing information structure from this baseline.

Subsequent, you create tables with completely different situations from the unique: one with out bucketing and one with bucketing, and examine them.

Optimize information structure utilizing Athena CTAS

On this part, we use an Athena CTAS question to optimize information structure and its format.

First, let’s create a desk with partitioning however with out bucketing. The brand new desk is partitioned by the column report_type as a result of most of anticipated queries use this column within the WHERE clause, and objects are saved as Parquet with Snappy compression.

  1. Open the Athena question editor.
  2. Run the next question, offering your individual S3 bucket and prefix:
    --CTAS, non-bucketed
    CREATE TABLE "bucketing_blog"."athena_non_bucketed"
    WITH (
        external_location = 's3://<your-s3-location>/athena-non-bucketed/',
        partitioned_by = ARRAY['report_type'],
        format="PARQUET",
        write_compression = 'SNAPPY'
    )
    AS
    SELECT
        station, date, supply, latitude, longitude, elevation, identify, call_sign, quality_control, wnd, cig, vis, tmp, dew, slp, aj1, gf1, mw1, report_type
    FROM "bucketing_blog"."noaa_remote_original"
    ;

Your information ought to appear to be the next screenshots.


There are 30 recordsdata underneath the partition.

Subsequent, you create a desk with Hive type bucketing. The variety of buckets must be fastidiously tuned by way of experiments in your personal use case. Typically talking, the extra buckets you might have, the smaller the granularity, which could lead to higher efficiency. Then again, too many small recordsdata might introduce inefficiency in question planning and processing. Additionally, bucketing solely works in case you are querying just a few values of the bucketing key. The extra values you add to your question, the extra seemingly that you’ll find yourself studying all buckets.

The next is the baseline question to optimize:

-- Baseline
SELECT station, report_type, date, supply, latitude, longitude, elevation, identify, call_sign, quality_control, wnd, cig, tmp
FROM "bucketing_blog"."noaa_remote_original"
WHERE
    report_type="CRN05"
    AND ( station = '99999904237'
        OR station = '99999953132'
        OR station = '99999903061'
        OR station = '99999963856'
        OR station = '99999994644'
    );

On this instance, the desk goes to be bucketed into 16 buckets by a high-cardinality column (station), which is meant for use for the WHERE clause within the question. All different situations stay the identical. The baseline question has 5 values within the station ID, and also you count on queries to have round that quantity at most, which is much less sufficient than the variety of buckets, so 16 ought to work nicely. It’s potential to specify a bigger variety of buckets, however CTAS can’t be used if the full variety of partitions exceeds 100.

  1. Run the next question:
    -- CTAS, Hive-bucketed
    CREATE TABLE "bucketing_blog"."athena_bucketed"
    WITH (
        external_location = 's3://<your-s3-location>/athena-bucketed/',
        partitioned_by = ARRAY['report_type'],
        bucketed_by = ARRAY['station'],
        bucket_count = 16,
        format="PARQUET",
        write_compression = 'SNAPPY'
    )
    AS
    SELECT
        station, date, supply, latitude, longitude, elevation, identify, call_sign, quality_control, wnd, cig, vis, tmp, dew, slp, aj1, gf1, mw1, report_type
    FROM "bucketing_blog"."noaa_remote_original"
    ;

The question creates S3 objects organized as proven within the following screenshots.


The table-level structure appears to be like precisely the identical between athena_non_bucketed and athena_bucketed: there are 13 partitions in every desk. The distinction is the variety of objects underneath the partitions. There are 16 objects (buckets) per partition, of roughly 10–25 MB every on this case. The variety of buckets is fixed on the specified worth whatever the quantity of knowledge, however the bucket dimension will depend on the quantity of knowledge.

Now you’re prepared to question towards every desk to guage question efficiency. The question will choose information with 5 particular stations and report sort CRN05 for the previous 5 years. Though you’ll be able to’t see which information of a selected station is positioned by which bucket, it has been calculated and positioned accurately by Athena.

  1. Question the non-bucketed desk with the next assertion:
    -- No bucketing 
    SELECT station, report_type, date, supply, latitude, longitude, elevation, identify, call_sign, quality_control, wnd, cig, tmp
    FROM "bucketing_blog"."athena_non_bucketed"
    WHERE
        report_type="CRN05"
        AND ( station = '99999904237'
            OR station = '99999953132'
            OR station = '99999903061'
            OR station = '99999963856'
            OR station = '99999994644'
        );


We ran this question 10 occasions. The common runtime of the ten queries is 10.95 seconds, and 358 MB of knowledge is scanned to return 2.21 million information. Each the runtime and scan dimension have been considerably decreased since you’ve partitioned the info, and might now learn just one partition the place 12 partitions of 13 are skipped. As well as, the quantity of knowledge scanned has gone down from 206 GB to 360 MB, which is a discount of 99.8%. This isn’t simply because of the partitioning, but in addition because of the change of its format to Parquet and compression with Snappy.

  1. Question the bucketed desk with the next assertion:
    -- Hive bucketing
    SELECT station, report_type, date, supply, latitude, longitude, elevation, identify, call_sign, quality_control, wnd, cig, tmp
    FROM "bucketing_blog"."athena_bucketed"
    WHERE
        report_type="CRN05"
        AND ( station = '99999904237'
            OR station = '99999953132'
            OR station = '99999903061'
            OR station = '99999963856'
            OR station = '99999994644'
        );


We ran this question 10 occasions. The common runtime of the ten queries is 7.82 seconds, and 69 MB of knowledge is scanned to return 2.21 million information. This implies a discount of common runtime from 10.95 to 7.82 seconds (-29%), and a dramatic discount of knowledge scanned from 358 MB to 69 MB (-81%) to return the identical variety of information in contrast with the non-bucketed desk. On this case, each runtime and information scanned had been improved by bucketing. This implies bucketing contributed not solely to efficiency but in addition to value discount.

Issues

As acknowledged earlier, dimension your bucket fastidiously to maximise efficiency of your question. Bucketing solely works in case you are querying just a few values of the bucketing key. Think about creating extra buckets than the variety of values anticipated within the precise question.

Moreover, an Athena CTAS question is restricted to create as much as 100 partitions at one time. In case you want numerous partitions, chances are you’ll wish to use AWS Glue extract, remodel, and cargo (ETL), though there’s a workaround to separate into a number of SQL statements.

Optimize information structure utilizing AWS Glue ETL

Apache Spark is an open supply distributed processing framework that permits versatile ETL with PySpark, Scala, and Spark SQL. It permits you to partition and bucket your information primarily based in your necessities. Spark has a number of tuning choices to speed up jobs. You may effortlessly automate and monitor Spark jobs. On this part, we use AWS Glue ETL jobs to run Spark code to optimize information structure.

In contrast to Athena bucketing, AWS Glue ETL makes use of Spark-based bucketing as a bucketing algorithm. All you could do is add the next desk property onto the desk: bucketing_format="spark". For particulars about this desk property, see Partitioning and bucketing in Athena.

Full the next steps to create a desk with bucketing by way of AWS Glue ETL:

  1. On the AWS Glue console, select ETL jobs within the navigation pane.
  2. Select Create job and select Visible ETL.
  3. Beneath Add nodes, select AWS Glue Knowledge Catalog for Sources.
  4. For Database, select bucketing_blog.
  5. For Desk, select noaa_remote_original.
  6. Beneath Add nodes, select Change Schema for Transforms.
  7. Beneath Add nodes, select Customized Remodel for Transforms.
  8. For Title, enter ToS3WithBucketing.
  9. For Node mother and father, select Change Schema.
  10. For Code block, enter the next code snippet:
    def ToS3WithBucketing (glueContext, dfc) -> DynamicFrameCollection:
        # Convert DynamicFrame to DataFrame
        df = dfc.choose(listing(dfc.keys())[0]).toDF()
        
        # Write to S3 with bucketing and partitioning
        df.repartition(1, "report_type") 
            .write.choice("path", "s3://<your-s3-location>/glue-bucketed/") 
            .mode("overwrite") 
            .partitionBy("report_type") 
            .bucketBy(16, "station") 
            .format("parquet") 
            .choice("compression", "snappy") 
            .saveAsTable("bucketing_blog.glue_bucketed")

The next screenshot exhibits the job created utilizing AWS Glue Studio to generate a desk and information.

Every node represents the next:

  • The AWS Glue Knowledge Catalog node hundreds the noaa_remote_original desk from the Knowledge Catalog
  • The Change Schema node makes certain that it hundreds columns registered within the Knowledge Catalog
  • The ToS3WithBucketing node writes information to Amazon S3 with each partitioning and Spark-based bucketing

The job has been efficiently authored within the visible editor.

  1. Beneath Job particulars, for IAM Position, select your AWS Identification and Entry Administration (IAM) position for this job.
  2. For Employee sort, select G.8X.
  3. For Requested variety of staff, enter 5.
  4. Select Save, then select Run.

After these steps, the desk glue_bucketed. has been created.

  1. Select Tables within the navigation pane, and select the desk glue_bucketed.
  2. On the Actions menu, select Edit desk underneath Handle.
  3. Within the Desk properties part, select Add.
  4. Add a key pair with key bucketing_format and worth spark.
  5. Select Save.

Now it’s time to question the tables.

  1. Question the bucketed desk with the next assertion:
    -- Spark bucketing
    SELECT station, report_type, date, supply, latitude, longitude, elevation, identify, call_sign, quality_control, wnd, cig, tmp
    FROM "bucketing_blog"."glue_bucketed"
    WHERE
        report_type="CRN05"
        AND ( station = '99999904237'
            OR station = '99999953132'
            OR station = '99999903061'
            OR station = '99999963856'
            OR station = '99999994644'
        );


We ran the question 10 occasions. The common runtime of the ten queries is 7.09 seconds, and 88 MB of knowledge is scanned to return 2.21 million information. On this case, each the runtime and information scanned had been improved by bucketing. This implies bucketing contributed not solely to efficiency but in addition to value discount.

The explanation for the bigger bytes scanned in comparison with the Athena CTAS instance is that the values had been distributed otherwise on this desk. Within the AWS Glue bucketed desk, the values had been distributed over 5 recordsdata. Within the Athena CTAS bucketed desk, the values had been distributed over 4 recordsdata. Keep in mind that rows are distributed into buckets utilizing a hash operate. The Spark bucketing algorithm makes use of a distinct hash operate than Hive, and on this case, it resulted in a distinct distribution throughout the recordsdata.

Issues

Glue DynamicFrame doesn’t assist bucketing natively. You could use Spark DataFrame as an alternative of DynamicFrame to bucket tables.

For details about fine-tuning AWS Glue ETL efficiency, check with Finest practices for efficiency tuning AWS Glue for Apache Spark jobs.

Optimize Iceberg information structure with hidden partitioning

Apache Iceberg is a high-performance open desk format for big analytic tables, bringing the reliability and ease of SQL tables to huge information. Just lately, there was an enormous demand to make use of Apache Iceberg tables to attain superior capabilities like ACID transaction, time journey question, and extra.

In Iceberg, bucketing works otherwise than the Hive desk technique we’ve seen up to now. In Iceberg, bucketing is a subset of partitioning, and might be utilized utilizing the bucket partition remodel. The way in which you employ it and the top result’s much like bucketing in Hive tables. For extra particulars about Iceberg bucket transforms, check with Bucket Remodel Particulars.

Full the next steps:

  1. Open the Athena question editor.
  2. Run the next question to create an Iceberg desk with hidden partitioning together with bucketing:
    -- CTAS, Iceberg-bucketed
    CREATE TABLE "bucketing_blog"."athena_bucketed_iceberg"
    WITH (table_type="ICEBERG",
          location = 's3://<your-s3-location>/athena-bucketed-iceberg/', 
          is_external = false,
          partitioning = ARRAY['report_type', 'bucket(station, 16)'],
          format="PARQUET",
          write_compression = 'SNAPPY'
    ) 
    AS
    SELECT
        station, date, supply, latitude, longitude, elevation, identify, call_sign, quality_control, wnd, cig, vis, tmp, dew, slp, aj1, gf1, mw1, report_type
    FROM "bucketing_blog"."noaa_remote_original"
    ;

Your information ought to appear to be the next screenshot.

There are two folders: information and metadata. Drill right down to information.

You see random prefixes underneath the information folder. Select the primary one to view its particulars.

You see the top-level partition primarily based on the report_type column. Drill right down to the following degree.

You see the second-level partition, bucketed with the station column.

The Parquet information recordsdata exist underneath these folders.

  1. Question the bucketed desk with the next assertion:
    -- Iceberg bucketing
    SELECT station, report_type, date, supply, latitude, longitude, elevation, identify, call_sign, quality_control, wnd, cig, tmp
    FROM "bucketing_blog"."athena_bucketed_iceberg"
    WHERE
        report_type="CRN05"
        AND
        ( station = '99999904237'
            OR station = '99999953132'
            OR station = '99999903061'
            OR station = '99999963856'
            OR station = '99999994644'
        );


With the Iceberg-bucketed desk, the typical runtime of the ten queries is 8.03 seconds, and 148 MB of knowledge is scanned to return 2.21 million information. That is much less environment friendly than bucketing with AWS Glue or Athena, however contemplating the advantages of Iceberg’s numerous options, it’s inside a suitable vary.

Outcomes

The next desk summarizes all the outcomes.

. noaa_remote_original athena_non_bucketed athena_bucketed glue_bucketed athena_bucketed_iceberg
Format CSV Parquet Parquet Parquet Iceberg (Parquet)
Compression n/a Snappy Snappy Snappy Snappy
Created by way of n/a Athena CTAS Athena CTAS Glue ETL Athena CTAS with Iceberg
Engine n/a Trino Trino Apache Spark Apache Iceberg
Desk dimension (GB) 155.8 5.0 5.0 5.8 5.0
The variety of S3 Objects 53360 376 192 192 195
Is partitioned? Sure however with completely different manner Sure Sure Sure Sure
Is bucketed? No No Sure Sure Sure
Bucketing format n/a n/a Hive Spark Iceberg
Variety of buckets n/a n/a 16 16 16
Common runtime (sec) 29.178 10.950 7.815 7.089 8.030
Scanned dimension (MB) 206640.0 358.6 69.1 87.8 147.7

With athena_bucketed, glue_bucketed, and athena_bucketed_iceberg, you had been capable of meet the latency purpose of 10 seconds. With bucketing, you noticed a 25–40% discount in runtime and a 60–85% discount in scan dimension, which might contribute to each latency and price optimization.

As you’ll be able to see from the outcome, though partitioning contributes considerably to scale back each runtime and scan dimension, bucketing may also contribute to scale back them additional.

Athena CTAS is simple and quick sufficient to finish the bucketing course of. AWS Glue ETL is extra versatile and scalable to attain superior use circumstances. You may select both technique primarily based in your requirement and use case, as a result of you’ll be able to make the most of bucketing by way of both choice.

Conclusion

On this put up, we demonstrated how you can optimize your desk information structure with partitioning and bucketing by way of Athena CTAS and AWS Glue ETL. We confirmed that bucketing contributes to accelerating question latency and lowering scan dimension to additional optimize prices. We additionally mentioned bucketing for Iceberg tables by way of hidden partitioning.

Bucketing only one approach to optimize information structure by lowering information scan. For optimizing your whole information structure, we suggest contemplating different choices like partitioning, utilizing columnar file format, and compression together with bucketing. This could allow your information to additional improve question efficiency.

Pleased bucketing!


Concerning the Authors

Takeshi Nakatani is a Principal Huge Knowledge Guide on the Skilled Companies group in Tokyo. He has 26 years of expertise within the IT business, with experience in architecting information infrastructure. On his days off, he could be a rock drummer or a motorcyclist.

Noritaka Sekiyama is a Principal Huge Knowledge Architect on the AWS Glue group. He’s liable for constructing software program artifacts to assist clients. In his spare time, he enjoys biking together with his highway bike.

Related Articles

LEAVE A REPLY

Please enter your comment!
Please enter your name here

Latest Articles