Sending Synchronous messages in kafka?

21,616

Solution 1

The producer API returns a Future from send. You can call Future#get to block until the sending has completed.

See this example from the Javadocs:

If you want to simulate a simple blocking call you can call the get() method immediately:

 byte[] key = "key".getBytes();
 byte[] value = "value".getBytes();
 ProducerRecord<byte[],byte[]> record = 
     new ProducerRecord<byte[],byte[]>("my-topic", key, value)
 producer.send(record).get();

Solution 2

As Thilo suggested, you can call Future#get to block until the sending has completed. However you might have some performance issue, since the producer starts sending when the producer queue has batch.size elements, when the buffer of size buffer.memory is full or after max.block.ms milliseconds.

If you have a limited number of threads pushing to kafka, you will have to wait max.block.ms each time for your message to be sent. So in some cases, you will prefer using :

// send message to producer queue
Future<RecordMetadata> future = producer.send(new ProducerRecord<>(topic, key, message));
// flush producer queue to spare queuing time
producer.flush();
// throw error when kafka is unreachable
future.get(10, TimeUnit.SECONDS);

Solution 3

The Thilo proposed answer is the way to go. In general, your suggestion about using max.in.flight.requests.per.connection = 1 is used for having still retries enabled but without losing messages ordering. It's not so used for having a sync producer.

Share:
21,616
Admin
Author by

Admin

Updated on July 09, 2022

Comments

  • Admin
    Admin almost 2 years

    How to send Synchronous messages in kafka?
    One way of achieving it could be by setting the properties parameter
    max.in.flight.requests.per.connection = 1.

    But I want to know if there is an even direct or alternate way of sending Synchronous messages in kafka. (something like producer.syncSend(...) etc).