org.apache.spark.SparkException: Task not serializable

15,828

Solution 1

Since you're defining your map function using an anonymous inner class, the containing class must also be Serializable. Define your map function as a separate class or make it a static inner class. From the Java documentation (http://docs.oracle.com/javase/8/docs/platform/serialization/spec/serial-arch.html):

Note - Serialization of inner classes (i.e., nested classes that are not static member classes), including local and anonymous classes, is strongly discouraged for several reasons. Because inner classes declared in non-static contexts contain implicit non-transient references to enclosing class instances, serializing such an inner class instance will result in serialization of its associated outer class instance as well.

Solution 2

just providing the code sample :

JavaDStream<String> lines = messages.map(mapFunc);

declare the inner class as a static variable :

static Function<Tuple2<String, String>, String> mapFunc=new Function<Tuple2<String, String>, String>() {
    @Override
    public String call(Tuple2<String, String> tuple2) {
        return tuple2._2();
    }
}
Share:
15,828
xiaolong li
Author by

xiaolong li

Updated on June 05, 2022

Comments

  • xiaolong li
    xiaolong li almost 2 years

    This is a working code example:

    JavaPairDStream<String, String> messages = KafkaUtils.createStream(javaStreamingContext, zkQuorum, group, topicMap);
    messages.print();
    JavaDStream<String> lines = messages.map(new Function<Tuple2<String, String>, String>() {
        @Override
        public String call(Tuple2<String, String> tuple2) {
            return tuple2._2();
        }
    });
    

    I get the below error:

    ERROR:
    org.apache.spark.SparkException: Task not serializable
        at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166)
        at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158)
        at org.apache.spark.SparkContext.clean(SparkContext.scala:1435)
        at org.apache.spark.streaming.dstream.DStream.map(DStream.scala:438)
        at org.apache.spark.streaming.api.java.JavaDStreamLike$class.map(JavaDStreamLike.scala:140)
        at org.apache.spark.streaming.api.java.JavaPairDStream.map(JavaPairDStream.scala:46)
    
  • InPursuit
    InPursuit about 9 years
    Glad it helped! Please accept the answer if it did
  • Johan
    Johan almost 8 years
    Spark is trying to serialize the object passed to map but can't serialize it because doesn't implement Serializable? Why Spark is doing serialization? And if we define the map function as a separate class, do we need to make it Serializable as well?