Kafka-python How to consume json message
With auto_offset_reset='earliest'
you have configured your consumer to read all messages in the topic. A JSON decoding error suggests that some message that was previously produced to the topic is not actually in JSON format.
Some solutions:
(1) consume from the tail of the topic instead: auto_offset_reset='latest'
(2) start a new topic: consumer.subscribe(['offering_new_too'])
(3) use a more comprehensive deserializer:
def forgiving_json_deserializer(v):
return if v is None
try:
return json.loads(v.decode('utf-8'))
except json.decoder.JSONDecodeError:
log.exception('Unable to decode: %s', v)
return None
KafkaConsumer(value_deserializer=forgiving_json_deserializer, ...)
Hope this helps!
Comments
-
Paras almost 2 years
I am a fairly new in Python and starting with Kafka. I have a requirement where I need to send and consume json messages. For this I am using kafka-python to communicate with Kafka.
#Producer.py from kafka import KafkaProducer import json producer = KafkaProducer(bootstrap_servers='localhost:9092',value_serializer=lambda v: json.dumps(v).encode('utf-8')) producer.send('offering_new', {"dataObjectID": "test1"}) #Consumer.py import json from kafka import KafkaConsumer consumer = KafkaConsumer(bootstrap_servers='localhost:9092',auto_offset_reset='earliest', value_deserializer=lambda m: json.loads(m.decode('utf-8'))) consumer.subscribe(['offering_new']) for message in consumer : print(message)
However I'm getting the following exception on consumer :
Traceback (most recent call last): File "<stdin>", line 1, in <module> File "/home/paras/.local/lib/python3.6/site-packages/kafka/consumer/group.py", line 1111, in __next__ return next(self._iterator) File "/home/paras/.local/lib/python3.6/site-packages/kafka/consumer/group.py", line 1082, in _message_generator for msg in self._fetcher: File "/home/paras/.local/lib/python3.6/site-packages/kafka/consumer/fetcher.py", line 482, in __next__ return next(self._iterator) File "/home/paras/.local/lib/python3.6/site-packages/kafka/consumer/fetcher.py", line 388, in _message_generator self._next_partition_records = self._parse_fetched_data(completion) File "/home/paras/.local/lib/python3.6/site-packages/kafka/consumer/fetcher.py", line 799, in _parse_fetched_data unpacked = list(self._unpack_message_set(tp, records)) File "/home/paras/.local/lib/python3.6/site-packages/kafka/consumer/fetcher.py", line 458, in _unpack_message_set tp.topic, record.value) File "/home/paras/.local/lib/python3.6/site-packages/kafka/consumer/fetcher.py", line 492, in _deserialize return f(bytes_) File "<stdin>", line 1, in <lambda> File "/usr/lib/python3.6/json/__init__.py", line 354, in loads return _default_decoder.decode(s) File "/usr/lib/python3.6/json/decoder.py", line 339, in decode obj, end = self.raw_decode(s, idx=_w(s, 0).end()) File "/usr/lib/python3.6/json/decoder.py", line 357, in raw_decode raise JSONDecodeError("Expecting value", s, err.value) from None json.decoder.JSONDecodeError: Expecting value: line 1 column 1 (char 0)
I am running the above code in Python Shell. Can someone tell me where I am going wrong?
-
Alfa Bravo over 4 yearsTo continually consume, would you run this in a
while True
loop. And also how would you set it so that you only carry on from the last message that you read, instead of going through all of them again? -
Conquistador over 2 yearsI was getting same error,
changing auto_offset_reset='latest'
resolved my issue. Thanks a lot -
y_159 about 2 yearsyou are not handling the case where the message is not json decodable.
-
y_159 about 2 yearswill the consumer keep continue to process other messages in case the
forgiving_json_deserializer
returnsNone
. any reference for this?