
在使用 aiohttp 发送大量并发 http 请求,尤其是每个请求携带大尺寸负载(例如,每个请求约 5 mb)时,开发者可能会遇到显著的性能瓶颈。一个常见的问题源于 aiohttp.clientsession.post() 方法中 json 参数的便捷性。当使用此参数时,aiohttp 内部会调用 json.dumps() 方法将 python 对象序列化为 json 字符串,然后编码为字节流。
对于大尺寸数据,json.dumps() 是一个同步的、CPU 密集型操作,可能耗时数十毫秒(例如 30-40 毫秒)。在 Python 的异步事件循环中,任何同步的、长时间运行的操作都会阻塞事件循环,阻止其处理其他待办任务。这意味着,如果有大量请求(例如 50 个),每个请求的 JSON 序列化都会阻塞事件循环,导致累积的阻塞时间显著增加(例如 50 * 30ms = 1500ms)。
这种阻塞效应会造成以下问题:
此外,网络层面的性能也至关重要。例如,DNS 解析(将域名转换为 IP 地址)也是一个潜在的阻塞点,尤其是在频繁建立新连接或不当复用会话时。
为了解决 JSON 序列化阻塞事件循环的问题,核心思想是将耗时的同步操作从主事件循环中剥离出来。
问题分析:aiohttp 的 json 参数内部调用 json.dumps(),这是一个同步的 CPU 密集型操作。当处理大型 JSON 负载时,它会长时间占用事件循环,导致其他异步任务无法执行。
解决方案: 手动预先序列化 JSON 数据,并将这个阻塞操作放入一个单独的线程中执行,从而避免阻塞主事件循环。这可以通过 asyncio.to_thread 实现。
示例代码:
import asyncio
import aiohttp
import json
import time
def prepare_json_data_sync(obj: dict) -> bytes:
"""
同步地将Python字典序列化为JSON字节流。
这个函数是CPU密集型的,适合在单独线程中运行。
"""
return json.dumps(obj).encode('utf-8')
async def send_large_request(session: aiohttp.ClientSession, url: str, payload: dict, request_id: int):
"""
发送一个大型POST请求,使用预序列化的JSON数据,避免阻塞事件循环。
"""
print(f"[{time.time():.2f}] 请求 {request_id}: 开始准备数据...")
# 使用 asyncio.to_thread 将阻塞的JSON序列化操作卸载到单独的线程
data_bytes = await asyncio.to_thread(prepare_json_data_sync, payload)
print(f"[{time.time():.2f}] 请求 {request_id}: 数据准备完成。发送请求...")
headers = {"Content-Type": "application/json"}
try:
async with session.post(url, data=data_bytes, headers=headers) as response:
print(f"[{time.time():.2f}] 请求 {request_id}: 收到响应,状态码: {response.status}")
return await response.text()
except aiohttp.ClientError as e:
print(f"[{time.time():.2f}] 请求 {request_id}: 发送失败 - {e}")
return None
async def main():
# 替换为你的实际测试URL,例如一个简单的HTTP echo server
# 为了演示效果,你可以运行一个本地的aiohttp服务器来接收请求
# 例如:
# from aiohttp import web
# async def handle(request):
# body = await request.read()
# await asyncio.sleep(0.1) # 模拟服务器处理延迟
# return web.Response(text=f"Received {len(body)} bytes from {request.path}")
# app = web.Application()
# app.router.add_post('/api/endpoint/{id}', handle)
# web.run_app(app, port=8080)
base_url = "http://localhost:8080/api/endpoint"
num_requests = 10 # 增加请求数量以更明显地观察效果
# 模拟一个较大的负载,例如一个包含大量数据的字典
# 实际场景中,这可能是数MB的数据
large_payload = {"data": "a" * (1024 * 100)} # 100KB字符串,实际可更大
async with aiohttp.ClientSession() as session:
tasks = [send_large_request(session, f"{base_url}/{i}", large_payload, i) for i in range(num_requests)]
await asyncio.gather(*tasks)
if __name__ == "__main__":
asyncio.run(main())注意事项:
除了 JSON 序列化,DNS 解析也是影响请求延迟的一个因素,尤其是在频繁建立新连接时。
问题分析: DNS 解析是将域名(如 example.com)转换为 IP 地址(如 93.184.216.34)的过程。这是一个网络操作,如果处理不当,可能会阻塞事件循环或引入额外的延迟。
解决方案:
安装 aiohttp[speedups]:aiohttp 提供了一个可选的依赖包 aiohttp[speedups],它会安装 aiodns。aiodns 是一个基于 C 语言的异步 DNS 解析器,能够显著加速 DNS 查找过程,并使其非阻塞。 安装命令:
pip install aiohttp[speedups]
安装后,aiohttp 会自动使用 aiodns 进行 DNS 解析,从而提高性能。
直接使用 IP 地址: 如果你的应用程序与内部服务通信,或者目标服务器的 IP 地址是稳定且已知的,你可以直接在 URL 中使用 IP 地址而不是域名。这样做可以完全跳过 DNS 解析步骤,从而消除这部分延迟。 例如:将 http://example.com/api 改为 http://93.184.216.34/api。 注意事项: 这种方法牺牲了灵活性和可维护性。IP 地址可能发生变化,并且对于公共服务或需要负载均衡的场景,直接使用 IP 地址通常不适用。
关键最佳实践:会话复用 (aiohttp.ClientSession): 这是最重要且最常被忽视的性能优化点。每次发送请求都创建一个新的 aiohttp.ClientSession 实例是严重的性能反模式,因为它会导致:
正确做法: 在应用程序的生命周期内,或者至少对于一组相关的请求,始终复用同一个 aiohttp.ClientSession 实例。通常,一个应用程序只需要一个全局的 ClientSession 实例。
import asyncio
import aiohttp
async def fetch_data(session: aiohttp.ClientSession, url: str):
async with session.get(url) as response:
return await response.text()
async def main_with_session_reuse():
# 在应用程序启动时创建一次会话
async with aiohttp.ClientSession() as session:
urls = ["http://example.com", "http://google.com", "http://github.com"]
tasks = [fetch_data(session, url) for url in urls]
results = await asyncio.gather(*tasks)
for url, result in zip(urls, results):
print(f"Fetched {url}: {result[:50]}...") # Print first 50 chars
if __name__ == "__main__":
asyncio.run(main_with_session_reuse())为了构建高性能、低延迟的 aiohttp 异步网络应用,特别是在处理大规模并发请求时,请务必遵循以下核心策略和最佳实践:
通过综合应用这些优化策略,开发者可以有效规避 aiohttp 在处理大规模并发请求时常见的性能瓶颈,确保应用程序具备高吞吐量和卓越的响应速度。
Copyright 2014-2025 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号