How to partition data by datetime in AWS Glue?

11,060

Solution 1

Convert Glue's DynamicFrame into Spark's DataFrame to add year/month/day columns and repartition. Reducing partitions to one will ensure that only one file will be written into a folder but it may slow down job performance.

Here is python code:

from pyspark.sql.functions import col,year,month,dayofmonth,to_date,from_unixtime

...

df = dynamicFrameSrc.toDF()

repartitioned_with_new_columns_df = df
    .withColumn(“date_col”, to_date(from_unixtime(col(“unix_time_col”))))
    .withColumn(“year”, year(col(“date_col”)))
    .withColumn(“month”, month(col(“date_col”)))
    .withColumn(“day”, dayofmonth(col(“date_col”)))
    .drop(col(“date_col”))
    .repartition(1)

dyf = DynamicFrame.fromDF(repartitioned_with_new_columns_df, glueContext, "enriched")

datasink = glueContext.write_dynamic_frame.from_options(
    frame = dyf, 
    connection_type = "s3", 
    connection_options = {
        "path": "s3://yourbucket/data”, 
        "partitionKeys": [“year”, “month”, “day”]
    }, 
    format = “parquet”, 
    transformation_ctx = "datasink"
)

Note that the from pyspark.qsl.functions import col can give a reference error, this shouldn't be a prblem as explained here.

Solution 2

I cannot comment so I am going to write as an answer.

I used Yuriy's code and a couple of things needed adjustment:

  • missing brackets

df = dynamicFrameSrc.toDF()

  • after toDF() I had to add select("*") otherwise schema was empty

df.select("*") .withColumn(“date_col”, to_date(from_unixtime(col(“unix_time_col”))))

Solution 3

To achieve this in AWS Glue Studio:

You will need to make a custom function to convert the datetime field to date. There is the extra step of converting it back to a DynamicFrameCollection.

In Python:

def MyTransform(glueContext, dfc) -> DynamicFrameCollection:
    df = dfc.select(list(dfc.keys())[0]).toDF()
    df_with_date = df.withColumn('date_field', df['datetime_field'].cast('date'))
    glue_df = DynamicFrame.fromDF(df_with_date, glueContext, "transform_date")
    return(DynamicFrameCollection({"CustomTransform0": glue_df}, glueContext))

You would then have to edit the custom transformer schema to include that new date field you just created.

You can then use the "data target" node to write the data to disk and then select that new date field to use as a partition.

video step by step walkthrough

Share:
11,060
user2642287
Author by

user2642287

Updated on June 06, 2022

Comments

  • user2642287
    user2642287 almost 2 years

    The current set-up:

    • S3 location with json files. All files stored in the same location (no day/month/year structure).

    • Glue Crawler reads the data in a catalog table

    • Glue ETL job transforms and stores the data into parquet tables in s3
    • Glue Crawler reads from s3 parquet tables and stores into a new table that gets queried by Athena

    What I want to achieve is the parquet tables to be partitioned by day (1) and the parquet tables for 1 day to be in the same file (2). Currently there is a parquet table for each json file.

    How would I go about it?

    One thing to mention, there is a datetime column in the data, but it's a unix epoch timestamp. I would probably need to convert that to a 'year/month/day' format, otherwise I'm assuming it will create a partition for each file again.

    Thanks a lot for your help!!

  • user2642287
    user2642287 over 4 years
    thanks for that. I tried this approach, played around with it, always getting some sort of syntax/typing errors. The one I'm struggling with atm is: TypeError: 'DynamicFrame' object is not subscriptable Any idea why would this be the case?
  • Yuriy Bondaruk
    Yuriy Bondaruk over 4 years
    @user2642287 please try it now, I've updated code to use col function
  • user2642287
    user2642287 over 4 years
    thanks again, that worked as expected. I'm trying to cast the year, month, day to integers. ApplyMapping doesn't seem to do the job, is there any other way to have partition columns as integers?
  • Yuriy Bondaruk
    Yuriy Bondaruk over 4 years
    Return values of functions year, month and dayofmonth are integers (see spark.apache.org/docs/2.3.0/api/java/org/apache/spark/sql/…)
  • user2642287
    user2642287 over 4 years
    Yuriy, can you please take a look at the following script: pastebin.com/2KY2x6A7 ? having the following issues: 1) year, month, date result in 'string' cols when I run the Glue crawler. 2) accuracy and subzone result in structs, instead of double and integer respectively. If I change the resulting table schema, I get the following error: HIVE_PARTITION_SCHEMA_MISMATCH: There is a mismatch between the table and partition schemas. Column 'subzone' in table '..' is declared as type 'int', but partition '..' declared column 'subzone' as type 'struct<int:int,string:string>'. thanks a lot!
  • Yuriy Bondaruk
    Yuriy Bondaruk over 4 years
    1. Glue crawlers always discover partition keys as string types even though they are written as integers. 2. Schema mismatch error happens when table and partition schemas don't match, so if you are modifying it you need to do it in all places (table and all partitions, it's very annoying I know).
  • MaltMaster
    MaltMaster over 4 years
    @YuriyBondaruk why the call to repartition(1)?
  • Yuriy Bondaruk
    Yuriy Bondaruk over 4 years
    @MaltMaster to produce only one file per partition