Kafka-python How to consume json message

10,563

With auto_offset_reset='earliest' you have configured your consumer to read all messages in the topic. A JSON decoding error suggests that some message that was previously produced to the topic is not actually in JSON format.

Some solutions:

(1) consume from the tail of the topic instead: auto_offset_reset='latest'

(2) start a new topic: consumer.subscribe(['offering_new_too'])

(3) use a more comprehensive deserializer:

def forgiving_json_deserializer(v):
    return if v is None
    try:
        return json.loads(v.decode('utf-8'))
    except json.decoder.JSONDecodeError:
        log.exception('Unable to decode: %s', v)
        return None

KafkaConsumer(value_deserializer=forgiving_json_deserializer, ...)

Hope this helps!

Share:
10,563
Paras
Author by

Paras

Seeking for helpers.

Updated on June 14, 2022

Comments

  • Paras
    Paras almost 2 years

    I am a fairly new in Python and starting with Kafka. I have a requirement where I need to send and consume json messages. For this I am using kafka-python to communicate with Kafka.

    #Producer.py
    from kafka import KafkaProducer
    import json
    producer = KafkaProducer(bootstrap_servers='localhost:9092',value_serializer=lambda v: json.dumps(v).encode('utf-8'))
    producer.send('offering_new', {"dataObjectID": "test1"})
    
    #Consumer.py
    import json
    from kafka import KafkaConsumer
    consumer = KafkaConsumer(bootstrap_servers='localhost:9092',auto_offset_reset='earliest', value_deserializer=lambda m: json.loads(m.decode('utf-8')))
    consumer.subscribe(['offering_new'])
    for message in consumer :
        print(message)
    

    However I'm getting the following exception on consumer :

    Traceback (most recent call last):
      File "<stdin>", line 1, in <module>
      File "/home/paras/.local/lib/python3.6/site-packages/kafka/consumer/group.py", line 1111, in __next__
        return next(self._iterator)
      File "/home/paras/.local/lib/python3.6/site-packages/kafka/consumer/group.py", line 1082, in _message_generator
        for msg in self._fetcher:
      File "/home/paras/.local/lib/python3.6/site-packages/kafka/consumer/fetcher.py", line 482, in __next__
        return next(self._iterator)
      File "/home/paras/.local/lib/python3.6/site-packages/kafka/consumer/fetcher.py", line 388, in _message_generator
        self._next_partition_records = self._parse_fetched_data(completion)
      File "/home/paras/.local/lib/python3.6/site-packages/kafka/consumer/fetcher.py", line 799, in _parse_fetched_data
        unpacked = list(self._unpack_message_set(tp, records))
      File "/home/paras/.local/lib/python3.6/site-packages/kafka/consumer/fetcher.py", line 458, in _unpack_message_set
        tp.topic, record.value)
      File "/home/paras/.local/lib/python3.6/site-packages/kafka/consumer/fetcher.py", line 492, in _deserialize
        return f(bytes_)
      File "<stdin>", line 1, in <lambda>
      File "/usr/lib/python3.6/json/__init__.py", line 354, in loads
        return _default_decoder.decode(s)
      File "/usr/lib/python3.6/json/decoder.py", line 339, in decode
        obj, end = self.raw_decode(s, idx=_w(s, 0).end())
      File "/usr/lib/python3.6/json/decoder.py", line 357, in raw_decode
        raise JSONDecodeError("Expecting value", s, err.value) from None
    json.decoder.JSONDecodeError: Expecting value: line 1 column 1 (char 0)
    

    I am running the above code in Python Shell. Can someone tell me where I am going wrong?

  • Alfa Bravo
    Alfa Bravo over 4 years
    To continually consume, would you run this in a while True loop. And also how would you set it so that you only carry on from the last message that you read, instead of going through all of them again?
  • Conquistador
    Conquistador over 2 years
    I was getting same error, changing auto_offset_reset='latest' resolved my issue. Thanks a lot
  • y_159
    y_159 about 2 years
    you are not handling the case where the message is not json decodable.
  • y_159
    y_159 about 2 years
    will the consumer keep continue to process other messages in case the forgiving_json_deserializer returns None. any reference for this?