Receiving Kafka Key in spring boot kafka listener
Solution 1
Please read the documentation.
...
Finally, metadata about the message is available from message headers. You can use the following header names to retrieve the headers of the message:
KafkaHeaders.RECEIVED_MESSAGE_KEY
KafkaHeaders.RECEIVED_TOPIC
KafkaHeaders.RECEIVED_PARTITION_ID
KafkaHeaders.RECEIVED_TIMESTAMP
KafkaHeaders.TIMESTAMP_TYPE
The following example shows how to use the headers:
@KafkaListener(id = "qux", topicPattern = "myTopic1")
public void listen(@Payload String foo,
@Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) Integer key,
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
@Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
@Header(KafkaHeaders.RECEIVED_TIMESTAMP) long ts
) {
...
}
The offset is also available.
Solution 2
The easiest way to get the key, value, and metadata for a Message using @KafkaListener is by using a ConsumerRecord in your KafkaListener function instead receive only the payload as a value record.
@KafkaListener(topics = "any-topic")
void listener(ConsumerRecord<String, String> record) {
System.out.println(record.key());
System.out.println(record.value());
System.out.println(record.partition());
System.out.println(record.topic());
System.out.println(record.offset());
}
Does not have beautiful annotations, but it works, Also if you want to receive records from a Kafka topic, process those records, and send them again to another Kafka topic, I would recommend you take a look at Kafka Streams API.
Related videos on Youtube
user1474111
Updated on June 04, 2022Comments
-
user1474111 almost 2 years
I am new in Spring Kafka. I have an microservice which sends message with a kafka key which is an user defined object.
1) First microservice sends message to Kafka with a key which is instance of MyKey object.
2) What I need to do is, to listen that topic and get this message with the key, and create a new key by using that Key.
Lets say that the message is send by the key which is myKey. And what I want to do in the listener is to create a new extended key as:
@KafkaListener(groupId = Bindings.CONSUMER_GROUP_DATA_CLEANUP, topics = "users") public void process( @Payload MyMessage myMessage){ MyExtended myExtendedKey= new MyExtendedKey(myKey.getX(), myKey.getY()); .... .... kafkaTemplate.send(TOPIC, myExtendedKey, message); }
I do not know how can I get the key of the message which is sent in the listener.
-
user1474111 over 4 yearsThe thing is here key is instance of Integer. In my case it is user defined class.
-
Artem Bilan over 4 yearsThen you need to take a look into
key.deserializer
Kafka Consumer property. That way you may use it like this@Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) MyKey key
-
Daspuru over 2 yearsFor me this is the best answer.