How to use AWS Glue / Spark to convert CSVs partitioned and split in S3 to partitioned and split Parquet

12,601

Since the source CSV files are not necessarily in the right date, you could add to them additional information regarding collect date time (or use any date if already available):

{"collectDateTime": {
    "timestamp": 1518091828,
    "timestampMs": 1518091828116,
    "day": 8,
    "month": 2,
    "year": 2018
}}

Then your job could use this information in the output DynamicFrame and ultimately use them as partitions. Some sample code of how to achieve this:

from awsglue.transforms import *
from pyspark.sql.types import *
from awsglue.context import GlueContext
from awsglue.utils import getResolvedOptions

import sys
import datetime

###
# CREATE THE NEW SIMPLIFIED LINE
##
def create_simplified_line(event_dict):

    # collect date time
    collect_date_time_dict = event_dict["collectDateTime"]

    new_line = {
        # TODO: COPY YOUR DATA HERE
        "myData": event_dict["myData"],
        "someOtherData": event_dict["someOtherData"],
        "timestamp": collect_date_time_dict["timestamp"],
        "timestampmilliseconds": long(collect_date_time_dict["timestamp"]) * 1000,
        "year": collect_date_time_dict["year"],
        "month": collect_date_time_dict["month"],
        "day": collect_date_time_dict["day"]
    }

    return new_line


###
# MAIN FUNCTION
##

# context
glueContext = GlueContext(SparkContext.getOrCreate())

# fetch from previous day source bucket
previous_date = datetime.datetime.utcnow() - datetime.timedelta(days=1)

# build s3 paths
s3_path = "s3://source-bucket/path/year={}/month={}/day={}/".format(previous_date.year, previous_date.month, previous_date.day)

# create dynamic_frame
dynamic_frame = glueContext.create_dynamic_frame.from_options(connection_type="s3", connection_options={"paths": [s3_path]}, format="json", format_options={}, transformation_ctx="dynamic_frame")

# resolve choices (optional)
dynamic_frame_resolved = ResolveChoice.apply(frame=dynamic_frame,choice="project:double",transformation_ctx="dynamic_frame_resolved")

# transform the source dynamic frame into a simplified version
result_frame = Map.apply(frame=dynamic_frame_resolved, f=create_simplified_line)

# write to simple storage service in parquet format
glueContext.write_dynamic_frame.from_options(frame=result_frame, connection_type="s3", connection_options={"path":"s3://target-bucket/path/","partitionKeys":["year", "month", "day"]}, format="parquet")

Did not test it, but the script is just a sample of how to achieve this and is fairly straightforward.

UPDATE

1) As for having specific file sizes/numbers in output partitions,

Spark's coalesce and repartition features are not yet implemented in Glue's Python API (only in Scala).

You can convert your dynamic frame into a data frame and leverage Spark's partition capabilities.

Convert to a dataframe and partition based on "partition_col"

partitioned_dataframe = datasource0.toDF().repartition(1)

Convert back to a DynamicFrame for further processing.

partitioned_dynamicframe = DynamicFrame.fromDF(partitioned_dataframe, glueContext, "partitioned_df")

The good news is that Glue has an interesting feature that if you have more than 50,000 input files per partition it'll automatically group them to you.

In case you want to specifically set this behavior regardless of input files number (your case), you may set the following connection_options while "creating a dynamic frame from options":

dynamic_frame = glueContext.create_dynamic_frame.from_options(connection_type="s3", connection_options={"paths": [s3_path], 'groupFiles': 'inPartition', 'groupSize': 1024 * 1024}, format="json", format_options={}, transformation_ctx="dynamic_frame")

In the previous example, it would attempt to group files into 1MB groups.

It is worth mentioning that this is not the same as coalesce, but it may help if your goal is to reduce the number of files per partition.

2) If files already exist in the destination, will it just safely add it (not overwrite or delete)

Glue's default SaveMode for write_dynamic_frame.from_options is to append.

When saving a DataFrame to a data source, if data/table already exists, contents of the DataFrame are expected to be appended to existing data.

3) Given each source partition may be 30-100GB, what's a guideline for # of DPUs

I'm afraid I won't be able to answer that. It depends on how fast it'll load your input files (size/number), your script's transformations, etc.

Share:
12,601

Related videos on Youtube

debugme
Author by

debugme

Updated on June 04, 2022

Comments

  • debugme
    debugme almost 2 years

    In AWS Glue's catalog, I have an external table defined with partitions that looks roughly like this in S3 and partitions for new dates are added daily:

    s3://my-data-lake/test-table/
        2017/01/01/
            part-0000-blah.csv.gz
            .
            .
            part-8000-blah.csv.gz
        2017/01/02/
            part-0000-blah.csv.gz
            .
            .
            part-7666-blah.csv.gz
    

    How could I use Glue/Spark to convert this to parquet that is also partitioned by date and split across n files per day?. The examples don't cover partitioning or splitting or provisioning (how many nodes and how big). Each day contains a couple hundred GBs.

    Because the source CSVs are not necessarily in the right partitions (wrong date) and are inconsistent in size, I'm hoping to to write to partitioned parquet with the right partition and more consistent size.

  • debugme
    debugme about 6 years
    Sorry, it's a little hard for me to follow how to use your suggestion. Assuming the source is read into a dynamic frame, this doesn't really address how to partition and split files. @leovrf helps with getting things to the right partition by specifying partition_keys as part of connection_options. However if I have 3 csv.gz's in a source partition with 3 days of data in each csv.gz, I end up with 3 parquet partitions with 3 files in each directory.