How to create unit test with kafka embedded in the spring cloud stream

10,730

We generally recommend using the Test Binder in tests but if you want to use an embedded kafka server, it can be done...

Add this to your POM...

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka-test</artifactId>
    <scope>test</scope>
</dependency>

Test app...

@SpringBootApplication
@EnableBinding(Processor.class)
public class So43330544Application {

    public static void main(String[] args) {
        SpringApplication.run(So43330544Application.class, args);
    }

    @StreamListener(Processor.INPUT)
    @SendTo(Processor.OUTPUT)
    public byte[] handle(byte[] in){
        return new String(in).toUpperCase().getBytes();
    }

}

application.properties...

spring.cloud.stream.bindings.output.destination=so0544out
spring.cloud.stream.bindings.input.destination=so0544in
spring.cloud.stream.bindings.output.producer.headerMode=raw
spring.cloud.stream.bindings.input.consumer.headerMode=raw
spring.cloud.stream.bindings.input.group=so0544

Test case...

@RunWith(SpringRunner.class)
@SpringBootTest
public class So43330544ApplicationTests {

    @ClassRule
    public static KafkaEmbedded embeddedKafka = new KafkaEmbedded(1);

    @Autowired
    private KafkaTemplate<byte[], byte[]> template;

    @Autowired
    private KafkaProperties properties;

    @BeforeClass
    public static void setup() {
        System.setProperty("spring.kafka.bootstrap-servers", embeddedKafka.getBrokersAsString());
    }

    @Test
    public void testSendReceive() {
        template.send("so0544in", "foo".getBytes());
        Map<String, Object> configs = properties.buildConsumerProperties();
        configs.put(ConsumerConfig.GROUP_ID_CONFIG, "test0544");
        configs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        ConsumerFactory<byte[], byte[]> cf = new DefaultKafkaConsumerFactory<>(configs);
        Consumer<byte[], byte[]> consumer = cf.createConsumer();
        consumer.subscribe(Collections.singleton("so0544out"));
        ConsumerRecords<byte[], byte[]> records = consumer.poll(10_000);
        consumer.commitSync();
        assertThat(records.count()).isEqualTo(1);
        assertThat(new String(records.iterator().next().value())).isEqualTo("FOO");
    }

}
Share:
10,730
Tiago Costa
Author by

Tiago Costa

Updated on June 15, 2022

Comments

  • Tiago Costa
    Tiago Costa almost 2 years

    Sorry for the question being too generic, but someone has some tutorial or guide on how to perform producer and consumer testing with kafka embedded. I've tried several, but there are several versions of dependencies and none actually works =/

    I'm using spring cloud stream kafka.