Spring Kafka multiple consumer for single topic consume different messages

14,714

That's not how Apache Kafka works. An idea there is always process records in the same partition in a single thread. That factory.setConcurrency(5); is definitely around how many partitions you have in a topic. So, if you have only one, this property doesn't bring any value. If you have 10 partitions in the topic, then Spring Kafka spawns 5 threads and each of them is going to handle 2 partition.

I would say this is pretty clear in the Reference Manual:

If, say, 6 TopicPartition s are provided and the concurrency is 3; each container will get 2 partitions. For 5 TopicPartition s, 2 containers will get 2 partitions and the third will get 1. If the concurrency is greater than the number of TopicPartitions, the concurrency will be adjusted down such that each container will get one partition.

So, if you would like to have such a concurrency you describe, you indeed have to create 5 partitions in your topic. And only after that you will be able to process records in the same topic in parallel.

Share:
14,714
alexanoid
Author by

alexanoid

Updated on June 04, 2022

Comments

  • alexanoid
    alexanoid almost 2 years

    In my Spring Boot Kafka application, I have the following consumer configuration:

    @Bean
    public ConsumerFactory<String, Post> postConsumerFactory(KafkaProperties kafkaProperties) {
        return new DefaultKafkaConsumerFactory<>(kafkaProperties.buildConsumerProperties(), new StringDeserializer(), new JsonDeserializer<>(Post.class));
    }
    
    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, Post> postKafkaListenerContainerFactory(KafkaProperties kafkaProperties) {
    
        kafkaProperties.getProperties().put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, kafkaConsumerMaxPollIntervalMs);
        kafkaProperties.getProperties().put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, kafkaConsumerMaxPollRecords);
    
        ConcurrentKafkaListenerContainerFactory<String, Post> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.getContainerProperties().setAckMode(AckMode.MANUAL_IMMEDIATE);
        factory.setConsumerFactory(postConsumerFactory(kafkaProperties));
    
        return factory;
    }
    

    and the consumer:

    @KafkaListener(topics = "${kafka.topic.post.send}", containerFactory = "postKafkaListenerContainerFactory")
    public void sendPost(ConsumerRecord<String, Post> consumerRecord, Acknowledgment ack) {
    
        // do some logic
    
        ack.acknowledge();
    }
    

    If I understood correctly, right now I have one the single instance of my consumer. I'd like to increase numbers of the post consumers, let's say to have 5 consumers that will consume different(not the same) messages from ${kafka.topic.post.send} in order to speed up message consumption.

    Is it as simple as add factory.setConcurrency(5); to my postKafkaListenerContainerFactory(), for example:

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, Post> postKafkaListenerContainerFactory(KafkaProperties kafkaProperties) {
    
        kafkaProperties.getProperties().put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, kafkaConsumerMaxPollIntervalMs);
        kafkaProperties.getProperties().put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, kafkaConsumerMaxPollRecords);
    
        ConcurrentKafkaListenerContainerFactory<String, Post> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.getContainerProperties().setAckMode(AckMode.MANUAL_IMMEDIATE);
        factory.setConsumerFactory(postConsumerFactory(kafkaProperties));
        factory.setConcurrency(5);
    
        return factory;
    }
    

    or do I need to do some extra work in order to achieve it?