How to fix "java.io.NotSerializableException: org.apache.kafka.clients.consumer.ConsumerRecord" in Spark Streaming Kafka Consumer?

11,724

Solution 1

The Consumer record object is received from Dstream. When you try to print it, it gives error because that object is not serailizable. Instead you should get values from ConsumerRecord object and print it.

instead of stream.print(), do:

stream.map(record=>(record.value().toString)).print

This should solve your problem.

GOTCHA

For anyone else seeing this exception, any call to checkpoint will call a persist with storageLevel = MEMORY_ONLY_SER, so don't call checkpoint until you call map

Solution 2

KafkaUtils.createDirectStream creates as a org.apache.spark.streaming.dstream.DStream. It is not a RDD. Spark Streaming will create RDDs temporarily as is runs. To retrieve an RDD use stream.foreach() to get the RDD and then RDD.foreach to get each object in the RDD. Those will be Kafka ConsumerRecords of which you use use the value() method to read the message from the Kafka topic:

stream.foreachRDD { rdd => 
    rdd.foreach { record => 
    val value = record.value()
    println(map.get(value)) 
    }
}

Solution 3

ConsumerRecord does not implement serialization, when performing operations that require serialization, ie persist or window, print. You need to add the below config to avoid the error.

    sparkConf.set("spark.serializer","org.apache.spark.serializer.KryoSerialize");
    sparkConf.registerKryoClasses((Class<ConsumerRecord>[] )Arrays.asList(ConsumerRecord.class).toArray());
Share:
11,724

Related videos on Youtube

Chenghao
Author by

Chenghao

Updated on June 04, 2022

Comments

  • Chenghao
    Chenghao over 1 year
    • Spark 2.0.0
    • Apache Kafka 0.10.1.0
    • scala 2.11.8

    When I use spark streaming and kafka integration with kafka broker version 0.10.1.0 with the following Scala code it fails with the following exception:

    16/11/13 12:55:20 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0)
    java.io.NotSerializableException: org.apache.kafka.clients.consumer.ConsumerRecord
    Serialization stack:
        - object not serializable (class: org.apache.kafka.clients.consumer.ConsumerRecord, value: ConsumerRecord(topic = local1, partition = 0, offset = 10000, CreateTime = 1479012919187, checksum = 1713832959, serialized key size = -1, serialized value size = 1, key = null, value = a))
        - element of array (index: 0)
        - array (class [Lorg.apache.kafka.clients.consumer.ConsumerRecord;, size 11)
        at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
    

    Why? How to fix it?


    Code :

    import org.apache.kafka.clients.consumer.ConsumerRecord
    import org.apache.kafka.common.serialization.StringDeserializer
    import org.apache.spark.streaming.kafka010._
    import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
    import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
    import org.apache.spark._
    import org.apache.commons.codec.StringDecoder
    import org.apache.spark.streaming._
    
    object KafkaConsumer_spark_test {
      def main(args: Array[String]) {
        val conf = new SparkConf().setAppName("KafkaConsumer_spark_test").setMaster("local[4]")
        val ssc = new StreamingContext(conf, Seconds(1))
        ssc.checkpoint("./checkpoint")
        val kafkaParams =Map[String, Object](
          "bootstrap.servers" -> "localhost:9092",
          "key.deserializer" -> classOf[StringDeserializer],
          "value.deserializer" -> classOf[StringDeserializer],
          "group.id" -> "example",
          "auto.offset.reset" -> "latest",
          "enable.auto.commit" -> (false: java.lang.Boolean)
        )
    
        val topics = Array("local1")
        val stream = KafkaUtils.createDirectStream[String, String](
          ssc,
          PreferConsistent,
          Subscribe[String, String](topics, kafkaParams)
        )
        stream.map(record => (record.key, record.value))
        stream.print()
    
        ssc.start()
        ssc.awaitTermination()
      }
    }
    

    Exception:

    16/11/13 12:55:20 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0)
    java.io.NotSerializableException: org.apache.kafka.clients.consumer.ConsumerRecord
    Serialization stack:
        - object not serializable (class: org.apache.kafka.clients.consumer.ConsumerRecord, value: ConsumerRecord(topic = local1, partition = 0, offset = 10000, CreateTime = 1479012919187, checksum = 1713832959, serialized key size = -1, serialized value size = 1, key = null, value = a))
        - element of array (index: 0)
        - array (class [Lorg.apache.kafka.clients.consumer.ConsumerRecord;, size 11)
        at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
        at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)
        at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:313)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)
    16/11/13 12:55:20 ERROR TaskSetManager: Task 0.0 in stage 0.0 (TID 0) had a not serializable result: org.apache.kafka.clients.consumer.ConsumerRecord
    Serialization stack:
        - object not serializable (class: org.apache.kafka.clients.consumer.ConsumerRecord, value: ConsumerRecord(topic = local1, partition = 0, offset = 10000, CreateTime = 1479012919187, checksum = 1713832959, serialized key size = -1, serialized value size = 1, key = null, value = a))
        - element of array (index: 0)
        - array (class [Lorg.apache.kafka.clients.consumer.ConsumerRecord;, size 11); not retrying
    16/11/13 12:55:20 ERROR JobScheduler: Error running job streaming job 1479012920000 ms.0
    org.apache.spark.SparkException: Job aborted due to stage failure: Task 0.0 in stage 0.0 (TID 0) had a not serializable result: org.apache.kafka.clients.consumer.ConsumerRecord
    Serialization stack:
        - object not serializable (class: org.apache.kafka.clients.consumer.ConsumerRecord, value: ConsumerRecord(topic = local1, partition = 0, offset = 10000, CreateTime = 1479012919187, checksum = 1713832959, serialized key size = -1, serialized value size = 1, key = null, value = a))
        - element of array (index: 0)
        - array (class [Lorg.apache.kafka.clients.consumer.ConsumerRecord;, size 11)
        at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1450)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1438)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1437)
        at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
        at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1437)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811)
        at scala.Option.foreach(Option.scala:257)
        at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:811)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1659)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1618)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1607)
        at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
        at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:632)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:1871)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:1884)
        at org.apache.spark.streaming.kafka010.KafkaRDD.take(KafkaRDD.scala:122)
        at org.apache.spark.streaming.kafka010.KafkaRDD.take(KafkaRDD.scala:50)
        at org.apache.spark.streaming.dstream.DStream$$anonfun$print$2$$anonfun$foreachFunc$3$1.apply(DStream.scala:734)
        at org.apache.spark.streaming.dstream.DStream$$anonfun$print$2$$anonfun$foreachFunc$3$1.apply(DStream.scala:733)
        at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:51)
        at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)
        at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)
        at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:415)
        at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:50)
        at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50)
        at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50)
        at scala.util.Try$.apply(Try.scala:192)
        at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39)
        at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:245)
        at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:245)
        at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:245)
        at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
        at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:244)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)
    Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 0.0 in stage 0.0 (TID 0) had a not serializable result: org.apache.kafka.clients.consumer.ConsumerRecord
    Serialization stack:
        - object not serializable (class: org.apache.kafka.clients.consumer.ConsumerRecord, value: ConsumerRecord(topic = local1, partition = 0, offset = 10000, CreateTime = 1479012919187, checksum = 1713832959, serialized key size = -1, serialized value size = 1, key = null, value = a))
        - element of array (index: 0)
        - array (class [Lorg.apache.kafka.clients.consumer.ConsumerRecord;, size 11)
        at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1450)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1438)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1437)
        at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
        at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1437)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811)
        at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811)
        at scala.Option.foreach(Option.scala:257)
        at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:811)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1659)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1618)
        at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1607)
        at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
        at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:632)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:1871)
        at org.apache.spark.SparkContext.runJob(SparkContext.scala:1884)
        at org.apache.spark.streaming.kafka010.KafkaRDD.take(KafkaRDD.scala:122)
        at org.apache.spark.streaming.kafka010.KafkaRDD.take(KafkaRDD.scala:50)
        at org.apache.spark.streaming.dstream.DStream$$anonfun$print$2$$anonfun$foreachFunc$3$1.apply(DStream.scala:734)
        at org.apache.spark.streaming.dstream.DStream$$anonfun$print$2$$anonfun$foreachFunc$3$1.apply(DStream.scala:733)
        at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:51)
        at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)
        at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:51)
        at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:415)
        at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:50)
        at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50)
        at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:50)
        at scala.util.Try$.apply(Try.scala:192)
        at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39)
        at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:245)
        at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:245)
        at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:245)
        at scala.util.DynamicVariable.withValue(DynamicVariable.scala:58)
        at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:244)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)
    
  • Chenghao
    Chenghao almost 7 years
    THANKS! Also, I found my mistake here. Actually, what I want to do is stream.map(record => (record.key, record.value)).print
  • HansHarhoff
    HansHarhoff over 5 years
    Did you mean: stream.foreachRDD { rdd => rdd.foreach { record => val value = record.value() val key = record.key() println(key+": "+value) } }
  • sujithramanathan
    sujithramanathan almost 3 years
    @Hansharhoff Yes
  • AlbusMPiroglu
    AlbusMPiroglu about 2 years
    I think the serializer.KryoSerialize" should be serializer.KryoSerializer". From: spark.apache.org/docs/latest/tuning.html