Cannot call methods on a stopped SparkContext
Solution 1
These are a few things that you should check -
Verify if you have resources available that you are specifying in spark-config
Do a search for stop() keyword in your codebase and check it should not be on sparkcontext
Spark has Spark-UI component where you can see what job ran, if it failed or succeeded, along with its log. That will tell you why is it failing.
Solution 2
Cannot call methods on a stopped SparkContext
it is consequence of some error which happend earlier. Look at the logs in $SPARK_HOME$/logs
and $SPARK_HOME$/work
.
Klue
Updated on April 28, 2020Comments
-
Klue about 4 years
When I run the following test, it throws "Cannot call methods on a stopped SparkContext". The possible problem is that I use
TestSuiteBase
and Streaming Spark Context. At the lineval gridEvalsRDD = ssc.sparkContext.parallelize(gridEvals)
I need to useSparkContext
that I access viassc.sparkContext
and this is where I have the problem (see the warning and error messages below)class StreamingTest extends TestSuiteBase with BeforeAndAfter { test("Test 1") { //... val gridEvals = for (initialWeights <- gridParams("initialWeights"); stepSize <- gridParams("stepSize"); numIterations <- gridParams("numIterations")) yield { val lr = new StreamingLinearRegressionWithSGD() .setInitialWeights(initialWeights.asInstanceOf[Vector]) .setStepSize(stepSize.asInstanceOf[Double]) .setNumIterations(numIterations.asInstanceOf[Int]) ssc = setupStreams(inputData, (inputDStream: DStream[LabeledPoint]) => { lr.trainOn(inputDStream) lr.predictOnValues(inputDStream.map(x => (x.label, x.features))) }) val output: Seq[Seq[(Double, Double)]] = runStreams(ssc, numBatches, numBatches) val cvRMSE = calculateRMSE(output, nPoints) println(s"RMSE = $cvRMSE") (initialWeights, stepSize, numIterations, cvRMSE) } val gridEvalsRDD = ssc.sparkContext.parallelize(gridEvals) } }
16/04/27 10:40:17 WARN StreamingContext: StreamingContext has already been stopped 16/04/27 10:40:17 INFO SparkContext: SparkContext already stopped.
Cannot call methods on a stopped SparkContext
UPDATE:
This is the base class
TestSuiteBase
:trait TestSuiteBase extends SparkFunSuite with BeforeAndAfter with Logging { // Name of the framework for Spark context def framework: String = this.getClass.getSimpleName // Master for Spark context def master: String = "local[2]" // Batch duration def batchDuration: Duration = Seconds(1) // Directory where the checkpoint data will be saved lazy val checkpointDir: String = { val dir = Utils.createTempDir() logDebug(s"checkpointDir: $dir") dir.toString } // Number of partitions of the input parallel collections created for testing def numInputPartitions: Int = 2 // Maximum time to wait before the test times out def maxWaitTimeMillis: Int = 10000 // Whether to use manual clock or not def useManualClock: Boolean = true // Whether to actually wait in real time before changing manual clock def actuallyWait: Boolean = false // A SparkConf to use in tests. Can be modified before calling setupStreams to configure things. val conf = new SparkConf() .setMaster(master) .setAppName(framework) // Timeout for use in ScalaTest `eventually` blocks val eventuallyTimeout: PatienceConfiguration.Timeout = timeout(Span(10, ScalaTestSeconds)) // Default before function for any streaming test suite. Override this // if you want to add your stuff to "before" (i.e., don't call before { } ) def beforeFunction() { if (useManualClock) { logInfo("Using manual clock") conf.set("spark.streaming.clock", "org.apache.spark.util.ManualClock") } else { logInfo("Using real clock") conf.set("spark.streaming.clock", "org.apache.spark.util.SystemClock") } } // Default after function for any streaming test suite. Override this // if you want to add your stuff to "after" (i.e., don't call after { } ) def afterFunction() { System.clearProperty("spark.streaming.clock") } before(beforeFunction) after(afterFunction) /** * Run a block of code with the given StreamingContext and automatically * stop the context when the block completes or when an exception is thrown. */ def withStreamingContext[R](ssc: StreamingContext)(block: StreamingContext => R): R = { try { block(ssc) } finally { try { ssc.stop(stopSparkContext = true) } catch { case e: Exception => logError("Error stopping StreamingContext", e) } } } /** * Run a block of code with the given TestServer and automatically * stop the server when the block completes or when an exception is thrown. */ def withTestServer[R](testServer: TestServer)(block: TestServer => R): R = { try { block(testServer) } finally { try { testServer.stop() } catch { case e: Exception => logError("Error stopping TestServer", e) } } } /** * Set up required DStreams to test the DStream operation using the two sequences * of input collections. */ def setupStreams[U: ClassTag, V: ClassTag]( input: Seq[Seq[U]], operation: DStream[U] => DStream[V], numPartitions: Int = numInputPartitions ): StreamingContext = { // Create StreamingContext val ssc = new StreamingContext(conf, batchDuration) if (checkpointDir != null) { ssc.checkpoint(checkpointDir) } // Setup the stream computation val inputStream = new TestInputStream(ssc, input, numPartitions) val operatedStream = operation(inputStream) val outputStream = new TestOutputStreamWithPartitions(operatedStream, new ArrayBuffer[Seq[Seq[V]]] with SynchronizedBuffer[Seq[Seq[V]]]) outputStream.register() ssc } /** * Set up required DStreams to test the binary operation using the sequence * of input collections. */ def setupStreams[U: ClassTag, V: ClassTag, W: ClassTag]( input1: Seq[Seq[U]], input2: Seq[Seq[V]], operation: (DStream[U], DStream[V]) => DStream[W] ): StreamingContext = { // Create StreamingContext val ssc = new StreamingContext(conf, batchDuration) if (checkpointDir != null) { ssc.checkpoint(checkpointDir) } // Setup the stream computation val inputStream1 = new TestInputStream(ssc, input1, numInputPartitions) val inputStream2 = new TestInputStream(ssc, input2, numInputPartitions) val operatedStream = operation(inputStream1, inputStream2) val outputStream = new TestOutputStreamWithPartitions(operatedStream, new ArrayBuffer[Seq[Seq[W]]] with SynchronizedBuffer[Seq[Seq[W]]]) outputStream.register() ssc } /** * Runs the streams set up in `ssc` on manual clock for `numBatches` batches and * returns the collected output. It will wait until `numExpectedOutput` number of * output data has been collected or timeout (set by `maxWaitTimeMillis`) is reached. * * Returns a sequence of items for each RDD. */ def runStreams[V: ClassTag]( ssc: StreamingContext, numBatches: Int, numExpectedOutput: Int ): Seq[Seq[V]] = { // Flatten each RDD into a single Seq runStreamsWithPartitions(ssc, numBatches, numExpectedOutput).map(_.flatten.toSeq) } /** * Runs the streams set up in `ssc` on manual clock for `numBatches` batches and * returns the collected output. It will wait until `numExpectedOutput` number of * output data has been collected or timeout (set by `maxWaitTimeMillis`) is reached. * * Returns a sequence of RDD's. Each RDD is represented as several sequences of items, each * representing one partition. */ def runStreamsWithPartitions[V: ClassTag]( ssc: StreamingContext, numBatches: Int, numExpectedOutput: Int ): Seq[Seq[Seq[V]]] = { assert(numBatches > 0, "Number of batches to run stream computation is zero") assert(numExpectedOutput > 0, "Number of expected outputs after " + numBatches + " is zero") logInfo("numBatches = " + numBatches + ", numExpectedOutput = " + numExpectedOutput) // Get the output buffer val outputStream = ssc.graph.getOutputStreams. filter(_.isInstanceOf[TestOutputStreamWithPartitions[_]]). head.asInstanceOf[TestOutputStreamWithPartitions[V]] val output = outputStream.output try { // Start computation ssc.start() // Advance manual clock val clock = ssc.scheduler.clock.asInstanceOf[ManualClock] logInfo("Manual clock before advancing = " + clock.getTimeMillis()) if (actuallyWait) { for (i <- 1 to numBatches) { logInfo("Actually waiting for " + batchDuration) clock.advance(batchDuration.milliseconds) Thread.sleep(batchDuration.milliseconds) } } else { clock.advance(numBatches * batchDuration.milliseconds) } logInfo("Manual clock after advancing = " + clock.getTimeMillis()) // Wait until expected number of output items have been generated val startTime = System.currentTimeMillis() while (output.size < numExpectedOutput && System.currentTimeMillis() - startTime < maxWaitTimeMillis) { logInfo("output.size = " + output.size + ", numExpectedOutput = " + numExpectedOutput) ssc.awaitTerminationOrTimeout(50) } val timeTaken = System.currentTimeMillis() - startTime logInfo("Output generated in " + timeTaken + " milliseconds") output.foreach(x => logInfo("[" + x.mkString(",") + "]")) assert(timeTaken < maxWaitTimeMillis, "Operation timed out after " + timeTaken + " ms") assert(output.size === numExpectedOutput, "Unexpected number of outputs generated") Thread.sleep(100) // Give some time for the forgetting old RDDs to complete } finally { ssc.stop(stopSparkContext = true) } output } /** * Verify whether the output values after running a DStream operation * is same as the expected output values, by comparing the output * collections either as lists (order matters) or sets (order does not matter) */ def verifyOutput[V: ClassTag]( output: Seq[Seq[V]], expectedOutput: Seq[Seq[V]], useSet: Boolean ) { logInfo("--------------------------------") logInfo("output.size = " + output.size) logInfo("output") output.foreach(x => logInfo("[" + x.mkString(",") + "]")) logInfo("expected output.size = " + expectedOutput.size) logInfo("expected output") expectedOutput.foreach(x => logInfo("[" + x.mkString(",") + "]")) logInfo("--------------------------------") // Match the output with the expected output for (i <- 0 until output.size) { if (useSet) { assert( output(i).toSet === expectedOutput(i).toSet, s"Set comparison failed\n" + s"Expected output (${expectedOutput.size} items):\n${expectedOutput.mkString("\n")}\n" + s"Generated output (${output.size} items): ${output.mkString("\n")}" ) } else { assert( output(i).toList === expectedOutput(i).toList, s"Ordered list comparison failed\n" + s"Expected output (${expectedOutput.size} items):\n${expectedOutput.mkString("\n")}\n" + s"Generated output (${output.size} items): ${output.mkString("\n")}" ) } } logInfo("Output verified successfully") } /** * Test unary DStream operation with a list of inputs, with number of * batches to run same as the number of expected output values */ def testOperation[U: ClassTag, V: ClassTag]( input: Seq[Seq[U]], operation: DStream[U] => DStream[V], expectedOutput: Seq[Seq[V]], useSet: Boolean = false ) { testOperation[U, V](input, operation, expectedOutput, -1, useSet) } /** * Test unary DStream operation with a list of inputs * @param input Sequence of input collections * @param operation Binary DStream operation to be applied to the 2 inputs * @param expectedOutput Sequence of expected output collections * @param numBatches Number of batches to run the operation for * @param useSet Compare the output values with the expected output values * as sets (order matters) or as lists (order does not matter) */ def testOperation[U: ClassTag, V: ClassTag]( input: Seq[Seq[U]], operation: DStream[U] => DStream[V], expectedOutput: Seq[Seq[V]], numBatches: Int, useSet: Boolean ) { val numBatches_ = if (numBatches > 0) numBatches else expectedOutput.size withStreamingContext(setupStreams[U, V](input, operation)) { ssc => val output = runStreams[V](ssc, numBatches_, expectedOutput.size) verifyOutput[V](output, expectedOutput, useSet) } } /** * Test binary DStream operation with two lists of inputs, with number of * batches to run same as the number of expected output values */ def testOperation[U: ClassTag, V: ClassTag, W: ClassTag]( input1: Seq[Seq[U]], input2: Seq[Seq[V]], operation: (DStream[U], DStream[V]) => DStream[W], expectedOutput: Seq[Seq[W]], useSet: Boolean ) { testOperation[U, V, W](input1, input2, operation, expectedOutput, -1, useSet) } /** * Test binary DStream operation with two lists of inputs * @param input1 First sequence of input collections * @param input2 Second sequence of input collections * @param operation Binary DStream operation to be applied to the 2 inputs * @param expectedOutput Sequence of expected output collections * @param numBatches Number of batches to run the operation for * @param useSet Compare the output values with the expected output values * as sets (order matters) or as lists (order does not matter) */ def testOperation[U: ClassTag, V: ClassTag, W: ClassTag]( input1: Seq[Seq[U]], input2: Seq[Seq[V]], operation: (DStream[U], DStream[V]) => DStream[W], expectedOutput: Seq[Seq[W]], numBatches: Int, useSet: Boolean ) { val numBatches_ = if (numBatches > 0) numBatches else expectedOutput.size withStreamingContext(setupStreams[U, V, W](input1, input2, operation)) { ssc => val output = runStreams[W](ssc, numBatches_, expectedOutput.size) verifyOutput[W](output, expectedOutput, useSet) } } }
-
Klue about 8 yearsI cannot find logs in the home directory of Spark. Do I need to execute my code with Logging?
-
Hlib about 8 yearsok. where did you find this: 16/04/27 10:40:17 WARN StreamingContext....?just look above (if you limited by terminal window then you can redirect it to file)
-
Klue about 8 yearsI am using IntellijIDEA. How to save logs to the file? It's quite obvious if I run a program from terminal (> log), but what should I do in case of IntellijIDEA? Sorry for beginner's questions...
-
Hlib about 8 yearsstackoverflow.com/questions/4736020/… anyway you can try to run it from terminal. and again. you sure that there are no logs in 'logs' and 'work' directory?
-
Klue about 8 yearsOk, it says
java.lang.IllegalStateException: Cannot call methods on a stopped SparkContext
and points to the last line of the codeval gridEvalsRDD = ssc.sparkContext.parallelize(gridEvals)
. Nothing else is mentioned on this issue. -
Klue about 8 yearsI think that the point 2 is the right reason of the issue. I checked
TestSuiteBase
(see my Update) and it only specifies StreamingContext. I tried to addsc.stop()
, but then I should passsc
as a parameter of the functionrunStreams(...)
and I'm not sure that this is the right way. Could you please elaborate a little bit your answer on this point? -
tesnik03 about 8 yearsBased on your updated TestSuiteBase, I would suggest to test this out by just commenting places where it says stop. If it work fine then do it one by one. You should be able to figure it out which one might be a problem. Rest of the code look fine to me.