
在处理大规模数据分析或导出场景时,经常需要从 opensearch 集群中检索超过默认 10,000 条限制的文档。标准的 search api 设计用于快速获取少量相关结果,而 scroll api 则提供了一种机制,允许用户获取一个查询的完整快照,并通过多次请求逐步获取所有匹配的文档。它通过维护一个服务器端的查询上下文(快照),确保在迭代过程中即使索引数据发生变化,也能获取到一致的结果集。
首先,需要正确初始化 opensearch-py 客户端,以便与 OpenSearch 集群建立连接。这包括指定主机、端口、认证信息以及其他连接参数。
from opensearchpy import OpenSearch, RequestsHttpConnection
import csv
# OpenSearch 集群连接信息
host = 'your-opensearch-host' # 替换为你的 OpenSearch 主机
port = 443
auth = ('username', 'password') # 替换为你的认证凭据
# 初始化 OpenSearch 客户端
client = OpenSearch(
hosts=[{"host": host, "port": port}],
http_auth=auth,
use_ssl=True,
timeout=300, # 请求超时时间
verify_certs=True,
connection_class=RequestsHttpConnection,
pool_maxsize=20, # 连接池大小
)
# 验证连接是否成功
try:
info = client.info()
print(f"Connected to OpenSearch: {info['version']['distribution']} {info['version']['number']}")
except Exception as e:
print(f"Error connecting to OpenSearch: {e}")
exit()接下来,定义用于检索文档的查询体。查询体应包含筛选条件、返回字段以及每次滚动请求期望获取的文档数量(size)。虽然 size 参数在 Scroll API 中仍然存在,但它现在表示每次滚动请求返回的批次大小,而不是总结果限制。
query_body = {
"size": 10000, # 每次滚动请求返回的最大文档数
"timeout": "300s", # 查询超时时间
"query": {
"bool": {
"must": [
{"match": {"type": "req"}}, # 匹配 'type' 字段为 'req' 的文档
{"range": {"@timestamp": {"gte": "now-7d/d", "lte": "now/d"}}}, # 匹配最近7天的日志
{"wildcard": {"req_h_user_agent": {"value": "*googlebot*"}}}, # 匹配用户代理包含 'googlebot' 的文档
]
}
},
# 指定需要返回的字段,而不是整个 _source
"fields": [
"@timestamp",
"resp_status",
"resp_bytes",
"req_h_referer",
"req_h_user_agent",
"req_h_host",
"req_uri",
"total_response_time",
],
"_source": False, # 禁用返回完整的 _source 字段,仅返回指定 fields
}
index_name = "fastly-*" # 要查询的索引模式使用 client.search 方法发起第一个滚动请求。关键在于设置 scroll 参数,它指定了滚动上下文的有效时间。例如,'1m' 表示滚动上下文将保持活动状态 1 分钟。此请求将返回第一批匹配的文档以及一个 _scroll_id,该 ID 用于后续的滚动请求。
# 发起初始搜索请求,并创建滚动上下文
print("Initiating scroll search...")
initial_response = client.search(
index=index_name,
body=query_body,
scroll='1m', # 滚动上下文保持活动的时间
)
# 从初始响应中获取 _scroll_id
scroll_id = initial_response.get("_scroll_id")
if not scroll_id:
print("No scroll ID returned, possibly no results or an error occurred.")
# Handle cases where no scroll ID is returned (e.g., no results)
exit()
# 获取第一批命中结果
hits = initial_response["hits"]["hits"]
total_hits = initial_response["hits"]["total"]["value"]
print(f"Found {total_hits} total hits.")
print(f"Retrieved {len(hits)} hits in the first batch.")
all_results = []
if hits:
all_results.extend(hits)在获取到 _scroll_id 后,可以通过循环调用 client.scroll 方法来持续获取剩余的文档批次。每次调用 client.scroll 时,都需要传入上一次请求返回的 _scroll_id。当 client.scroll 返回的 hits 列表为空时,表示所有匹配的文档都已检索完毕,此时可以终止循环。
立即学习“Python免费学习笔记(深入)”;
在每次迭代中,更新 scroll_id 是非常重要的,因为 OpenSearch 可能会在每次滚动请求后返回一个新的 _scroll_id。
# 循环获取所有结果
retrieved_count = len(hits)
while len(hits) > 0:
print(f"Retrieving next batch using scroll ID: {scroll_id}")
scroll_response = client.scroll(
scroll='1m', # 每次滚动请求保持滚动上下文的有效时间
scroll_id=scroll_id,
)
# 获取新的 _scroll_id 和命中结果
scroll_id = scroll_response.get("_scroll_id")
hits = scroll_response["hits"]["hits"]
if hits:
all_results.extend(hits)
retrieved_count += len(hits)
print(f"Retrieved {len(hits)} more hits. Total retrieved: {retrieved_count}")
else:
print("No more hits found.")
break
# 清理滚动上下文(可选,通常在上下文过期后自动清理)
if scroll_id:
try:
client.clear_scroll(scroll_id=scroll_id)
print(f"Scroll context {scroll_id} cleared.")
except Exception as e:
print(f"Error clearing scroll context: {e}")
print(f"\nSuccessfully retrieved all {len(all_results)} results.")
# 示例:将结果写入 CSV 文件
output_file = "opensearch_results.csv"
with open(output_file, "w", newline="", encoding="utf-8") as f:
writer = csv.writer(f)
# 写入 CSV 头部
writer.writerow([
"timestamp", "response_code", "bytes", "url", "response_time",
"referer", "user_agent"
])
# 遍历所有结果并写入 CSV
for hit in all_results:
fields = hit.get("fields", {}) # 使用 .get() 避免 KeyError
# 提取并格式化数据
timestamp = fields.get("@timestamp", [""])[0]
resp_status = fields.get("resp_status", [""])[0]
resp_bytes = fields.get("resp_bytes", [""])[0]
req_h_host = fields.get("req_h_host", [""])[0]
req_uri = fields.get("req_uri", [""])[0]
full_url = f"{req_h_host}{req_uri}" if req_h_host and req_uri else ""
total_response_time = fields.get("total_response_time", [""])[0]
req_h_referer = fields.get("req_h_referer", [""])[0]
req_h_user_agent = fields.get("req_h_user_agent", [""])[0]
writer.writerow([
timestamp,
resp_status,
resp_bytes,
full_url,
total_response_time,
req_h_referer,
req_h_user_agent,
])
print(f"All results saved to {output_file}")通过 opensearch-py 客户端的 Scroll API,可以有效地绕过 OpenSearch 标准搜索的 10,000 条结果限制,实现对大规模数据集的完整检索。理解其工作原理、正确配置客户端、构建查询以及迭代获取结果是成功实现这一目标的关键。在实际应用中,还需要结合错误处理和资源管理策略,以确保数据获取过程的稳定性和效率。
以上就是如何使用 OpenSearch Python 客户端高效获取全部查询结果的详细内容,更多请关注php中文网其它相关文章!
每个人都需要一台速度更快、更稳定的 PC。随着时间的推移,垃圾文件、旧注册表数据和不必要的后台进程会占用资源并降低性能。幸运的是,许多工具可以让 Windows 保持平稳运行。
Copyright 2014-2025 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号