How to reconnect kafka producer once closed?

14,316

Solution 1

Generally, calling close() on the KafkaProducer is sufficient to make sure all inflight records have completed:

/**
 * Close this producer. This method blocks until all previously sent requests complete.
 * This method is equivalent to <code>close(Long.MAX_VALUE, TimeUnit.MILLISECONDS)</code>.
 * <p>
 * <strong>If close() is called from {@link Callback}, a warning message will be logged and close(0, TimeUnit.MILLISECONDS)
 * will be called instead. We do this because the sender thread would otherwise try to join itself and
 * block forever.</strong>
 * <p>
 *
 * @throws InterruptException If the thread is interrupted while blocked
 */

If your producer is being used throughout the lifetime of your application, don't close it up until you get a termination signal, then call close(). As said in the documentation, the producer is safe to used in a multi-threaded environment and hence you should re-use the same instance.

If you're sharing your KafkaProducer in multiple threads, you have two choices:

  1. Call close() while registering a shutdown callback via Runtime.getRuntime().addShutdownHook from your main execution thread
  2. Have your multi-threaded methods race for closing on only allow for a single one to win.

A rough sketch of 2 would possibly look like this:

object KafkaOwner {
  private var producer: KafkaProducer = ???
  @volatile private var isClosed = false
     
  def close(): Unit = {
    if (!isClosed) {
      kafkaProducer.close()
      isClosed = true
    }
  }
    
  def instance: KafkaProducer = {
    this.synchronized {
      if (!isClosed) producer 
      else {
        producer = new KafkaProducer()
        isClosed = false
      }
    }
  }
}

Solution 2

As described in javadoc for KafkaProducer:

public void close()

Close this producer. This method blocks until all previously sent requests complete.
This method is equivalent to close(Long.MAX_VALUE, TimeUnit.MILLISECONDS).

src: https://kafka.apache.org/0110/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html#close()

So you don't need to worry that your messages won't be sent, even if you call close immediately after send.

If you plan to use a KafkaProducer more than once, then close it only after you've finished using it. If you still want to have the guarantee that your message is actually sent before your method completes and not waiting in a buffer, then use KafkaProducer#flush() which will block until current buffer is sent. You can also block on Future#get() if you prefer.

There is also one caveat to be aware of if you don't plan to ever close your KafkaProducer (e.g. in short-lived apps, where you just send some data and the app immediately terminates after sending). The KafkaProducer IO thread is a daemon thread, which means the JVM will not wait until this thread finishes to terminate the VM. So, to ensure that your messages are actually sent use KafkaProducer#flush(), no-arg KafkaProducer#close() or block on Future#get().

Share:
14,316
usman
Author by

usman

Associate Technical Manager @ Soliton Technologies. Always working in Java Web Technologies, hanging around with various frameworks. Currently playing with Apache Kafka, Apache Camel for our new projects.

Updated on June 13, 2022

Comments

  • usman
    usman almost 2 years

    I have multi thread app which uses producer class to produce messages, earlier i was using below code to create producer for each request.where KafkaProducer was newly built with each request as below:

    KafkaProducer<String, byte[]> producer = new KafkaProducer<String, byte[]>(prop);
    
    ProducerRecord<String, byte[]> data = new ProducerRecord<String, byte[]>(topic, objBytes);
    producer.send(data, new Callback() {
    
                    @Override
                    public void onCompletion(RecordMetadata metadata, Exception exception) {
                        if (exception != null) {
                            isValidMsg[0] = false;
                            exception.printStackTrace();
                            saveOrUpdateLog(msgBean, producerType, exception);
                            logger.error("ERROR:Unable to produce message.",exception);
                        }
                    }
                });
    producer.close();
    

    Then I read Kafka docs on producer and come to know we should use single producer instance to have good performance.

    Then I created single instance of KafkaProducer inside a singleton class.

    Now when & where we should close the producer. Obviously if we close the producer after first send request it wont find the producer to resend messages hence throwing :

    java.lang.IllegalStateException: Cannot send after the producer is closed.
    

    OR how we can reconnect to producer once closed. Problem is if program crashes or have exceptions then?

  • usman
    usman over 7 years
    it is both sync/async. Moreover if got closed some how e.g: in exception/app crashed, then how to reconnect. Note i will not reinitialize the KafkaProducer isntance as its not null and do hold all the properties even after close() method called. Also i have multiple apps i.e. 4 Consumers which uses this shared producer to send msg to multiple topics.
  • Yuval Itzchakov
    Yuval Itzchakov over 7 years
    @usman Why do you say it is both sync and async? Where do you see a sync version?
  • usman
    usman over 7 years
    kafka.apache.org/08/documentation#implementation "The Producer API that wraps the 2 low-level producers -". Well is there any method provided by kafka yet? your code shows we have to re instantiate the object.
  • Yuval Itzchakov
    Yuval Itzchakov over 7 years
    @usman That's the old producer. You're using the new producer API which has no sync methods. You cannot re-use the same producer instance once you shut it down.
  • usman
    usman over 7 years
    hmmm. so i have to re instantiate the object? no kafka methods?
  • usman
    usman over 7 years
    well needed java code but i can understand this too. update me if find any thing else.
  • febot
    febot about 5 years
    Interesting, isn't the accepted answer contradict the doc?
  • y_159
    y_159 almost 3 years
    when a producer writes to a topic, how can I get it's status to check whether it's free or busy in writing?
  • y_159
    y_159 almost 3 years
    when a producer writes to a topic, how can I get it's status to check whether it's free or busy in writing?
  • y_159
    y_159 almost 3 years
    or in another case if the producer is down.
  • Yuval Itzchakov
    Yuval Itzchakov almost 3 years
    @y_159 I don't see any methods on the KafkaProducer that allow you to do that. What you can do is forcefully flush() all records.