Test Kafka Streams topology

10,001

Solution 1

Update Kafka 1.1.0 (released 23-Mar-2018):

KIP-247 added official test utils. Per the Upgrade Guide:

There is a new artifact kafka-streams-test-utils providing a TopologyTestDriver, ConsumerRecordFactory, and OutputVerifier class. You can include the new artifact as a regular dependency to your unit tests and use the test driver to test your business logic of your Kafka Streams application. For more details, see KIP-247.

From the documentation:

    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-streams-test-utils</artifactId>
        <version>1.1.0</version>
        <scope>test</scope>
    </dependency>

The test driver simulates the library runtime that continuously fetches records from input topics and processes them by traversing the topology. You can use the test driver to verify that your specified processor topology computes the correct result with the manually piped in data records. The test driver captures the results records and allows to query its embedded state stores:

    // Create your topology
    Topology topology = new Topology();
    Properties config = new Properties();
    config.put(StreamsConfig.APPLICATION_ID_CONFIG, "test");
    config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:1234");

    // Run it on the test driver
    TopologyTestDriver testDriver = new TopologyTestDriver(topology, config);

    // Feed input data
    ConsumerRecordFactory<String, Integer> factory = new ConsumerRecordFactory<>("input-topic", new StringSerializer(), new IntegerSerializer());
    testDriver.pipe(factory.create("key", 42L));

    // Verify output
    ProducerRecord<String, Integer> outputRecord = testDriver.readOutput("output-topic", new StringDeserializer(), new LongDeserializer());

See the documentation for details.


ProcessorTopologyTestDriver is available as of 0.11.0.0. It is available in the kafka-streams test artifact (specified with <classifier>test</classifier> in Maven):

    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-streams</artifactId>
        <version>0.11.0.0</version>
        <classifier>test</classifier>
        <scope>test</scope>
    </dependency>

You will also need to add the kafka-clients test artifact:

    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>0.11.0.0</version>
        <classifier>test</classifier>
        <scope>test</scope>
    </dependency>

Then you can use the test driver. Per the Javadoc, first create a ProcessorTopologyTestDriver:

    StringSerializer strSerializer = new StringSerializer();
    StringDeserializer strDeserializer = new StringDeserializer();
    Properties props = new Properties();
    props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9091");
    props.setProperty(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, CustomTimestampExtractor.class.getName());
    props.setProperty(StreamsConfig.KEY_SERIALIZER_CLASS_CONFIG, strSerializer.getClass().getName());
    props.setProperty(StreamsConfig.KEY_DESERIALIZER_CLASS_CONFIG, strDeserializer.getClass().getName());
    props.setProperty(StreamsConfig.VALUE_SERIALIZER_CLASS_CONFIG, strSerializer.getClass().getName());
    props.setProperty(StreamsConfig.VALUE_DESERIALIZER_CLASS_CONFIG, strDeserializer.getClass().getName());
    StreamsConfig config = new StreamsConfig(props);
    TopologyBuilder builder = ...
    ProcessorTopologyTestDriver driver = new ProcessorTopologyTestDriver(config, builder);

You can feed input into the topology as though you had actually written to one of the input topics:

    driver.process("input-topic", "key1", "value1", strSerializer, strSerializer);

And read output topics:

    ProducerRecord<String, String> record1 = driver.readOutput("output-topic-1", strDeserializer, strDeserializer);
    ProducerRecord<String, String> record2 = driver.readOutput("output-topic-1", strDeserializer, strDeserializer);
    ProducerRecord<String, String> record3 = driver.readOutput("output-topic-2", strDeserializer, strDeserializer);

Then you can assert on these results.

Solution 2

  1. As you are asking if it is possible to test Kafka Streams application without a real Kafka setup, you might try this Mocked Streams library in Scala. Mocked Streams 1.0 is a library for Scala >= 2.11.8 which allows you to unit-test processing topologies of Kafka Streams applications (since Apache Kafka >=0.10.1) without Zookeeper and Kafka Brokers. Reference: https://github.com/jpzk/mockedstreams

  2. You can also use scalatest-embedded-kafka which is a library that provides an in-memory Kafka broker to run your ScalaTest specs against. It uses Kafka 0.10.1.1 and ZooKeeper 3.4.8.
    Reference: https://github.com/manub/scalatest-embedded-kafka#scalatest-embedded-kafka-streams

Good luck!

Solution 3

Spring kafka has support for unit testing with an embedded kafka see https://docs.spring.io/spring-kafka/docs/2.1.0.RELEASE/reference/html/_reference.html#__embeddedkafka_annotation.

Also the kafka team is working on releasing a test driver for streams https://issues.apache.org/jira/browse/KAFKA-3625.

Solution 4

You can just run a single Zookeeper and broker locally to test a Kafka Streams application.

Just follow those quick start guides:

Also check out this Kafka Streams examples (with detailed walk through instructions in the JavaDocs):

Solution 5

You should check Kafka Unit here.

Your test setup should look something like this:

KafkaUnit kafkaUnitServer = new KafkaUnit();
kafkaUnitServer.startup();
kafkaUnitServer.createTopic(testTopic);
KeyedMessage<String, String> keyedMessage = new KeyedMessage<>(testTopic, "key", "value");
kafkaUnitServer.sendMessages(keyedMessage);

And then to read your messages and assert that everything went ok you do something like this:

List<String> messages = kafkaUnitServer.readMessages(testTopic, 1);

This actually spins up an embedded kafka that helps you have everything you need contained in a test.

You could get a little bit fancier and setup your embedded kafka as a setup() method (or setupSpec() in Spock) and stop your embedded kafka in a teardown().

Share:
10,001
imehl
Author by

imehl

Updated on July 15, 2022

Comments

  • imehl
    imehl almost 2 years

    I'm searching a way to test a Kafka Streams application. So that I can define the input events and the test suite shows me the output.

    Is this possible without a real Kafka setup?