How to programmatically create a topic in Apache Kafka using Python
Solution 1
You can programmatically create topics using either kafka-python
or confluent_kafka
client which is a lightweight wrapper around librdkafka.
Using kafka-python
from kafka.admin import KafkaAdminClient, NewTopic
admin_client = KafkaAdminClient(
bootstrap_servers="localhost:9092",
client_id='test'
)
topic_list = []
topic_list.append(NewTopic(name="example_topic", num_partitions=1, replication_factor=1))
admin_client.create_topics(new_topics=topic_list, validate_only=False)
Using confluent_kafka
from confluent_kafka.admin import AdminClient, NewTopic
admin_client = AdminClient({
"bootstrap.servers": "localhost:9092"
})
topic_list = []
topic_list.append(NewTopic("example_topic", 1, 1))
admin_client.create_topics(topic_list)
Solution 2
If you can run confluent_kafka
(Python) v0.11.6
or above, then the following is how to create kafka topics
, list kafka topics
and delete kafka topics
:
>>> import confluent_kafka.admin, pprint
>>> conf = {'bootstrap.servers': 'broker01:9092'}
>>> kafka_admin = confluent_kafka.admin.AdminClient(conf)
>>> new_topic = confluent_kafka.admin.NewTopic('topic100', 1, 1)
# Number-of-partitions = 1
# Number-of-replicas = 1
>>> kafka_admin.create_topics([new_topic,]) # CREATE (a list(), so you can create multiple).
{'topic100': <Future at 0x7f524b0f1240 state=running>} # Stdout from above command.
>>> pprint.pprint(kafka_admin.list_topics().topics) # LIST
{'topic100' : TopicMetadata(topic100, 1 partitions),
'topic99' : TopicMetadata(topic99, 1 partitions),
'topic98' : TopicMetadata(topic98, 1 partitions)}
And to delete kafka topics
using that same kafka_admin
object, this:
kafka_admin.delete_topics(['topic99', 'topic100',]) # DELETE
I hope this helps. \(◠﹏◠)/
Solution 3
It looks like you can can use the following to ensure that your topic already exist (I assume you are using the following kafka python implementation):
client = KafkaClient(...)
producer = KafkaProducer(...)
client.ensure_topic_exists('my_new_topic')
producer.send_messages('my_new_topic', ...)
Solution 4
It seems that there is no kafka server api to create a topic so you have to use topic automatic creation of the or the command line tool:
bin/kafka-create-topic.sh --zookeeper localhost:2181 --replica 1 --partition 1 --topic test
jpgerek
Updated on July 09, 2022Comments
-
jpgerek almost 2 years
So far I haven't seen a python client that implements the creation of a topic explicitly without using the configuration option to create automatically the topics.
-
zackdever almost 9 yearsThat won't work.
ensure_topic_exists
only works with the auto topic creation enabled. github.com/mumrah/kafka-python/blob/… -
Hans Jespersen almost 7 yearsActually the broker created the topic and only because auto.topic.create.enable was set to "true". All topics created this way will have the default configuration with may or may not be good for your use case.
-
Boris Tsema over 5 yearsWhat is
final_list
? -
Mohsin over 5 years@BorisTsema Please refer: github.com/dpkp/kafka-python/blob/master/kafka/producer/… It's a topic value, if I'm not wrong. The data that you want to send to consumer.
-
NYCeyes over 5 yearsP.S. Where possible, I prefer the
confluent_kafka
library to thekafka-python
library because the former is a "thin wrapper" (quoting the Confluent literature) over thelibrdkafka C/C++ library
; and therefore performant. Though, in fairness,kafka-python
is more Pythonic and both libraries work well. -
Rubber Duck over 4 yearscould you add topic configuration e.g max.message.bytes=1000000 example for confluent_kafka
-
Alfa Bravo over 4 yearsWhen I open a python shell and I type your code in line for line it works 100% - I create the topic and when I list all topics it it there. But when I put it in a .py file and run the file, for some reason it does not create even though it does not give an error. If I copy every line in the file, and paste it into a python shell, then it creates again. The exact same code works in the shell but not in a file...
-
NYCeyes over 4 years@AlfaBravo Hmmm... Are you sure the program is running at all? Try inserting
print('hello')
statements around the code to check. -
HenryM about 3 yearsNo. You use the AdminClient per NYCeyes's comment.
-
JamesD almost 3 yearsUsing the above technique I'm able to create Topic(s), I get the response {'jjd_topic1': <Future at 0x7f7914970940 state=running>} but when I then List the topics the new topic is not in the list... it's like it doesn't persist it. What could be happening here?
-
JamesD almost 3 yearsYes, I have experienced that first hand - the program runs, and the client returns information about the topics that got created {'jjd_topic1': <Future at 0x7f7914970940 state=running>}, but when I run the list, they are not there. Like they never got committed. Why would that be?
-
Nicole White almost 3 yearsSuper bizarre -- @AlfaBravo I am experiencing the same issue. If I run as a .py file it doesn't create the topic, but If I go into a python shell and do it it gets created...
-
Mayak over 2 yearsI also see no new topic in kafka here as a result by using the confluent_kafka answer.
-
user3900576 about 2 yearsI also saw similar issue, but once you produce a message on that topic, it starts showing up. ` from confluent_kafka import Producer producer = Producer({ "bootstrap.servers": "localhost:9092" }) producer.produce("example_topic", key='key1', value='value1') producer.flush() `