如何在 Python 中规避 Google Cloud Storage 1000 读取 / 400 写入限制

Google Cloud Datastore 对 get 请求有内置的 1000 个键限制,对 put 限制为每请求 400 个实体。如果你达到限制,你将看到以下错误消息之一:

output.txt
google.api_core.exceptions.InvalidArgument: 400 cannot get more than 1000 keys in a single call
google.api_core.exceptions.InvalidArgument: 400 cannot write more than 500 entities in a single call

你可以通过分块请求来修复此问题,即一次只对 get 等执行 1000 个请求。

此代码提供了一个自动化此过程的类的现成示例。作为额外好处,它使用 concurrent.futures.Executor 并行执行 1000(用于 get)或 400(用于 put)个分块的请求。由于性能预期为 IO 密集型,建议使用 concurrent.futures.ThreadPoolExecutor。 如果你在构造时不给类一个执行器,它会自己创建一个。

datastore_chunk_client.py
import itertools
from concurrent.futures import ThreadPoolExecutor

def _chunks(l, n=1000):
    """
    从 l 中生成连续的 n 大小的块。
    https://stackoverflow.com/a/312464/2597135
    """
    for i in range(0, len(l), n):
        yield l[i:i + n]

def _get_chunk(client, keys):
    """
    获取单个块
    """
    missing = []
    vals = client.get_multi(keys, missing=missing)
    return vals, missing

class DatastoreChunkClient(object):
    """
    提供 Google Cloud Datastore 客户端的轻量封装,提供读取和写入的方式
    """
    def __init__(self, client, executor=None):
        self.client = client
        if executor is None:
            executor = ThreadPoolExecutor(16)
        self.executor = executor

    def get_multi(self, keys):
        """
        client.get_multi() 的轻量封装,通过使用 self.executor 并行执行 1000 大小的分块读取来规避 1000 读取请求限制。

        返回 (values, missing)。
        """
        all_missing = []
        all_vals = []
        for vals, missing in self.executor.map(lambda chunk: _get_chunk(self.client, chunk), _chunks(keys, 1000)):
            all_vals += vals
            all_missing += missing
        return all_vals, all_missing

    def put_multi(self, entities):
        """
        client.put_multi() 的轻量封装,通过使用 self.executor 并行执行 400 大小的分块写入来规避 400 写入请求限制。

        返回 (values, missing)。
        """
        for none in self.executor.map(lambda chunk: self.client.put_multi(chunk), _chunks(entities, 400)):
            pass

用法示例:

datastore_usage_example.py
# 创建"原始"的 google datastore 客户端
client = datastore.Client(project="myproject-123456")
chunkClient = DatastoreChunkClient(client)

# 键列表的大小仅受内存限制
keys = [...]
values, missing = chunkClient.get_multi(keys)

# 实体列表的大小仅受内存限制
entities = [...]
chunkClient.put_multi(entities)

Check out similar posts by category: Cloud, Python