Testing a @KafkaListener using Spring Embedded Kafka
Solution 1
You are probably sending the message before the consumer has been assigned the topic/partition. Set property...
spring:
kafka:
consumer:
auto-offset-reset: earliest
...it defaults to latest
.
This is like using --from-beginning
with the console consumer.
EDIT
Oh; you're not using boot's properties.
Add
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
EDIT2
BTW, you should probably also do a get(10L, TimeUnit.SECONDS)
on the result of the template.send()
(a Future<>
) to assert that the send was successful.
EDIT3
To override the offset reset just for the test, you can do the same as what you did for the broker addresses:
@Value("${spring.kafka.consumer.auto-offset-reset:latest}")
private String reset;
...
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, this.reset);
and
@TestPropertySource(properties = { "spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}",
"spring.kafka.consumer.auto-offset-reset=earliest"})
However, bear in mind that this property only applies the first time a group consumes. To always start at the end each time the app starts, you have to seek to the end during startup.
Also, I would recommend setting enable.auto.commit
to false
so that the container takes care of committing the offsets rather than just relying on the consumer client doing it on a time schedule.
Solution 2
Maybe someone will find this useful. I had a similar problem.
Locally tests were running (some checks were performed within Awaitility.waitAtMost
) but in the Jenkins pipeline, tests were failing.
The solution was, like already mentioned in the most voted answer, setting auto-offset-reset=earliest
.
When tests are running, you can check if you set the configuration properly by looking into test logs. Spring outputs configuration for both producer and consumer
Comments
-
riccardo.cardin over 3 years
I am trying to write a unit test for a Kafka listener that I am developing using Spring Boot 2.x. Being a unit test, I don't want to start up a full Kafka server an instance of Zookeeper. So, I decided to use Spring Embedded Kafka.
The definition of my listener is very basic.
@Component public class Listener { private final CountDownLatch latch; @Autowired public Listener(CountDownLatch latch) { this.latch = latch; } @KafkaListener(topics = "sample-topic") public void listen(String message) { latch.countDown(); } }
Also the test, that verifies the
latch
counter to be equal to zero after receiving a message, is very easy.@RunWith(SpringRunner.class) @SpringBootTest @DirtiesContext @EmbeddedKafka(topics = { "sample-topic" }) @TestPropertySource(properties = { "spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}" }) public class ListenerTest { @Autowired private KafkaEmbedded embeddedKafka; @Autowired private CountDownLatch latch; private KafkaTemplate<Integer, String> producer; @Before public void setUp() { this.producer = buildKafkaTemplate(); this.producer.setDefaultTopic("sample-topic"); } private KafkaTemplate<Integer, String> buildKafkaTemplate() { Map<String, Object> senderProps = KafkaTestUtils.producerProps(embeddedKafka); ProducerFactory<Integer, String> pf = new DefaultKafkaProducerFactory<>(senderProps); return new KafkaTemplate<>(pf); } @Test public void listenerShouldConsumeMessages() throws InterruptedException { // Given producer.sendDefault(1, "Hello world"); // Then assertThat(latch.await(10L, TimeUnit.SECONDS)).isTrue(); } }
Unfortunately, the test fails and I cannot understand why. Is it possible to use an instance of
KafkaEmbedded
to test a method marked with the annotation@KafkaListener
?All the code is shared in my GitHub repository kafka-listener.
Thanks to all.
-
Gary Russell about 6 yearsSee the edit to my answer; I didn't notice you weren't using boot's config properties for the consumer.
-
-
riccardo.cardin about 6 yearsThank you. Set the
auto.offset.reset
property toearliest
made the magic :) -
riccardo.cardin about 6 yearsHowever, if I need to use
latest
asauto.offset.reset
value? How can I make the test works? Thanks a lot. -
Rob almost 5 yearsHow is this ever testing the "Listener" class that contains the @KafkaListener method? I see the CountDownLatch assertion, but never any assertion that the method was hit...
-
Gary Russell almost 5 years? the code in the
@KafkaListener
method counts down the latch, hence it was called.Typically, however, you wouldn't do it like this. Most likely the listener invokes a service and you would inject a mock or stubbed service instead. -
rios0rios0 almost 3 yearsVery useful, thanks! When I was using Intellij I had the same problem trying to collect Coverage.