Simple embedded Kafka test example with spring boot
Solution 1
Embedded Kafka tests work for me with below configs,
Annotation on test class
@EnableKafka
@SpringBootTest(classes = {KafkaController.class}) // Specify @KafkaListener class if its not the same class, or not loaded with test config
@EmbeddedKafka(
partitions = 1,
controlledShutdown = false,
brokerProperties = {
"listeners=PLAINTEXT://localhost:3333",
"port=3333"
})
public class KafkaConsumerTest {
@Autowired
KafkaEmbedded kafkaEmbeded;
@Autowired
KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry;
Before annotation for setup method
@Before
public void setUp() throws Exception {
for (MessageListenerContainer messageListenerContainer : kafkaListenerEndpointRegistry.getListenerContainers()) {
ContainerTestUtils.waitForAssignment(messageListenerContainer,
kafkaEmbeded.getPartitionsPerTopic());
}
}
Note: I am not using @ClassRule
for creating embedded Kafka rather auto-wiring
@Autowired embeddedKafka
@Test
public void testReceive() throws Exception {
kafkaTemplate.send(topic, data);
}
Hope this helps!
Edit: Test configuration class marked with @TestConfiguration
@TestConfiguration
public class TestConfig {
@Bean
public ProducerFactory<String, String> producerFactory() {
return new DefaultKafkaProducerFactory<>(KafkaTestUtils.producerProps(kafkaEmbedded));
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
KafkaTemplate<String, String> kafkaTemplate = new KafkaTemplate<>(producerFactory());
kafkaTemplate.setDefaultTopic(topic);
return kafkaTemplate;
}
Now @Test
method will autowire KafkaTemplate and use is to send message
kafkaTemplate.send(topic, data);
Updated answer code block with above line
Solution 2
since the accepted answer doesn't compile or work for me. I find another solution based on https://blog.mimacom.com/testing-apache-kafka-with-spring-boot/ what I would like to share with you.
The dependency is 'spring-kafka-test' version: '2.2.7.RELEASE'
@RunWith(SpringRunner.class)
@EmbeddedKafka(partitions = 1, topics = { "testTopic" })
@SpringBootTest
public class SimpleKafkaTest {
private static final String TEST_TOPIC = "testTopic";
@Autowired
EmbeddedKafkaBroker embeddedKafkaBroker;
@Test
public void testReceivingKafkaEvents() {
Consumer<Integer, String> consumer = configureConsumer();
Producer<Integer, String> producer = configureProducer();
producer.send(new ProducerRecord<>(TEST_TOPIC, 123, "my-test-value"));
ConsumerRecord<Integer, String> singleRecord = KafkaTestUtils.getSingleRecord(consumer, TEST_TOPIC);
assertThat(singleRecord).isNotNull();
assertThat(singleRecord.key()).isEqualTo(123);
assertThat(singleRecord.value()).isEqualTo("my-test-value");
consumer.close();
producer.close();
}
private Consumer<Integer, String> configureConsumer() {
Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("testGroup", "true", embeddedKafkaBroker);
consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
Consumer<Integer, String> consumer = new DefaultKafkaConsumerFactory<Integer, String>(consumerProps)
.createConsumer();
consumer.subscribe(Collections.singleton(TEST_TOPIC));
return consumer;
}
private Producer<Integer, String> configureProducer() {
Map<String, Object> producerProps = new HashMap<>(KafkaTestUtils.producerProps(embeddedKafkaBroker));
return new DefaultKafkaProducerFactory<Integer, String>(producerProps).createProducer();
}
}
Solution 3
I solved the issue now
@BeforeClass
public static void setUpBeforeClass() {
System.setProperty("spring.kafka.bootstrap-servers", embeddedKafka.getBrokersAsString());
System.setProperty("spring.cloud.stream.kafka.binder.zkNodes", embeddedKafka.getZookeeperConnectionString());
}
while I was debugging, I saw that the embedded kaka server is taking a random port.
I couldn't find the configuration for it, so I am setting the kafka config same as the server. Looks still a bit ugly for me.
I would love to have just the @Mayur mentioned line
@EmbeddedKafka(partitions = 1, controlledShutdown = false, brokerProperties = {"listeners=PLAINTEXT://localhost:9092", "port=9092"})
but can't find the right dependency in the internet.
Solution 4
In integration testing, having fixed ports like 9092 is not recommended because multiple tests should have the flexibility to open their own ports from embedded instances. So, following implementation is something like that,
NB: this implementation is based on junit5(Jupiter:5.7.0) and spring-boot 2.3.4.RELEASE
TestClass:
@EnableKafka
@SpringBootTest(classes = {ConsumerTest.Config.class, Consumer.class})
@EmbeddedKafka(
partitions = 1,
controlledShutdown = false)
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
public class ConsumerTest {
@Autowired
private EmbeddedKafkaBroker kafkaEmbedded;
@Autowired
private KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry;
@BeforeAll
public void setUp() throws Exception {
for (final MessageListenerContainer messageListenerContainer : kafkaListenerEndpointRegistry.getListenerContainers()) {
ContainerTestUtils.waitForAssignment(messageListenerContainer,
kafkaEmbedded.getPartitionsPerTopic());
}
}
@Value("${topic.name}")
private String topicName;
@Autowired
private KafkaTemplate<String, Optional<Map<String, List<ImmutablePair<String, String>>>>> requestKafkaTemplate;
@Test
public void consume_success() {
requestKafkaTemplate.send(topicName, load);
}
@Configuration
@Import({
KafkaListenerConfig.class,
TopicConfig.class
})
public static class Config {
@Value(value = "${spring.kafka.bootstrap-servers}")
private String bootstrapAddress;
@Bean
public ProducerFactory<String, Optional<Map<String, List<ImmutablePair<String, String>>>>> requestProducerFactory() {
final Map<String, Object> configProps = new HashMap<>();
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
return new DefaultKafkaProducerFactory<>(configProps);
}
@Bean
public KafkaTemplate<String, Optional<Map<String, List<ImmutablePair<String, String>>>>> requestKafkaTemplate() {
return new KafkaTemplate<>(requestProducerFactory());
}
}
}
Listener Class:
@Component
public class Consumer {
@KafkaListener(
topics = "${topic.name}",
containerFactory = "listenerContainerFactory"
)
@Override
public void listener(
final ConsumerRecord<String, Optional<Map<String, List<ImmutablePair<String, String>>>>> consumerRecord,
final @Payload Optional<Map<String, List<ImmutablePair<String, String>>>> payload
) {
}
}
Listner Config:
@Configuration
public class KafkaListenerConfig {
@Value(value = "${spring.kafka.bootstrap-servers}")
private String bootstrapAddress;
@Value(value = "${topic.name}")
private String resolvedTreeQueueName;
@Bean
public ConsumerFactory<String, Optional<Map<String, List<ImmutablePair<String, String>>>>> resolvedTreeConsumerFactory() {
final Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
props.put(ConsumerConfig.GROUP_ID_CONFIG, resolvedTreeQueueName);
return new DefaultKafkaConsumerFactory<>(props, new StringDeserializer(), new CustomDeserializer());
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Optional<Map<String, List<ImmutablePair<String, String>>>>> resolvedTreeListenerContainerFactory() {
final ConcurrentKafkaListenerContainerFactory<String, Optional<Map<String, List<ImmutablePair<String, String>>>>> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(resolvedTreeConsumerFactory());
return factory;
}
}
TopicConfig:
@Configuration
public class TopicConfig {
@Value(value = "${spring.kafka.bootstrap-servers}")
private String bootstrapAddress;
@Value(value = "${topic.name}")
private String requestQueue;
@Bean
public KafkaAdmin kafkaAdmin() {
Map<String, Object> configs = new HashMap<>();
configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
return new KafkaAdmin(configs);
}
@Bean
public NewTopic requestTopic() {
return new NewTopic(requestQueue, 1, (short) 1);
}
}
application.properties:
spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}
This assignment is the most important assignment that would bind the embedded instance port to the KafkaTemplate and, KafkaListners.
Following the above implementation, you could open dynamic ports per test class and, it would be more convenient.
Related videos on Youtube
Yuna Braska
Updated on July 09, 2022Comments
-
Yuna Braska almost 2 years
Edit FYI: working gitHub example
I was searching the internet and couldn't find a working and simple example of an embedded Kafka test.
My setup is:- Spring boot
- Multiple @KafkaListener with different topics in one class
- Embedded Kafka for test which is starting fine
- Test with Kafkatemplate which is sending to topic but the @KafkaListener methods are not receiving anything even after a huge sleep time
- No warnings or errors are shown, only info spam from Kafka in logs
Please help me. There are mostly over configured or overengineered examples. I am sure it can be done simple. Thanks, guys!
@Controller public class KafkaController { private static final Logger LOG = getLogger(KafkaController.class); @KafkaListener(topics = "test.kafka.topic") public void receiveDunningHead(final String payload) { LOG.debug("Receiving event with payload [{}]", payload); //I will do database stuff here which i could check in db for testing } }
private static String SENDER_TOPIC = "test.kafka.topic";
@ClassRule public static KafkaEmbedded embeddedKafka = new KafkaEmbedded(1, true, SENDER_TOPIC); @Test public void testSend() throws InterruptedException, ExecutionException { Map<String, Object> senderProps = KafkaTestUtils.producerProps(embeddedKafka); KafkaProducer<Integer, String> producer = new KafkaProducer<>(senderProps); producer.send(new ProducerRecord<>(SENDER_TOPIC, 0, 0, "message00")).get(); producer.send(new ProducerRecord<>(SENDER_TOPIC, 0, 1, "message01")).get(); producer.send(new ProducerRecord<>(SENDER_TOPIC, 1, 0, "message10")).get(); Thread.sleep(10000); }
-
pvpkiran about 6 yearsshow the code. see if this helps stackoverflow.com/questions/48682745/…
-
Yuna Braska about 6 years@pvpkiran this is still not working. the test does only test itself but never reaches my KafkaListener when i just take the sending part to my topic
-
Artem Bilan about 6 yearsIt’s not clear by your test code how that
KafkaController
is involved. How are you sure that the listener is started? -
Yuna Braska about 6 years@ArtemBilan cause there is the [@KafkaListener] annotation on the method. or do I have todo something else?
-
Artem Bilan about 6 yearsRight, the test needs to bootstrap an application context with that component
-
Yuna Braska about 6 years@ArtemBilan its spring boot, the context is starting. with the test annotation [@RunWith(SpringRunner.class) @SpringBootTest] unfortunately, even get this example fails: codenotfound.com/spring-kafka-boot-example.html
-
Artem Bilan about 6 yearsRight, but how that
@SpringBootTest
know about yourKafkaController
component? How is it scanned or configured ? -
Artem Bilan about 6 yearsPay attention how that sample has
@SpringBootApplication
class in the same package as@SpringBootTest
. And thoseReceiver
andSender
components are in the nested packages. So, they all are clearly scanned and configured. And that's how it works. If your@SpringBootTest
is in different package, your component are not visible and you should provide some@Configuraiton
class.
-
Yuna Braska about 6 yearsThanks! this sounds great, but where does [@EmbeddedKafka] and [kafkaListenerEndpointRegistry] come from? Can you post a full example with imports?
-
donm about 6 yearsSince we have annotated our class with
@EnableKafka
and@EmbeddedKafka
annotations, you can autowire both in the test class. In answer first code block,@Autowired KafkaEmbedded kafkaEmbedded
is already there, just like that you can autowire forkafkaListenerEndpointRegistry
-
Yuna Braska about 6 yearsI get every time the same error while I am testing different solutions: ERROR org.springframework.kafka.support.LoggingProducerListener:76 - Exception thrown when sending a message with key='null' and payload='Hello Message!' to topic myTopic: org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 60000 ms.
-
donm about 6 yearsI have just updated
@TestConfiguration
code in the answer, Hope it will help you! -
Yuna Braska about 6 yearsI still don't know where "@EmbeddedKafka" is coming from. which dependency Is needed for that? I am using currently "spring-kafka-test"
-
donm about 6 years
spring-kafka-test
has@EmbeddedKafka
, you should be able to use@Autowired KafkaEmbedded kafkaEmbeded;
-
Yuna Braska about 6 yearsokay found it!! [@EmbeddedKafka] is not included in newer versions of "spring-kafka-test"
-
dav.garcia over 5 years
TestConfig
is best declared as an inner class withinKafkaConsumerTest
. In this case: a) It must bestatic
b)KafkaEmbedded
must be injected as a parameter of the methodproducerFactory
c) InjectProducerFactory
as a parameter to the methodkafkaTemplate
and then use it instead of callingproducerFactory()
. -
Flashpix over 5 yearsYou can set spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers} in your application.properties for the test, that should work. This is filled from EmbeddedKafka with the random port it was assigned on startup.
-
Manish Bansal almost 5 yearsare you testing embedded kafka itself?
-
legend over 4 yearsThis embedded kafka solution not working with cucumber test.
-
Debadatta about 4 yearsjava.lang.NoClassDefFoundError: kafka/common/KafkaException
-
Aldo Inácio da Silva about 3 yearsThis annotation @EmbeddedKafka in my case came from spring-kafka-test-2.6.5. I have a dependency in pom to spring-kafka-test and I'm using spring-boot 2.4.2 version.@Sylhare
-
Tom AsIdea over 2 yearsThe
setup()
method withContainerTestUtils.waitForAssignment(..)
was gold for us. We encountered the consumer to hang after another test class which caused the consumer for the next test to not receive anything. We also use@DirtiesContext(AFTER_CLASS)
-
Naveen Kulkarni almost 2 yearsWas searching for easiest option for the configuration of embedded kafka and found this. Thanks for posting it out.