Testing a @KafkaListener using Spring Embedded Kafka

22,039

Solution 1

You are probably sending the message before the consumer has been assigned the topic/partition. Set property...

spring:
  kafka:
    consumer:
      auto-offset-reset: earliest

...it defaults to latest.

This is like using --from-beginning with the console consumer.

EDIT

Oh; you're not using boot's properties.

Add

props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

EDIT2

BTW, you should probably also do a get(10L, TimeUnit.SECONDS) on the result of the template.send() (a Future<>) to assert that the send was successful.

EDIT3

To override the offset reset just for the test, you can do the same as what you did for the broker addresses:

@Value("${spring.kafka.consumer.auto-offset-reset:latest}")
private String reset;

...

    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, this.reset);

and

@TestPropertySource(properties = { "spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}",
        "spring.kafka.consumer.auto-offset-reset=earliest"})

However, bear in mind that this property only applies the first time a group consumes. To always start at the end each time the app starts, you have to seek to the end during startup.

Also, I would recommend setting enable.auto.commit to false so that the container takes care of committing the offsets rather than just relying on the consumer client doing it on a time schedule.

Solution 2

Maybe someone will find this useful. I had a similar problem. Locally tests were running (some checks were performed within Awaitility.waitAtMost) but in the Jenkins pipeline, tests were failing.

The solution was, like already mentioned in the most voted answer, setting auto-offset-reset=earliest. When tests are running, you can check if you set the configuration properly by looking into test logs. Spring outputs configuration for both producer and consumer

Share:
22,039
riccardo.cardin
Author by

riccardo.cardin

Computer science addicted.

Updated on January 02, 2021

Comments

  • riccardo.cardin
    riccardo.cardin over 3 years

    I am trying to write a unit test for a Kafka listener that I am developing using Spring Boot 2.x. Being a unit test, I don't want to start up a full Kafka server an instance of Zookeeper. So, I decided to use Spring Embedded Kafka.

    The definition of my listener is very basic.

    @Component
    public class Listener {
        private final CountDownLatch latch;
    
        @Autowired
        public Listener(CountDownLatch latch) {
            this.latch = latch;
        }
    
        @KafkaListener(topics = "sample-topic")
        public void listen(String message) {
            latch.countDown();
        }
    }
    

    Also the test, that verifies the latch counter to be equal to zero after receiving a message, is very easy.

    @RunWith(SpringRunner.class)
    @SpringBootTest
    @DirtiesContext
    @EmbeddedKafka(topics = { "sample-topic" })
    @TestPropertySource(properties = { "spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}" })
    public class ListenerTest {
    
        @Autowired
        private KafkaEmbedded embeddedKafka;
    
        @Autowired
        private CountDownLatch latch;
    
        private KafkaTemplate<Integer, String> producer;
    
        @Before
        public void setUp() {
            this.producer = buildKafkaTemplate();
            this.producer.setDefaultTopic("sample-topic");
        }
    
        private KafkaTemplate<Integer, String> buildKafkaTemplate() {
            Map<String, Object> senderProps = KafkaTestUtils.producerProps(embeddedKafka);
            ProducerFactory<Integer, String> pf = new DefaultKafkaProducerFactory<>(senderProps);
            return new KafkaTemplate<>(pf);
        }
    
        @Test
        public void listenerShouldConsumeMessages() throws InterruptedException {
            // Given
            producer.sendDefault(1, "Hello world");
            // Then
            assertThat(latch.await(10L, TimeUnit.SECONDS)).isTrue();
        }
    }
    

    Unfortunately, the test fails and I cannot understand why. Is it possible to use an instance of KafkaEmbedded to test a method marked with the annotation @KafkaListener?

    All the code is shared in my GitHub repository kafka-listener.

    Thanks to all.

    • Gary Russell
      Gary Russell about 6 years
      See the edit to my answer; I didn't notice you weren't using boot's config properties for the consumer.
  • riccardo.cardin
    riccardo.cardin about 6 years
    Thank you. Set the auto.offset.reset property to earliest made the magic :)
  • riccardo.cardin
    riccardo.cardin about 6 years
    However, if I need to use latest as auto.offset.reset value? How can I make the test works? Thanks a lot.
  • Rob
    Rob almost 5 years
    How is this ever testing the "Listener" class that contains the @KafkaListener method? I see the CountDownLatch assertion, but never any assertion that the method was hit...
  • Gary Russell
    Gary Russell almost 5 years
    ? the code in the @KafkaListener method counts down the latch, hence it was called.Typically, however, you wouldn't do it like this. Most likely the listener invokes a service and you would inject a mock or stubbed service instead.
  • rios0rios0
    rios0rios0 almost 3 years
    Very useful, thanks! When I was using Intellij I had the same problem trying to collect Coverage.