Spring Kafka Producer not sending to Kafka 1.0.0 (Magic v1 does not support record headers)
Solution 1
Solved. The problem is neither the broker, some docker cache nor the Spring app.
The problem was a console consumer which I used in parallel for debugging. This was an "old" consumer started with kafka-console-consumer.sh --topic=topic --zookeeper=...
It actually prints a warning when started: Using the ConsoleConsumer with old consumer is deprecated and will be removed in a future major release. Consider using the new consumer by passing [bootstrap-server] instead of [zookeeper].
A "new" consumer with --bootstrap-server
option should be used (especially when using Kafka 1.0 with JsonSerializer).
Note: Using an old consumer here can indeed affect the producer.
Solution 2
I had a similar issue. Kafka adds headers by default if we use JsonSerializer
or JsonSerde
for values.
In order to prevent this issue, we need to disable adding info headers.
if you are fine with default json serialization, then use the following (key point here is ADD_TYPE_INFO_HEADERS
):
Map<String, Object> props = new HashMap<>(defaultSettings);
props.put(JsonSerializer.ADD_TYPE_INFO_HEADERS, false);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
ProducerFactory<String, Object> producerFactory = new DefaultKafkaProducerFactory<>(props);
but if you need a custom JsonSerializer
with specific ObjectMapper
(like with PropertyNamingStrategy.SNAKE_CASE
), you should disable adding info headers explicitly on JsonSerializer
, as spring kafka ignores DefaultKafkaProducerFactory
's property ADD_TYPE_INFO_HEADERS
(as for me it's a bad design of spring kafka)
JsonSerializer<Object> valueSerializer = new JsonSerializer<>(customObjectMapper);
valueSerializer.setAddTypeInfo(false);
ProducerFactory<String, Object> producerFactory = new DefaultKafkaProducerFactory<>(props, Serdes.String().serializer(), valueSerializer);
or if we use JsonSerde
, then:
Map<String, Object> jsonSerdeProperties = new HashMap<>();
jsonSerdeProperties.put(JsonSerializer.ADD_TYPE_INFO_HEADERS, false);
JsonSerde<T> jsonSerde = new JsonSerde<>(serdeClass);
jsonSerde.configure(jsonSerdeProperties, false);
Solution 3
I just ran a test against that docker image with no problems...
$docker ps
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
f093b3f2475c kafkadocker_kafka "start-kafka.sh" 33 minutes ago Up 2 minutes 0.0.0.0:32768->9092/tcp kafkadocker_kafka_1
319365849e48 wurstmeister/zookeeper "/bin/sh -c '/usr/sb…" 33 minutes ago Up 2 minutes 22/tcp, 2888/tcp, 3888/tcp, 0.0.0.0:2181->2181/tcp kafkadocker_zookeeper_1
.
@SpringBootApplication
public class So47953901Application {
public static void main(String[] args) {
SpringApplication.run(So47953901Application.class, args);
}
@Bean
public ApplicationRunner runner(KafkaTemplate<Object, Object> template) {
return args -> template.send("foo", "bar", "baz");
}
@KafkaListener(id = "foo", topics = "foo")
public void listen(String in) {
System.out.println(in);
}
}
.
spring.kafka.bootstrap-servers=192.168.177.135:32768
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.enable-auto-commit=false
.
2017-12-23 13:27:27.990 INFO 21305 --- [ foo-0-C-1] o.s.k.l.KafkaMessageListenerContainer : partitions assigned: [foo-0]
baz
EDIT
Still works for me...
spring.kafka.bootstrap-servers=192.168.177.135:32768
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.enable-auto-commit=false
spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer
spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer
.
2017-12-23 15:27:59.997 INFO 44079 --- [ main] o.a.k.clients.producer.ProducerConfig : ProducerConfig values:
acks = 1
...
value.serializer = class org.springframework.kafka.support.serializer.JsonSerializer
...
2017-12-23 15:28:00.071 INFO 44079 --- [ foo-0-C-1] o.s.k.l.KafkaMessageListenerContainer : partitions assigned: [foo-0]
baz
DerM
Updated on June 02, 2022Comments
-
DerM almost 2 years
I am using this docker-compose setup for setting up Kafka locally: https://github.com/wurstmeister/kafka-docker/
docker-compose up
works fine, creating topics via shell works fine.Now I try to connect to Kafka via
spring-kafka:2.1.0.RELEASE
When starting up the Spring application it prints the correct version of Kafka:
o.a.kafka.common.utils.AppInfoParser : Kafka version : 1.0.0 o.a.kafka.common.utils.AppInfoParser : Kafka commitId : aaa7af6d4a11b29d
I try to send a message like this
kafkaTemplate.send("test-topic", UUID.randomUUID().toString(), "test");
Sending on client side fails with
UnknownServerException: The server experienced an unexpected error when processing the request
In the server console I get the message Magic v1 does not support record headers
Error when handling request {replica_id=-1,max_wait_time=100,min_bytes=1,max_bytes=2147483647,topics=[{topic=test-topic,partitions=[{partition=0,fetch_offset=39,max_bytes=1048576}]}]} (kafka.server.KafkaApis) java.lang.IllegalArgumentException: Magic v1 does not support record headers
Googling suggests a version conflict, but the version seem to fit (
org.apache.kafka:kafka-clients:1.0.0
is in the classpath).Any clues? Thanks!
Edit: I narrowed down the source of the problem. Sending plain Strings works, but sending Json via JsonSerializer results in the given problem. Here is the content of my producer config:
@Value("\${kafka.bootstrap-servers}") lateinit var bootstrapServers: String @Bean fun producerConfigs(): Map<String, Any> = HashMap<String, Any>().apply { // list of host:port pairs used for establishing the initial connections to the Kakfa cluster put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers) put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer::class.java) put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer::class.java) } @Bean fun producerFactory(): ProducerFactory<String, MyClass> = DefaultKafkaProducerFactory(producerConfigs()) @Bean fun kafkaTemplate(): KafkaTemplate<String, MyClass> = KafkaTemplate(producerFactory())
-
DerM over 6 yearsThanks for checking! I narrowed down the source of the problem. If I send plain Strings it works! But if I use JsonSerializer, the problem occurs. See update.
-
Gary Russell over 6 yearsStill works for me - see my edit. The
JsonSerializer
adds headers by default so it definitely looks like the broker in your docker image is < 0.11. You can confirm by setting producer propertyJsonSerializer.ADD_TYPE_INFO_HEADERS
tofalse
. That property stops it from adding the type info in headers. -
DerM over 6 yearsThanks! You are right, it works with the given property. I just don't get it. When I fire up docker-compose it prints definitely
kafka_1 | [2017-12-23 20:51:53,289] INFO Kafka version : 1.0.0
. I also tried recreating, deleting all images, etc.. like this:docker-compose down && docker-compose build --no-cache && docker-compose up
-
Gary Russell over 6 yearsTry
docker logs <containerId> > server.log
in mine, I see this:[2017-12-23 18:07:52,905] INFO Kafka version : 1.0.0 (org.apache.kafka.common.utils.AppInfoParser)
. -
DerM over 6 yearsYes.. Same here.
-
DerM over 6 yearsIf I
/bin/bash
into the image I also see all the kafka_2.12-1.0.0 jars. -
Gary Russell over 6 yearsBizarre - what's even stranger is, if I talk to an 0.10.2.0 server from S-K 2.1.0 and the
JsonSerializer
, I get the exception on the client as I would expectCaused by: java.lang.IllegalArgumentException: Magic v1 does not support record headers
. As I said in my original comment to your question, old brokers know nothing about headers so I can't see how you can get this error on a server unless the client is old. Are you running your client on AWS? I have heard before some weirdness with code on AWS running oldkafka-clients
even if the app is packaged with the right one. -
Gary Russell over 6 yearsLook for
2017-12-23 16:22:16.500 INFO 55322 --- [ main] o.a.kafka.common.utils.AppInfoParser : Kafka version : 1.0.0
in the client log. -
DerM over 6 yearsSo.. another update. I think the problem is neither the broker nor the Spring app. I was using a console consumer in parallel to the Spring app for debugging (based on this tutorial ). I am pretty sure the problem occurs when using the "old" consumer (
kafka-console-consumer.sh --topic=topic --zookeeper=$ZK
) with the parameter zookeper instead of bootstrap-server. What I find interesting is that this consumer results in an UnknownServerException in the (Spring) producer. -
Gary Russell over 6 yearsTry using the
--bootstrap-server
option instead of--zookeeper
; theconsole-consumer
will use the new consumer in that case (but you can't see headers with the console consumer, regardless). But I agree it's weird that the consumer can affect the producer. Maybe there's some logic that says "we have an old consumer attached to this topic so you can't send headers". -
DerM over 6 yearsYes, it works. Thank you very much for your time, happy holidays!
-
mojtab23 almost 6 yearsAdding this line solved my problem
props.put(JsonSerializer.ADD_TYPE_INFO_HEADERS, false);
-
Dikla over 4 yearsWhat if I do need the type header? How can I overcome the issue?
-
Gopal over 3 yearsI am not running any consumer in parallel but still this issue is coming. I also tried adding props.put(JsonSerializer.ADD_TYPE_INFO_HEADERS, false); but still it is giving the same error of magicv1 doesnt support adding record header. Can you please help here