how to do bulk indexing to elasticsearch from python

11,143

I prefer using the bulk method present in helpers module for bulk indexing. Try the following:

from elasticsearch import helpers
res = helpers.bulk(es, jsonvalue, chunk_size=1000, request_timeout=200)

Your jsonvalue needs to follow a particular format. It needs to be a list of the 10K json documents with each document having the following fields:

doc = {
    '_index': 'your-index',
    '_type': 'your-type',
    '_id': 'your-id',
    'field_1': 'value_1',
    ...
}

So your final jsonvalue would look something like this:

jsonvalue = [
    {
    '_index': 'your-index',
    '_type': 'your-type',
    '_id': 'your-id',
    'field_1': 'value_1',
    ...
},
    {
    '_index': 'your-index',
    '_type': 'your-type',
    '_id': 'your-id',
    'field_1': 'value_2',
    ...
},
    {
    '_index': 'your-index',
    '_type': 'your-type',
    '_id': 'your-id',
    'field_1': 'value_3',
    ...
}
]
Share:
11,143
RCP
Author by

RCP

Coding

Updated on June 25, 2022

Comments

  • RCP
    RCP almost 2 years

    I have nearly 10K json documents and i want to push all this documents to elasticsearch by using elasticsearch bulk api from python. I went through some docs but didn't get any solutions.

    result=es.bulk(index="index1", doc_type="index123", body=jsonvalue)
    helpers.bulk(es,doc) 
    

    i tried both but no result,i am getting this error

    elasticsearch.exceptions.RequestError: TransportError(400, u'illegal_argument_exception', u'Malformed action/metadata line [1], expected START_OBJECT or END_OBJECT but found [VALUE_STRING]')
    

    please help me

  • RCP
    RCP over 6 years
    can't i write my json documents inside "_source": { }
  • Jai Sharma
    Jai Sharma over 6 years
    _source convention is the way elasticsearch stores the data internally and returns it in this way. You should pass your json document at the first level in the dictionary.
  • RCP
    RCP over 6 years
    sorry i am not getting you, please tell me where should i use 'doc' in helpers.bulk(), and where should i pass my json documnets?
  • Jai Sharma
    Jai Sharma over 6 years
    Edited my answer.
  • RCP
    RCP over 6 years
    yeah my json value is in above format , where should i pass that value and where should i pass 'doc' in helpers.bulk()?
  • Jai Sharma
    Jai Sharma over 6 years
    helpers.bulk(es, jsonvalue, chunk_size=1000, request_timeout=200). The 2nd argument should be the list of all the documents, as mentioned in the answer.
  • RCP
    RCP over 6 years
    so now in this helpers.bulk(es, jsonvalue, chunk_size=1000, request_timeout=200) which argument is holding "index name,doc_type" and all
  • Jai Sharma
    Jai Sharma over 6 years
    Index name and doc type are now part of each document, _index and _type hold the values for index & doc type respectively. Please read the documentation elasticsearch-py.readthedocs.io/en/master/… here
  • RCP
    RCP over 6 years
    Tried for sample data jsonvalue = { '_index': 'your-index', '_type': 'your-type', '_id': 'your-id', 'field_1':{"priority": "CRITICAL"} } helpers.bulk(es, jsonvalue, chunk_size=1000, request_timeout=200) but getting error elasticsearch.exceptions.RequestError: TransportError(400, u'action_request_validation_exception', u'Validation Failed: 1: index is missing;2: type is missing;3: index is missing;4: type is missing;5: index is missing;6: type is missing;7: index is missing;8)
  • Jai Sharma
    Jai Sharma over 6 years
  • Ansuman
    Ansuman over 3 years
    from 2020 => '_index': 'your-index', '_type': 'your-type', '_id': 'your-id', 'field_1':{"priority": "CRITICAL"} } helpers.bulk(es, jsonvalue, chunk_size=1000, request_timeout=200) but getting error elasticsearch.exceptions.RequestError: TransportError(400, u'action_request_validation_exception', u'Validation Failed: 1: index is missing;2: type is missing;3: index is missing;4: type is missing;5: index is missing;6: type is missing;7: index is missing;8)2020: this says