
在构建基于 fastapi 等异步框架的应用程序时,我们通常会选择 elasticsearch-py 库提供的 asyncelasticsearch 客户端来与 elasticsearch 集群进行交互,以充分利用异步i/o的优势。然而,当需要执行批量数据操作(如批量索引、更新或删除)时,开发者可能会遇到一个常见的困惑:库中标准的 elasticsearch.helpers.bulk 函数并不直接支持 asyncelasticsearch 客户端。尝试将其与异步客户端一同使用会导致类型错误或无法预期的行为,因为它被设计用于同步的 elasticsearch 客户端。
为了解决这一问题,elasticsearch-py 库专门为 AsyncElasticsearch 客户端提供了一套异步辅助函数,其中就包括 elasticsearch.helpers.async_bulk。这个函数是 helpers.bulk 的异步对应版本,它能够与 AsyncElasticsearch 实例无缝协作,以非阻塞的方式执行批量操作,确保应用程序的响应性和性能。
下面是一个如何在异步环境中利用 async_bulk 进行批量索引操作的示例。我们将演示如何准备数据、调用 async_bulk 以及处理操作结果。
import asyncio
from elasticsearch import AsyncElasticsearch, helpers
# 假设您的Elasticsearch运行在本地,并使用默认端口
# 实际应用中,请替换为您的ES集群地址
ES_HOST = "http://localhost:9200"
INDEX_NAME = "my_async_index"
async def perform_async_bulk_indexing():
# 初始化 AsyncElasticsearch 客户端
# 建议使用 async with 语句管理客户端生命周期
async with AsyncElasticsearch(ES_HOST) as es:
# 1. 检查并创建索引(如果不存在)
if not await es.indices.exists(index=INDEX_NAME):
await es.indices.create(index=INDEX_NAME)
print(f"索引 '{INDEX_NAME}' 已创建。")
else:
print(f"索引 '{INDEX_NAME}' 已存在。")
# 2. 准备要批量操作的数据
# 每个字典代表一个操作,通常包含 "_index", "_id", "_source"
documents = [
{
"_index": INDEX_NAME,
"_id": "doc1",
"_source": {"title": "Async Bulk Operations", "author": "Alice", "views": 100}
},
{
"_index": INDEX_NAME,
"_id": "doc2",
"_source": {"title": "Elasticsearch in Python", "author": "Bob", "views": 150}
},
{
"_index": INDEX_NAME,
"_id": "doc3",
"_source": {"title": "FastAPI with Elasticsearch", "author": "Charlie", "views": 200}
},
{
"_index": INDEX_NAME,
"_id": "doc4",
"_source": {"title": "Optimizing Async Applications", "author": "Alice", "views": 120}
},
]
print(f"\n开始批量索引 {len(documents)} 篇文档...")
# 3. 调用 helpers.async_bulk 执行批量操作
# actions 参数可以是一个生成器或列表
# yield_ok=False 表示只返回失败的文档信息,默认是True
success_count, failed_actions = await helpers.async_bulk(
es,
documents,
index=INDEX_NAME, # 可以在这里指定默认索引,也可以在每个文档中指定
chunk_size=500, # 每次发送到ES的文档数量
max_retries=3, # 失败后重试次数
initial_backoff=2, # 初始重试等待时间(秒)
max_backoff=60, # 最大重试等待时间(秒)
raise_on_error=False, # 遇到错误时不抛出异常,而是返回失败列表
raise_on_exception=False # 遇到异常时不抛出异常,而是返回失败列表
)
print(f"\n批量操作完成。")
print(f"成功索引文档数量: {success_count}")
# 4. 处理失败的文档
if failed_actions:
print(f"以下文档未能成功索引 ({len(failed_actions)} 篇):")
for item in failed_actions:
print(f" - {item}")
else:
print("所有文档均成功索引。")
# 5. 刷新索引并查询验证
await es.indices.refresh(index=INDEX_NAME)
search_result = await es.search(index=INDEX_NAME, query={"match_all": {}})
print(f"\n索引 '{INDEX_NAME}' 中当前文档总数: {search_result['hits']['total']['value']}")
if __name__ == "__main__":
asyncio.run(perform_async_bulk_indexing())通过本文的介绍和示例,我们了解到在 AsyncElasticsearch 中执行异步批量操作的关键在于使用 elasticsearch.helpers.async_bulk 函数。它不仅解决了与异步客户端的兼容性问题,还提供了丰富的参数配置,使得开发者能够构建高效、健壮且符合异步编程范式的 Elasticsearch 数据处理逻辑。掌握 async_bulk 的使用,是提升基于 AsyncElasticsearch 应用性能和可靠性的重要一步。
以上就是AsyncElasticsearch 异步批量操作实践指南的详细内容,更多请关注php中文网其它相关文章!
每个人都需要一台速度更快、更稳定的 PC。随着时间的推移,垃圾文件、旧注册表数据和不必要的后台进程会占用资源并降低性能。幸运的是,许多工具可以让 Windows 保持平稳运行。
Copyright 2014-2025 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号