用python开发websocket服务有三种常见方案。1. 使用websockets库:轻量级适合学习,通过asyncio实现异步通信,安装简单且代码易懂,但不便集成到web框架;2. flask项目推荐flask-socketio:结合flask使用,支持rest api与websocket共存,部署需配合eventlet或gevent提升并发;3. django项目使用channels:完整支持django生态,通过asgi处理websocket请求,配置较复杂但适合大型项目。选择依据场景而定,小项目用websockets,已有flask选flask-socketio,django必用channels,并注意连接管理及性能优化。

用Python开发WebSocket服务,其实不难。如果你需要做实时通信,比如聊天应用、在线协作工具或者实时数据推送,WebSocket是个很合适的选择。相比传统的HTTP轮询,它能实现双向通信,效率更高,延迟更低。

Python生态中有一些现成的库可以帮你快速搭建WebSocket服务,下面我来分享几种常见方案和操作方法。

websockets 库:轻量级纯WebSocket服务如果你想从头开始构建一个简单的WebSocket服务,推荐使用 websockets 这个第三方库。它是基于asyncio的,适合做异步处理。
立即学习“Python免费学习笔记(深入)”;
安装方式很简单:

pip install websockets
写一个基础的服务端示例:
import asyncio
import websockets
async def echo(websocket, path):
async for message in websocket:
print(f"收到消息: {message}")
await websocket.send(f"服务器回复: {message}")
start_server = websockets.serve(echo, "localhost", 8765)
asyncio.get_event_loop().run_until_complete(start_server)
asyncio.get_event_loop().run_forever()这个例子会启动一个监听在 ws://localhost:8765 的WebSocket服务,接收客户端消息并原样返回。
客户端可以用浏览器测试,也可以用另一个Python脚本连接:
async def connect():
async with websockets.connect("ws://localhost:8765") as websocket:
await websocket.send("你好")
response = await websocket.recv()
print(response)
asyncio.get_event_loop().run_until_complete(connect())优点是简单易懂,适合学习或小型项目。缺点是如果要集成到Web框架里(比如Flask、Django),就不太方便了。
Flask-SocketIO 实现实时通信如果你已经有一个Flask项目,想加WebSocket功能,推荐使用 Flask-SocketIO。
安装依赖:
pip install flask-socketio eventlet
基本服务代码如下:
传统驾校预约方式步骤繁琐,效率低下,随着移动互联网科技和5G的革新,驾校考试领域迫切需要更加简洁、高效的预约方式,便捷人们的生活。因此设计基于微信小程序的驾校预约系统,改进传统驾校预约方式,实现高效的驾校学校预约。 采用腾讯提供的小程序云开发解决方案,无须服务器和域名。驾校预约管理:开始/截止时间/人数均可灵活设置,可以自定义客户预约填写的数据项驾校预约凭证:支持线下到场后校验签到/核销/二维码自
0
from flask import Flask, render_template
from flask_socketio import SocketIO, emit
app = Flask(__name__)
app.config['SECRET_KEY'] = 'secret!'
socketio = SocketIO(app)
@socketio.on('connect')
def handle_connect():
print('客户端已连接')
@socketio.on('message')
def handle_message(data):
print('收到消息:', data)
emit('response', f'服务器回应: {data}')
if __name__ == '__main__':
socketio.run(app, host='0.0.0.0', port=5000)前端HTML部分可以用JavaScript连接:
<script src="https://cdn.socket.io/4.3.2/socket.io.min.js"></script>
<script>
const socket = io('http://localhost:5000');
socket.on('connect', () => {
console.log('已连接到Flask WebSocket');
socket.emit('message', 'Hello from client');
});
socket.on('response', (data) => {
console.log('收到回复:', data);
});
</script>这种方式更适合已有Flask项目,或者需要结合REST API一起使用的场景。但注意,部署时最好配合 eventlet 或 gevent 才能支持并发连接。
如果你用的是Django,并且希望把WebSocket整合进现有项目,那就要用 Django Channels。它支持ASGI协议,可以同时处理HTTP和WebSocket请求。
安装:
pip install channels
配置步骤略多,主要修改点包括:
settings.py 中添加 'channels' 到 INSTALLED_APPS
ASGI_APPLICATION 指向你的路由文件consumers.py 处理WebSocket逻辑一个简单的Consumer示例如下:
from channels.generic.websocket import AsyncWebsocketConsumer
import json
class ChatConsumer(AsyncWebsocketConsumer):
async def connect(self):
await self.accept()
async def disconnect(self, close_code):
pass
async def receive(self, text_data):
data = json.loads(text_data)
message = data['message']
await self.send(text_data=json.dumps({'response': message}))然后在 routing.py 里定义路径:
from django.urls import re_path
from . import consumers
websocket_urlpatterns = [
re_path(r'ws/chat/$', consumers.ChatConsumer),
]这样就可以通过 /ws/chat/ 建立WebSocket连接了。
Channels的优势在于完整支持Django生态,适合大型项目。但上手门槛比前两个高一些,初期配置也稍微复杂。
websockets 库就够了。Flask-SocketIO。Channels。另外,WebSocket连接管理很重要。比如用户断开重连、广播消息、维护连接池等,这些细节在实际开发中都要考虑到。
基本上就这些。WebSocket开发虽然不算太复杂,但容易忽略连接管理和错误处理。刚开始可以先跑通最简例子,再逐步加上业务逻辑。
以上就是怎样用Python开发WebSocket服务?实时通信方案的详细内容,更多请关注php中文网其它相关文章!
每个人都需要一台速度更快、更稳定的 PC。随着时间的推移,垃圾文件、旧注册表数据和不必要的后台进程会占用资源并降低性能。幸运的是,许多工具可以让 Windows 保持平稳运行。
Copyright 2014-2025 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号