max.poll.intervals.ms set to int.Max by default

25,223

Does it enable applications to become unresponsive? Or Kafka Streams has a different way to leave the consumer group when the processing is taking too long?

Kafka Streams leverages a heartbeat functionality of the Kafka consumer client in this context, and thus decouples heartbeats ("Is this app instance still alive?") from calls to poll(). The two main parameters are session.timeout.ms (for the heartbeat thread) and max.poll.interval.ms (for the processing thread), and their difference is described in more detail at https://stackoverflow.com/a/39759329/1743580.

The heartbeating was introduced so that an application instance may be allowed to spent a lot of time processing a record without being considered "not making progress" and thus "be dead". For example, your app can do a lot of crunching for a single record for a minute, while still heartbeating to Kafka "Hey, I'm still alive, and I am making progress. But I'm simply not done with the processing yet. Stay tuned."

Of course you can change max.poll.interval.ms from its default (Integer.MAX_VALUE) to a lower setting if, for example, you actually do want your app instance to be considered "dead" if it takes longer than X seconds in-between polling records, and thus if it takes longer than X seconds to process the latest round of records. It depends on your specific use case whether or not such a configuration makes sense -- in most cases, the default setting is a safe bet.

session.timeout.ms: The timeout used to detect consumer failures when using Kafka's group management facility. The consumer sends periodic heartbeats to indicate its liveness to the broker. If no heartbeats are received by the broker before the expiration of this session timeout, then the broker will remove this consumer from the group and initiate a rebalance. Note that the value must be in the allowable range as configured in the broker configuration by group.min.session.timeout.ms and group.max.session.timeout.ms.

max.poll.interval.ms: The maximum delay between invocations of poll() when using consumer group management. This places an upper bound on the amount of time that the consumer can be idle before fetching more records. If poll() is not called before expiration of this timeout, then the consumer is considered failed and the group will rebalance in order to reassign the partitions to another member.

Share:
25,223
Javier Holguera
Author by

Javier Holguera

Updated on December 21, 2020

Comments

  • Javier Holguera
    Javier Holguera over 3 years

    Apache Kafka documentation states:

    The internal Kafka Streams consumer max.poll.interval.ms default value was changed from 300000 to Integer.MAX_VALUE

    Since this value is used to detect when the processing time for a batch of records exceeds a given threshold, is there a reason for such an "unlimited" value?

    Does it enable applications to become unresponsive? Or Kafka Streams has a different way to leave the consumer group when the processing is taking too long?

  • Javier Holguera
    Javier Holguera over 6 years
    Thanks for the answer. From your link: "If the processing thread dies, it takes max.poll.interval.ms to detect this. However, if the whole consumer dies (and a dying processing thread most likely crashes the whole consumer including the heartbeat thread), it takes only session.timeout.ms to detect it.". With an Integer.MAX_VALUE value, the processing thread death would never be detected. Why would KStreams default to a setting that enables a type of failure to go undetected?
  • miguno
    miguno over 6 years
    Because more users struggled with the previous default setting, where a longer processing time would lead to your app being mistakenly detected as "dead".
  • Javier Holguera
    Javier Holguera over 6 years
    That makes sense. With a default of 500 records per batch and a previous max.poll.interval.ms of 300 sec, it was quite tight under load scenarios. However, with this new default, are we left with just the session.timeout.ms timeout, since the other is too large to detect anything anymore (by default)?
  • Javier Holguera
    Javier Holguera over 6 years
    Also, considering KIP-62 saying "we give the client as much as max.poll.interval.ms to handle a batch of records, this is also the maximum time before a consumer can be expected to rejoin the group in the worst case", does it mean a broker would wait Integer.MAX_VALUE for a client to report in the event of a rebalance? That sounds improbable, so there must be another timeout in action in that scenario