Schnelleres Python-Elasticsearch index() mit concurrent.futures ThreadPoolExecutor

English Deutsch

Im vorherigen Beitrag Elasticsearch Python minimales index() / insert-Beispiel wurde gezeigt, wie ein Dokument in Elasticsearch eingefügt wird.

Beim Einfügen einer großen Anzahl von Dokumenten in Elasticsearch fällt auf, dass es extrem langsam ist, auf den Abschluss des API-Aufrufs zu warten, bevor das nächste Dokument eingefügt wird.

In diesem Beitrag wird eine einfache Methode gezeigt, viele Anfragen parallel auszuführen, sodass mehrere index-Operationen gleichzeitig laufen, während der Code weitere Dokumente verarbeitet. Hierfür wird concurrent.futures.ThreadPoolExecutor verwendet und – nachdem alle Dokumente in die Warteschlange eingefügt wurden – concurrent.futures.wait verwendet, um auf den Abschluss aller Anfragen zu warten, bevor das Programm beendet wird.

es_index_threadpool_example.py
#!/usr/bin/env python3
from elasticsearch import Elasticsearch
from concurrent.futures import ThreadPoolExecutor
import concurrent.futures

index_executor = ThreadPoolExecutor(64)
futures = []

es = Elasticsearch()
for i in range(1000):
    future = index_executor.submit(es.index, index="test-index", id=i, body={"test": 123})
    futures.append(future)

print("Waiting for requests to complete...")
concurrent.futures.wait(futures)

Check out similar posts by category: Databases, ElasticSearch, Python