How to write Unit test case for adding callback for ListenableFuture

10,942

You can write a test case like this.

    @Test
    public void can_publishDataToKafka() {
        String key = someAlphanumericString();
        String topic = someAlphaString(10);
        long offset = somePositiveLong();
        int partition = somePositiveInteger();

        SiebelRecord siebelRecord = mock(SiebelRecord.class);
        SendResult<String, Object> sendResult = mock(SendResult.class);
        ListenableFuture<SendResult<String, Object>> responseFuture = mock(ListenableFuture.class);
        RecordMetadata recordMetadata = new RecordMetadata(new TopicPartition(topic, partition), offset, 0L, 0L, 0L, 0, 0);

        given(sendResult.getRecordMetadata()).willReturn(recordMetadata);
        when(kafkaTemplate.send(topic, key, siebelRecord)).thenReturn(responseFuture);
        doAnswer(invocationOnMock -> {
            ListenableFutureCallback listenableFutureCallback = invocationOnMock.getArgument(0);
            listenableFutureCallback.onSuccess(sendResult);
            assertEquals(sendResult.getRecordMetadata().offset(), offset);
            assertEquals(sendResult.getRecordMetadata().partition(), partition);
            return null;
        }).when(responseFuture).addCallback(any(ListenableFutureCallback.class));

        service.publishDataToKafka(key, topic, siebelRecord);

        verify(kafkaTemplate, times(1)).send(topic, key, siebelRecord);
    }

    @Test(expected = KafkaException.class)
    public void can_capture_failure_publishDataToKafka() {
        String key = someAlphanumericString();
        String topic = someAlphaString(10);
        String message = someString(20);

        SiebelRecord siebelRecord = mock(SiebelRecord.class);
        ListenableFuture<SendResult<String, Object>> responseFuture = mock(ListenableFuture.class);
        Throwable throwable = mock(Throwable.class);

        given(throwable.getMessage()).willReturn(message);
        when(kafkaTemplate.send(topic, key, siebelRecord)).thenReturn(responseFuture);
        doAnswer(invocationOnMock -> {
            ListenableFutureCallback listenableFutureCallback = invocationOnMock.getArgument(0);
            listenableFutureCallback.onFailure(throwable);
            return null;
        }).when(responseFuture).addCallback(any(ListenableFutureCallback.class));

        service.publishDataToKafka(key, topic, siebelRecord);
    }

Share:
10,942

Related videos on Youtube

David
Author by

David

Updated on September 14, 2022

Comments

  • David
    David over 1 year

    I am trying to write the unit test case for ListenableFuture adding Callback but I am not sure how to do it. Didn`t get anything useful on internet.

     @Test
        public void can_publish_data_to_kafka() {
            String topic = someString(10);
            String key = someAlphanumericString(5);
            String data = someString(50);
            SendResult sendResult = mock(SendResult.class);
            ListenableFuture<SendResult<String, Object>> future = mock(ListenableFuture.class);
    
            given(kafkaTemplate.send(topic, key, data)).willReturn(future);
    
            doAnswer(new Answer() {
                @Override
                public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
                    return invocationOnMock.getArguments()[1];
                }
            });
    
            service.method(key, topic, data);
    
        }
    

    Code for which i want to write test case

    ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send(topicName, key, data);
    
            future.addCallback(new ListenableFutureCallback<SendResult<String, Object>>() {
                @Override
                public void onSuccess(SendResult<String, Object> stringKafkaBeanSendResult) {
                    RecordMetadata recordMetadata = stringKafkaBeanSendResult.getRecordMetadata();
                    LOGGER.info(String.format("sent message %s to topic %s partition %s with offset %s" + data.toString(), recordMetadata.topic(), recordMetadata.partition(), recordMetadata.offset()));
                }
    
                @Override
                public void onFailure(Throwable throwable) {
                    LOGGER.error(String.format("unable to send message = %s to topic %s because of error %s" + data.toString(), topicName, throwable.getMessage()));
                }
            });
    

    I am expecting a direction in which i should go towards for writing UT using mockito.

  • Aditya Sharma
    Aditya Sharma almost 4 years
    How can I test this if I use Futures.addCallback.
  • Aditya Sharma
    Aditya Sharma almost 4 years
    And I don't want to use PowerMockito.
  • Ronak Poriya
    Ronak Poriya almost 4 years
    @AdityaSharma Check if it is helpful ListeningScheduledExecutorService executor = MoreExecutors.listeningDecorator(Executors.newSingleThreadSc‌​heduledExecutor()); ListenableFuture<T> futureResponse = executor.schedule(call, n, unit);
  • al gh
    al gh about 3 years
    I am not sure why, but the doAnswer block has no effect and does not get executed.
  • Granfaloon
    Granfaloon about 3 years
    can_capture_failure doesn't throw a KafkaException for some reason. I can see it is activating the onFailure callback, but test fails with Expected exception:...