Spark SQL between timestamp on where clause?

13,089

As you are using Timestamp in your where clause, you need to convert LocalDateTime to Timestamp. Also note that the first parameter of between is lowerBound so in your case LocalDateTime.now().minusHours(1) should come before LocalDateTime.now(). And then you can do:

import java.time.LocalDateTime
import java.sql.Timestamp

df.where(
     unix_timestamp($"date", "yyyy-MM-dd HH:mm:ss.S")
       .cast("timestamp")
       .between(
          Timestamp.valueOf(LocalDateTime.now().minusHours(1)),
          Timestamp.valueOf(LocalDateTime.now())
       ))
  .show()

You will get filtered DF like

+-----+--------------------+
|color|                date|
+-----+--------------------+
|  red|2016-11-29 10:58:...|
+-----+--------------------+
Share:
13,089
Shankar
Author by

Shankar

Love Open Source and Big Data Technologies.

Updated on July 11, 2022

Comments

  • Shankar
    Shankar over 1 year

    I'm trying to return the rows between two timestamps using DataFrame API.

    Sample code is:

    val df = Seq(
        ("red", "2016-11-29 07:10:10.234"),
        ("green", "2016-11-29 07:10:10.234"),
        ("blue", "2016-11-29 07:10:10.234")).toDF("color", "date")
    
      df.where(unix_timestamp($"date", "yyyy-MM-dd HH:mm:ss.S").cast("timestamp").between(LocalDateTime.now(), LocalDateTime.now().minusHours(1))).show()
    

    But it's throwing Unsupported literal type class java.time.LocalDateTime error.

    Exception in thread "main" java.lang.RuntimeException: Unsupported literal type class java.time.LocalDateTime 2016-11-29T07:32:12.084
        at org.apache.spark.sql.catalyst.expressions.Literal$.apply(literals.scala:57)
        at org.apache.spark.sql.functions$.lit(functions.scala:101)
        at org.apache.spark.sql.Column.$greater$eq(Column.scala:438)
        at org.apache.spark.sql.Column.between(Column.scala:542)
        at com.sankar.SparkSQLTimestampDifference$.delayedEndpoint$com$sankar$SparkSQLTimestampDifference$1(SparkSQLTimestampDifference.scala:23)
        at com.sankar.SparkSQLTimestampDifference$delayedInit$body.apply(SparkSQLTimestampDifference.scala:7)
        at scala.Function0$class.apply$mcV$sp(Function0.scala:34)
        at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12)
        at scala.App$$anonfun$main$1.apply(App.scala:76)
        at scala.App$$anonfun$main$1.apply(App.scala:76)
        at scala.collection.immutable.List.foreach(List.scala:381)
        at scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:35)
        at scala.App$class.main(App.scala:76)
        at com.sankar.SparkSQLTimestampDifference$.main(SparkSQLTimestampDifference.scala:7)
        at com.sankar.SparkSQLTimestampDifference.main(SparkSQLTimestampDifference.scala)