Why does Spark fail with "Failed to get broadcast_0_piece0 of broadcast_0" in local mode?

19,173

Solution 1

Just discovered why I was getting this exception: for a reason my SparkContext object started/stopped several times between ScalaTest methods. So, fixing that behaviour lead me to get spark working in the right way I would expect.

Solution 2

I was getting this error as well. I haven't really seen any concrete coding examples, so I will share my solution. This cleared the error for me, but I have a sense that there may be more than 1 solution to this problem. But this would be worth a go as it keeps everything within the code.

It looks as though the SparkContext was shutting down, thus throwing the error. I think the issue is that the SparkContext is created in a class and then extended to other classes. The extension causes it to shut down, which is a bit annoying. Below is the implementation I used to get this error to clear.

Spark Initialisation Class:

import org.apache.spark.{SparkConf, SparkContext}

class Spark extends Serializable {
  def getContext: SparkContext = {
    @transient lazy val conf: SparkConf = 
          new SparkConf()
          .setMaster("local")
          .setAppName("test")

    @transient lazy val sc: SparkContext = new SparkContext(conf)
    sc.setLogLevel("OFF")

   sc
  }
 }

Main Class:

object Test extends Spark{

  def main(args: Array[String]): Unit = {
  val sc = getContext
  val irisRDD: RDD[String] = sc.textFile("...")
...
}

Then just extend your other class with the the Spark Class and it should all work out.

I was getting the error running LogisticRegression Models, so I would assume this should fix it for you as well with other Machine Learning libraries as well.

Solution 3

Related to the above answers, I encountered this issue when I inadvertently serialized a datastax connector (i.e Cassandra connection driver) query to a spark slave. This then spun off its own SparkContext and within 4 seconds the entire application had crashed

Solution 4

I was also facing the same issue. after a lot of googling I found that I have made a singleton class for SparkContext initialization which is only valid for a single JVM instance, but in case of Spark this singleton class will be invoked from each worker node running on separate JVM instance and hence lead to multiple SparkContext object.

Solution 5

For me helped this, because SparkContext was already created

val sc = SparkContext.getOrCreate()

Before i tried with this

val conf = new SparkConf().setAppName("Testing").setMaster("local").set("spark.driver.allowMultipleContexts", "true")
val sc = SparkContext(conf)

But it was broken when i ran

 spark.createDataFrame(rdd, schema)
Share:
19,173
Saulo Ricci
Author by

Saulo Ricci

Updated on July 25, 2022

Comments

  • Saulo Ricci
    Saulo Ricci almost 2 years

    I'm running this snippet to sort an RDD of points, ordering the RDD and taking the K-nearest points from a given point:

    def getKNN(sparkContext:SparkContext, k:Int, point2:Array[Double], pointsRDD:RDD[Array[Double]]): RDD[Array[Double]] = {
        val tuplePointDistanceRDD:RDD[(Double, Array[Double])] = pointsRDD.map(point =>
                               (DistanceUtils.euclidianDistance(point, point2), point))
        sparkContext.parallelize(tuplePointDistanceRDD.sortBy(_._1).map(_._2).take(k))
    

    }

    Using just one SparkContext in my application and passing it as a parameter to my function, I'm getting a org.apache.spark.SparkException: Failed to get broadcast_0_piece0 of broadcast_0 error at the moment I call sparkContext.parallelize(tuplePointDistanceRDD.sortBy(_._1).map(_._2).take(k)) to get the KNN points from point2.

    I'm constructing sparkContext as this snippet bellow:

    var sparkContext = new SparkContext("local", "<app_name>")
    

    What would be the possible causes of facing this kind of error?

    Basically this is the LOG of my standalone spark environment with the stack trace of this error:

    15/12/24 11:55:29 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkDriver@localhost:55731]
    15/12/24 11:55:29 INFO Utils: Successfully started service 'sparkDriver' on port 55731.
    15/12/24 11:55:29 INFO SparkEnv: Registering MapOutputTracker
    15/12/24 11:55:29 INFO SparkEnv: Registering BlockManagerMaster
    15/12/24 11:55:29 INFO DiskBlockManager: Created local directory at /private/var/folders/0r/3b6d3b6j45774_9616myw4440000gn/T/blockmgr-70e73cfe-683b-4297-aa5d-de38f98d02f1
    15/12/24 11:55:29 INFO MemoryStore: MemoryStore started with capacity 491.7 MB
    15/12/24 11:55:29 INFO HttpFileServer: HTTP File server directory is /private/var/folders/0r/3b6d3b6j45774_9616myw4440000gn/T/spark-f7bc8b6f-7d4f-4c55-8dff-0fbc4f6c2532/httpd-fb502369-4c28-4585-a37e-f3645d1d55a3
    15/12/24 11:55:29 INFO HttpServer: Starting HTTP Server
    15/12/24 11:55:29 INFO Utils: Successfully started service 'HTTP file server' on port 55732.
    15/12/24 11:55:29 INFO SparkEnv: Registering OutputCommitCoordinator
    15/12/24 11:55:29 INFO Utils: Successfully started service 'SparkUI' on port 4040.
    15/12/24 11:55:29 INFO SparkUI: Started SparkUI at http://localhost:4040
    15/12/24 11:55:29 WARN MetricsSystem: Using default name DAGScheduler for source because spark.app.id is not set.
    15/12/24 11:55:29 INFO Executor: Starting executor ID driver on host localhost
    15/12/24 11:55:29 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 55733.
    15/12/24 11:55:29 INFO NettyBlockTransferService: Server created on 55733
    15/12/24 11:55:29 INFO BlockManagerMaster: Trying to register BlockManager
    15/12/24 11:55:29 INFO BlockManagerMasterEndpoint: Registering block manager localhost:55733 with 491.7 MB RAM, BlockManagerId(driver, localhost, 55733)
    15/12/24 11:55:29 INFO BlockManagerMaster: Registered BlockManager
    15/12/24 11:55:30 INFO TorrentBroadcast: Started reading broadcast variable 0
    
    org.apache.spark.SparkException: Failed to get broadcast_0_piece0 of broadcast_0
    java.io.IOException: org.apache.spark.SparkException: Failed to get broadcast_0_piece0 of broadcast_0
    at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1178)
    at org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBroadcast.scala:165)
    at org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBroadcast.scala:64)
    at org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scala:64)
    at org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.scala:88)
    at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70)
    at org.apache.spark.rdd.HadoopRDD.getJobConf(HadoopRDD.scala:144)
    at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:200)
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)
    at scala.Option.getOrElse(Option.scala:120)
    at org.apache.spark.rdd.RDD.partitions(RDD.scala:237)
    at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)
    at scala.Option.getOrElse(Option.scala:120)
    at org.apache.spark.rdd.RDD.partitions(RDD.scala:237)
    at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)
    at scala.Option.getOrElse(Option.scala:120)
    at org.apache.spark.rdd.RDD.partitions(RDD.scala:237)
    at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)
    at scala.Option.getOrElse(Option.scala:120)
    at org.apache.spark.rdd.RDD.partitions(RDD.scala:237)
    at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:239)
    at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:237)
    at scala.Option.getOrElse(Option.scala:120)
    at org.apache.spark.rdd.RDD.partitions(RDD.scala:237)
    at org.apache.spark.rdd.RDD.sortBy$default$3(RDD.scala:548)
    at LOF$.getKNN(LOF.scala:14)
    at LOF$.lof(LOF.scala:25)
    at BehaviourActivityScoreJudgeTest$$anonfun$1.apply$mcV$sp(BehaviourActivityScoreJudgeTest.scala:14)
    at BehaviourActivityScoreJudgeTest$$anonfun$1.apply(BehaviourActivityScoreJudgeTest.scala:11)
    at BehaviourActivityScoreJudgeTest$$anonfun$1.apply(BehaviourActivityScoreJudgeTest.scala:11)
    at org.scalatest.Transformer$$anonfun$apply$1.apply$mcV$sp(Transformer.scala:22)
    at org.scalatest.OutcomeOf$class.outcomeOf(OutcomeOf.scala:85)
    at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
    at org.scalatest.Transformer.apply(Transformer.scala:22)
    at org.scalatest.Transformer.apply(Transformer.scala:20)
    at org.scalatest.FlatSpecLike$$anon$1.apply(FlatSpecLike.scala:1647)
    at org.scalatest.Suite$class.withFixture(Suite.scala:1122)
    at org.scalatest.FlatSpec.withFixture(FlatSpec.scala:1683)
    at org.scalatest.FlatSpecLike$class.invokeWithFixture$1(FlatSpecLike.scala:1644)
    at org.scalatest.FlatSpecLike$$anonfun$runTest$1.apply(FlatSpecLike.scala:1656)
    at org.scalatest.FlatSpecLike$$anonfun$runTest$1.apply(FlatSpecLike.scala:1656)
    at org.scalatest.SuperEngine.runTestImpl(Engine.scala:306)
    at org.scalatest.FlatSpecLike$class.runTest(FlatSpecLike.scala:1656)
    at org.scalatest.FlatSpec.runTest(FlatSpec.scala:1683)
    at org.scalatest.FlatSpecLike$$anonfun$runTests$1.apply(FlatSpecLike.scala:1714)
    at org.scalatest.FlatSpecLike$$anonfun$runTests$1.apply(FlatSpecLike.scala:1714)
    at org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:413)
    at org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:401)
    at scala.collection.immutable.List.foreach(List.scala:318)
    at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:401)
    at org.scalatest.SuperEngine.org$scalatest$SuperEngine$$runTestsInBranch(Engine.scala:390)
    at org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:427)
    at org.scalatest.SuperEngine$$anonfun$traverseSubNodes$1$1.apply(Engine.scala:401)
    at scala.collection.immutable.List.foreach(List.scala:318)
    at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:401)
    at org.scalatest.SuperEngine.org$scalatest$SuperEngine$$runTestsInBranch(Engine.scala:396)
    at org.scalatest.SuperEngine.runTestsImpl(Engine.scala:483)
    at org.scalatest.FlatSpecLike$class.runTests(FlatSpecLike.scala:1714)
    at org.scalatest.FlatSpec.runTests(FlatSpec.scala:1683)
    at org.scalatest.Suite$class.run(Suite.scala:1424)
    at org.scalatest.FlatSpec.org$scalatest$FlatSpecLike$$super$run(FlatSpec.scala:1683)
    at org.scalatest.FlatSpecLike$$anonfun$run$1.apply(FlatSpecLike.scala:1760)
    at org.scalatest.FlatSpecLike$$anonfun$run$1.apply(FlatSpecLike.scala:1760)
    at org.scalatest.SuperEngine.runImpl(Engine.scala:545)
    at org.scalatest.FlatSpecLike$class.run(FlatSpecLike.scala:1760)
    at BehaviourActivityScoreJudgeTest.org$scalatest$BeforeAndAfterAll$$super$run(BehaviourActivityScoreJudgeTest.scala:4)
    at org.scalatest.BeforeAndAfterAll$class.liftedTree1$1(BeforeAndAfterAll.scala:257)
    at org.scalatest.BeforeAndAfterAll$class.run(BeforeAndAfterAll.scala:256)
    at BehaviourActivityScoreJudgeTest.run(BehaviourActivityScoreJudgeTest.scala:4)
    at org.scalatest.tools.SuiteRunner.run(SuiteRunner.scala:55)
    at org.scalatest.tools.Runner$$anonfun$doRunRunRunDaDoRunRun$3.apply(Runner.scala:2563)
    at org.scalatest.tools.Runner$$anonfun$doRunRunRunDaDoRunRun$3.apply(Runner.scala:2557)
    at scala.collection.immutable.List.foreach(List.scala:318)
    at org.scalatest.tools.Runner$.doRunRunRunDaDoRunRun(Runner.scala:2557)
    at org.scalatest.tools.Runner$$anonfun$runOptionallyWithPassFailReporter$2.apply(Runner.scala:1044)
    at org.scalatest.tools.Runner$$anonfun$runOptionallyWithPassFailReporter$2.apply(Runner.scala:1043)
    at org.scalatest.tools.Runner$.withClassLoaderAndDispatchReporter(Runner.scala:2722)
    at org.scalatest.tools.Runner$.runOptionallyWithPassFailReporter(Runner.scala:1043)
    at org.scalatest.tools.Runner$.run(Runner.scala:883)
    at org.scalatest.tools.Runner.run(Runner.scala)
    at org.jetbrains.plugins.scala.testingSupport.scalaTest.ScalaTestRunner.runScalaTest2(ScalaTestRunner.java:137)
    at org.jetbrains.plugins.scala.testingSupport.scalaTest.ScalaTestRunner.main(ScalaTestRunner.java:28)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:497)
    at com.intellij.rt.execution.application.AppMain.main(AppMain.java:134)
    Caused by: org.apache.spark.SparkException: Failed to get broadcast_0_piece0 of broadcast_0
    at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks$1$$anonfun$2.apply(TorrentBroadcast.scala:138)
    at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks$1$$anonfun$2.apply(TorrentBroadcast.scala:138)
    at scala.Option.getOrElse(Option.scala:120)
    at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks$1.apply$mcVI$sp(TorrentBroadcast.scala:137)
    at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks$1.apply(TorrentBroadcast.scala:120)
    at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$broadcast$TorrentBroadcast$$readBlocks$1.apply(TorrentBroadcast.scala:120)
    at scala.collection.immutable.List.foreach(List.scala:318)
    at org.apache.spark.broadcast.TorrentBroadcast.org$apache$spark$broadcast$TorrentBroadcast$$readBlocks(TorrentBroadcast.scala:120)
    at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock$1.apply(TorrentBroadcast.scala:175)
    at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1175)
    ... 94 more
    
    15/12/24 11:55:30 INFO SparkUI: Stopped Spark web UI at http://localhost:4040
    15/12/24 11:55:30 INFO DAGScheduler: Stopping DAGScheduler
    15/12/24 11:55:30 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
    15/12/24 11:55:30 INFO MemoryStore: MemoryStore cleared
    15/12/24 11:55:30 INFO BlockManager: BlockManager stopped
    15/12/24 11:55:30 INFO BlockManagerMaster: BlockManagerMaster stopped
    15/12/24 11:55:30 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
    15/12/24 11:55:30 INFO SparkContext: Successfully stopped SparkContext
    15/12/24 11:55:30 INFO RemoteActorRefProvider$RemotingTerminator: Shutting down remote daemon.
    15/12/24 11:55:30 INFO RemoteActorRefProvider$RemotingTerminator: Remote daemon shut down; proceeding with flushing remote transports.
    15/12/24 11:55:30 INFO RemoteActorRefProvider$RemotingTerminator: Remoting shut down.
    
  • Shams
    Shams about 8 years
    how you fixed that Start/Stop behaviour of SparkContext
  • BushMinusZero
    BushMinusZero about 7 years
    Please elaborate on how you fixed the issue.
  • Vikas Singh
    Vikas Singh about 7 years
    Simply, just removed the SparkContext initialization from Singleton class and put it in driver code.