How to circumvent Google Cloud Storage 1000 read / 400 write limit in Python

Google Cloud Datastore has a built-in 1000 keys limit for get requests and a 400 entities per request for put limit. If you hit it, you will see one of these error messages:

google.api_core.exceptions.InvalidArgument: 400 cannot get more than 1000 keys in a single call
google.api_core.exceptions.InvalidArgument: 400 cannot write more than 500 entities in a single call

You can fix it by chunking the requests, i.e. only do 1000 requests at one time for get etc.

This code provides a ready-to-use example for a class that automates this process. As an added benefit, it performs the requests in chunks of 1000 (for get) or 400 (for put) in parallel using a concurrent.futures.Executor. As the performance is expected to be IO-bound, it is recommended to use a concurrent.futures.ThreadPoolExecutor. If you dont give the class an executor on construction, it will create one by itself.

import itertools
from concurrent.futures import ThreadPoolExecutor

def _chunks(l, n=1000):
    """
    Yield successive n-sized chunks from l.
    https://stackoverflow.com/a/312464/2597135
    """
    for i in range(0, len(l), n):
        yield l[i:i + n]

def _get_chunk(client, keys):
    """
    Get a single chunk
    """
    missing = []
    vals = client.get_multi(keys, missing=missing)
    return vals, missing

class DatastoreChunkClient(object):
    """
    Provides a thin wrapper around a Google Cloud Datastore client providing means
    of reading nd
    """
    def __init__(self, client, executor=None):
        self.client = client
        if executor is None:
            executor = ThreadPoolExecutor(16)
        self.executor = executor
    
    def get_multi(self, keys):
        """
        Thin wrapper around client.get_multi() that circumvents
        the 1000 read requests limit by doing 1000-sized chunked reads
        in parallel using self.executor.

        Returns (values, missing).
        """
        all_missing = []
        all_vals = []
        for vals, missing in self.executor.map(lambda chunk: _get_chunk(self.client, chunk), _chunks(keys, 1000)):
            all_vals += vals
            all_missing += missing
        return all_vals, all_missing

    def put_multi(self, entities):
        """
        Thin wrapper around client.put_multi() that circumvents
        the 400 read requests limit by doing 400-sized chunked reads
        in parallel using self.executor.

        Returns (values, missing).
        """
        for none in self.executor.map(lambda chunk: self.client.put_multi(chunk), _chunks(entities, 400)):
            pass

Usage example:

# Create "raw" google datastore client
client = datastore.Client(project="myproject-123456")
chunkClient = DatastoreChunkClient(client)

# The size of the key list is only limited by memory
keys = [...]
values, missing = chunkClient.get_multi(keys)

# The size of the entity list is only limited by memory
entities = [...]
chunkClient.put_multi(entities)