Kafkaconsumer is not safe for multi-threading access

12,976

Solution 1

Spark 2.2.0 has a workaround using no cache. Just use spark.streaming.kafka.consumer.cache.enabled to false. Take a look on this pull request

Solution 2

This is a similar problem of java.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access , you have more than one thread running with the same consumer and Kafka does not support multithreading. Also make sure you are not using spark.speculation=true as it will cause the error mentioned above.

Share:
12,976

Related videos on Youtube

Amanpreet Khurana
Author by

Amanpreet Khurana

Updated on November 09, 2022

Comments

  • Amanpreet Khurana
    Amanpreet Khurana less than a minute

    I am using below code to read from Kafka topic , and process the data.

    JavaDStream<Row> transformedMessages = messages.flatMap(record -> processData(record))
                    .transform(new Function<JavaRDD<Row>, JavaRDD<Row>>() {
                        //JavaRDD<Row> records = ss.emptyDataFrame().toJavaRDD();
                        StructType schema = DataTypes.createStructType(fields);
                        public JavaRDD<Row> call(JavaRDD<Row> rdd) throws Exception {
                            records = rdd.union(records);
                            return rdd;
                        }
            });
           transformedMessages.foreachRDD(record -> {
                //System.out.println("Aman" +record.count());
                StructType schema = DataTypes.createStructType(fields);
                Dataset ds = ss.createDataFrame(records, schema);
                ds.createOrReplaceTempView("trades");
                System.out.println(ds.count());
                ds.show();
            });
    

    While running the code, i am getting below exception :

    Caused by: java.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access
        at org.apache.kafka.clients.consumer.KafkaConsumer.acquire(KafkaConsumer.java:1624)
        at org.apache.kafka.clients.consumer.KafkaConsumer.seek(KafkaConsumer.java:1197)
        at org.apache.spark.streaming.kafka010.CachedKafkaConsumer.seek(CachedKafkaConsumer.scala:95)
        at org.apache.spark.streaming.kafka010.CachedKafkaConsumer.get(CachedKafkaConsumer.scala:69)
        at org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:228)
        at org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:194)
        at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
        at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
        at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.agg_doAggregateWithoutKey$(Unknown Source)
        at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
        at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
        at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:377)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)
        at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:126)
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
        at org.apache.spark.scheduler.Task.run(Task.scala:99)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)
    

    The fact that i only have one DStream, i am not sure why i am getting this exception. I am reading from 3 partitions in a Kafka topic. I assume that the "createDirectStream" will be creating 3 consumers to read the data.

    Below is the code for for KafkaConsumer, acquire method:

     private void acquire() {
            this.ensureNotClosed();
            long threadId = Thread.currentThread().getId();
            if(threadId != this.currentThread.get() && !this.currentThread.compareAndSet(-1L, threadId)) {
                throw new ConcurrentModificationException("KafkaConsumer is not safe for multi-threaded access");
            } else {
                this.refcount.incrementAndGet();
            }
        }
    
  • Danny Varod
    Danny Varod over 4 years
    Note that this has to be set on the SparkConf (passed to the SparkSession builder).