How to use multi-thread consumer in kafka 0.9.0?

18,281

Solution 1

Kafka consumer is not thread safe. As you pointed out in your question, the document stated that

A simple option is to give each thread its own consumer instance

But in your code, you have the same consumer instance wrapped by different KafkaConsumerRunner instances. Thus multiple threads are accessing the same consumer instance. The kafka documentation clearly stated

The Kafka consumer is NOT thread-safe. All network I/O happens in the thread of the application making the call. It is the responsibility of the user to ensure that multi-threaded access is properly synchronized. Un-synchronized access will result in ConcurrentModificationException.

That's exactly the exception you received.

Solution 2

It is throwing the exception on your call to subscribe. this.consumer.subscribe(topicName);

Move that block into a synchronized block like this:

@Override
public void run() {
    try {
        synchronized (consumer) {
            this.consumer.subscribe(topicName);
        }
        ConsumerRecords<String, String> records;
        while (!closed.get()) {
            synchronized (consumer) {
                records = consumer.poll(100);
            }
            for (ConsumerRecord<String, String> tmp : records) {
                System.out.println(tmp.value());
            }
        }
    } catch (WakeupException e) {
        // Ignore exception if closing
        System.out.println(e);
        //if (!closed.get()) throw e;
    }
}

Solution 3

Maybe is not your case, but if you are mergin processing of data of serveral topics, then you can read data from multiple topics with the same consumer. If not, then is preferable to create separate jobs consuming each topic.

Share:
18,281
Acceml
Author by

Acceml

a student studying in Harbin Institute of Technology,and majoring in computer sciense.

Updated on July 08, 2022

Comments

  • Acceml
    Acceml almost 2 years

    The doc of kafka give an approach about with following describes:

    One Consumer Per Thread:A simple option is to give each thread its own consumer > instance.

    My code:

    public class KafkaConsumerRunner implements Runnable {
    
        private final AtomicBoolean closed = new AtomicBoolean(false);
        private final CloudKafkaConsumer consumer;
        private final String topicName;
    
        public KafkaConsumerRunner(CloudKafkaConsumer consumer, String topicName) {
            this.consumer = consumer;
            this.topicName = topicName;
        }
    
        @Override
        public void run() {
            try {
                this.consumer.subscribe(topicName);
                ConsumerRecords<String, String> records;
                while (!closed.get()) {
                    synchronized (consumer) {
                        records = consumer.poll(100);
                    }
                    for (ConsumerRecord<String, String> tmp : records) {
                        System.out.println(tmp.value());
                    }
                }
            } catch (WakeupException e) {
                // Ignore exception if closing
                System.out.println(e);
                //if (!closed.get()) throw e;
            }
        }
    
        // Shutdown hook which can be called from a separate thread
        public void shutdown() {
            closed.set(true);
            consumer.wakeup();
        }
    
        public static void main(String[] args) {
            CloudKafkaConsumer kafkaConsumer = KafkaConsumerBuilder.builder()
                    .withBootstrapServers("172.31.1.159:9092")
                    .withGroupId("test")
                    .build();
            ExecutorService executorService = Executors.newFixedThreadPool(5);
            executorService.execute(new KafkaConsumerRunner(kafkaConsumer, "log"));
            executorService.execute(new KafkaConsumerRunner(kafkaConsumer, "log.info"));
            executorService.shutdown();
        }
    }
    

    but it doesn't work and throws an exception:

    java.util.ConcurrentModificationException: KafkaConsumer is not safe for multi-threaded access

    Furthermore, I read the source of Flink (an open source platform for distributed stream and batch data processing). Flink using multi-thread consumer is similar to mine.

    long pollTimeout = Long.parseLong(flinkKafkaConsumer.properties.getProperty(KEY_POLL_TIMEOUT, Long.toString(DEFAULT_POLL_TIMEOUT)));
    pollLoop: while (running) {
        ConsumerRecords<byte[], byte[]> records;
        //noinspection SynchronizeOnNonFinalField
        synchronized (flinkKafkaConsumer.consumer) {
            try {
                records = flinkKafkaConsumer.consumer.poll(pollTimeout);
            } catch (WakeupException we) {
                if (running) {
                    throw we;
                }
                // leave loop
                continue;
            }
        }
    

    flink code of mutli-thread

    What's wrong?

  • Prasath
    Prasath over 7 years
    Working for me.