Receiving Kafka Key in spring boot kafka listener

11,585

Solution 1

Please read the documentation.

...

Finally, metadata about the message is available from message headers. You can use the following header names to retrieve the headers of the message:

KafkaHeaders.RECEIVED_MESSAGE_KEY

KafkaHeaders.RECEIVED_TOPIC

KafkaHeaders.RECEIVED_PARTITION_ID

KafkaHeaders.RECEIVED_TIMESTAMP

KafkaHeaders.TIMESTAMP_TYPE

The following example shows how to use the headers:

@KafkaListener(id = "qux", topicPattern = "myTopic1")
public void listen(@Payload String foo,
        @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) Integer key,
        @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
        @Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
        @Header(KafkaHeaders.RECEIVED_TIMESTAMP) long ts
        ) {
    ...
}

The offset is also available.

Solution 2

The easiest way to get the key, value, and metadata for a Message using @KafkaListener is by using a ConsumerRecord in your KafkaListener function instead receive only the payload as a value record.

@KafkaListener(topics = "any-topic")
void listener(ConsumerRecord<String, String> record) {
    System.out.println(record.key());
    System.out.println(record.value());
    System.out.println(record.partition());
    System.out.println(record.topic());
    System.out.println(record.offset());
}

Does not have beautiful annotations, but it works, Also if you want to receive records from a Kafka topic, process those records, and send them again to another Kafka topic, I would recommend you take a look at Kafka Streams API.

Share:
11,585

Related videos on Youtube

user1474111
Author by

user1474111

Updated on June 04, 2022

Comments

  • user1474111
    user1474111 almost 2 years

    I am new in Spring Kafka. I have an microservice which sends message with a kafka key which is an user defined object.

    1) First microservice sends message to Kafka with a key which is instance of MyKey object.

    2) What I need to do is, to listen that topic and get this message with the key, and create a new key by using that Key.

    Lets say that the message is send by the key which is myKey. And what I want to do in the listener is to create a new extended key as:

         @KafkaListener(groupId = Bindings.CONSUMER_GROUP_DATA_CLEANUP, topics = "users")
         public void process( @Payload MyMessage myMessage){
    
            MyExtended myExtendedKey= new MyExtendedKey(myKey.getX(), myKey.getY());
            ....
            ....
            kafkaTemplate.send(TOPIC,  myExtendedKey, message);
          }
    

    I do not know how can I get the key of the message which is sent in the listener.

  • user1474111
    user1474111 over 4 years
    The thing is here key is instance of Integer. In my case it is user defined class.
  • Artem Bilan
    Artem Bilan over 4 years
    Then you need to take a look into key.deserializer Kafka Consumer property. That way you may use it like this @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) MyKey key
  • Daspuru
    Daspuru over 2 years
    For me this is the best answer.