How to I add a current timestamp (extra column) in the glue job so that the output data has an extra column

12,731

Solution 1

We do the following and works great without converting toDF()

datasource0 = glueContext.create_dynamic_frame.from_catalog(...)

from datetime import datetime
def AddProcessedTime(r):
    r["jobProcessedDateTime"] = datetime.today() #timestamp of when we ran this.
    return r

mapped_dyF = Map.apply(frame = datasource0, f = AddProcessedTime)

Solution 2

I'm not sure if there's a glue native way to do this with the DynamicFrame, but you can easily convert to a Spark Dataframe and then use the withColumn method. You will need to use the lit function to put literal values into a new column, as below.

from datetime import datetime
from pyspark.sql.functions import lit

glue_df = glueContext.create_dynamic_frame.from_catalog(...)
spark_df = glue_df.toDF()
spark_df = spark_df.withColumn('some_date', lit(datetime.now()))

Some references:

Solution 3

In my experience working with Glue the timezone where Glue runs is GMT. But my timezone is CDT. So, to get CDT timezone I need to convert the time within SparkContext. This specific case is to add last_load_date to the target/sink.

So I created a function.

def convert_timezone(sc):
sqlContext = SQLContext(sc)
local_time=dt.now().strftime('%Y-%m-%d %H:%M:%S')
local_time_df=sqlContext.createDataFrame([(local_time,)],['time'])
CDT_time_df = local_time_df.select(from_utc_timestamp(local_time_df['time'],'CST6CDT').alias('cdt_time'))
CDT_time=[i['cdt_time'].strftime('%Y-%m-%d %H:%M:%S') for i in CDT_time_df.collect()][0]
return CDT_time

And then call the function like ...

job_run_time =  date_config.convert_timezone(sc)

datasourceDF0 = datasource0.toDF()
datasourceDF1 = datasourceDF0.withColumn('last_updated_date',lit(job_run_time))

Solution 4

As I have been seen there is not a properly answer to this issue I will try to explain my solution to this problem:

First thing is to clarify the withColumn function is a good way to do this but it is important to mention that this function is from the Dataframe from Spark itself and this function is not part of the glue DynamicFrame which is a own library from Glue AWS, so you need to covert the frames to do this....

First step is from the DynamicFrame get the Spark Dataframe, glue library does this with the function toDF() function, once with the Spark frame you can add the column and/or do whatever manipulation you require.

Then what we glue expect is his own frame so we need to transformed back from spark to glue proprietary frame, to do so you can use the apply function of the DynamicFrame, which requires to import the object:

import com.amazonaws.services.glue.DynamicFrame

and use the glueContext which you should already have it, like:

DynamicFrame(sparkDataFrame, glueContext)

In resume the code should looks like:

import org.apache.spark.sql.functions._
import com.amazonaws.services.glue.DynamicFrame

...

val sparkDataFrame = datasourceToModify.toDF().withColumn("created_date", current_date())
val finalDataFrameForGlue = DynamicFrame(sparkDataFrame, glueContext)

...

Note: the import org.apache.spark.sql.functions._ is to bring the current_date() function to add the column with the date.

Hope this helps....

Solution 5

Use Spark's current_timestamp() function:

import org.apache.spark.sql.functions._

...

val timestampedDf = source.toDF().withColumn("Update_Date", current_timestamp())
val timestamped = DynamicFrame(timestampedDf, glueContext)
Share:
12,731
Kishore Bharathy
Author by

Kishore Bharathy

Data Engineer at Verisk Analytics

Updated on June 04, 2022

Comments

  • Kishore Bharathy
    Kishore Bharathy almost 2 years

    How to I add a current timestamp (extra column) in the glue job so that the output data has an extra column. In this case:

    Schema Source Table: Col1, Col2

    After Glue job.

    Schema of Destination: Col1, Col2, Update_Date(Current Timestamp)