使用 asyncio.wait() 优化 WebSocket 广播性能

花韻仙語
发布: 2025-08-19 20:26:01
原创
786人浏览过

使用 asyncio.wait() 优化 websocket 广播性能

正如摘要所述,本文将深入探讨在使用 websockets 库构建 WebSocket 服务时,如何解决 websockets.broadcast() 在无限循环中可能导致的阻塞问题,并提供替代方案 asyncio.wait(),帮助开发者构建高性能的 WebSocket 应用。

问题背景与分析

在使用 websockets 库构建实时应用时,经常需要将服务器端的数据广播到所有连接的客户端。一种常见的做法是在一个无限循环中读取数据(例如视频帧),进行处理,然后使用 websockets.broadcast() 将结果广播出去。

然而,当客户端数量增多,或者数据处理过程耗时较长时,websockets.broadcast() 可能会阻塞事件循环,导致客户端无法及时接收到数据,甚至出现连接超时等问题。这是因为 websockets.broadcast() 实际上是一个同步操作,它会依次向每个客户端发送数据,直到所有客户端都发送完毕才返回。

解决方案:使用 asyncio.wait() 进行异步广播

为了解决这个问题,我们可以使用 asyncio.wait() 来实现异步广播。asyncio.wait() 允许我们并发地执行多个异步任务,而不会阻塞事件循环。

以下是使用 asyncio.wait() 替代 websockets.broadcast() 的代码示例:

Server.py:

import websockets
import cv2
import asyncio
import time

def predict(image):
    # 模拟耗时的数据处理过程
    time.sleep(0.1)
    return "test"

async def echo(websocket, path):
    global vidCap, i
    while True:
        ret, image = vidCap.read()
        if ret:
            start = time.time()
            result = predict(image)
            # 使用 asyncio.wait() 进行异步广播
            await asyncio.wait([ws.send(result) for ws in clients])
            end = time.time()
            print("exec time:%f s" % (end - start))
        else:
            # 视频读取完毕,退出循环
            break

async def handler(websocket, path):
    clients.add(websocket)
    try:
        await echo(websocket, path)
    finally:
        clients.remove(websocket)

async def serve():
    start_server = await websockets.serve(handler, "localhost", 8765)
    await start_server.wait_closed()

if __name__ == '__main__':
    vidCap = cv2.VideoCapture(0) # 使用摄像头
    clients = set()
    asyncio.run(serve())
登录后复制

Client.py:

百度虚拟主播
百度虚拟主播

百度智能云平台的一站式、灵活化的虚拟主播直播解决方案

百度虚拟主播 36
查看详情 百度虚拟主播
import websockets
import asyncio
import time

async def get_result(uri):
    async with websockets.connect(uri) as websocket:
        while True:
            try:
                start = time.time()
                recv_text = await websocket.recv()
                print(recv_text)
                end = time.time()
                print("exec:%f s" % (end - start))
            except websockets.exceptions.ConnectionClosed:
                print("Connection closed")
                break
            except Exception as e:
                print(f"Error: {e}")
                break

if __name__ == '__main__':
    asyncio.run(get_result("ws://127.0.0.1:8765/ws"))
登录后复制

代码解释:

  1. asyncio.wait([ws.send(result) for ws in clients]): 这行代码的关键在于使用列表推导式 [ws.send(result) for ws in clients] 创建了一个包含所有客户端发送任务的列表。然后,asyncio.wait() 并发地执行这些任务,这意味着服务器可以同时向所有客户端发送数据,而不会阻塞事件循环。

  2. 异常处理: 在Client.py中添加了websockets.exceptions.ConnectionClosed异常的处理,可以更优雅地处理客户端断开连接的情况。

websockets.broadcast() vs. asyncio.wait()

特性 websockets.broadcast() asyncio.wait()
执行方式 同步 异步
阻塞性 会阻塞事件循环 不会阻塞事件循环
并发性 支持并发执行多个发送任务
适用场景 客户端数量较少,数据处理速度快 客户端数量较多,数据处理速度慢,需要高并发性能的场景

总结:

  • websockets.broadcast() 简单易用,但在高并发场景下性能较差。
  • asyncio.wait() 可以实现异步广播,在高并发场景下性能更优。

注意事项

  • 错误处理: 在使用 asyncio.wait() 时,需要注意错误处理。如果某个客户端的发送任务失败,asyncio.wait() 会抛出异常。你需要根据实际情况选择合适的错误处理策略,例如忽略错误、重试或关闭连接。
  • 性能优化: 虽然 asyncio.wait() 可以提高并发性能,但在客户端数量非常庞大时,仍然可能成为性能瓶颈。可以考虑使用更高级的广播算法,例如基于发布/订阅模式的消息队列。
  • 视频流读取: 示例代码中使用 cv2.VideoCapture(0) 从摄像头读取视频流。 你可以根据实际情况修改为读取本地视频文件或 RTSP 流。请确保安装了 OpenCV 库 (pip install opencv-python).
  • 资源释放: 在程序结束时,务必释放视频捕获对象 vidCap.release(),以避免资源泄露。

通过使用 asyncio.wait() 替代 websockets.broadcast(),可以显著提高 WebSocket 服务的并发性能和响应速度,从而构建更稳定、更高效的实时应用。 记住,选择合适的广播方式取决于你的具体应用场景和性能需求。

以上就是使用 asyncio.wait() 优化 WebSocket 广播性能的详细内容,更多请关注php中文网其它相关文章!

数码产品性能查询
数码产品性能查询

该软件包括了市面上所有手机CPU,手机跑分情况,电脑CPU,电脑产品信息等等,方便需要大家查阅数码产品最新情况,了解产品特性,能够进行对比选择最具性价比的商品。

下载
来源:php中文网
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系admin@php.cn
最新问题
开源免费商场系统广告
热门教程
更多>
最新下载
更多>
网站特效
网站源码
网站素材
前端模板
关于我们 免责申明 举报中心 意见反馈 讲师合作 广告合作 最新更新 English
php中文网:公益在线php培训,帮助PHP学习者快速成长!
关注服务号 技术交流群
PHP中文网订阅号
每天精选资源文章推送
PHP中文网APP
随时随地碎片化学习

Copyright 2014-2025 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号