Error sending fetch request (sessionId=INVALID, epoch=INITIAL) to node 1001: org.apache.kafka.common.errors.DisconnectException
As @Ahmad Abdelghany comment says, settings org.apache.kafka
logs to DEBUG shows:
DEBUG org.apache.kafka.clients.NetworkClient - Disconnecting from node 1 due to request timeout.
DEBUG org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient - Cancelled request with header RequestHeader(apiKey=FETCH, apiVersion=11, clientId=consumer-1, correlationId=183) due to node 1 being disconnected
DEBUG org.apache.kafka.clients.NetworkClient - Give up sending metadata request since no node is available
INFO org.apache.kafka.clients.FetchSessionHandler - Error sending fetch request (sessionId=INVALID, epoch=INITIAL) to node 1: {}.
org.apache.kafka.common.errors.DisconnectException: null
DEBUG org.apache.kafka.clients.NetworkClient - Give up sending metadata request since no node is available
The important bit being:
due to request timeout
You can increase request.timeout.ms
(default 30000) with
props.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, "60000");
Note: your case should be different since your timeout is set to 10 minutes. Displaying debug logs should help.
Edayan
Develops web applications using Node js, React Js etc. Loves linux, open source.
Updated on July 25, 2022Comments
-
Edayan almost 2 years
I am using spring kafka and facing some errors
Error sending fetch request (sessionId=INVALID, epoch=INITIAL) to node 1001: org.apache.kafka.common.errors.DisconnectException.
my consumer producer code is given
` @EnableKafka @Configuration public class KafkaConfig {
@Value(value = "${spring.kafka.consumer.bootstrap-servers}") private String bootstrapAddress; @Value(value = "${spring.kafka.consumer.registry-server}") private String registryAddress; @Value(value = "${spring.kafka.consumer.group-id}") private String groupId; @Bean public ConsumerFactory<String, GenericRecord> consumerFactory() { final Map<String, Object> props = new HashMap<>(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress); props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, registryAddress); props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, 600000); props.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, 600000); final AvroSerde avroSerde = new AvroSerde(); avroSerde.configure(props, false); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, avroSerde.deserializer().getClass()); return new DefaultKafkaConsumerFactory<>(props, new StringDeserializer(), avroSerde.deserializer()); } @Bean @ConditionalOnMissingBean(name = "kafkaListenerContainerFactory") public ConcurrentKafkaListenerContainerFactory<String, GenericRecord> kafkaListenerContainerFactory() { final ConcurrentKafkaListenerContainerFactory<String, GenericRecord> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); factory.setConcurrency(1); return factory; } @Bean KafkaTemplate<String, AlertEvent> kafkaTemplate() { return new KafkaTemplate<>(producerFactory()); } @Bean public ProducerFactory<String, AlertEvent> producerFactory() { Map<String, Object> config = new HashMap<>(); config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress); config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, getAlertGsonEncoder().getClass()); return new DefaultKafkaProducerFactory(config); } private static GsonEncoder<AlertEvent> getAlertGsonEncoder() { final GsonEncoder<AlertEvent> valueSerializer = new GsonEncoder<>(); valueSerializer.configure(Collections.singletonMap(GsonEncoder.INSERT_SCHEMA_CONFIG, false), false); return valueSerializer; } }
`
I am getting the error and not able to understand why, Please help. The configuration is given below.
`
auto.commit.interval.ms = 5000, auto.offset.reset = latest, check.crcs = true, bootstrap.servers = [kafka:9092], client.dns.lookup = default, default.api.timeout.ms = 60000, connections.max.idle.ms = 540000, client.id = , exclude.internal.topics = true, enable.auto.commit = true, fetch.max.bytes = 52428800, fetch.max.wait.ms = 600000, fetch.min.bytes = 1, group.id = iot-brazilian-alert-hb03, interceptor.classes = [], heartbeat.interval.ms = 3000, internal.leave.group.on.close = true, isolation.level = read_uncommitted, key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer, max.partition.fetch.bytes = 1048576, max.poll.interval.ms = 300000, metadata.max.age.ms = 300000, max.poll.records = 500, metric.reporters = [], metrics.num.samples = 2, metrics.recording.level = INFO, metrics.sample.window.ms = 30000, partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor], reconnect.backoff.max.ms = 1000, receive.buffer.bytes = 65536, reconnect.backoff.ms = 50, request.timeout.ms = 600000, retry.backoff.ms = 100, sasl.client.callback.handler.class = null, sasl.jaas.config = null, sasl.kerberos.kinit.cmd = /usr/bin/kinit, sasl.kerberos.min.time.before.relogin = 60000, sasl.kerberos.service.name = null, sasl.kerberos.ticket.renew.jitter = 0.05, sasl.kerberos.ticket.renew.window.factor = 0.8, sasl.login.callback.handler.class = null, sasl.login.class = null, sasl.login.refresh.buffer.seconds = 300, sasl.login.refresh.min.period.seconds = 60, sasl.login.refresh.window.factor = 0.8, sasl.login.refresh.window.jitter = 0.05, sasl.mechanism = GSSAPI, security.protocol = PLAINTEXT, send.buffer.bytes = 131072, ssl.cipher.suites = null, session.timeout.ms = 10000, ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1], ssl.endpoint.identification.algorithm = https, ssl.key.password = null, ssl.keymanager.algorithm = SunX509, ssl.keystore.location = null, ssl.keystore.password = null, ssl.keystore.type = JKS, ssl.protocol = TLS, ssl.provider = null, ssl.secure.random.implementation = null, ssl.trustmanager.algorithm = PKIX, ssl.truststore.location = null, ssl.truststore.password = null, ssl.truststore.type = JKS, value.deserializer = class com.blupa.iot.kafka.encoder.AvroWrapper
`
not sure what I am doing wrongly. I am using org.apache.avro:1:8:2 and io.confluent.kafka-avro-serializer:3.2.1