Kafka Stream with Avro in JAVA , schema.registry.url" which has no default value
13,708
Solution 1
If you have keys and values in Avro format the following lines should do the trick for you,
config.put("key.converter.schema.registry.url", "http://localhost:8081");
config.put("value.converter.schema.registry.url", "http://localhost:8081");
If this doesn't seem to work you can override Serdes
explicitly. For example, if you have Avro keys:
final Map<String, String> serdeConfig = Collections.singletonMap("schema.registry.url",
"http://localhost:8081");
final Serde<GenericRecord> keyGenericAvroSerde = new GenericAvroSerde();
keyGenericAvroSerde.configure(serdeConfig, true); // true for record keys
final Serde<GenericRecord> valueGenericAvroSerde = new GenericAvroSerde();
valueGenericAvroSerde.configure(serdeConfig, false); // false for record values
StreamsBuilder builder = new StreamsBuilder();
KStream<GenericRecord, GenericRecord> textLines =
builder.stream(keyGenericAvroSerde, valueGenericAvroSerde, "my-avro-topic");
// Do whatever you like
Solution 2
If you are using an application.yml
you can set the property like:
spring:
kafka:
properties:
schema.registry.url: your-schema-registy-url
consumer:
auto-offset-reset: latest
group-id: simple-consumer
I found it in a tutorial from the confluent blog
Author by
Benny Chan
Updated on June 28, 2022Comments
-
Benny Chan almost 2 years
I have the following configuration for my Kafka Stream application
Properties config = new Properties(); config.put(StreamsConfig.APPLICATION_ID_CONFIG,this.applicaionId); config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,svrConfig.getBootstrapServers()); config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); // we disable the cache to demonstrate all the "steps" involved in the transformation - not recommended in prod config.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, svrConfig.getCacheMaxBytesBufferingConfig()); // Exactly once processing!! config.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE); config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,SpecificAvroSerde.class); config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,SpecificAvroSerde.class); config.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG,"http://localhost:8081");
And I got the following error:
Exception in thread "main" io.confluent.common.config.ConfigException: Missing required configuration "schema.registry.url" which has no default value. at io.confluent.common.config.ConfigDef.parse(ConfigDef.java:243) at io.confluent.common.config.AbstractConfig.<init>(AbstractConfig.java:78) at io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig.<init>(AbstractKafkaAvroSerDeConfig.java:100) at io.confluent.kafka.serializers.KafkaAvroSerializerConfig.<init>(KafkaAvroSerializerConfig.java:32) at io.confluent.kafka.serializers.KafkaAvroSerializer.configure(KafkaAvroSerializer.java:48) at io.confluent.kafka.streams.serdes.avro.SpecificAvroSerializer.configure(SpecificAvroSerializer.java:58) at io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde.configure(SpecificAvroSerde.java:107)
I have tried to replace the line
config.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG,"http://localhost:8081");
with
config.put("schema.registry.url","http://localhost:8081");
but with the same error
I have followed the instruction from this url when preparing my Stream application.
Any suggestion?
-
Benny Chan almost 6 yearsThanks so much Giorgos. Turn out I add the line "final Map<String, String> serdeConfig = Collections.singletonMap("schema.registry.url", "localhost:8081");" and everything works fine. I was thinking this line is only an optional setting to be run but turn out it seems this is a must~
-
Matthias J. Sax almost 6 yearsFollow up: Configs from
StreamConfig
are only passed to all Serdes KafkaStreams creates internally. For you case, you create the Serdes in your code, and thus it's your responsibility to provide the correct configuration. Cf. docs.confluent.io/current/streams/developer-guide/… -
Naveen over 4 yearsDo anyone know how to configure the same in spring boot application? maybe directly via
application.yml
? -
Jonathan Morales Vélez almost 3 years@Naveen check my answer below stackoverflow.com/a/68574923/1128216
-
Jonathan Morales Vélez almost 3 yearsIt logs a warning
The configuration 'schema.registry.url' was supplied but isn't a known config.
There's an answer saying that the warning goes away by changing the position of the property (placing it after theproducer
property) That suggestion doesn't work for me. -
OneCricketeer over 2 yearsThe warning is because AdminClient config doesn't know about that property. You can move it under
consumer.properties
to silence that -
OneCricketeer over 2 yearsWhy are you using Kafka Connect converter properties for Kafka Streams?
-
OneCricketeer over 2 yearsThis seems to duplicate the bottom half of the accepted answer. If you're saying the Consumed / Produced objects are needed, better to edit / comment the post