how to send JSON object to kafka from python client

24,392

Solution 1

The below is my code for producer to kafka. The only thing i did differently was to use yaml.safe_load to load the json content. It returns the contents as strings instead of unicode. The following is the snippet

with open('smaller_test_prod.txt') as f:
    for line in f:
        d = yaml.safe_load(line)
        jd = json.dumps(d)
        producer.send_messages(b'zeus_metrics',jd)

In here every line is a json data stored in a file.

Solution 2

Kafka expects the values in bytes

b`some json message`

Here is my simple Kafka producer which sends the message to Kafka server.

import json
from bson import json_util

from kafka import KafkaProducer

producer = KafkaProducer(bootstrap_servers='localhost:9092')

for i in range(10):
    data = { 'tag ': 'blah',
        'name' : 'sam',
        'index' : i,
        'score': 
            {'row1': 100,
             'row2': 200
        }
    }   
    producer.send('orders', json.dumps(data, default=json_util.default).encode('utf-8'))

Here json.dumps() convert json into string and encode('utf-8') converts string into byte array.

Share:
24,392
Rahul
Author by

Rahul

Updated on March 24, 2020

Comments

  • Rahul
    Rahul about 4 years

    I have a simple JSON object like the following

    d = { 'tag ': 'blah',
      'name' : 'sam',
      'score': 
        {'row1': 100,
          'row2': 200
         }
    }
    

    The following is my python code which is sending messages to Kafka

    from kafka import SimpleProducer, KafkaClient
    import json 
    
    # To send messages synchronously
    kafka = KafkaClient('10.20.30.12:9092')
    producer = SimpleProducer(kafka)
    jd = json.dumps(d)
    producer.send_messages(b'message1',jd)
    

    I see in the storm logs that the message is being received but its throwing Transformation null for tuple { json structure in here } not sure what needs to be done in order to fix this ?..