Spring Kafka multiple consumer for single topic consume different messages
That's not how Apache Kafka works. An idea there is always process records in the same partition in a single thread. That factory.setConcurrency(5);
is definitely around how many partitions you have in a topic. So, if you have only one, this property doesn't bring any value. If you have 10 partitions in the topic, then Spring Kafka spawns 5 threads and each of them is going to handle 2 partition.
I would say this is pretty clear in the Reference Manual:
If, say, 6 TopicPartition s are provided and the concurrency is 3; each container will get 2 partitions. For 5 TopicPartition s, 2 containers will get 2 partitions and the third will get 1. If the concurrency is greater than the number of TopicPartitions, the concurrency will be adjusted down such that each container will get one partition.
So, if you would like to have such a concurrency you describe, you indeed have to create 5 partitions in your topic. And only after that you will be able to process records in the same topic in parallel.
alexanoid
Updated on June 04, 2022Comments
-
alexanoid almost 2 years
In my Spring Boot Kafka application, I have the following consumer configuration:
@Bean public ConsumerFactory<String, Post> postConsumerFactory(KafkaProperties kafkaProperties) { return new DefaultKafkaConsumerFactory<>(kafkaProperties.buildConsumerProperties(), new StringDeserializer(), new JsonDeserializer<>(Post.class)); } @Bean public ConcurrentKafkaListenerContainerFactory<String, Post> postKafkaListenerContainerFactory(KafkaProperties kafkaProperties) { kafkaProperties.getProperties().put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, kafkaConsumerMaxPollIntervalMs); kafkaProperties.getProperties().put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, kafkaConsumerMaxPollRecords); ConcurrentKafkaListenerContainerFactory<String, Post> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.getContainerProperties().setAckMode(AckMode.MANUAL_IMMEDIATE); factory.setConsumerFactory(postConsumerFactory(kafkaProperties)); return factory; }
and the consumer:
@KafkaListener(topics = "${kafka.topic.post.send}", containerFactory = "postKafkaListenerContainerFactory") public void sendPost(ConsumerRecord<String, Post> consumerRecord, Acknowledgment ack) { // do some logic ack.acknowledge(); }
If I understood correctly, right now I have one the single instance of my consumer. I'd like to increase numbers of the post consumers, let's say to have 5 consumers that will consume different(not the same) messages from
${kafka.topic.post.send}
in order to speed up message consumption.Is it as simple as add
factory.setConcurrency(5);
to mypostKafkaListenerContainerFactory()
, for example:@Bean public ConcurrentKafkaListenerContainerFactory<String, Post> postKafkaListenerContainerFactory(KafkaProperties kafkaProperties) { kafkaProperties.getProperties().put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, kafkaConsumerMaxPollIntervalMs); kafkaProperties.getProperties().put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, kafkaConsumerMaxPollRecords); ConcurrentKafkaListenerContainerFactory<String, Post> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.getContainerProperties().setAckMode(AckMode.MANUAL_IMMEDIATE); factory.setConsumerFactory(postConsumerFactory(kafkaProperties)); factory.setConcurrency(5); return factory; }
or do I need to do some extra work in order to achieve it?