Spring Kafka Producer not sending to Kafka 1.0.0 (Magic v1 does not support record headers)

18,033

Solution 1

Solved. The problem is neither the broker, some docker cache nor the Spring app.

The problem was a console consumer which I used in parallel for debugging. This was an "old" consumer started with kafka-console-consumer.sh --topic=topic --zookeeper=...

It actually prints a warning when started: Using the ConsoleConsumer with old consumer is deprecated and will be removed in a future major release. Consider using the new consumer by passing [bootstrap-server] instead of [zookeeper].

A "new" consumer with --bootstrap-server option should be used (especially when using Kafka 1.0 with JsonSerializer). Note: Using an old consumer here can indeed affect the producer.

Solution 2

I had a similar issue. Kafka adds headers by default if we use JsonSerializer or JsonSerde for values. In order to prevent this issue, we need to disable adding info headers.

if you are fine with default json serialization, then use the following (key point here is ADD_TYPE_INFO_HEADERS):

Map<String, Object> props = new HashMap<>(defaultSettings);
props.put(JsonSerializer.ADD_TYPE_INFO_HEADERS, false);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
ProducerFactory<String, Object> producerFactory = new DefaultKafkaProducerFactory<>(props);

but if you need a custom JsonSerializer with specific ObjectMapper (like with PropertyNamingStrategy.SNAKE_CASE), you should disable adding info headers explicitly on JsonSerializer, as spring kafka ignores DefaultKafkaProducerFactory's property ADD_TYPE_INFO_HEADERS (as for me it's a bad design of spring kafka)

JsonSerializer<Object> valueSerializer = new JsonSerializer<>(customObjectMapper);
valueSerializer.setAddTypeInfo(false);
ProducerFactory<String, Object> producerFactory = new DefaultKafkaProducerFactory<>(props, Serdes.String().serializer(), valueSerializer);

or if we use JsonSerde, then:

Map<String, Object> jsonSerdeProperties = new HashMap<>();
jsonSerdeProperties.put(JsonSerializer.ADD_TYPE_INFO_HEADERS, false);
JsonSerde<T> jsonSerde = new JsonSerde<>(serdeClass);
jsonSerde.configure(jsonSerdeProperties, false);

Solution 3

I just ran a test against that docker image with no problems...

$docker ps

CONTAINER ID        IMAGE                    COMMAND                  CREATED             STATUS              PORTS                                                NAMES
f093b3f2475c        kafkadocker_kafka        "start-kafka.sh"         33 minutes ago      Up 2 minutes        0.0.0.0:32768->9092/tcp                              kafkadocker_kafka_1
319365849e48        wurstmeister/zookeeper   "/bin/sh -c '/usr/sb…"   33 minutes ago      Up 2 minutes        22/tcp, 2888/tcp, 3888/tcp, 0.0.0.0:2181->2181/tcp   kafkadocker_zookeeper_1

.

@SpringBootApplication
public class So47953901Application {

    public static void main(String[] args) {
        SpringApplication.run(So47953901Application.class, args);
    }

    @Bean
    public ApplicationRunner runner(KafkaTemplate<Object, Object> template) {
        return args -> template.send("foo", "bar", "baz");
    }

    @KafkaListener(id = "foo", topics = "foo")
    public void listen(String in) {
        System.out.println(in);
    }

}

.

spring.kafka.bootstrap-servers=192.168.177.135:32768
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.enable-auto-commit=false

.

2017-12-23 13:27:27.990  INFO 21305 --- [      foo-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : partitions assigned: [foo-0]
baz

EDIT

Still works for me...

spring.kafka.bootstrap-servers=192.168.177.135:32768
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.enable-auto-commit=false
spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer
spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer

.

2017-12-23 15:27:59.997  INFO 44079 --- [           main] o.a.k.clients.producer.ProducerConfig    : ProducerConfig values: 
    acks = 1
    ...
    value.serializer = class org.springframework.kafka.support.serializer.JsonSerializer

...

2017-12-23 15:28:00.071  INFO 44079 --- [      foo-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : partitions assigned: [foo-0]
baz
Share:
18,033
DerM
Author by

DerM

Updated on June 02, 2022

Comments

  • DerM
    DerM almost 2 years

    I am using this docker-compose setup for setting up Kafka locally: https://github.com/wurstmeister/kafka-docker/

    docker-compose up works fine, creating topics via shell works fine.

    Now I try to connect to Kafka via spring-kafka:2.1.0.RELEASE

    When starting up the Spring application it prints the correct version of Kafka:

    o.a.kafka.common.utils.AppInfoParser     : Kafka version : 1.0.0
    o.a.kafka.common.utils.AppInfoParser     : Kafka commitId : aaa7af6d4a11b29d
    

    I try to send a message like this

    kafkaTemplate.send("test-topic", UUID.randomUUID().toString(), "test");
    

    Sending on client side fails with

    UnknownServerException: The server experienced an unexpected error when processing the request
    

    In the server console I get the message Magic v1 does not support record headers

    Error when handling request {replica_id=-1,max_wait_time=100,min_bytes=1,max_bytes=2147483647,topics=[{topic=test-topic,partitions=[{partition=0,fetch_offset=39,max_bytes=1048576}]}]} (kafka.server.KafkaApis)
    java.lang.IllegalArgumentException: Magic v1 does not support record headers
    

    Googling suggests a version conflict, but the version seem to fit (org.apache.kafka:kafka-clients:1.0.0 is in the classpath).

    Any clues? Thanks!

    Edit: I narrowed down the source of the problem. Sending plain Strings works, but sending Json via JsonSerializer results in the given problem. Here is the content of my producer config:

    @Value("\${kafka.bootstrap-servers}")
    lateinit var bootstrapServers: String
    
    @Bean
    fun producerConfigs(): Map<String, Any> =
            HashMap<String, Any>().apply {
                // list of host:port pairs used for establishing the initial connections to the Kakfa cluster
                put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers)
                put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer::class.java)
                put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer::class.java)
            }
    
    @Bean
    fun producerFactory(): ProducerFactory<String, MyClass> =
            DefaultKafkaProducerFactory(producerConfigs())
    
    @Bean
    fun kafkaTemplate(): KafkaTemplate<String, MyClass> =
            KafkaTemplate(producerFactory())
    
  • DerM
    DerM over 6 years
    Thanks for checking! I narrowed down the source of the problem. If I send plain Strings it works! But if I use JsonSerializer, the problem occurs. See update.
  • Gary Russell
    Gary Russell over 6 years
    Still works for me - see my edit. The JsonSerializer adds headers by default so it definitely looks like the broker in your docker image is < 0.11. You can confirm by setting producer property JsonSerializer.ADD_TYPE_INFO_HEADERS to false. That property stops it from adding the type info in headers.
  • DerM
    DerM over 6 years
    Thanks! You are right, it works with the given property. I just don't get it. When I fire up docker-compose it prints definitely kafka_1 | [2017-12-23 20:51:53,289] INFO Kafka version : 1.0.0. I also tried recreating, deleting all images, etc.. like this: docker-compose down && docker-compose build --no-cache && docker-compose up
  • Gary Russell
    Gary Russell over 6 years
    Try docker logs <containerId> > server.log in mine, I see this: [2017-12-23 18:07:52,905] INFO Kafka version : 1.0.0 (org.apache.kafka.common.utils.AppInfoParser).
  • DerM
    DerM over 6 years
    Yes.. Same here.
  • DerM
    DerM over 6 years
    If I /bin/bash into the image I also see all the kafka_2.12-1.0.0 jars.
  • Gary Russell
    Gary Russell over 6 years
    Bizarre - what's even stranger is, if I talk to an 0.10.2.0 server from S-K 2.1.0 and the JsonSerializer, I get the exception on the client as I would expect Caused by: java.lang.IllegalArgumentException: Magic v1 does not support record headers. As I said in my original comment to your question, old brokers know nothing about headers so I can't see how you can get this error on a server unless the client is old. Are you running your client on AWS? I have heard before some weirdness with code on AWS running old kafka-clients even if the app is packaged with the right one.
  • Gary Russell
    Gary Russell over 6 years
    Look for 2017-12-23 16:22:16.500 INFO 55322 --- [ main] o.a.kafka.common.utils.AppInfoParser : Kafka version : 1.0.0 in the client log.
  • DerM
    DerM over 6 years
    So.. another update. I think the problem is neither the broker nor the Spring app. I was using a console consumer in parallel to the Spring app for debugging (based on this tutorial ). I am pretty sure the problem occurs when using the "old" consumer (kafka-console-consumer.sh --topic=topic --zookeeper=$ZK) with the parameter zookeper instead of bootstrap-server. What I find interesting is that this consumer results in an UnknownServerException in the (Spring) producer.
  • Gary Russell
    Gary Russell over 6 years
    Try using the --bootstrap-server option instead of --zookeeper; the console-consumer will use the new consumer in that case (but you can't see headers with the console consumer, regardless). But I agree it's weird that the consumer can affect the producer. Maybe there's some logic that says "we have an old consumer attached to this topic so you can't send headers".
  • DerM
    DerM over 6 years
    Yes, it works. Thank you very much for your time, happy holidays!
  • mojtab23
    mojtab23 almost 6 years
    Adding this line solved my problem props.put(JsonSerializer.ADD_TYPE_INFO_HEADERS, false);
  • Dikla
    Dikla over 4 years
    What if I do need the type header? How can I overcome the issue?
  • Gopal
    Gopal over 3 years
    I am not running any consumer in parallel but still this issue is coming. I also tried adding props.put(JsonSerializer.ADD_TYPE_INFO_HEADERS, false); but still it is giving the same error of magicv1 doesnt support adding record header. Can you please help here