What is the correct way to start/stop spark streaming jobs in yarn?

13,280

Solution 1

  1. You can close the spark-submit console. The job is running in background already when writes out RUNNING state.
  2. Logs are visible just after the application completes. During runtime all logs are accessible directly at worker nodes locally (you can see at YARN resource manager web UI) and are aggregated to HDFS after the job finishes.
  3. yarn application -kill is probably the best way how to stop Spark streaming application, but it's not perfect. It would be better to do some graceful shutdown to stop all stream receivers and stop streaming context, but I personally don't know how to do it.

Solution 2

I finally figure a way to safely close spark streaming job.

  1. write a socket server thread wait for stop the streaming context
    package xxx.xxx.xxx

    import java.io.{BufferedReader, InputStreamReader}
    import java.net.{ServerSocket, Socket}

    import org.apache.spark.streaming.StreamingContext

    object KillServer {

      class NetworkService(port: Int, ssc: StreamingContext) extends Runnable {
        val serverSocket = new ServerSocket(port)

        def run() {
          Thread.currentThread().setName("Zhuangdy | Waiting for graceful stop at port " + port)
          while (true) {
            val socket = serverSocket.accept()
            (new Handler(socket, ssc)).run()
          }
        }
      }

      class Handler(socket: Socket, ssc: StreamingContext) extends Runnable {
        def run() {
          val reader = new InputStreamReader(socket.getInputStream)
          val br = new BufferedReader(reader)
          if (br.readLine() == "kill") {
            ssc.stop(true, true)
          }
          br.close();
        }
      }

      def run(port:Int, ssc: StreamingContext): Unit ={
        (new NetworkService(port, ssc)).run
      }
    }
  1. at your main method where you start streaming context, add following code

    ssc.start()
    KillServer.run(11212, ssc)
    ssc.awaitTermination()
  2. Write spark-submit to submit jobs to yarn, and direct output to a file which you will use later

    spark-submit --class "com.Mainclass" \        
            --conf "spark.streaming.stopGracefullyOnShutdown=true" \        
            --master yarn-cluster  --queue "root"  \        
            --deploy-mode cluster \
            --executor-cores 4 --num-executors 8 --executor-memory 3G \
            hdfs:///xxx.jar > output 2>&1 &

  1. Finally, safely shutdown spark streaming job without data loss or compute result not persist!!! (The server socket which is using to stop streaming context gracefully is running on the driver, so you grep the output of step 3 to get the driver addr, and using echo nc to send a socket kill command)

    #!/bin/bash
    driver=`cat output | grep ApplicationMaster | grep -Po '\d+.\d+.\d+.\d+'`
    echo "kill" | nc $driver 11212
    driverid=`yarn application -list 2>&1 | grep ad.Stat | grep -Po 'application_\d+_\d+'`
    yarn application -kill $driverid

Solution 3

  1. What is your data source? If it is reliable, like Kafka direct receiver, the yarn kill shutdown should be fine. When your application restart, it will read from the last complete batch offset. If the data source is not reliable, or if you want to handle a graceful shutdown yourself, you have to implement some kind of external hook on the streaming context. I faced the same problem and I ended up implementing a small hack to add a new tab in the webui that acts as a stop button.
Share:
13,280

Related videos on Youtube

Kevin Pauli
Author by

Kevin Pauli

Updated on November 14, 2020

Comments

  • Kevin Pauli
    Kevin Pauli over 3 years

    I have been experimenting and googling for many hours, with no luck.

    I have a spark streaming app that runs fine in a local spark cluster. Now I need to deploy it on cloudera 5.4.4. I need to be able to start it, have it run in the background continually, and be able to stop it.

    I tried this:

    $ spark-submit --master yarn-cluster --class MyMain my.jar myArgs
    

    But it just prints these lines endlessly.

    15/07/28 17:58:18 INFO Client: Application report for application_1438092860895_0012 (state: RUNNING)
    15/07/28 17:58:19 INFO Client: Application report for application_1438092860895_0012 (state: RUNNING)
    

    Question number 1: since it is a streaming app, it needs to run continuously. So how do I run it in a "background" mode? All the examples I can find of submitting spark jobs on yarn seem to assume that the application will do some work and terminate, and therefore that you would want to run it in the foreground. But that is not the case for streaming.

    Next up... at this point the app does not seem to be functioning. I figure it could be a bug or misconfiguration on my part, so I tried to look in the logs to see what's happening:

    $ yarn logs -applicationId application_1438092860895_012
    

    But it tells me :

    /tmp/logs/hdfs/logs/application_1438092860895_0012does not have any log files.
    

    So question number 2: If the application is RUNNING, why does it have no log files?

    So eventually I just had to kill it:

    $ yarn application -kill application_1438092860895_012
    

    That brings up question number 3: assuming I can eventually get the app launched and running in the background, is "yarn application -kill" the preferred way of stopping it?

  • vutran
    vutran about 8 years
    I have the same question with Keven, however your answer no.1 doesn't seem work to me. I have a python streaming application. When i submit it to my standalone spark, it prints out info logs and does print 'app-20160403171906-0003/0 is now RUNNING' but I can't exit the submission.
  • Kevin Pauli
    Kevin Pauli over 7 years
    While that may work, I learned later that "yarn application -kill" sends a sigint to your application which you can handle and shut things down gracefully. e.g. in scala: sys.ShutdownHookThread { LOGGER.info("Stopping spark context...") ssc.stop(stopSparkContext = true, stopGracefully = true) LOGGER.info("Stopped") }
  • user3005501
    user3005501 over 7 years
    It looks like the sys.ShutdownHookThread method stopped working around Spark 1.5. I can verify that it doesn't work in Spark 1.6.1.