Spring @KafkaListener and concurrency

19,113

Solution 1

Kafka doesn't work that way; you need at least as many partitions as consumers (controlled by concurrency in the spring container).

Also, only one consumer (in a group) can consume from a partition at a time so, even if you increase the partitions, records in the same partition behind the "stuck" consumer will not be received by other consumers.

Solution 2

If you want to have failover Kafka, you must spin up more instances of your application.

Example: you have a topic named test with 1 partition, you will create 2 instances of your app with the same Kafka group. One instance will process your data, the other will wait and start processing messages in case the first instance crashes. Same if you have N partitions with N + 1 or 2 or 3 instances of your application. Also, every instance will only have one consumer thread.

For more info about it search on Google: Kafka Consumer Groups.

Share:
19,113
Sviatlana
Author by

Sviatlana

Java developer.

Updated on June 04, 2022

Comments

  • Sviatlana
    Sviatlana almost 2 years

    I am working with spring boot + spring @KafkaListener. And the behavior I expect is: my kafka listener reads messages in 10 threads. So that, if one of threads hangs, other messages are would continue reading and handling messages.

    I defined bean of

    @Bean
    public ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(
            ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
            ConsumerFactory<Object, Object> kafkaConsumerFactory)
    {
    
        ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
        configurer.configure(factory, kafkaConsumerFactory);
        factory.getContainerProperties().setMissingTopicsFatal(false);
        factory.getContainerProperties().setCommitLogLevel(LogIfLevelEnabled.Level.INFO);
        return factory;
    }
    

    And spring boot config:

    spring.kafka.listener.concurrency=10
    

    I see that all configs work, I see my 10 threads in jmx:

    enter image description here

    But then I make such test:

     @KafkaListener(topics = {
                "${topic.name}" }, clientIdPrefix = "${kafka.client.id.prefix}", idIsGroup = false, id = "${kafka.listener.name}", containerFactory = "kafkaListenerContainerFactory")
        public void listen(ConsumerRecord<String, String> record)
        {
            if(record.getVersion() < 3) {
                try {
                    Thread.sleep(20000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            else
                System.out.println("It works!");
    
        }
    

    If version is < 3, then hang, otherwise - work. I send 3 messages with version 1,2 and 3. I expect that messages with version 1 and 2 will hang, but version 3 will be processed at the time it comes to listener. But unfortunately message with version 3 waits for messages 1 and 2 before starts its processing.

    Maybe my expectations are not true and this is a right behavior of kafka listener. Please help me to deal with kafka concurrency, why does it act like that?