how to get acknowledgement from Kafka broker if message is produced by producer?

23,176

Solution 1

So I'm not 100% sure on which versions work with which in Kafka. Currently I use 0.8.2, I know 0.9 introduced some breaking changes but I couldn't tell you for sure what does/doesn't work now.

One very strong recommendation, I would use the Kafka-Client version that corresponds to your broker version. If you're using broker 0.8.2, I would use kakfa-client 0.8.2 as well.

You never presented any code of how you're using this, so I'm just somewhat guessing in the dark. But I've implemented the Callback feature in Kafka 0.8.2 by using this method in the producer. Below is the method signature.

public java.util.concurrent.Future<RecordMetadata> send(ProducerRecord<K,V> record, Callback callback)

Where will I call that method, I actually pass in the class with the overriden method.

KafkaProducer<String, String> prod = new KafkaProducer<String, String>(props);
ProducerRecord<String, String> record = //data to send to kafka
prod.send(record, new Callback() {
  @Override
  public void onCompletion(RecordMetadata metadata, Exception e) {
    if (e != null) {
      e.printStackTrace();
    } else {
      //implement logic here, or call another method to process metadata
      System.out.println("Callback");
    }
  }
}); 

I'm assuming there's a way to also do it as you've done it. But you'd have to provide the code showing how you're actually sending records to Kafka. Other than that I'm just guessing.

Solution 2

There is a simple way to get information from the broker after publish message by KafkaProducer using kafka 0.9 version. You can call get() method that will return a RecordMetadata object which you can get info such as offset, topicPartition, below a code snippet as an example:

RecordMetadata m = kafkaProducer.send(new ProducerRecord<byte[], byte[]>(
                        topic, key.getBytes("UTF-8"), message
                                .getBytes("UTF-8"))).get();
System.out.println("Message produced, offset: " + m.offset());
System.out.println("Message produced, partition : " + m.partition());
System.out.println("Message produced, topic: " + m.topic());

Solution 3

Close the producer -- once you are done publishing all your messages -- like this:

producer.close();

This ensures that the following method from the Callback is always called:

onCompletion(RecordMetadata metadata, Exception exception)

NB: I have tested this by adding that line to this sample Producer class and it works.

Share:
23,176
usman
Author by

usman

Associate Technical Manager @ Soliton Technologies. Always working in Java Web Technologies, hanging around with various frameworks. Currently playing with Apache Kafka, Apache Camel for our new projects.

Updated on July 09, 2022

Comments

  • usman
    usman almost 2 years

    I would like to get some response from the broker when I produce a message. I have tried CallBack mechanism (by implementing CallBack) used in KafkaProducer.send but it did not work and does not call onCompletion method.

    When I shutdown Kafka server and try to produce message then it does call callback method.

    Is there any other way to get acknowledgment?

    @Override
        public void onCompletion(RecordMetadata metadata, Exception exception) {
            long elapsedTime = System.currentTimeMillis() - startTime;
            System.out.println("Called Callback method");
            if (metadata != null) {
                System.out.println("message(" + key + ", " + message
                        + ") sent to partition(" + metadata.partition() + "), "
                        + "offset(" + metadata.offset() + ") in " + elapsedTime
                        + " ms");
            } else {
                exception.printStackTrace();
            }
    
        }
    
    props.put("bootstrap.servers", "localhost:9092");
    props.put("client.id", "mytopic");
    props.put("key.serializer", org.apache.kafka.common.serialization.StringSerializer.class);
    props.put("value.serializer", org.apache.kafka.common.serialization.ByteArraySerializer.class);
    
    KafkaProducer<String, byte[]> producer = new KafkaProducer<String,byte[]>(props);
    long runtime = new Date().getTime(); 
    String ip = "192.168.2."+ rnd.nextInt(255); 
    String msg = runtime + ".www.ppop.com," + ip;
    producer.send(new ProducerRecord<String, byte[]>("mytopic", msg.getBytes()), `new TransCallBack(Calendar.getInstance().getTimeInMillis(), key, msg));`
    

    i am using kafka-client api 0.9.1 with broker version 0.8.2.