Simple embedded Kafka test example with spring boot

105,251

Solution 1

Embedded Kafka tests work for me with below configs,

Annotation on test class

@EnableKafka
@SpringBootTest(classes = {KafkaController.class}) // Specify @KafkaListener class if its not the same class, or not loaded with test config
@EmbeddedKafka(
    partitions = 1, 
    controlledShutdown = false,
    brokerProperties = {
        "listeners=PLAINTEXT://localhost:3333", 
        "port=3333"
})
public class KafkaConsumerTest {
    @Autowired
    KafkaEmbedded kafkaEmbeded;

    @Autowired
    KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry;

Before annotation for setup method

@Before
public void setUp() throws Exception {
  for (MessageListenerContainer messageListenerContainer : kafkaListenerEndpointRegistry.getListenerContainers()) {
    ContainerTestUtils.waitForAssignment(messageListenerContainer, 
    kafkaEmbeded.getPartitionsPerTopic());
  }
}

Note: I am not using @ClassRule for creating embedded Kafka rather auto-wiring
@Autowired embeddedKafka

@Test
public void testReceive() throws Exception {
     kafkaTemplate.send(topic, data);
}

Hope this helps!

Edit: Test configuration class marked with @TestConfiguration

@TestConfiguration
public class TestConfig {

@Bean
public ProducerFactory<String, String> producerFactory() {
    return new DefaultKafkaProducerFactory<>(KafkaTestUtils.producerProps(kafkaEmbedded));
}

@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
    KafkaTemplate<String, String> kafkaTemplate = new KafkaTemplate<>(producerFactory());
    kafkaTemplate.setDefaultTopic(topic);
    return kafkaTemplate;
}

Now @Test method will autowire KafkaTemplate and use is to send message

kafkaTemplate.send(topic, data);

Updated answer code block with above line

Solution 2

since the accepted answer doesn't compile or work for me. I find another solution based on https://blog.mimacom.com/testing-apache-kafka-with-spring-boot/ what I would like to share with you.

The dependency is 'spring-kafka-test' version: '2.2.7.RELEASE'

@RunWith(SpringRunner.class)
@EmbeddedKafka(partitions = 1, topics = { "testTopic" })
@SpringBootTest
public class SimpleKafkaTest {

    private static final String TEST_TOPIC = "testTopic";

    @Autowired
    EmbeddedKafkaBroker embeddedKafkaBroker;

    @Test
    public void testReceivingKafkaEvents() {
        Consumer<Integer, String> consumer = configureConsumer();
        Producer<Integer, String> producer = configureProducer();

        producer.send(new ProducerRecord<>(TEST_TOPIC, 123, "my-test-value"));

        ConsumerRecord<Integer, String> singleRecord = KafkaTestUtils.getSingleRecord(consumer, TEST_TOPIC);
        assertThat(singleRecord).isNotNull();
        assertThat(singleRecord.key()).isEqualTo(123);
        assertThat(singleRecord.value()).isEqualTo("my-test-value");

        consumer.close();
        producer.close();
    }

    private Consumer<Integer, String> configureConsumer() {
        Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("testGroup", "true", embeddedKafkaBroker);
        consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        Consumer<Integer, String> consumer = new DefaultKafkaConsumerFactory<Integer, String>(consumerProps)
                .createConsumer();
        consumer.subscribe(Collections.singleton(TEST_TOPIC));
        return consumer;
    }

    private Producer<Integer, String> configureProducer() {
        Map<String, Object> producerProps = new HashMap<>(KafkaTestUtils.producerProps(embeddedKafkaBroker));
        return new DefaultKafkaProducerFactory<Integer, String>(producerProps).createProducer();
    }
}

Solution 3

I solved the issue now

@BeforeClass
public static void setUpBeforeClass() {
    System.setProperty("spring.kafka.bootstrap-servers", embeddedKafka.getBrokersAsString());
    System.setProperty("spring.cloud.stream.kafka.binder.zkNodes", embeddedKafka.getZookeeperConnectionString());
}

while I was debugging, I saw that the embedded kaka server is taking a random port.

I couldn't find the configuration for it, so I am setting the kafka config same as the server. Looks still a bit ugly for me.

I would love to have just the @Mayur mentioned line

@EmbeddedKafka(partitions = 1, controlledShutdown = false, brokerProperties = {"listeners=PLAINTEXT://localhost:9092", "port=9092"})

but can't find the right dependency in the internet.

Solution 4

In integration testing, having fixed ports like 9092 is not recommended because multiple tests should have the flexibility to open their own ports from embedded instances. So, following implementation is something like that,

NB: this implementation is based on junit5(Jupiter:5.7.0) and spring-boot 2.3.4.RELEASE

TestClass:

@EnableKafka
@SpringBootTest(classes = {ConsumerTest.Config.class, Consumer.class})
@EmbeddedKafka(
        partitions = 1,
        controlledShutdown = false)
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
public class ConsumerTest {

    @Autowired
    private EmbeddedKafkaBroker kafkaEmbedded;

    @Autowired
    private KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry;

    @BeforeAll
    public void setUp() throws Exception {
        for (final MessageListenerContainer messageListenerContainer : kafkaListenerEndpointRegistry.getListenerContainers()) {
            ContainerTestUtils.waitForAssignment(messageListenerContainer,
                    kafkaEmbedded.getPartitionsPerTopic());
        }
    }

    @Value("${topic.name}")
    private String topicName;

    @Autowired
    private KafkaTemplate<String, Optional<Map<String, List<ImmutablePair<String, String>>>>> requestKafkaTemplate;

    @Test
    public void consume_success() {
        requestKafkaTemplate.send(topicName, load);
    }


    @Configuration
    @Import({
            KafkaListenerConfig.class,
            TopicConfig.class
    })
    public static class Config {

        @Value(value = "${spring.kafka.bootstrap-servers}")
        private String bootstrapAddress;

        @Bean
        public ProducerFactory<String, Optional<Map<String, List<ImmutablePair<String, String>>>>> requestProducerFactory() {
            final Map<String, Object> configProps = new HashMap<>();
            configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
            configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
            configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
            return new DefaultKafkaProducerFactory<>(configProps);
        }

        @Bean
        public KafkaTemplate<String, Optional<Map<String, List<ImmutablePair<String, String>>>>> requestKafkaTemplate() {
            return new KafkaTemplate<>(requestProducerFactory());
        }
    }
}

Listener Class:

@Component
public class Consumer {
    @KafkaListener(
            topics = "${topic.name}",
            containerFactory = "listenerContainerFactory"
    )
    @Override
    public void listener(
            final ConsumerRecord<String, Optional<Map<String, List<ImmutablePair<String, String>>>>> consumerRecord,
            final @Payload Optional<Map<String, List<ImmutablePair<String, String>>>> payload
    ) {
        
    }
}

Listner Config:

@Configuration
public class KafkaListenerConfig {

    @Value(value = "${spring.kafka.bootstrap-servers}")
    private String bootstrapAddress;

    @Value(value = "${topic.name}")
    private String resolvedTreeQueueName;

    @Bean
    public ConsumerFactory<String, Optional<Map<String, List<ImmutablePair<String, String>>>>> resolvedTreeConsumerFactory() {
        final Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, resolvedTreeQueueName);
        return new DefaultKafkaConsumerFactory<>(props, new StringDeserializer(), new CustomDeserializer());
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, Optional<Map<String, List<ImmutablePair<String, String>>>>> resolvedTreeListenerContainerFactory() {
        final ConcurrentKafkaListenerContainerFactory<String, Optional<Map<String, List<ImmutablePair<String, String>>>>> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(resolvedTreeConsumerFactory());
        return factory;
    }

}

TopicConfig:

@Configuration
public class TopicConfig {

    @Value(value = "${spring.kafka.bootstrap-servers}")
    private String bootstrapAddress;

    @Value(value = "${topic.name}")
    private String requestQueue;

    @Bean
    public KafkaAdmin kafkaAdmin() {
        Map<String, Object> configs = new HashMap<>();
        configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
        return new KafkaAdmin(configs);
    }

    @Bean
    public NewTopic requestTopic() {
        return new NewTopic(requestQueue, 1, (short) 1);
    }
}

application.properties:

spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}

This assignment is the most important assignment that would bind the embedded instance port to the KafkaTemplate and, KafkaListners.

Following the above implementation, you could open dynamic ports per test class and, it would be more convenient.

Share:
105,251

Related videos on Youtube

Yuna Braska
Author by

Yuna Braska

Updated on July 09, 2022

Comments

  • Yuna Braska
    Yuna Braska almost 2 years

    Edit FYI: working gitHub example


    I was searching the internet and couldn't find a working and simple example of an embedded Kafka test.

    My setup is:

    • Spring boot
    • Multiple @KafkaListener with different topics in one class
    • Embedded Kafka for test which is starting fine
    • Test with Kafkatemplate which is sending to topic but the @KafkaListener methods are not receiving anything even after a huge sleep time
    • No warnings or errors are shown, only info spam from Kafka in logs

    Please help me. There are mostly over configured or overengineered examples. I am sure it can be done simple. Thanks, guys!

    @Controller
    public class KafkaController {
    
        private static final Logger LOG = getLogger(KafkaController.class);
    
        @KafkaListener(topics = "test.kafka.topic")
        public void receiveDunningHead(final String payload) {
            LOG.debug("Receiving event with payload [{}]", payload);
            //I will do database stuff here which i could check in db for testing
        }
    }
    

    private static String SENDER_TOPIC = "test.kafka.topic";

    @ClassRule
    public static KafkaEmbedded embeddedKafka = new KafkaEmbedded(1, true, SENDER_TOPIC);
    
    @Test
        public void testSend() throws InterruptedException, ExecutionException {
    
            Map<String, Object> senderProps = KafkaTestUtils.producerProps(embeddedKafka);
    
            KafkaProducer<Integer, String> producer = new KafkaProducer<>(senderProps);
            producer.send(new ProducerRecord<>(SENDER_TOPIC, 0, 0, "message00")).get();
            producer.send(new ProducerRecord<>(SENDER_TOPIC, 0, 1, "message01")).get();
            producer.send(new ProducerRecord<>(SENDER_TOPIC, 1, 0, "message10")).get();
            Thread.sleep(10000);
        }
    
    • pvpkiran
      pvpkiran about 6 years
      show the code. see if this helps stackoverflow.com/questions/48682745/…
    • Yuna Braska
      Yuna Braska about 6 years
      @pvpkiran this is still not working. the test does only test itself but never reaches my KafkaListener when i just take the sending part to my topic
    • Artem Bilan
      Artem Bilan about 6 years
      It’s not clear by your test code how that KafkaController is involved. How are you sure that the listener is started?
    • Yuna Braska
      Yuna Braska about 6 years
      @ArtemBilan cause there is the [@KafkaListener] annotation on the method. or do I have todo something else?
    • Artem Bilan
      Artem Bilan about 6 years
      Right, the test needs to bootstrap an application context with that component
    • Yuna Braska
      Yuna Braska about 6 years
      @ArtemBilan its spring boot, the context is starting. with the test annotation [@RunWith(SpringRunner.class) @SpringBootTest] unfortunately, even get this example fails: codenotfound.com/spring-kafka-boot-example.html
    • Artem Bilan
      Artem Bilan about 6 years
      Right, but how that @SpringBootTest know about your KafkaController component? How is it scanned or configured ?
    • Artem Bilan
      Artem Bilan about 6 years
      Pay attention how that sample has @SpringBootApplication class in the same package as @SpringBootTest. And those Receiver and Sender components are in the nested packages. So, they all are clearly scanned and configured. And that's how it works. If your @SpringBootTest is in different package, your component are not visible and you should provide some @Configuraiton class.
  • Yuna Braska
    Yuna Braska about 6 years
    Thanks! this sounds great, but where does [@EmbeddedKafka] and [kafkaListenerEndpointRegistry] come from? Can you post a full example with imports?
  • donm
    donm about 6 years
    Since we have annotated our class with @EnableKafka and @EmbeddedKafka annotations, you can autowire both in the test class. In answer first code block, @Autowired KafkaEmbedded kafkaEmbedded is already there, just like that you can autowire for kafkaListenerEndpointRegistry
  • Yuna Braska
    Yuna Braska about 6 years
    I get every time the same error while I am testing different solutions: ERROR org.springframework.kafka.support.LoggingProducerListener:76 - Exception thrown when sending a message with key='null' and payload='Hello Message!' to topic myTopic: org.apache.kafka.common.errors.TimeoutException: Failed to update metadata after 60000 ms.
  • donm
    donm about 6 years
    I have just updated @TestConfiguration code in the answer, Hope it will help you!
  • Yuna Braska
    Yuna Braska about 6 years
    I still don't know where "@EmbeddedKafka" is coming from. which dependency Is needed for that? I am using currently "spring-kafka-test"
  • donm
    donm about 6 years
    spring-kafka-test has @EmbeddedKafka, you should be able to use @Autowired KafkaEmbedded kafkaEmbeded;
  • Yuna Braska
    Yuna Braska about 6 years
    okay found it!! [@EmbeddedKafka] is not included in newer versions of "spring-kafka-test"
  • dav.garcia
    dav.garcia over 5 years
    TestConfig is best declared as an inner class within KafkaConsumerTest. In this case: a) It must be static b) KafkaEmbedded must be injected as a parameter of the method producerFactory c) Inject ProducerFactory as a parameter to the method kafkaTemplate and then use it instead of calling producerFactory().
  • Flashpix
    Flashpix over 5 years
    You can set spring.kafka.bootstrap-servers=${spring.embedded.kafka.broke‌​rs} in your application.properties for the test, that should work. This is filled from EmbeddedKafka with the random port it was assigned on startup.
  • Manish Bansal
    Manish Bansal almost 5 years
    are you testing embedded kafka itself?
  • legend
    legend over 4 years
    This embedded kafka solution not working with cucumber test.
  • Debadatta
    Debadatta about 4 years
    java.lang.NoClassDefFoundError: kafka/common/KafkaException
  • Aldo Inácio da Silva
    Aldo Inácio da Silva about 3 years
    This annotation @EmbeddedKafka in my case came from spring-kafka-test-2.6.5. I have a dependency in pom to spring-kafka-test and I'm using spring-boot 2.4.2 version.@Sylhare
  • Tom AsIdea
    Tom AsIdea over 2 years
    The setup() method with ContainerTestUtils.waitForAssignment(..) was gold for us. We encountered the consumer to hang after another test class which caused the consumer for the next test to not receive anything. We also use @DirtiesContext(AFTER_CLASS)
  • Naveen Kulkarni
    Naveen Kulkarni almost 2 years
    Was searching for easiest option for the configuration of embedded kafka and found this. Thanks for posting it out.