How to save latest offset that Spark consumed to ZK or Kafka and can read back after restart
Solution 1
One of the constructors of createDirectStream function can get a map that will hold the partition id as the key and the offset from which you are starting to consume as the value.
Just look at api here: http://spark.apache.org/docs/2.2.0/api/java/org/apache/spark/streaming/kafka/KafkaUtils.html The map that I was talking about usually called: fromOffsets
You can insert data to the map:
startOffsetsMap.put(TopicAndPartition(topicName,partitionId), startOffset)
And use it when you create the direct stream:
KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder, (String, String)](
streamingContext, kafkaParams, startOffsetsMap, messageHandler(_))
After each iteration you can get the processed offsets using:
rdd.asInstanceOf[HasOffsetRanges].offsetRanges
You would be able to use this data to construct the fromOffsets map in the next iteration.
You can see the full code and usage here: https://spark.apache.org/docs/latest/streaming-kafka-integration.html at the end of the page
Solution 2
To add to Michael Kopaniov's answer, if you really want to use ZK as the place you store and load your map of offsets from, you can.
However, because your results are not being output to ZK, you will not get reliable semantics unless your output operation is idempotent (which it sounds like it isn't).
If it's possible to store your results in the same document in mongo alongside the offsets in a single atomic action, that might be better for you.
For more detail, see https://www.youtube.com/watch?v=fXnNEq1v3VA
Solution 3
Here's some code you can use to store offsets in ZK http://geeks.aretotally.in/spark-streaming-kafka-direct-api-store-offsets-in-zk/
And here's some code you can use to use the offset when you call KafkaUtils.createDirectStream: http://geeks.aretotally.in/spark-streaming-direct-api-reusing-offset-from-zookeeper/
Related videos on Youtube
Comments
-
giaosudau over 1 year
I am using
Kafka 0.8.2
to receive data from AdExchange then I useSpark Streaming 1.4.1
to store data toMongoDB
.My problem is when I restart my
Spark Streaming
Job for instance like update new version, fix bug, add new features. It will continue read the latestoffset
ofkafka
at the time then I will lost data AdX push to kafka during restart the job.I try something like
auto.offset.reset -> smallest
but it will receive from 0 -> last then data was huge and duplicate in db.I also try to set specific
group.id
andconsumer.id
toSpark
but it the same.How to save the latest
offset
spark consumed tozookeeper
orkafka
then can read back from that to latestoffset
? -
giaosudau over 8 yearsBut How to save latest offset consumed to ZK or Kafka. I try to enable
kafkaParams ++= Map[String, String]("auto.commit.interval.ms" -> "1000") kafkaParams ++= Map[String, String]("zookeeper.sync.time.ms" -> "200") kafkaParams ++= Map[String, String]("zookeeper.session.timeout.ms" -> "400")
but it not work -
Michael Kopaniov over 8 yearsOne of the options is as I told you to use the .offsetRanges data structure. After you processed your stream in a given iteration you can do:
dStream.foreachRDD { rdd => val x = rdd.asInstanceOf[HasOffsetRanges].offsetRanges; // Do something with X (save it external FS for example) }
x will hold the last processed offset for every topic-partition combination of the RDD. If you need to have exactly once semantics, you would have to support it manually, but it is possible. -
Michael Kopaniov over 8 yearsCheckpointing is the right way to go incase you don't make any change to your StreamingContext since then you would be able to continue the processing from the right offset automatically (Spark will take care for that). If you want to add features / correct bugs (And apparently giaosudau want to do it) very often you are going to change the streaming context and therefore wan't be able to use the checkpoints directory. The last link that you provided explains it perfectly.
-
giaosudau over 8 yearsMy idea that I don't want to save in external storage because ZK and Kafka can handle this.
-
Michael Kopaniov over 8 yearsI believe they can't. Spark 1.3.1 change its approach about how to use Kafka as data source from Write Ahead Logs to direct streams. Direct stream uses Kafka SimpleConsumer to get messages from Kafka. And you can read here: cwiki.apache.org/confluence/display/KAFKA/… that one of the down sides of using SimpleConsumer is that you have to keep track yourself for the offsets that you already consumed. As long as Spark streaming uses simple consumer you won't find a solution from Kafka / ZK perspective. But Spark may add their own handling on top of Kafka.
-
giaosudau over 8 yearsso we need store offset in external storage? Is Redis suitable? Thanks.
-
Michael Kopaniov over 8 yearsAny reliable storage should do the work. I'm usually saving the data to HDFS because I think it is the most simple solution. I can't think of a reason why Redis won't be able to do the work as well.
-
Stephane Maarek over 7 years@MichaelKopaniov is there any way to checksum the context function and invalidate the previous context if the function has changed? In which case it would fall back to reading offsets from a store (fs, database)
-
Michael Kopaniov over 7 years@Stephane Few days passed since I dealt with this problem so I may be mistaken but as far as I remember in the old Spark streaming (<2.0) You either create a new StreamingContext or you read a StreamingContext that was previously defined from the checkpoint directory. You do not create a new StreamingContext for every iteration and just compare it with the context from the checkpoint directory, So if I understood your question correctly, you can't invalidate previously saved context.
-
Michael Kopaniov over 7 years@Stephane but what you can do, is to have some configurable parameter that indicates whether you want to use the streaming context from the checkpoint directory or you would like to create a new one of your own. If this parameter specifies that you want to create a new context, then you will create if from (fs, database) and override the previous context when checkpointing the data to the checkpoint directory.
-
ammills01 almost 7 yearsBoth of these links are now broken, which is why the community always suggests posting the solution as part of the answer along with the link, not just the link.
-
Reaper over 5 yearsNot the same - from the doc: "If you enable Spark checkpointing, offsets will be stored in the checkpoint. This is easy to enable, but there are drawbacks. Your output operation must be idempotent, since you will get repeated outputs; transactions are not an option. Furthermore, you cannot recover from a checkpoint if your application code has changed. For planned upgrades, you can mitigate this by running the new code at the same time as the old code (since outputs need to be idempotent anyway, they should not clash). But for unplanned failures that require code changes, you will lose data.."