Google Cloud Datastore has a built-in 1000 keys limit for get
requests and a 400 entities per request for put
limit. If you hit it, you will see one of these error messages:
google.api_core.exceptions.InvalidArgument: 400 cannot get more than 1000 keys in a single call google.api_core.exceptions.InvalidArgument: 400 cannot write more than 500 entities in a single call
You can fix it by chunking the requests, i.e. only do 1000 requests at one time for get
etc.
This code provides a ready-to-use example for a class that automates this process. As an added benefit, it performs the requests in chunks of 1000 (for get
) or 400 (for put
) in parallel using a concurrent.futures.Executor
. As the performance is expected to be IO-bound, it is recommended to use a concurrent.futures.ThreadPoolExecutor
.
If you dont give the class an executor on construction, it will create one by itself.
import itertools from concurrent.futures import ThreadPoolExecutor def _chunks(l, n=1000): """ Yield successive n-sized chunks from l. https://stackoverflow.com/a/312464/2597135 """ for i in range(0, len(l), n): yield l[i:i + n] def _get_chunk(client, keys): """ Get a single chunk """ missing = [] vals = client.get_multi(keys, missing=missing) return vals, missing class DatastoreChunkClient(object): """ Provides a thin wrapper around a Google Cloud Datastore client providing means of reading nd """ def __init__(self, client, executor=None): self.client = client if executor is None: executor = ThreadPoolExecutor(16) self.executor = executor def get_multi(self, keys): """ Thin wrapper around client.get_multi() that circumvents the 1000 read requests limit by doing 1000-sized chunked reads in parallel using self.executor. Returns (values, missing). """ all_missing = [] all_vals = [] for vals, missing in self.executor.map(lambda chunk: _get_chunk(self.client, chunk), _chunks(keys, 1000)): all_vals += vals all_missing += missing return all_vals, all_missing def put_multi(self, entities): """ Thin wrapper around client.put_multi() that circumvents the 400 read requests limit by doing 400-sized chunked reads in parallel using self.executor. Returns (values, missing). """ for none in self.executor.map(lambda chunk: self.client.put_multi(chunk), _chunks(entities, 400)): pass
Usage example:
# Create "raw" google datastore client client = datastore.Client(project="myproject-123456") chunkClient = DatastoreChunkClient(client) # The size of the key list is only limited by memory keys = [...] values, missing = chunkClient.get_multi(keys) # The size of the entity list is only limited by memory entities = [...] chunkClient.put_multi(entities)