EmbeddedKafka how to check received messages in unit test

10,326

This works for me. Give it a try

@RunWith(SpringRunner.class)
@SpringBootTest
public class KafkaEmbeddedTest {

    private static String SENDER_TOPIC = "testTopic";

    @ClassRule
    // By default it creates two partitions.
    public static KafkaEmbedded embeddedKafka = new KafkaEmbedded(1, true, SENDER_TOPIC); 

    @Test
    public void testSend() throws InterruptedException, ExecutionException {

        Map<String, Object> senderProps = KafkaTestUtils.producerProps(embeddedKafka);
        //If you wish to send it to partitions other than 0 and 1, 
        //then you need to specify number of paritions in the declaration

        KafkaProducer<Integer, String> producer = new KafkaProducer<>(senderProps);
        producer.send(new ProducerRecord<>(SENDER_TOPIC, 0, 0, "message00")).get();
        producer.send(new ProducerRecord<>(SENDER_TOPIC, 0, 1, "message01")).get();
        producer.send(new ProducerRecord<>(SENDER_TOPIC, 1, 0, "message10")).get();


        Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("sampleRawConsumer", "false", embeddedKafka);
        // Make sure you set the offset as earliest, because by the 
        // time consumer starts, producer might have sent all messages
        consumerProps.put("auto.offset.reset", "earliest");

        final List<String> receivedMessages = Lists.newArrayList();
        final CountDownLatch latch = new CountDownLatch(3);
        ExecutorService executorService = Executors.newSingleThreadExecutor();
        executorService.execute(() -> {
            KafkaConsumer<Integer, String> kafkaConsumer = new KafkaConsumer<>(consumerProps);
            kafkaConsumer.subscribe(Collections.singletonList(SENDER_TOPIC));
            try {
                while (true) {
                    ConsumerRecords<Integer, String> records = kafkaConsumer.poll(100);
                    records.iterator().forEachRemaining(record -> {
                        receivedMessages.add(record.value());
                        latch.countDown();
                    });
                }
            } finally {
                kafkaConsumer.close();
            }
        });

    latch.await(10, TimeUnit.SECONDS);
    assertTrue(receivedMessages.containsAll(Arrays.asList("message00", "message01", "message10")));
    }
}

I am using countdown latch because Producer.Send(..) is an async operation. So what i am doing here is waiting in an infinite loop polling kafka every 100 milliseconds, if there is new record and if so adding it to a List for future assertions and then reducing the countdown. And I will wait for 10 seconds in total just to be sure.
You can as well use a simple loop and then exit after a few minutes.(If you don't wish to use CountdownLatch and ExecutorService stuff)

Share:
10,326
dermoritz
Author by

dermoritz

Updated on June 04, 2022

Comments

  • dermoritz
    dermoritz almost 2 years

    I created a spring boot application that sends messages to a Kafka topic. I am using spring spring-integration-kafka: A KafkaProducerMessageHandler<String,String> is subscribed to a channel (SubscribableChannel) and pushes all messages received to one topic. The application works fine. I see messages arriving in Kafka via console consumer (local kafka).

    I also create an Integrationtest that uses KafkaEmbedded. I am checking the expected messages by subscribing to the channel within the test - all is fine.

    But i want the test to check also the messages put into kafka. Sadly Kafka's JavaDoc is not the best. What i tried so far is:

    @ClassRule
    public static KafkaEmbedded kafkaEmbedded = new KafkaEmbedded(1, true, "myTopic");
    //...
    @Before
    public void init() throws Exception {
    
        mockConsumer = new MockConsumer<>( OffsetResetStrategy.EARLIEST );
        kafkaEmbedded.consumeFromAnEmbeddedTopic( mockConsumer,"sikom" );
    
    }
    //...
    
    @Test
    public void endToEnd() throws Exception {
    //  ...
    
        ConsumerRecords<String, String> records = mockConsumer.poll( 10000 );
    
        StreamSupport.stream(records.spliterator(), false).forEach( record -> log.debug( "record: " + record.value() ) );
    
    
    }
    

    The problem is that i don't see any records. I am not sure if my KafkaEmbedded setup is correct. But messages are receive by the channel.

  • Gary Russell
    Gary Russell about 6 years
    The key here is consumerProps.put("auto.offset.reset", "earliest"); otherwise the consumer connects after the message has sent and consumes from the end.
  • Usr
    Usr about 5 years
    Hi, KafkaEmbedded is deprecated now, what is the alternative?
  • billydh
    billydh over 4 years
    @Usr you can try EmbeddedKafkaRule