通过使用 concurrent.futures ThreadPoolExecutor 加速 Python Elasticsearch index()

在我们之前的文章Elasticsearch Python 最小 index() / insert 示例中,我们展示了如何将文档插入 Elasticsearch。

向 Elasticsearch 插入大量文档时,你会注意到在尝试插入文档之前等待 API 调用完成非常慢。

在这篇文章中,我们将展示一种并行执行许多请求的简单方法,以便多个 index 操作并发运行,同时你的代码正在处理更多文档。为此,我们将使用 concurrent.futures.ThreadPoolExecutor,并在将所有文档插入队列后,使用 concurrent.futures.wait 等待所有请求完成后再退出。

es_index_threadpool_example.py
#!/usr/bin/env python3
from elasticsearch import Elasticsearch
from concurrent.futures import ThreadPoolExecutor
import concurrent.futures

index_executor = ThreadPoolExecutor(64)
futures = []

es = Elasticsearch()
for i in range(1000):
    future = index_executor.submit(es.index, index="test-index", id=i, body={"test": 123})
    futures.append(future)

print("等待请求完成...")
concurrent.futures.wait(futures)

Check out similar posts by category: Databases, ElasticSearch, Python