Cannot call methods on a stopped SparkContext

33,108

Solution 1

These are a few things that you should check -

  1. Verify if you have resources available that you are specifying in spark-config

  2. Do a search for stop() keyword in your codebase and check it should not be on sparkcontext

  3. Spark has Spark-UI component where you can see what job ran, if it failed or succeeded, along with its log. That will tell you why is it failing.

Solution 2

Cannot call methods on a stopped SparkContext it is consequence of some error which happend earlier. Look at the logs in $SPARK_HOME$/logs and $SPARK_HOME$/work.

Share:
33,108
Klue
Author by

Klue

Updated on April 28, 2020

Comments

  • Klue
    Klue about 4 years

    When I run the following test, it throws "Cannot call methods on a stopped SparkContext". The possible problem is that I use TestSuiteBase and Streaming Spark Context. At the line val gridEvalsRDD = ssc.sparkContext.parallelize(gridEvals) I need to use SparkContext that I access via ssc.sparkContext and this is where I have the problem (see the warning and error messages below)

    class StreamingTest extends TestSuiteBase with BeforeAndAfter {
    
    test("Test 1") {
    //...
        val gridEvals = for (initialWeights <- gridParams("initialWeights");
                             stepSize <- gridParams("stepSize");
                             numIterations <- gridParams("numIterations")) yield {
          val lr = new StreamingLinearRegressionWithSGD()
            .setInitialWeights(initialWeights.asInstanceOf[Vector])
            .setStepSize(stepSize.asInstanceOf[Double])
            .setNumIterations(numIterations.asInstanceOf[Int])
    
          ssc = setupStreams(inputData, (inputDStream: DStream[LabeledPoint]) => {
            lr.trainOn(inputDStream)
            lr.predictOnValues(inputDStream.map(x => (x.label, x.features)))
          })
    
          val output: Seq[Seq[(Double, Double)]] = runStreams(ssc, numBatches, numBatches)
          val cvRMSE = calculateRMSE(output, nPoints)
          println(s"RMSE = $cvRMSE")
          (initialWeights, stepSize, numIterations, cvRMSE)
    
        }
    
         val gridEvalsRDD = ssc.sparkContext.parallelize(gridEvals)
    
    }
    
    }
    

    16/04/27 10:40:17 WARN StreamingContext: StreamingContext has already been stopped 16/04/27 10:40:17 INFO SparkContext: SparkContext already stopped.

    Cannot call methods on a stopped SparkContext

    UPDATE:

    This is the base class TestSuiteBase:

    trait TestSuiteBase extends SparkFunSuite with BeforeAndAfter with Logging {
    
      // Name of the framework for Spark context
      def framework: String = this.getClass.getSimpleName
    
      // Master for Spark context
      def master: String = "local[2]"
    
      // Batch duration
      def batchDuration: Duration = Seconds(1)
    
      // Directory where the checkpoint data will be saved
      lazy val checkpointDir: String = {
        val dir = Utils.createTempDir()
        logDebug(s"checkpointDir: $dir")
        dir.toString
      }
    
      // Number of partitions of the input parallel collections created for testing
      def numInputPartitions: Int = 2
    
      // Maximum time to wait before the test times out
      def maxWaitTimeMillis: Int = 10000
    
      // Whether to use manual clock or not
      def useManualClock: Boolean = true
    
      // Whether to actually wait in real time before changing manual clock
      def actuallyWait: Boolean = false
    
      // A SparkConf to use in tests. Can be modified before calling setupStreams to configure things.
      val conf = new SparkConf()
        .setMaster(master)
        .setAppName(framework)
    
      // Timeout for use in ScalaTest `eventually` blocks
      val eventuallyTimeout: PatienceConfiguration.Timeout = timeout(Span(10, ScalaTestSeconds))
    
      // Default before function for any streaming test suite. Override this
      // if you want to add your stuff to "before" (i.e., don't call before { } )
      def beforeFunction() {
        if (useManualClock) {
          logInfo("Using manual clock")
          conf.set("spark.streaming.clock", "org.apache.spark.util.ManualClock")
        } else {
          logInfo("Using real clock")
          conf.set("spark.streaming.clock", "org.apache.spark.util.SystemClock")
        }
      }
    
      // Default after function for any streaming test suite. Override this
      // if you want to add your stuff to "after" (i.e., don't call after { } )
      def afterFunction() {
        System.clearProperty("spark.streaming.clock")
      }
    
      before(beforeFunction)
      after(afterFunction)
    
      /**
       * Run a block of code with the given StreamingContext and automatically
       * stop the context when the block completes or when an exception is thrown.
       */
      def withStreamingContext[R](ssc: StreamingContext)(block: StreamingContext => R): R = {
        try {
          block(ssc)
        } finally {
          try {
            ssc.stop(stopSparkContext = true)
          } catch {
            case e: Exception =>
              logError("Error stopping StreamingContext", e)
          }
        }
      }
    
      /**
       * Run a block of code with the given TestServer and automatically
       * stop the server when the block completes or when an exception is thrown.
       */
      def withTestServer[R](testServer: TestServer)(block: TestServer => R): R = {
        try {
          block(testServer)
        } finally {
          try {
            testServer.stop()
          } catch {
            case e: Exception =>
              logError("Error stopping TestServer", e)
          }
        }
      }
    
      /**
       * Set up required DStreams to test the DStream operation using the two sequences
       * of input collections.
       */
      def setupStreams[U: ClassTag, V: ClassTag](
          input: Seq[Seq[U]],
          operation: DStream[U] => DStream[V],
          numPartitions: Int = numInputPartitions
        ): StreamingContext = {
        // Create StreamingContext
        val ssc = new StreamingContext(conf, batchDuration)
        if (checkpointDir != null) {
          ssc.checkpoint(checkpointDir)
        }
    
        // Setup the stream computation
        val inputStream = new TestInputStream(ssc, input, numPartitions)
        val operatedStream = operation(inputStream)
        val outputStream = new TestOutputStreamWithPartitions(operatedStream,
          new ArrayBuffer[Seq[Seq[V]]] with SynchronizedBuffer[Seq[Seq[V]]])
        outputStream.register()
        ssc
      }
    
      /**
       * Set up required DStreams to test the binary operation using the sequence
       * of input collections.
       */
      def setupStreams[U: ClassTag, V: ClassTag, W: ClassTag](
          input1: Seq[Seq[U]],
          input2: Seq[Seq[V]],
          operation: (DStream[U], DStream[V]) => DStream[W]
        ): StreamingContext = {
        // Create StreamingContext
        val ssc = new StreamingContext(conf, batchDuration)
        if (checkpointDir != null) {
          ssc.checkpoint(checkpointDir)
        }
    
        // Setup the stream computation
        val inputStream1 = new TestInputStream(ssc, input1, numInputPartitions)
        val inputStream2 = new TestInputStream(ssc, input2, numInputPartitions)
        val operatedStream = operation(inputStream1, inputStream2)
        val outputStream = new TestOutputStreamWithPartitions(operatedStream,
          new ArrayBuffer[Seq[Seq[W]]] with SynchronizedBuffer[Seq[Seq[W]]])
        outputStream.register()
        ssc
      }
    
      /**
       * Runs the streams set up in `ssc` on manual clock for `numBatches` batches and
       * returns the collected output. It will wait until `numExpectedOutput` number of
       * output data has been collected or timeout (set by `maxWaitTimeMillis`) is reached.
       *
       * Returns a sequence of items for each RDD.
       */
      def runStreams[V: ClassTag](
          ssc: StreamingContext,
          numBatches: Int,
          numExpectedOutput: Int
        ): Seq[Seq[V]] = {
        // Flatten each RDD into a single Seq
        runStreamsWithPartitions(ssc, numBatches, numExpectedOutput).map(_.flatten.toSeq)
      }
    
      /**
       * Runs the streams set up in `ssc` on manual clock for `numBatches` batches and
       * returns the collected output. It will wait until `numExpectedOutput` number of
       * output data has been collected or timeout (set by `maxWaitTimeMillis`) is reached.
       *
       * Returns a sequence of RDD's. Each RDD is represented as several sequences of items, each
       * representing one partition.
       */
      def runStreamsWithPartitions[V: ClassTag](
          ssc: StreamingContext,
          numBatches: Int,
          numExpectedOutput: Int
        ): Seq[Seq[Seq[V]]] = {
        assert(numBatches > 0, "Number of batches to run stream computation is zero")
        assert(numExpectedOutput > 0, "Number of expected outputs after " + numBatches + " is zero")
        logInfo("numBatches = " + numBatches + ", numExpectedOutput = " + numExpectedOutput)
    
        // Get the output buffer
        val outputStream = ssc.graph.getOutputStreams.
          filter(_.isInstanceOf[TestOutputStreamWithPartitions[_]]).
          head.asInstanceOf[TestOutputStreamWithPartitions[V]]
        val output = outputStream.output
    
        try {
          // Start computation
          ssc.start()
    
          // Advance manual clock
          val clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
          logInfo("Manual clock before advancing = " + clock.getTimeMillis())
          if (actuallyWait) {
            for (i <- 1 to numBatches) {
              logInfo("Actually waiting for " + batchDuration)
              clock.advance(batchDuration.milliseconds)
              Thread.sleep(batchDuration.milliseconds)
            }
          } else {
            clock.advance(numBatches * batchDuration.milliseconds)
          }
          logInfo("Manual clock after advancing = " + clock.getTimeMillis())
    
          // Wait until expected number of output items have been generated
          val startTime = System.currentTimeMillis()
          while (output.size < numExpectedOutput &&
            System.currentTimeMillis() - startTime < maxWaitTimeMillis) {
            logInfo("output.size = " + output.size + ", numExpectedOutput = " + numExpectedOutput)
            ssc.awaitTerminationOrTimeout(50)
          }
          val timeTaken = System.currentTimeMillis() - startTime
          logInfo("Output generated in " + timeTaken + " milliseconds")
          output.foreach(x => logInfo("[" + x.mkString(",") + "]"))
          assert(timeTaken < maxWaitTimeMillis, "Operation timed out after " + timeTaken + " ms")
          assert(output.size === numExpectedOutput, "Unexpected number of outputs generated")
    
          Thread.sleep(100) // Give some time for the forgetting old RDDs to complete
        } finally {
          ssc.stop(stopSparkContext = true)
        }
        output
      }
    
      /**
       * Verify whether the output values after running a DStream operation
       * is same as the expected output values, by comparing the output
       * collections either as lists (order matters) or sets (order does not matter)
       */
      def verifyOutput[V: ClassTag](
          output: Seq[Seq[V]],
          expectedOutput: Seq[Seq[V]],
          useSet: Boolean
        ) {
        logInfo("--------------------------------")
        logInfo("output.size = " + output.size)
        logInfo("output")
        output.foreach(x => logInfo("[" + x.mkString(",") + "]"))
        logInfo("expected output.size = " + expectedOutput.size)
        logInfo("expected output")
        expectedOutput.foreach(x => logInfo("[" + x.mkString(",") + "]"))
        logInfo("--------------------------------")
    
        // Match the output with the expected output
        for (i <- 0 until output.size) {
          if (useSet) {
            assert(
              output(i).toSet === expectedOutput(i).toSet,
              s"Set comparison failed\n" +
                s"Expected output (${expectedOutput.size} items):\n${expectedOutput.mkString("\n")}\n" +
                s"Generated output (${output.size} items): ${output.mkString("\n")}"
            )
          } else {
            assert(
              output(i).toList === expectedOutput(i).toList,
              s"Ordered list comparison failed\n" +
                s"Expected output (${expectedOutput.size} items):\n${expectedOutput.mkString("\n")}\n" +
                s"Generated output (${output.size} items): ${output.mkString("\n")}"
            )
          }
        }
        logInfo("Output verified successfully")
      }
    
      /**
       * Test unary DStream operation with a list of inputs, with number of
       * batches to run same as the number of expected output values
       */
      def testOperation[U: ClassTag, V: ClassTag](
          input: Seq[Seq[U]],
          operation: DStream[U] => DStream[V],
          expectedOutput: Seq[Seq[V]],
          useSet: Boolean = false
        ) {
        testOperation[U, V](input, operation, expectedOutput, -1, useSet)
      }
    
      /**
       * Test unary DStream operation with a list of inputs
       * @param input      Sequence of input collections
       * @param operation  Binary DStream operation to be applied to the 2 inputs
       * @param expectedOutput Sequence of expected output collections
       * @param numBatches Number of batches to run the operation for
       * @param useSet     Compare the output values with the expected output values
       *                   as sets (order matters) or as lists (order does not matter)
       */
      def testOperation[U: ClassTag, V: ClassTag](
          input: Seq[Seq[U]],
          operation: DStream[U] => DStream[V],
          expectedOutput: Seq[Seq[V]],
          numBatches: Int,
          useSet: Boolean
        ) {
        val numBatches_ = if (numBatches > 0) numBatches else expectedOutput.size
        withStreamingContext(setupStreams[U, V](input, operation)) { ssc =>
          val output = runStreams[V](ssc, numBatches_, expectedOutput.size)
          verifyOutput[V](output, expectedOutput, useSet)
        }
      }
    
      /**
       * Test binary DStream operation with two lists of inputs, with number of
       * batches to run same as the number of expected output values
       */
      def testOperation[U: ClassTag, V: ClassTag, W: ClassTag](
          input1: Seq[Seq[U]],
          input2: Seq[Seq[V]],
          operation: (DStream[U], DStream[V]) => DStream[W],
          expectedOutput: Seq[Seq[W]],
          useSet: Boolean
        ) {
        testOperation[U, V, W](input1, input2, operation, expectedOutput, -1, useSet)
      }
    
      /**
       * Test binary DStream operation with two lists of inputs
       * @param input1     First sequence of input collections
       * @param input2     Second sequence of input collections
       * @param operation  Binary DStream operation to be applied to the 2 inputs
       * @param expectedOutput Sequence of expected output collections
       * @param numBatches Number of batches to run the operation for
       * @param useSet     Compare the output values with the expected output values
       *                   as sets (order matters) or as lists (order does not matter)
       */
      def testOperation[U: ClassTag, V: ClassTag, W: ClassTag](
          input1: Seq[Seq[U]],
          input2: Seq[Seq[V]],
          operation: (DStream[U], DStream[V]) => DStream[W],
          expectedOutput: Seq[Seq[W]],
          numBatches: Int,
          useSet: Boolean
        ) {
        val numBatches_ = if (numBatches > 0) numBatches else expectedOutput.size
        withStreamingContext(setupStreams[U, V, W](input1, input2, operation)) { ssc =>
          val output = runStreams[W](ssc, numBatches_, expectedOutput.size)
          verifyOutput[W](output, expectedOutput, useSet)
        }
      }
    }
    
  • Klue
    Klue about 8 years
    I cannot find logs in the home directory of Spark. Do I need to execute my code with Logging?
  • Hlib
    Hlib about 8 years
    ok. where did you find this: 16/04/27 10:40:17 WARN StreamingContext....?just look above (if you limited by terminal window then you can redirect it to file)
  • Klue
    Klue about 8 years
    I am using IntellijIDEA. How to save logs to the file? It's quite obvious if I run a program from terminal (> log), but what should I do in case of IntellijIDEA? Sorry for beginner's questions...
  • Hlib
    Hlib about 8 years
    stackoverflow.com/questions/4736020/… anyway you can try to run it from terminal. and again. you sure that there are no logs in 'logs' and 'work' directory?
  • Klue
    Klue about 8 years
    Ok, it says java.lang.IllegalStateException: Cannot call methods on a stopped SparkContext and points to the last line of the code val gridEvalsRDD = ssc.sparkContext.parallelize(gridEvals). Nothing else is mentioned on this issue.
  • Klue
    Klue about 8 years
    I think that the point 2 is the right reason of the issue. I checked TestSuiteBase (see my Update) and it only specifies StreamingContext. I tried to add sc.stop(), but then I should pass sc as a parameter of the function runStreams(...) and I'm not sure that this is the right way. Could you please elaborate a little bit your answer on this point?
  • tesnik03
    tesnik03 about 8 years
    Based on your updated TestSuiteBase, I would suggest to test this out by just commenting places where it says stop. If it work fine then do it one by one. You should be able to figure it out which one might be a problem. Rest of the code look fine to me.