How to partition data by datetime in AWS Glue?
Solution 1
Convert Glue's DynamicFrame into Spark's DataFrame to add year/month/day columns and repartition. Reducing partitions to one will ensure that only one file will be written into a folder but it may slow down job performance.
Here is python code:
from pyspark.sql.functions import col,year,month,dayofmonth,to_date,from_unixtime
...
df = dynamicFrameSrc.toDF()
repartitioned_with_new_columns_df = df
.withColumn(“date_col”, to_date(from_unixtime(col(“unix_time_col”))))
.withColumn(“year”, year(col(“date_col”)))
.withColumn(“month”, month(col(“date_col”)))
.withColumn(“day”, dayofmonth(col(“date_col”)))
.drop(col(“date_col”))
.repartition(1)
dyf = DynamicFrame.fromDF(repartitioned_with_new_columns_df, glueContext, "enriched")
datasink = glueContext.write_dynamic_frame.from_options(
frame = dyf,
connection_type = "s3",
connection_options = {
"path": "s3://yourbucket/data”,
"partitionKeys": [“year”, “month”, “day”]
},
format = “parquet”,
transformation_ctx = "datasink"
)
Note that the from pyspark.qsl.functions import col
can give a reference error, this shouldn't be a prblem as explained here.
Solution 2
I cannot comment so I am going to write as an answer.
I used Yuriy's code and a couple of things needed adjustment:
- missing brackets
df = dynamicFrameSrc.toDF()
- after toDF() I had to add
select("*")
otherwise schema was empty
df.select("*")
.withColumn(“date_col”, to_date(from_unixtime(col(“unix_time_col”))))
Solution 3
To achieve this in AWS Glue Studio:
You will need to make a custom function to convert the datetime field to date. There is the extra step of converting it back to a DynamicFrameCollection.
In Python:
def MyTransform(glueContext, dfc) -> DynamicFrameCollection:
df = dfc.select(list(dfc.keys())[0]).toDF()
df_with_date = df.withColumn('date_field', df['datetime_field'].cast('date'))
glue_df = DynamicFrame.fromDF(df_with_date, glueContext, "transform_date")
return(DynamicFrameCollection({"CustomTransform0": glue_df}, glueContext))
You would then have to edit the custom transformer schema to include that new date field you just created.
You can then use the "data target" node to write the data to disk and then select that new date field to use as a partition.
video step by step walkthrough
user2642287
Updated on June 06, 2022Comments
-
user2642287 almost 2 years
The current set-up:
S3 location with json files. All files stored in the same location (no day/month/year structure).
Glue Crawler reads the data in a catalog table
- Glue ETL job transforms and stores the data into parquet tables in s3
- Glue Crawler reads from s3 parquet tables and stores into a new table that gets queried by Athena
What I want to achieve is the parquet tables to be partitioned by day (1) and the parquet tables for 1 day to be in the same file (2). Currently there is a parquet table for each json file.
How would I go about it?
One thing to mention, there is a datetime column in the data, but it's a unix epoch timestamp. I would probably need to convert that to a 'year/month/day' format, otherwise I'm assuming it will create a partition for each file again.
Thanks a lot for your help!!
-
user2642287 over 4 yearsthanks for that. I tried this approach, played around with it, always getting some sort of syntax/typing errors. The one I'm struggling with atm is: TypeError: 'DynamicFrame' object is not subscriptable Any idea why would this be the case?
-
Yuriy Bondaruk over 4 years@user2642287 please try it now, I've updated code to use
col
function -
user2642287 over 4 yearsthanks again, that worked as expected. I'm trying to cast the year, month, day to integers. ApplyMapping doesn't seem to do the job, is there any other way to have partition columns as integers?
-
Yuriy Bondaruk over 4 yearsReturn values of functions year, month and dayofmonth are integers (see spark.apache.org/docs/2.3.0/api/java/org/apache/spark/sql/…)
-
user2642287 over 4 yearsYuriy, can you please take a look at the following script: pastebin.com/2KY2x6A7 ? having the following issues: 1) year, month, date result in 'string' cols when I run the Glue crawler. 2) accuracy and subzone result in structs, instead of double and integer respectively. If I change the resulting table schema, I get the following error: HIVE_PARTITION_SCHEMA_MISMATCH: There is a mismatch between the table and partition schemas. Column 'subzone' in table '..' is declared as type 'int', but partition '..' declared column 'subzone' as type 'struct<int:int,string:string>'. thanks a lot!
-
Yuriy Bondaruk over 4 years1. Glue crawlers always discover partition keys as string types even though they are written as integers. 2. Schema mismatch error happens when table and partition schemas don't match, so if you are modifying it you need to do it in all places (table and all partitions, it's very annoying I know).
-
MaltMaster over 4 years@YuriyBondaruk why the call to
repartition(1)
? -
Yuriy Bondaruk over 4 years@MaltMaster to produce only one file per partition