org.apache.spark.SparkException: Task not serializable
Solution 1
Since you're defining your map function using an anonymous inner class, the containing class must also be Serializable. Define your map function as a separate class or make it a static inner class. From the Java documentation (http://docs.oracle.com/javase/8/docs/platform/serialization/spec/serial-arch.html):
Note - Serialization of inner classes (i.e., nested classes that are not static member classes), including local and anonymous classes, is strongly discouraged for several reasons. Because inner classes declared in non-static contexts contain implicit non-transient references to enclosing class instances, serializing such an inner class instance will result in serialization of its associated outer class instance as well.
Solution 2
just providing the code sample :
JavaDStream<String> lines = messages.map(mapFunc);
declare the inner class as a static variable :
static Function<Tuple2<String, String>, String> mapFunc=new Function<Tuple2<String, String>, String>() {
@Override
public String call(Tuple2<String, String> tuple2) {
return tuple2._2();
}
}
xiaolong li
Updated on June 05, 2022Comments
-
xiaolong li almost 2 years
This is a working code example:
JavaPairDStream<String, String> messages = KafkaUtils.createStream(javaStreamingContext, zkQuorum, group, topicMap); messages.print(); JavaDStream<String> lines = messages.map(new Function<Tuple2<String, String>, String>() { @Override public String call(Tuple2<String, String> tuple2) { return tuple2._2(); } });
I get the below error:
ERROR: org.apache.spark.SparkException: Task not serializable at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166) at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158) at org.apache.spark.SparkContext.clean(SparkContext.scala:1435) at org.apache.spark.streaming.dstream.DStream.map(DStream.scala:438) at org.apache.spark.streaming.api.java.JavaDStreamLike$class.map(JavaDStreamLike.scala:140) at org.apache.spark.streaming.api.java.JavaPairDStream.map(JavaPairDStream.scala:46)
-
InPursuit about 9 yearsGlad it helped! Please accept the answer if it did
-
Johan almost 8 yearsSpark is trying to serialize the object passed to map but can't serialize it because doesn't implement Serializable? Why Spark is doing serialization? And if we define the map function as a separate class, do we need to make it Serializable as well?