Best practice to query large number of ndb entities from datastore

18,047

Solution 1

Large processing like this should not be done in a user request, which has a 60s time limit. Instead, it should be done in a context that supports long-running requests. The task queue supports requests up to 10 minutes, and (I believe) normal memory restraints (F1 instances, the default, have 128MB of memory). For even higher limits (no request timeout, 1GB+ of memory), use backends.

Here's something to try: set up a URL that, when accessed, fires off a task queue task. It returns a web page that polls every ~5s to another URL that responds with true/false if the task queue task has been completed yet. The task queue processes the data, which can take some 10s of seconds, and saves the result to the datastore either as the computed data or a rendered web page. Once the initial page detects that it has completed, the user is redirected to the page, which fetches the now computed results from the datastore.

Solution 2

The new experimental Data Processing feature (an AppEngine API for MapReduce) looks very suitable for solving this problem. It does automatic sharding to execute multiple parallel worker processes.

Solution 3

I have a similar problem and after working with Google support for few weeks I can confirm there is no magic solution at least as of December 2017.

tl;dr: One can expect throughput from 220 entities/second for standard SDK running on B1 instance up to 900 entities/second for a patched SDK running on a B8 instance.

The limitation is CPU related and changing the instanced type directly impacts performance. This is confirmed by similar results obtained on B4 and B4_1G instances

The best throughput I got for an Expando entity with about 30 fields is:

Standard GAE SDK

  • B1 instance: ~220 entities/second
  • B2 instance: ~250 entities/second
  • B4 instance: ~560 entities/second
  • B4_1G instance: ~560 entities/second
  • B8 instance: ~650 entities/second

Patched GAE SDK

  • B1 instance: ~420 entities/second
  • B8 instance: ~900 entities/second

For standard GAE SDK I tried various approaches including multi-threading but the best proved to be fetch_async with wait_any. Current NDB library already does a great job of using async and futures under the hood so any attempt to push that using threads only make it worse.

I found two interesting approaches to optimize this:

Matt Faus explains the problem very well:

GAE SDK provides an API for reading and writing objects derived from your classes to the datastore. This saves you the boring work of validating raw data returned from the datastore and repackaging it into an easy-to-use object. In particular, GAE uses protocol buffers to transmit raw data from the store to the frontend machine that needs it. The SDK is then responsible for decoding this format and returning a clean object to your code. This utility is great, but sometimes it does a bit more work than you would like. [...] Using our profiling tool, I discovered that fully 50% of the time spent fetching these entities was during the protobuf-to-python-object decoding phase. This means that the CPU on the frontend server was a bottleneck in these datastore reads!

GAE-data-access-web-request

Both approaches try to reduce the time spent doing protobuf to Python decoding by reducing the number of fields decoded.

I tried both approaches but I only succeed with Matt's. SDK internals changed since Evan published his solution. I had to change a bit the code published by Matt here, but is was pretty easy - if there is interest I can publish the final code.

For a regular Expando entity with about 30 fields I used Matt's solution to decode only couple fields and obtained a significant improvement.

In conclusion one need to plan accordingly and don't expect to be able to process much more than few hundreds entities in a "real-time" GAE request.

Share:
18,047
Allen
Author by

Allen

Updated on June 16, 2022

Comments

  • Allen
    Allen almost 2 years

    I have run into an interesting limit with the App Engine datastore. I am creating a handler to help us analyze some usage data on one of our production servers. To perform the analysis I need to query and summarize 10,000+ entities pulled from the datastore. The calculation isn't hard, it is just a histogram of items that pass a specific filter of the usage samples. The problem I hit is that I can't get the data back from the datastore fast enough to do any processing before hitting the query deadline.

    I have tried everything I can think of to chunk the query into parallel RPC calls to improve performance, but according to appstats I can't seem to get the queries to actually execute in parallel. No matter what method I try (see below) it always seems that the RPC's fall back to a waterfall of sequential next queries.

    Note: the query and analysis code does work, it just runs to slowly because I can't get data quickly enough from the datastore.

    Background

    I don't have a live version I can share, but here is the basic model for the part of the system I am talking about:

    class Session(ndb.Model):
       """ A tracked user session. (customer account (company), version, OS, etc) """
       data = ndb.JsonProperty(required = False, indexed = False)
    
    class Sample(ndb.Model):
       name      = ndb.StringProperty  (required = True,  indexed = True)
       session   = ndb.KeyProperty     (required = True,  kind = Session)
       timestamp = ndb.DateTimeProperty(required = True,  indexed = True)
       tags      = ndb.StringProperty  (repeated = True,  indexed = True)
    

    You can think of the samples as times when a user makes use of a capability of a given name. (ex: 'systemA.feature_x'). The tags are based upon customer details, system information, and the feature. ex: ['winxp', '2.5.1', 'systemA', 'feature_x', 'premium_account']). So the tags form a denormalized set of tokens that could be used to find samples of interest.

    The analysis I am trying to do consists of taking a date range and asking how many times was a feature of set of features (perhaps all features) used per day (or per hour) per customer account (company, not per user).

    So the input to the handler be something like:

    • Start Date
    • End Date
    • Tag(s)

    Output would be:

    [{
       'company_account': <string>,
       'counts': [
          {'timeperiod': <iso8601 date>, 'count': <int>}, ...
       ]
     }, ...
    ]
    

    Common Code for Queries

    Here is some code in common for all queries. The general structure of the handler is a simple get handler using webapp2 that sets up the query parameters, runs the query, processes the results, creates data to return.

    # -- Build Query Object --- #
    query_opts = {}
    query_opts['batch_size'] = 500   # Bring in large groups of entities
    
    q = Sample.query()
    q = q.order(Sample.timestamp)
    
    # Tags
    tag_args = [(Sample.tags == t) for t in tags]
    q = q.filter(ndb.query.AND(*tag_args))
    
    def handle_sample(sample):
       session_obj = sample.session.get()    # Usually found in local or memcache thanks to ndb
       count_key   = session_obj.data['customer']
       addCountForPeriod(count_key, sample.timestamp)
    

    Methods Tried

    I have tried a variety of methods to try to pull data from the datastore as quickly as possible and in parallel. The methods I have tried so far include:

    A. Single Iteration

    This is more of a simple base case to compare against the other methods. I just build the query and iterate over all the items letting ndb do what it does to pull them one after the other.

    q = q.filter(Sample.timestamp >= start_time)
    q = q.filter(Sample.timestamp <= end_time)
    q_iter = q.iter(**query_opts)
    
    for sample in q_iter:
       handle_sample(sample)
    

    B. Large Fetch

    The idea here was to see if I could do a single very large fetch.

    q = q.filter(Sample.timestamp >= start_time)
    q = q.filter(Sample.timestamp <= end_time)
    samples = q.fetch(20000, **query_opts)
    
    for sample in samples:
       handle_sample(sample)
    

    C. Async fetches across time range

    The idea here is to recognize that the samples are fairly well spaced across time so I can create a set of independent queries that split the overall time region into chunks and try to run each of these in parallel using async:

    # split up timestamp space into 20 equal parts and async query each of them
    ts_delta       = (end_time - start_time) / 20
    cur_start_time = start_time
    q_futures = []
    
    for x in range(ts_intervals):
       cur_end_time = (cur_start_time + ts_delta)
       if x == (ts_intervals-1):    # Last one has to cover full range
          cur_end_time = end_time
    
       f = q.filter(Sample.timestamp >= cur_start_time,
                    Sample.timestamp < cur_end_time).fetch_async(limit=None, **query_opts)
       q_futures.append(f)
       cur_start_time = cur_end_time
    
    # Now loop through and collect results
    for f in q_futures:
       samples = f.get_result()
       for sample in samples:
          handle_sample(sample)
    

    D. Async mapping

    I tried this method because the documentation made it sound like ndb may exploit some parallelism automatically when using the Query.map_async method.

    q = q.filter(Sample.timestamp >= start_time)
    q = q.filter(Sample.timestamp <= end_time)
    
    @ndb.tasklet
    def process_sample(sample):
       period_ts   = getPeriodTimestamp(sample.timestamp)
       session_obj = yield sample.session.get_async()    # Lookup the session object from cache
       count_key   = session_obj.data['customer']
       addCountForPeriod(count_key, sample.timestamp)
       raise ndb.Return(None)
    
    q_future = q.map_async(process_sample, **query_opts)
    res = q_future.get_result()
    

    Outcome

    I tested out one example query to collect overall response time and appstats traces. The results are:

    A. Single Iteration

    real: 15.645s

    This one goes sequentially through fetching batches one after the other and then retrieves every session from memcache.

    Method A appstats

    B. Large Fetch

    real: 12.12s

    Effectively the same as option A but a bit faster for some reason.

    Method B appstats

    C. Async fetches across time range

    real: 15.251s

    Appears to provide more parallelism at the start but seems to get slowed down by a sequence of calls to next during iteration of the results. Also doesn't seem to be able to overlap the session memcache lookups with the pending queries.

    Method C appstats

    D. Async mapping

    real: 13.752s

    This one is the hardest for me to understand. It looks like it has q good deal of overlapping, but everything seems to stretch out in a waterfall instead of in parallel.

    Method D appstats

    Recommendations

    Based upon all this, what am I missing? Am I just hitting a limit on App Engine or is there a better way to pull down large number of entities in parallel?

    I am at a loss as to what to try next. I thought about rewriting the client to make multiple requests to app engine in parallel but this seems pretty brute force. I would really expect that app engine should be able to handle this use case so I am guessing there is something I am missing.

    Update

    In the end I found that option C was the best for my case. I was able to optimize it to complete in 6.1 seconds. Still not perfect, but much better.

    After getting advice from several people, I found that the following items were key to understand and keep in mind:

    • Multiple queries can run in parallel
    • Only 10 RPC's can be in flight at once
    • Try to denormalize to the point that there are no secondary queries
    • This type of task is better left to map reduce and task queues, not real-time queries

    So what I did to make it faster:

    • I partitioned the query space from the beginning based upon time. (note: the more equal the partitions are in terms of entities returned, the better)
    • I denormalized the data further to remove the need for the secondary session query
    • I made use of ndb async operations and wait_any() to overlap the queries with the processing

    I am still not getting the performance I would expect or like, but it is workable for now. I just wish their was a better way to pull large numbers of sequential entities into memory quickly in handlers.

  • aschmid00
    aschmid00 almost 12 years
    could you show a simple example for how to use your approach to get entities in parallel? i thought a tasklet would take care of this but it does not seem like it.
  • Allen
    Allen almost 12 years
    I am not using cursors because none of the queries restart in the middle later. They all grab all the entities immediately with no offset. As far as map reduce, I thought about that but this isn't an offline analysis, it is meant to be a live query that internal users will dynamically change as they explore the data. My understanding of map reduce is that it doesn't fit this real-time interactive usecase.
  • dragonx
    dragonx almost 12 years
    I might have made a bad assumption, I was thinking the datastore_v3.Next calls in C was due to the use of some offset based iterator. Mapreduce isn't ideal for the interactive usecase in my experience because a) you can't predict how long the operation will take, and b) you typically have to write your results to the datastore rather than receive an easy result that you can put on a template. It gets a bit ugly on the client side, I think you need a way to poll to see if the result is ready. However, because of the parallel nature, it does tend to be faster than serializing queries.
  • Allen
    Allen almost 12 years
    Agreed that map reduce can parallelize. I was just hoping that ndb and async operations could also parallelize enough for my use case. I don't need to parallelize the computation, simply the data retrieval. I had also considered using urlfetch to write a multi-level handler that would spawn off requests to subhandlers to get the data and then collect and process it in the parent handler. It just seems like there has to be an easier way.
  • Tim Hoffman
    Tim Hoffman almost 12 years
    I don't think your going to be able to reliably do this as a live query, especially if your dataset (returned results gets a lot larger).
  • Allen
    Allen almost 12 years
    I had been thinking of using a backend as well. I am still hoping to get the query to work inside a normal deadline, but if that doesn't work I will fallback to using a backend to run it as you are describing. Since one of my bottlenecks is loading all the session objects into local cache, there may also be a way to get a performance boost using backends if I can keep all the sessions in memory at all times.
  • ZiglioUK
    ZiglioUK over 10 years
    That answers nothing. The question was specific to how the datastore is supposed to work, and it doesn't. The same problem applied to task queues and backends when one has to fetch 100,000 or 1M entities. Dog slow, expensive datastore
  • ZiglioUK
    ZiglioUK over 10 years
    Agree MapReduce is the way to go, in my limited experience I've seen it performing much better than my own queries, not sure why. Shame Google is not maintaining their own MR package. I wonder whether there's any work at all going into improving the datastore and its abysmal performances and costs, all work seems to go for GCE and the cloud store
  • Josep Valls
    Josep Valls about 9 years
    Take a look at the MapReduce andwer by Martin Berends below. Backends have been deprecated. There is a nice guide describing the migration process: cloud.google.com/appengine/docs/python/modules/converting