Kafka Stream with Avro in JAVA , schema.registry.url" which has no default value

13,708

Solution 1

If you have keys and values in Avro format the following lines should do the trick for you,

config.put("key.converter.schema.registry.url", "http://localhost:8081");  
config.put("value.converter.schema.registry.url", "http://localhost:8081");

If this doesn't seem to work you can override Serdes explicitly. For example, if you have Avro keys:

final Map<String, String> serdeConfig = Collections.singletonMap("schema.registry.url",
                                                                 "http://localhost:8081");
final Serde<GenericRecord> keyGenericAvroSerde = new GenericAvroSerde();
keyGenericAvroSerde.configure(serdeConfig, true); // true for record keys
final Serde<GenericRecord> valueGenericAvroSerde = new GenericAvroSerde();
valueGenericAvroSerde.configure(serdeConfig, false); // false for record values

StreamsBuilder builder = new StreamsBuilder();
KStream<GenericRecord, GenericRecord> textLines =
builder.stream(keyGenericAvroSerde, valueGenericAvroSerde, "my-avro-topic");
// Do whatever you like

Solution 2

If you are using an application.yml you can set the property like:

spring:
  kafka:
    properties:
      schema.registry.url: your-schema-registy-url
    consumer:
      auto-offset-reset: latest
      group-id: simple-consumer

I found it in a tutorial from the confluent blog

Share:
13,708
Benny Chan
Author by

Benny Chan

Updated on June 28, 2022

Comments

  • Benny Chan
    Benny Chan almost 2 years

    I have the following configuration for my Kafka Stream application

        Properties config = new Properties();
        config.put(StreamsConfig.APPLICATION_ID_CONFIG,this.applicaionId);
        config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,svrConfig.getBootstrapServers());
        config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    
        // we disable the cache to demonstrate all the "steps" involved in the transformation - not recommended in prod
        config.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, svrConfig.getCacheMaxBytesBufferingConfig());
    
        // Exactly once processing!!
        config.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE);
        config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,SpecificAvroSerde.class);
        config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,SpecificAvroSerde.class);
        config.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG,"http://localhost:8081");
    

    And I got the following error:

    Exception in thread "main" io.confluent.common.config.ConfigException: Missing required configuration "schema.registry.url" which has no default value.
    at io.confluent.common.config.ConfigDef.parse(ConfigDef.java:243)
    at io.confluent.common.config.AbstractConfig.<init>(AbstractConfig.java:78)
    at io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig.<init>(AbstractKafkaAvroSerDeConfig.java:100)
    at io.confluent.kafka.serializers.KafkaAvroSerializerConfig.<init>(KafkaAvroSerializerConfig.java:32)
    at io.confluent.kafka.serializers.KafkaAvroSerializer.configure(KafkaAvroSerializer.java:48)
    at io.confluent.kafka.streams.serdes.avro.SpecificAvroSerializer.configure(SpecificAvroSerializer.java:58)
    at io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde.configure(SpecificAvroSerde.java:107)
    

    I have tried to replace the line

    config.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG,"http://localhost:8081");
    

    with

    config.put("schema.registry.url","http://localhost:8081");
    

    but with the same error

    I have followed the instruction from this url when preparing my Stream application.

    Any suggestion?

  • Benny Chan
    Benny Chan almost 6 years
    Thanks so much Giorgos. Turn out I add the line "final Map<String, String> serdeConfig = Collections.singletonMap("schema.registry.url", "localhost:8081");" and everything works fine. I was thinking this line is only an optional setting to be run but turn out it seems this is a must~
  • Matthias J. Sax
    Matthias J. Sax almost 6 years
    Follow up: Configs from StreamConfig are only passed to all Serdes KafkaStreams creates internally. For you case, you create the Serdes in your code, and thus it's your responsibility to provide the correct configuration. Cf. docs.confluent.io/current/streams/developer-guide/…
  • Naveen
    Naveen over 4 years
    Do anyone know how to configure the same in spring boot application? maybe directly via application.yml?
  • Jonathan Morales Vélez
    Jonathan Morales Vélez almost 3 years
    @Naveen check my answer below stackoverflow.com/a/68574923/1128216
  • Jonathan Morales Vélez
    Jonathan Morales Vélez almost 3 years
    It logs a warning The configuration 'schema.registry.url' was supplied but isn't a known config. There's an answer saying that the warning goes away by changing the position of the property (placing it after the producer property) That suggestion doesn't work for me.
  • OneCricketeer
    OneCricketeer over 2 years
    The warning is because AdminClient config doesn't know about that property. You can move it under consumer.properties to silence that
  • OneCricketeer
    OneCricketeer over 2 years
    Why are you using Kafka Connect converter properties for Kafka Streams?
  • OneCricketeer
    OneCricketeer over 2 years
    This seems to duplicate the bottom half of the accepted answer. If you're saying the Consumed / Produced objects are needed, better to edit / comment the post