How to programmatically create a topic in Apache Kafka using Python

35,508

Solution 1

You can programmatically create topics using either kafka-python or confluent_kafka client which is a lightweight wrapper around librdkafka.


Using kafka-python

from kafka.admin import KafkaAdminClient, NewTopic


admin_client = KafkaAdminClient(
    bootstrap_servers="localhost:9092", 
    client_id='test'
)

topic_list = []
topic_list.append(NewTopic(name="example_topic", num_partitions=1, replication_factor=1))
admin_client.create_topics(new_topics=topic_list, validate_only=False)

Using confluent_kafka

from confluent_kafka.admin import AdminClient, NewTopic


admin_client = AdminClient({
    "bootstrap.servers": "localhost:9092"
})

topic_list = []
topic_list.append(NewTopic("example_topic", 1, 1))
admin_client.create_topics(topic_list)

Solution 2

If you can run confluent_kafka (Python) v0.11.6 or above, then the following is how to create kafka topics, list kafka topics and delete kafka topics:

>>> import confluent_kafka.admin, pprint

>>> conf        = {'bootstrap.servers': 'broker01:9092'}
>>> kafka_admin = confluent_kafka.admin.AdminClient(conf)

>>> new_topic   = confluent_kafka.admin.NewTopic('topic100', 1, 1)
                  # Number-of-partitions  = 1
                  # Number-of-replicas    = 1

>>> kafka_admin.create_topics([new_topic,]) # CREATE (a list(), so you can create multiple).
    {'topic100': <Future at 0x7f524b0f1240 state=running>} # Stdout from above command.

>>> pprint.pprint(kafka_admin.list_topics().topics) # LIST
    {'topic100' : TopicMetadata(topic100, 1 partitions),
     'topic99'  : TopicMetadata(topic99,  1 partitions),
     'topic98'  : TopicMetadata(topic98,  1 partitions)}

And to delete kafka topics using that same kafka_admin object, this:

kafka_admin.delete_topics(['topic99', 'topic100',]) # DELETE

I hope this helps. \(◠﹏◠)/

Solution 3

It looks like you can can use the following to ensure that your topic already exist (I assume you are using the following kafka python implementation):

client = KafkaClient(...)
producer = KafkaProducer(...)
client.ensure_topic_exists('my_new_topic')
producer.send_messages('my_new_topic', ...)

Solution 4

It seems that there is no kafka server api to create a topic so you have to use topic automatic creation of the or the command line tool:

bin/kafka-create-topic.sh --zookeeper localhost:2181 --replica 1 --partition 1 --topic test
Share:
35,508
jpgerek
Author by

jpgerek

Updated on July 09, 2022

Comments

  • jpgerek
    jpgerek almost 2 years

    So far I haven't seen a python client that implements the creation of a topic explicitly without using the configuration option to create automatically the topics.

  • zackdever
    zackdever almost 9 years
    That won't work. ensure_topic_exists only works with the auto topic creation enabled. github.com/mumrah/kafka-python/blob/…
  • Hans Jespersen
    Hans Jespersen almost 7 years
    Actually the broker created the topic and only because auto.topic.create.enable was set to "true". All topics created this way will have the default configuration with may or may not be good for your use case.
  • Boris Tsema
    Boris Tsema over 5 years
    What is final_list?
  • Mohsin
    Mohsin over 5 years
    @BorisTsema Please refer: github.com/dpkp/kafka-python/blob/master/kafka/producer/… It's a topic value, if I'm not wrong. The data that you want to send to consumer.
  • NYCeyes
    NYCeyes over 5 years
    P.S. Where possible, I prefer the confluent_kafka library to the kafka-python library because the former is a "thin wrapper" (quoting the Confluent literature) over the librdkafka C/C++ library; and therefore performant. Though, in fairness, kafka-python is more Pythonic and both libraries work well.
  • Rubber Duck
    Rubber Duck over 4 years
    could you add topic configuration e.g max.message.bytes=1000000 example for confluent_kafka
  • Alfa Bravo
    Alfa Bravo over 4 years
    When I open a python shell and I type your code in line for line it works 100% - I create the topic and when I list all topics it it there. But when I put it in a .py file and run the file, for some reason it does not create even though it does not give an error. If I copy every line in the file, and paste it into a python shell, then it creates again. The exact same code works in the shell but not in a file...
  • NYCeyes
    NYCeyes over 4 years
    @AlfaBravo Hmmm... Are you sure the program is running at all? Try inserting print('hello') statements around the code to check.
  • HenryM
    HenryM about 3 years
    No. You use the AdminClient per NYCeyes's comment.
  • JamesD
    JamesD almost 3 years
    Using the above technique I'm able to create Topic(s), I get the response {'jjd_topic1': <Future at 0x7f7914970940 state=running>} but when I then List the topics the new topic is not in the list... it's like it doesn't persist it. What could be happening here?
  • JamesD
    JamesD almost 3 years
    Yes, I have experienced that first hand - the program runs, and the client returns information about the topics that got created {'jjd_topic1': <Future at 0x7f7914970940 state=running>}, but when I run the list, they are not there. Like they never got committed. Why would that be?
  • Nicole White
    Nicole White almost 3 years
    Super bizarre -- @AlfaBravo I am experiencing the same issue. If I run as a .py file it doesn't create the topic, but If I go into a python shell and do it it gets created...
  • Mayak
    Mayak over 2 years
    I also see no new topic in kafka here as a result by using the confluent_kafka answer.
  • user3900576
    user3900576 about 2 years
    I also saw similar issue, but once you produce a message on that topic, it starts showing up. ` from confluent_kafka import Producer producer = Producer({ "bootstrap.servers": "localhost:9092" }) producer.produce("example_topic", key='key1', value='value1') producer.flush() `