
本文旨在解决前端频繁轮询后端以获取实时状态更新的低效问题。我们将探讨两种高效的后端数据推送机制:server-sent events (sse) 和 websockets。重点介绍它们的工作原理、fastapi中的实现方式以及前端如何接收数据,并根据实际应用场景,提供选择这两种技术的指导,以实现从后端向前端的事件驱动型实时通信。
在现代Web应用中,实时数据更新是提升用户体验的关键。传统的做法是前端定时向后端发送请求(即轮询)以检查数据是否有更新。然而,当数据更新不频繁时,这种方式会造成大量的无效请求,浪费服务器资源和网络带宽,尤其是在硬件状态等可能长时间保持不变的场景下,其低效性尤为突出。为了解决这一问题,我们需要一种机制,让后端能够在数据发生变化时主动将信息推送给前端,而不是等待前端的请求。
实现后端向前端实时推送数据主要有两种主流技术:Server-Sent Events (SSE) 和 WebSockets。它们都允许服务器在数据可用时立即发送给客户端,从而避免了低效的轮询。
Server-Sent Events 是一种基于HTTP的单向通信技术,允许服务器向客户端推送事件流。它通过一个持久的HTTP连接工作,服务器可以随时向客户端发送文本事件。
工作原理: 当客户端发起一个SSE连接时,服务器会保持这个连接打开,并以 text/event-stream 的MIME类型发送数据。数据以特定格式(data: your_message\n\n)发送,客户端的浏览器会自动解析这些事件。
优点:
FastAPI 实现 SSE: 在FastAPI中,可以使用 StreamingResponse 结合一个异步生成器来创建SSE端点。当硬件状态发生变化时,后端可以将最新的状态作为事件发送给所有订阅的客户端。
from fastapi import FastAPI, Request
from fastapi.responses import StreamingResponse
import asyncio
import json
app = FastAPI()
# 模拟硬件状态
hardware_status = {"temperature": 25.0, "humidity": 60, "power_on": True}
# 存储待发送的事件
event_queue = asyncio.Queue()
# 模拟硬件状态更新(在实际应用中,这会由硬件监控脚本触发)
async def simulate_hardware_updates():
while True:
await asyncio.sleep(5) # 每5秒模拟一次状态更新
new_temperature = hardware_status["temperature"] + 0.5
new_humidity = hardware_status["humidity"] + (1 if new_temperature > 27 else -1)
# 假设只有温度或湿度变化才推送
if new_temperature != hardware_status["temperature"] or new_humidity != hardware_status["humidity"]:
hardware_status["temperature"] = round(new_temperature, 2)
hardware_status["humidity"] = round(new_humidity, 2)
print(f"Hardware status updated: {hardware_status}")
# 将更新后的状态放入事件队列
event_data = {"status": hardware_status, "timestamp": asyncio.time()}
await event_queue.put(json.dumps(event_data))
@app.on_event("startup")
async def startup_event():
asyncio.create_task(simulate_hardware_updates())
@app.get("/events")
async def sse_endpoint(request: Request):
async def event_generator():
while True:
# 检查客户端是否断开连接
if await request.is_disconnected():
print("Client disconnected from SSE.")
break
# 从队列获取事件
event_data = await event_queue.get()
yield f"data: {event_data}\n\n"
# 确保在没有事件时不会阻塞太久,可以加入一个短时间的延迟
await asyncio.sleep(0.1)
return StreamingResponse(event_generator(), media_type="text/event-stream")
# 额外的端点,用于手动触发状态更新(可选,用于测试)
@app.post("/update_status")
async def update_status(new_temp: float = 26.0, new_hum: int = 65):
hardware_status["temperature"] = new_temp
hardware_status["humidity"] = new_hum
event_data = {"status": hardware_status, "timestamp": asyncio.time()}
await event_queue.put(json.dumps(event_data))
return {"message": "Status updated and event queued."}
前端 (React) 接收 SSE: 前端使用 EventSource API来监听来自 /events 端点的事件。
import React, { useEffect, useState } from 'react';
function HardwareStatus() {
const [status, setStatus] = useState(null);
const [error, setError] = useState(null);
useEffect(() => {
// 创建 EventSource 实例
const eventSource = new EventSource('http://localhost:8000/events'); // 替换为你的FastAPI地址
// 监听 'message' 事件,这是默认的事件类型
eventSource.onmessage = (event) => {
try {
const data = JSON.parse(event.data);
setStatus(data.status);
setError(null); // 清除之前的错误
console.log("Received SSE event:", data);
} catch (e) {
console.error("Failed to parse SSE data:", e);
setError("Failed to parse data.");
}
};
// 监听 'open' 事件,表示连接已建立
eventSource.onopen = () => {
console.log('SSE connection opened.');
};
// 监听 'error' 事件
eventSource.onerror = (error) => {
console.error('SSE error:', error);
eventSource.close(); // 发生错误时关闭连接
setError("SSE connection error. Retrying...");
// EventSource 会自动尝试重连,但你也可以在这里自定义重连逻辑
};
// 组件卸载时关闭连接
return () => {
eventSource.close();
console.log('SSE connection closed.');
};
}, []); // 仅在组件挂载时运行一次
if (error) {
return <div>Error: {error}</div>;
}
if (!status) {
return <div>Connecting to hardware status updates...</div>;
}
return (
<div>
<h1>Hardware Status</h1>
<p>Temperature: {status.temperature}°C</p>
<p>Humidity: {status.humidity}%</p>
<p>Power On: {status.power_on ? 'Yes' : 'No'}</p>
</div>
);
}
export default HardwareStatus;WebSockets 提供了一个全双工的通信通道,允许客户端和服务器之间进行双向、低延迟的实时数据交换。与SSE不同,WebSockets在握手后建立一个持久的TCP连接,而不是基于HTTP请求-响应模型。
工作原理: 客户端通过一个特殊的HTTP握手请求升级到WebSocket协议。一旦握手成功,连接就升级为WebSocket,客户端和服务器可以独立地发送和接收数据帧。
优点:
FastAPI 实现 WebSockets: FastAPI通过 websocket 依赖提供了对WebSockets的良好支持。
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
import asyncio
import json
app = FastAPI()
# 模拟硬件状态
hardware_status_ws = {"temperature": 25.0, "humidity": 60, "power_on": True}
# 存储所有活跃的WebSocket连接
active_connections: list[WebSocket] = []
# 模拟硬件状态更新(在实际应用中,这会由硬件监控脚本触发)
async def simulate_hardware_updates_ws():
while True:
await asyncio.sleep(5) # 每5秒模拟一次状态更新
new_temperature = hardware_status_ws["temperature"] + 0.5
new_humidity = hardware_status_ws["humidity"] + (1 if new_temperature > 27 else -1)
if new_temperature != hardware_status_ws["temperature"] or new_humidity != hardware_status_ws["humidity"]:
hardware_status_ws["temperature"] = round(new_temperature, 2)
hardware_status_ws["humidity"] = round(new_humidity, 2)
print(f"Hardware status updated (WS): {hardware_status_ws}")
# 向所有连接的客户端广播更新
message = json.dumps({"status": hardware_status_ws, "timestamp": asyncio.time()})
for connection in active_connections:
try:
await connection.send_text(message)
except RuntimeError as e:
print(f"Error sending to WebSocket client: {e}")
# 可以在这里处理断开的连接,例如从 active_connections 中移除
@app.on_event("startup")
async def startup_event_ws():
asyncio.create_task(simulate_hardware_updates_ws())
@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
await websocket.accept()
active_connections.append(websocket)
print(f"New WebSocket connection: {websocket.client}")
# 首次连接时发送当前状态
initial_status = json.dumps({"status": hardware_status_ws, "timestamp": asyncio.time()})
await websocket.send_text(initial_status)
try:
while True:
# 可以在这里接收客户端发送的消息(如果需要双向通信)
# data = await websocket.receive_text()
# print(f"Received from client: {data}")
# await websocket.send_text(f"Message text was: {data}")
await asyncio.sleep(0.1) # 保持连接活跃,避免CPU空转
except WebSocketDisconnect:
active_connections.remove(websocket)
print(f"WebSocket client disconnected: {websocket.client}")
except Exception as e:
print(f"WebSocket error: {e}")
active_connections.remove(websocket)
前端 (React) 接收 WebSockets: 前端使用 WebSocket API来建立和管理连接。
import React, { useEffect, useState, useRef } from 'react';
function HardwareStatusWS() {
const [status, setStatus] = useState(null);
const [error, setError] = useState(null);
const ws = useRef(null); // 使用ref来存储WebSocket实例
useEffect(() => {
// 建立 WebSocket 连接
ws.current = new WebSocket('ws://localhost:8000/ws'); // 替换为你的FastAPI地址
ws.current.onopen = () => {
console.log('WebSocket connection opened.');
setError(null); // 清除之前的错误
};
ws.current.onmessage = (event) => {
try {
const data = JSON.parse(event.data);
setStatus(data.status);
console.log("Received WebSocket message:", data);
} catch (e) {
console.error("Failed to parse WebSocket data:", e);
setError("Failed to parse data.");
}
};
ws.current.onclose = (event) => {
console.log('WebSocket connection closed:', event.code, event.reason);
setError("WebSocket connection closed. Reconnecting...");
// 可以实现重连逻辑
setTimeout(() => {
// Simple reconnect logic, consider more robust solutions for production
if (ws.current && ws.current.readyState === WebSocket.CLOSED) {
console.log("Attempting to reconnect WebSocket...");
ws.current = null; // Clear old instance
// Trigger effect to re-establish connection
// This is a simple way, often a dedicated reconnect function is better
// For simplicity, we'll let the effect re-run if dependencies change, or manually call a reconnect function
// For now, simply setting ws.current to null and letting the next render potentially re-trigger setup is too indirect.
// A more direct approach:
// ws.current = new WebSocket('ws://localhost:8000/ws'); // Re-initiate connection
// And then re-attach handlers, or better, wrap this in a function.
}
}, 3000); // 3秒后尝试重连
};
ws.current.onerror = (error) => {
console.error('WebSocket error:', error);
setError("WebSocket connection error.");
};
// 组件卸载时关闭连接
return () => {
if (ws.current) {
ws.current.close();
console.log('WebSocket connection cleaned up.');
}
};
}, []); // 仅在组件挂载时运行一次
// 示例:向服务器发送消息(如果需要双向通信)
// const sendMessage = () => {
// if (ws.current && ws.current.readyState === WebSocket.OPEN) {
// ws.current.send(JSON.stringify({ message: "Hello from client!" }));
// }
// };
if (error) {
return <div>Error: {error}</div>;
}
if (!status) {
return <div>Connecting to hardware status updates via WebSocket...</div>;
}
return (
<div>
<h1>Hardware Status (WebSocket)</h1>
<p>Temperature: {status.temperature}°C</p>
<p>Humidity: {status.humidity}%</p>
<p>Power On: {status.power_on ? 'Yes' : 'No'}</p>
{/* <button onClick={sendMessage}>Send Message</button> */}
</div>
);
}
export default HardwareStatusWS;在实际应用中,选择SSE还是WebSockets取决于具体的业务需求:
SSE (Server-Sent Events):
WebSockets:
针对本案例(硬件状态更新,可能长时间无变化): 由于硬件状态更新属于服务器向客户端的单向推送,且可能长时间处于空闲状态,SSE是更推荐的选择。它的实现更简单,并且内置的自动重连机制能够很好地处理连接中断的情况,而无需客户端进行复杂的重连逻辑。WebSockets虽然也能实现,但对于这种单向推送的场景,其双向通信的能力并没有被充分利用,反而增加了实现的复杂性。
通过采用SSE或WebSockets,我们可以彻底告别低效的轮询机制,实现后端数据向前端的实时、事件驱动型推送。对于本例中硬件状态更新这种以服务器向客户端单向推送为主,且可能长时间空闲的场景,SSE因其简洁性和内置的自动重连特性而成为更优的选择。而当需要客户端与服务器进行频繁双向通信时,WebSockets则能提供更强大的支持。理解这两种技术的特点并根据实际需求做出明智的选择,是构建高效实时Web应用的关键。
以上就是FastAPI实现后端实时推送:告别轮询,拥抱SSE与WebSocket的详细内容,更多请关注php中文网其它相关文章!
每个人都需要一台速度更快、更稳定的 PC。随着时间的推移,垃圾文件、旧注册表数据和不必要的后台进程会占用资源并降低性能。幸运的是,许多工具可以让 Windows 保持平稳运行。
Copyright 2014-2025 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号