GenericRowWithSchema exception in casting ArrayBuffer to HashSet in DataFrame to RDD from Hive table

15,156

What you actually get during map phase is not an ArrayBuffer[(Int, String)] but an ArrayBuffer[Row] hence the error. Ignoring other columns what you need is something like this:

import org.apache.spark.sql.Row

tempHiveQL.map((a: Row) =>
    a.getAs[Seq[Row]](4).map{case Row(k: Int, v: String) => (k, v)}.toSet)

It looks like this issue has been fixed in Spark 1.5.0.

Share:
15,156
Glenn Strycker
Author by

Glenn Strycker

Ph.D. Physics 2010 Univ of Michigan. Currently works at ValueClick/Dotomi as a Decision Sciences Analyst.

Updated on June 06, 2022

Comments

  • Glenn Strycker
    Glenn Strycker almost 2 years

    I have a Hive table in parquet format that was generated using

    create table myTable (var1 int, var2 string, var3 int, var4 string, var5 array<struct<a:int,b:string>>) stored as parquet;
    

    I am able to verify that it was filled -- here is a sample value

    [1, "abcdef", 2, "ghijkl", ArrayBuffer([1, "hello"])]
    

    I wish to put this into a Spark RDD of the form

    ((1,"abcdef"), ((2,"ghijkl"), Set((1,"hello"))))
    

    Now, using spark-shell (I get the same problem in spark-submit), I made a test RDD with these values

    scala> val tempRDD = sc.parallelize(Seq(((1,"abcdef"),((2,"ghijkl"), ArrayBuffer[(Int,String)]((1,"hello"))))))
    tempRDD: org.apache.spark.rdd.RDD[((Int, String), ((Int, String), scala.collection.mutable.ArrayBuffer[(Int, String)]))] = ParallelCollectionRDD[44] at parallelize at <console>:85
    

    using an iterator, I can cast the ArrayBuffer as a HashSet in the following new RDD:

    scala> val tempRDD2 = tempRDD.map(a => (a._1, (a._2._1, { var tempHashSet = new HashSet[(Int,String)]; a._2._2.foreach(a => tempHashSet = tempHashSet ++ HashSet(a)); tempHashSet } )))
    tempRDD2: org.apache.spark.rdd.RDD[((Int, String), ((Int, String), scala.collection.immutable.HashSet[(Int, String)]))] = MapPartitionsRDD[46] at map at <console>:87
    
    scala> tempRDD2.collect.foreach(println)
    ((1,abcdef),((2,ghijkl),Set((1,hello))))
    

    But when I attempt to do the EXACT SAME THING with a DataFrame with a HiveContext / SQLContext, I get the following error:

    scala> val hc = new HiveContext(sc)
    scala> import hc._
    scala> import hc.implicits._
    
    scala> val tempHiveQL = hc.sql("""select var1, var2, var3, var4, var5 from myTable""")
    
    scala> val tempRDDfromHive = tempHiveQL.map(a => ((a(0).toString.toInt, a(1).toString), ((a(2).toString.toInt, a(3).toString), a(4).asInstanceOf[ArrayBuffer[(Int,String)]] )))
    
    scala> val tempRDD3 = tempRDDfromHive.map(a => (a._1, (a._2._1, { var tempHashSet = new HashSet[(Int,String)]; a._2._2.foreach(a => tempHashSet = tempHashSet ++ HashSet(a)); tempHashSet } )))
    tempRDD3: org.apache.spark.rdd.RDD[((Int, String), ((Int, String), scala.collection.immutable.HashSet[(Int, String)]))] = MapPartitionsRDD[47] at map at <console>:91
    
    scala> tempRDD3.collect.foreach(println)
    org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 14.0 failed 1 times, most recent failure: Lost task 1.0 in stage 14.0 (TID 5211, localhost): java.lang.ClassCastException: org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema cannot be cast to scala.Tuple2
           at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1$$anonfun$apply$1.apply(<console>:91)
           at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
           at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
           at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(<console>:91)
           at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(<console>:91)
           at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
           at scala.collection.Iterator$class.foreach(Iterator.scala:727)
           at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
           at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
           at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
           at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
           at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
           at scala.collection.AbstractIterator.to(Iterator.scala:1157)
           at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
           at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
           at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
           at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
           at org.apache.spark.rdd.RDD$$anonfun$17.apply(RDD.scala:813)
           at org.apache.spark.rdd.RDD$$anonfun$17.apply(RDD.scala:813)
           at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1503)
           at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1503)
           at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
           at org.apache.spark.scheduler.Task.run(Task.scala:64)
           at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
           at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
           at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
           at java.lang.Thread.run(Thread.java:724)
    
    Driver stacktrace:
           at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1203)
           at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1192)
           at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1191)
           at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
           at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
           at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1191)
           at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693)
           at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:693)
           at scala.Option.foreach(Option.scala:236)
           at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:693)
           at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1393)
           at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1354)
           at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
    

    Note that I get this same error "GenericRowWithSchema cannot be cast to scala.Tuple2" when I run this in a compiled program using spark-submit. The program crashes at RUN TIME when it encounters the conversion step, and I had no compiler errors.

    It seems very strange to me that my artificially generated RDD "tempRDD" would work with the conversion, whereas the Hive query DataFrame->RDD did not. I checked, and both of the RDDs have the same form:

    scala> tempRDD
    org.apache.spark.rdd.RDD[((Int, String), ((Int, String), scala.collection.mutable.ArrayBuffer[(Int, String)]))] = MapPartitionsRDD[21] at map at DataFrame.scala:776
    
    scala> tempRDDfromHive
    org.apache.spark.rdd.RDD[((Int, String), ((Int, String), scala.collection.mutable.ArrayBuffer[(Int, String)]))] = ParallelCollectionRDD[25] at parallelize at <console>:70
    

    the only difference is where their last step originated. I even tried persisting, checkpointing, and materializing these RDDs before running the steps for tempRDD2 and tempRDD3. All got the same error message.

    I also read though related stackoverflow questions and Apache Spark Jira issues, and from those I attempted casting the ArrayBuffer as an Iterator instead, but that also failed on the second step with the same error.

    Does anyone know how to properly convert ArrayBuffers to HashSets for DataFrames originating from Hive tables? Since the error seems to be only for the Hive table version, I'm tempted to think that this is an issue with Spark/Hive integration in SparkSQL.

    Any ideas?

    My Spark version is 1.3.0 CDH.

    Here are the printSchema results:

    scala> tempRDDfromHive.printSchema()
    root
     |-- var1: integer (nullable = true)
     |-- var2: string (nullable = true)
     |-- var3: integer (nullable = true)
     |-- var4: string (nullable = true)
     |-- var5: array (nullable = true)
     |    |-- element: struct (containsNull = true)
     |    |    |-- a: integer (nullable = true)
     |    |    |-- b: string (nullable = true)
    
  • Glenn Strycker
    Glenn Strycker over 8 years
    I will test this out tomorrow and see if it works (and reward your answer accordingly). Do you have a URL for the Jira ticket of the bug you're referencing that is fixed in 1.5? I need to document known issues so my company is encouraged to upgrade from 1.3.0. Thanks!
  • zero323
    zero323 over 8 years
    Sorry, I don't have any reference. I simply couldn't reproduce the problem on 1.5.
  • WestCoastProjects
    WestCoastProjects over 7 years
    Third answer of yours that has helped me - in past sixty minutes.
  • zero323
    zero323 over 7 years
    @javadba Call me "Apache Spark - The Missing Manual" :)
  • WestCoastProjects
    WestCoastProjects over 7 years
    AStMM.. got it.
  • Ishan Bhatt
    Ishan Bhatt almost 4 years
    Works like a charm, I had actually logged-in to upvote the answer.