Why does Spark fail with "Failed to get broadcast_0_piece0 of broadcast_0" in local mode?
Solution 1
Just discovered why I was getting this exception: for a reason my SparkContext
object started/stopped several times between ScalaTest
methods. So, fixing that behaviour lead me to get spark working in the right way I would expect.
Solution 2
I was getting this error as well. I haven't really seen any concrete coding examples, so I will share my solution. This cleared the error for me, but I have a sense that there may be more than 1 solution to this problem. But this would be worth a go as it keeps everything within the code.
It looks as though the SparkContext was shutting down, thus throwing the error. I think the issue is that the SparkContext is created in a class and then extended to other classes. The extension causes it to shut down, which is a bit annoying. Below is the implementation I used to get this error to clear.
Spark Initialisation Class:
import org.apache.spark.{SparkConf, SparkContext}
class Spark extends Serializable {
def getContext: SparkContext = {
@transient lazy val conf: SparkConf =
new SparkConf()
.setMaster("local")
.setAppName("test")
@transient lazy val sc: SparkContext = new SparkContext(conf)
sc.setLogLevel("OFF")
sc
}
}
Main Class:
object Test extends Spark{
def main(args: Array[String]): Unit = {
val sc = getContext
val irisRDD: RDD[String] = sc.textFile("...")
...
}
Then just extend your other class with the the Spark Class and it should all work out.
I was getting the error running LogisticRegression Models, so I would assume this should fix it for you as well with other Machine Learning libraries as well.
Solution 3
Related to the above answers, I encountered this issue when I inadvertently serialized a datastax connector (i.e Cassandra connection driver) query to a spark slave. This then spun off its own SparkContext and within 4 seconds the entire application had crashed
Solution 4
I was also facing the same issue. after a lot of googling I found that I have made a singleton class for SparkContext initialization which is only valid for a single JVM instance, but in case of Spark this singleton class will be invoked from each worker node running on separate JVM instance and hence lead to multiple SparkContext object.
Solution 5
For me helped this, because SparkContext was already created
val sc = SparkContext.getOrCreate()
Before i tried with this
val conf = new SparkConf().setAppName("Testing").setMaster("local").set("spark.driver.allowMultipleContexts", "true")
val sc = SparkContext(conf)
But it was broken when i ran
spark.createDataFrame(rdd, schema)
Saulo Ricci
Updated on July 25, 2022Comments
-
Saulo Ricci almost 2 years
I'm running this snippet to sort an RDD of points, ordering the RDD and taking the K-nearest points from a given point:
def getKNN(sparkContext:SparkContext, k:Int, point2:Array[Double], pointsRDD:RDD[Array[Double]]): RDD[Array[Double]] = { val tuplePointDistanceRDD:RDD[(Double, Array[Double])] = pointsRDD.map(point => (DistanceUtils.euclidianDistance(point, point2), point)) sparkContext.parallelize(tuplePointDistanceRDD.sortBy(_._1).map(_._2).take(k))
}
Using just one SparkContext in my application and passing it as a parameter to my function, I'm getting a
org.apache.spark.SparkException: Failed to get broadcast_0_piece0 of broadcast_0
error at the moment I callsparkContext.parallelize(tuplePointDistanceRDD.sortBy(_._1).map(_._2).take(k))
to get the KNN points frompoint2
.I'm constructing
sparkContext
as this snippet bellow:var sparkContext = new SparkContext("local", "<app_name>")
What would be the possible causes of facing this kind of error?
Basically this is the LOG of my standalone spark environment with the stack trace of this error:
15/12/24 11:55:29 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkDriver@localhost:55731] 15/12/24 11:55:29 INFO Utils: Successfully started service 'sparkDriver' on port 55731. 15/12/24 11:55:29 INFO SparkEnv: Registering MapOutputTracker 15/12/24 11:55:29 INFO SparkEnv: Registering BlockManagerMaster 15/12/24 11:55:29 INFO DiskBlockManager: Created local directory at /private/var/folders/0r/3b6d3b6j45774_9616myw4440000gn/T/blockmgr-70e73cfe-683b-4297-aa5d-de38f98d02f1 15/12/24 11:55:29 INFO MemoryStore: MemoryStore started with capacity 491.7 MB 15/12/24 11:55:29 INFO HttpFileServer: HTTP File server directory is /private/var/folders/0r/3b6d3b6j45774_9616myw4440000gn/T/spark-f7bc8b6f-7d4f-4c55-8dff-0fbc4f6c2532/httpd-fb502369-4c28-4585-a37e-f3645d1d55a3 15/12/24 11:55:29 INFO HttpServer: Starting HTTP Server 15/12/24 11:55:29 INFO Utils: Successfully started service 'HTTP file server' on port 55732. 15/12/24 11:55:29 INFO SparkEnv: Registering OutputCommitCoordinator 15/12/24 11:55:29 INFO Utils: Successfully started service 'SparkUI' on port 4040. 15/12/24 11:55:29 INFO SparkUI: Started SparkUI at http://localhost:4040 15/12/24 11:55:29 WARN MetricsSystem: Using default name DAGScheduler for source because spark.app.id is not set. 15/12/24 11:55:29 INFO Executor: Starting executor ID driver on host localhost 15/12/24 11:55:29 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 55733. 15/12/24 11:55:29 INFO NettyBlockTransferService: Server created on 55733 15/12/24 11:55:29 INFO BlockManagerMaster: Trying to register BlockManager 15/12/24 11:55:29 INFO BlockManagerMasterEndpoint: Registering block manager localhost:55733 with 491.7 MB RAM, BlockManagerId(driver, localhost, 55733) 15/12/24 11:55:29 INFO BlockManagerMaster: Registered BlockManager 15/12/24 11:55:30 INFO TorrentBroadcast: Started reading broadcast variable 0 org.apache.spark.SparkException: Failed to get broadcast_0_piece0 of broadcast_0 java.io.IOException: org.apache.spark.SparkException: Failed to get broadcast_0_piece0 of broadcast_0 at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1178) at org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:165) at org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBroadcast.scala:64) at org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scala:64) at org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:88) at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70) at org.apache.spark.rdd.HadoopRDD.getJobConf(HadoopRDD.scala:144) at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:200) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:237) at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:237) at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:237) at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:237) at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239) at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.rdd.RDD.partitions(RDD.scala:237) at org.apache.spark.rdd.RDD.sortBy$default$3(RDD.scala:548) at LOF$.getKNN(LOF.scala:14) at LOF$.lof(LOF.scala:25) at BehaviourActivityScoreJudgeTest$$anonfun$1.apply$mcV$sp(BehaviourActivityScoreJudgeTest.scala:14) at BehaviourActivityScoreJudgeTest$$anonfun$1.apply(BehaviourActivityScoreJudgeTest.scala:11) at BehaviourActivityScoreJudgeTest$$anonfun$1.apply(BehaviourActivityScoreJudgeTest.scala:11) at org.scalatest.Transformer$$anonfun$apply$1.apply$mcV$sp(Transformer.scala:22) at org.scalatest.OutcomeOf$class.outcomeOf(OutcomeOf.scala:85) at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104) at org.scalatest.Transformer.apply(Transformer.scala:22) at org.scalatest.Transformer.apply(Transformer.scala:20) at org.scalatest.FlatSpecLike$$anon$1.apply(FlatSpecLike.scala:1647) at org.scalatest.Suite$class.withFixture(Suite.scala:1122) at org.scalatest.FlatSpec.withFixture(FlatSpec.scala:1683) at org.scalatest.FlatSpecLike$class.invokeWithFixture$1(FlatSpecLike.scala:1644) at org.scalatest.FlatSpecLike$$anonfun$runTest$1.apply(FlatSpecLike.scala:1656) at org.scalatest.FlatSpecLike$$anonfun$runTest$1.apply(FlatSpecLike.scala:1656) at org.scalatest.SuperEngine.runTestImpl(Engine.scala:306) at org.scalatest.FlatSpecLike$class.runTest(FlatSpecLike.scala:1656) at org.scalatest.FlatSpec.runTest(FlatSpec.scala:1683) at org.scalatest.FlatSpecLike$$anonfun$runTests$1.apply(FlatSpecLike.scala:1714) at org.scalatest.FlatSpecLike$$anonfun$runTests$1.apply(FlatSpecLike.scala:1714) at org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:413) at org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:401) at scala.collection.immutable.List.foreach(List.scala:318) at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:401) at org.scalatest.SuperEngine.org$scalatest$SuperEngine$$runTestsInBranch(Engine.scala:390) at org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:427) at org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:401) at scala.collection.immutable.List.foreach(List.scala:318) at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:401) at org.scalatest.SuperEngine.org$scalatest$SuperEngine$$runTestsInBranch(Engine.scala:396) at org.scalatest.SuperEngine.runTestsImpl(Engine.scala:483) at org.scalatest.FlatSpecLike$class.runTests(FlatSpecLike.scala:1714) at org.scalatest.FlatSpec.runTests(FlatSpec.scala:1683) at org.scalatest.Suite$class.run(Suite.scala:1424) at org.scalatest.FlatSpec.org$scalatest$FlatSpecLike$$super$run(FlatSpec.scala:1683) at org.scalatest.FlatSpecLike$$anonfun$run$1.apply(FlatSpecLike.scala:1760) at org.scalatest.FlatSpecLike$$anonfun$run$1.apply(FlatSpecLike.scala:1760) at org.scalatest.SuperEngine.runImpl(Engine.scala:545) at org.scalatest.FlatSpecLike$class.run(FlatSpecLike.scala:1760) at BehaviourActivityScoreJudgeTest.org$scalatest$BeforeAndAfterAll$$super$run(BehaviourActivityScoreJudgeTest.scala:4) at org.scalatest.BeforeAndAfterAll$class.liftedTree1$1(BeforeAndAfterAll.scala:257) at org.scalatest.BeforeAndAfterAll$class.run(BeforeAndAfterAll.scala:256) at BehaviourActivityScoreJudgeTest.run(BehaviourActivityScoreJudgeTest.scala:4) at org.scalatest.tools.SuiteRunner.run(SuiteRunner.scala:55) at org.scalatest.tools.Runner$$anonfun$doRunRunRunDaDoRunRun$3.apply(Runner.scala:2563) at org.scalatest.tools.Runner$$anonfun$doRunRunRunDaDoRunRun$3.apply(Runner.scala:2557) at scala.collection.immutable.List.foreach(List.scala:318) at org.scalatest.tools.Runner$.doRunRunRunDaDoRunRun(Runner.scala:2557) at org.scalatest.tools.Runner$$anonfun$runOptionallyWithPassFailReporter$2.apply(Runner.scala:1044) at org.scalatest.tools.Runner$$anonfun$runOptionallyWithPassFailReporter$2.apply(Runner.scala:1043) at org.scalatest.tools.Runner$.withClassLoaderAndDispatchReporter(Runner.scala:2722) at org.scalatest.tools.Runner$.runOptionallyWithPassFailReporter(Runner.scala:1043) at org.scalatest.tools.Runner$.run(Runner.scala:883) at org.scalatest.tools.Runner.run(Runner.scala) at org.jetbrains.plugins.scala.testingSupport.scalaTest.ScalaTestRunner.runScalaTest2(ScalaTestRunner.java:137) at org.jetbrains.plugins.scala.testingSupport.scalaTest.ScalaTestRunner.main(ScalaTestRunner.java:28) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:497) at com.intellij.rt.execution.application.AppMain.main(AppMain.java:134) Caused by: org.apache.spark.SparkException: Failed to get broadcast_0_piece0 of broadcast_0 at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks$1$$anonfun$2.apply(TorrentBroadcast.scala:138) at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks$1$$anonfun$2.apply(TorrentBroadcast.scala:138) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks$1.apply$mcVI$sp(TorrentBroadcast.scala:137) at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks$1.apply(TorrentBroadcast.scala:120) at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks$1.apply(TorrentBroadcast.scala:120) at scala.collection.immutable.List.foreach(List.scala:318) at org.apache.spark.broadcast.TorrentBroadcast.org$apache$spark$broadcast$TorrentBroadcast$$readBlocks(TorrentBroadcast.scala:120) at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock$1.apply(TorrentBroadcast.scala:175) at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1175) ... 94 more 15/12/24 11:55:30 INFO SparkUI: Stopped Spark web UI at http://localhost:4040 15/12/24 11:55:30 INFO DAGScheduler: Stopping DAGScheduler 15/12/24 11:55:30 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped! 15/12/24 11:55:30 INFO MemoryStore: MemoryStore cleared 15/12/24 11:55:30 INFO BlockManager: BlockManager stopped 15/12/24 11:55:30 INFO BlockManagerMaster: BlockManagerMaster stopped 15/12/24 11:55:30 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped! 15/12/24 11:55:30 INFO SparkContext: Successfully stopped SparkContext 15/12/24 11:55:30 INFO RemoteActorRefProvider$RemotingTerminator: Shutting down remote daemon. 15/12/24 11:55:30 INFO RemoteActorRefProvider$RemotingTerminator: Remote daemon shut down; proceeding with flushing remote transports. 15/12/24 11:55:30 INFO RemoteActorRefProvider$RemotingTerminator: Remoting shut down.
-
Shams about 8 yearshow you fixed that Start/Stop behaviour of SparkContext
-
BushMinusZero about 7 yearsPlease elaborate on how you fixed the issue.
-
Vikas Singh about 7 yearsSimply, just removed the SparkContext initialization from Singleton class and put it in driver code.