Checking the existence of topic in kafka before creating in Java

15,166

Solution 1

You can use AdminClient from kakfa-client version 0.11.0.0

Sample code:

    Properties config = new Properties();
    config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhist:9091");

    AdminClient admin = AdminClient.create(config);
    ListTopicsResult listTopics = admin.listTopics();
    Set<String> names = listTopics.names().get();
    boolean contains = names.contains("TEST_6");
    if (!contains) {
        List<NewTopic> topicList = new ArrayList<NewTopic>();
        Map<String, String> configs = new HashMap<String, String>();
        int partitions = 5;
        Short replication = 1;
        NewTopic newTopic = new NewTopic("TEST_6", partitions, replication).configs(configs);
        topicList.add(newTopic);
        admin.createTopics(topicList);
    }

Solution 2

For this purpose, you can use the method AdminUtils.topicExists(ZkUtils zkClient, String topic), it will return true if the topic already exists, false otherwise.

Your code would then be something like this:

if (!AdminUtils.topicExists(zkClient, myTopic)){
    AdminUtils.createTopic(zkClient, myTopic, 2, 1, properties);
}

Solution 3

public static void createKafkaTopic(String sourceTopicName, String sinkTopicName, String responseTopicName, String kafkaUrl) {

    try {
        Properties properties = new Properties();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaUrl);
        AdminClient kafkaAdminClient = KafkaAdminClient.create(properties);
        ListTopicsResult topics = kafkaAdminClient.listTopics();
        Set <String> names = topics.names().get();

        boolean containsSourceTopic = names.contains(sourceTopicName);
        boolean containsSinkTopic = names.contains(sinkTopicName);
        boolean containsResponseTopic = names.contains(responseTopicName);

        if (!containsResponseTopic && !containsSinkTopic && !containsSourceTopic) {
            CreateTopicsResult result = kafkaAdminClient.createTopics(
                    Stream.of(sourceTopicName, sinkTopicName, responseTopicName).map(
                            name -> new NewTopic(name, 1, (short) 1)
                    ).collect(Collectors.toList())
            );
            result.all().get();
            LOG.info("new sourceTopicName: {}, sinkTopicName: {}, responseTopicName: {} are created",
                    sourceTopicName, sinkTopicName, responseTopicName);
        }
    } catch (ExecutionException | InterruptedException e) {
        LOG.info("Error message {}", e.getMessage());
    }
}
Share:
15,166

Related videos on Youtube

nishantv
Author by

nishantv

Updated on September 16, 2022

Comments

  • nishantv
    nishantv over 1 year

    I am trying to create a topic in kafka 0.8.2 by using :

    AdminUtils.createTopic(zkClient, myTopic, 2, 1, properties);
    

    If I run the code more than once locally for testing, this fails as the topic was already created. Is there a way to check if the topic exists before creating the topic? The TopicCommand api doesn't seem to return anything for listTopics or describeTopic .

  • Alessio
    Alessio over 4 years
    While this code snippet may solve the question, including an explanation really helps to improve the quality of your post. Remember that you are answering the question for readers in the future, and those people might not know the reasons for your code suggestion.