How to reconnect kafka producer once closed?
Solution 1
Generally, calling close()
on the KafkaProducer
is sufficient to make sure all inflight records have completed:
/**
* Close this producer. This method blocks until all previously sent requests complete.
* This method is equivalent to <code>close(Long.MAX_VALUE, TimeUnit.MILLISECONDS)</code>.
* <p>
* <strong>If close() is called from {@link Callback}, a warning message will be logged and close(0, TimeUnit.MILLISECONDS)
* will be called instead. We do this because the sender thread would otherwise try to join itself and
* block forever.</strong>
* <p>
*
* @throws InterruptException If the thread is interrupted while blocked
*/
If your producer is being used throughout the lifetime of your application, don't close it up until you get a termination signal, then call close()
. As said in the documentation, the producer is safe to used in a multi-threaded environment and hence you should re-use the same instance.
If you're sharing your KafkaProducer
in multiple threads, you have two choices:
- Call
close()
while registering a shutdown callback viaRuntime.getRuntime().addShutdownHook
from your main execution thread - Have your multi-threaded methods race for closing on only allow for a single one to win.
A rough sketch of 2 would possibly look like this:
object KafkaOwner {
private var producer: KafkaProducer = ???
@volatile private var isClosed = false
def close(): Unit = {
if (!isClosed) {
kafkaProducer.close()
isClosed = true
}
}
def instance: KafkaProducer = {
this.synchronized {
if (!isClosed) producer
else {
producer = new KafkaProducer()
isClosed = false
}
}
}
}
Solution 2
As described in javadoc for KafkaProducer
:
public void close()
Close this producer. This method blocks until all previously sent requests complete.
This method is equivalent to close(Long.MAX_VALUE, TimeUnit.MILLISECONDS).
src: https://kafka.apache.org/0110/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html#close()
So you don't need to worry that your messages won't be sent, even if you call close immediately after send.
If you plan to use a KafkaProducer more than once, then close it only after you've finished using it. If you still want to have the guarantee that your message is actually sent before your method completes and not waiting in a buffer, then use KafkaProducer#flush()
which will block until current buffer is sent. You can also block on Future#get()
if you prefer.
There is also one caveat to be aware of if you don't plan to ever close your KafkaProducer (e.g. in short-lived apps, where you just send some data and the app immediately terminates after sending). The KafkaProducer IO thread is a daemon thread, which means the JVM will not wait until this thread finishes to terminate the VM. So, to ensure that your messages are actually sent use KafkaProducer#flush()
, no-arg KafkaProducer#close()
or block on Future#get()
.
usman
Associate Technical Manager @ Soliton Technologies. Always working in Java Web Technologies, hanging around with various frameworks. Currently playing with Apache Kafka, Apache Camel for our new projects.
Updated on June 13, 2022Comments
-
usman almost 2 years
I have multi thread app which uses producer class to produce messages, earlier i was using below code to create producer for each request.where KafkaProducer was newly built with each request as below:
KafkaProducer<String, byte[]> producer = new KafkaProducer<String, byte[]>(prop); ProducerRecord<String, byte[]> data = new ProducerRecord<String, byte[]>(topic, objBytes); producer.send(data, new Callback() { @Override public void onCompletion(RecordMetadata metadata, Exception exception) { if (exception != null) { isValidMsg[0] = false; exception.printStackTrace(); saveOrUpdateLog(msgBean, producerType, exception); logger.error("ERROR:Unable to produce message.",exception); } } }); producer.close();
Then I read Kafka docs on producer and come to know we should use single producer instance to have good performance.
Then I created single instance of KafkaProducer inside a singleton class.
Now when & where we should close the producer. Obviously if we close the producer after first send request it wont find the producer to resend messages hence throwing :
java.lang.IllegalStateException: Cannot send after the producer is closed.
OR how we can reconnect to producer once closed. Problem is if program crashes or have exceptions then?
-
usman over 7 yearsit is both sync/async. Moreover if got closed some how e.g: in exception/app crashed, then how to reconnect. Note i will not reinitialize the KafkaProducer isntance as its not null and do hold all the properties even after close() method called. Also i have multiple apps i.e. 4 Consumers which uses this shared producer to send msg to multiple topics.
-
Yuval Itzchakov over 7 years@usman Why do you say it is both sync and async? Where do you see a sync version?
-
usman over 7 yearskafka.apache.org/08/documentation#implementation "The Producer API that wraps the 2 low-level producers -". Well is there any method provided by kafka yet? your code shows we have to re instantiate the object.
-
Yuval Itzchakov over 7 years@usman That's the old producer. You're using the new producer API which has no sync methods. You cannot re-use the same producer instance once you shut it down.
-
usman over 7 yearshmmm. so i have to re instantiate the object? no kafka methods?
-
usman over 7 yearswell needed java code but i can understand this too. update me if find any thing else.
-
febot about 5 yearsInteresting, isn't the accepted answer contradict the doc?
-
y_159 almost 3 yearswhen a producer writes to a topic, how can I get it's status to check whether it's free or busy in writing?
-
y_159 almost 3 yearswhen a producer writes to a topic, how can I get it's status to check whether it's free or busy in writing?
-
y_159 almost 3 yearsor in another case if the producer is down.
-
Yuval Itzchakov almost 3 years@y_159 I don't see any methods on the
KafkaProducer
that allow you to do that. What you can do is forcefullyflush()
all records.