
正如摘要所述,本文将深入探讨在使用 websockets 库构建 WebSocket 服务时,如何解决 websockets.broadcast() 在无限循环中可能导致的阻塞问题,并提供替代方案 asyncio.wait(),帮助开发者构建高性能的 WebSocket 应用。
在使用 websockets 库构建实时应用时,经常需要将服务器端的数据广播到所有连接的客户端。一种常见的做法是在一个无限循环中读取数据(例如视频帧),进行处理,然后使用 websockets.broadcast() 将结果广播出去。
然而,当客户端数量增多,或者数据处理过程耗时较长时,websockets.broadcast() 可能会阻塞事件循环,导致客户端无法及时接收到数据,甚至出现连接超时等问题。这是因为 websockets.broadcast() 实际上是一个同步操作,它会依次向每个客户端发送数据,直到所有客户端都发送完毕才返回。
为了解决这个问题,我们可以使用 asyncio.wait() 来实现异步广播。asyncio.wait() 允许我们并发地执行多个异步任务,而不会阻塞事件循环。
以下是使用 asyncio.wait() 替代 websockets.broadcast() 的代码示例:
Server.py:
import websockets
import cv2
import asyncio
import time
def predict(image):
# 模拟耗时的数据处理过程
time.sleep(0.1)
return "test"
async def echo(websocket, path):
global vidCap, i
while True:
ret, image = vidCap.read()
if ret:
start = time.time()
result = predict(image)
# 使用 asyncio.wait() 进行异步广播
await asyncio.wait([ws.send(result) for ws in clients])
end = time.time()
print("exec time:%f s" % (end - start))
else:
# 视频读取完毕,退出循环
break
async def handler(websocket, path):
clients.add(websocket)
try:
await echo(websocket, path)
finally:
clients.remove(websocket)
async def serve():
start_server = await websockets.serve(handler, "localhost", 8765)
await start_server.wait_closed()
if __name__ == '__main__':
vidCap = cv2.VideoCapture(0) # 使用摄像头
clients = set()
asyncio.run(serve())Client.py:
import websockets
import asyncio
import time
async def get_result(uri):
async with websockets.connect(uri) as websocket:
while True:
try:
start = time.time()
recv_text = await websocket.recv()
print(recv_text)
end = time.time()
print("exec:%f s" % (end - start))
except websockets.exceptions.ConnectionClosed:
print("Connection closed")
break
except Exception as e:
print(f"Error: {e}")
break
if __name__ == '__main__':
asyncio.run(get_result("ws://127.0.0.1:8765/ws"))代码解释:
asyncio.wait([ws.send(result) for ws in clients]): 这行代码的关键在于使用列表推导式 [ws.send(result) for ws in clients] 创建了一个包含所有客户端发送任务的列表。然后,asyncio.wait() 并发地执行这些任务,这意味着服务器可以同时向所有客户端发送数据,而不会阻塞事件循环。
异常处理: 在Client.py中添加了websockets.exceptions.ConnectionClosed异常的处理,可以更优雅地处理客户端断开连接的情况。
| 特性 | websockets.broadcast() | asyncio.wait() |
|---|---|---|
| 执行方式 | 同步 | 异步 |
| 阻塞性 | 会阻塞事件循环 | 不会阻塞事件循环 |
| 并发性 | 无 | 支持并发执行多个发送任务 |
| 适用场景 | 客户端数量较少,数据处理速度快 | 客户端数量较多,数据处理速度慢,需要高并发性能的场景 |
总结:
通过使用 asyncio.wait() 替代 websockets.broadcast(),可以显著提高 WebSocket 服务的并发性能和响应速度,从而构建更稳定、更高效的实时应用。 记住,选择合适的广播方式取决于你的具体应用场景和性能需求。
以上就是使用 asyncio.wait() 优化 WebSocket 广播性能的详细内容,更多请关注php中文网其它相关文章!
Copyright 2014-2025 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号