
在 Go 中,golang.org/x/net/websocket 包为每个传入的 WebSocket 连接创建一个独立的协程来处理。默认的“回显服务器”示例简单地将收到的数据回传给发送者。然而,要实现一个聊天服务器或任何需要消息广播功能的应用,单个连接的处理器需要能够访问并向其他所有活跃的连接发送数据。这引入了一个核心的并发挑战:如何安全、高效地共享和管理这些连接的状态?
直接在每个连接的协程中维护所有连接的列表并进行广播是不可行的,因为这将导致并发读写共享数据的问题。Go 提供了两种主要的方法来解决这个问题:利用通道进行协调,或者使用互斥锁保护共享内存。
这是 Go 语言中处理并发共享状态的惯用(idiomatic)方法,它遵循“不要通过共享内存来通信;而是通过通信来共享内存”的原则。这种模式的核心是一个独立的“枢纽”(Hub)或“管理器”协程,它负责维护所有活跃的 WebSocket 连接,并协调消息的注册、注销和广播。
package main
import (
"fmt"
"io"
"log"
"net/http"
"time" // 引入 time 包用于日志时间戳
"golang.org/x/net/websocket" // 引入 WebSocket 包
)
// Hub 结构体管理 WebSocket 连接和消息广播
type Hub struct {
// 注册的客户端连接,使用 map[连接指针]bool 来高效存储
clients map[*websocket.Conn]bool
// 从客户端接收到的入站消息
broadcast chan []byte
// 客户端注册请求通道
register chan *websocket.Conn
// 客户端注销请求通道
unregister chan *websocket.Conn
}
// NewHub 创建并返回一个新的 Hub 实例
func NewHub() *Hub {
return &Hub{
broadcast: make(chan []byte),
register: make(chan *websocket.Conn),
unregister: make(chan *websocket.Conn),
clients: make(map[*websocket.Conn]bool),
}
}
// Run 启动 Hub 协程,监听其通道上的事件
func (h *Hub) Run() {
for {
select {
case client := <-h.register:
// 注册新客户端
h.clients[client] = true
log.Printf("[%s] 客户端已注册: %s, 当前连接数: %d\n", time.Now().Format("15:04:05"), client.RemoteAddr(), len(h.clients))
case client := <-h.unregister:
// 注销客户端
if _, ok := h.clients[client]; ok {
delete(h.clients, client)
client.Close() // 关闭连接
log.Printf("[%s] 客户端已注销: %s, 当前连接数: %d\n", time.Now().Format("15:04:05"), client.RemoteAddr(), len(h.clients))
}
case message := <-h.broadcast:
// 广播消息给所有活跃客户端
for client := range h.clients {
_, err := client.Write(message)
if err != nil {
// 写入失败通常意味着客户端已断开,进行注销处理
log.Printf("[%s] 写入客户端 %s 失败: %v。正在注销。\n", time.Now().Format("15:04:05"), client.RemoteAddr(), err)
// 使用 select 防止 unregister 通道阻塞
select {
case h.unregister <- client:
default:
// 如果注销通道已满,则跳过,避免死锁
log.Printf("[%s] 注销通道阻塞,无法立即注销客户端 %s\n", time.Now().Format("15:04:05"), client.RemoteAddr())
}
}
}
}
}
}
// WebSocketHandler 处理单个 WebSocket 连接
func WebSocketHandler(hub *Hub, ws *websocket.Conn) {
// defer 确保连接关闭和注销操作在函数退出时执行
defer func() {
hub.unregister <- ws
ws.Close()
}()
// 注册当前连接到 Hub
hub.register <- ws
// 持续从 WebSocket 连接中读取消息
buff := make([]byte, 512) // 读取缓冲区
for {
n, err := ws.Read(buff)
if err != nil {
if err != io.EOF {
log.Printf("[%s] 从客户端 %s 读取错误: %v\n", time.Now().Format("15:04:05"), ws.RemoteAddr(), err)
} else {
log.Printf("[%s] 客户端 %s 已断开连接 (EOF)\n", time.Now().Format("15:04:05"), ws.RemoteAddr())
}
break // 发生错误或 EOF 时退出读取循环
}
message := buff[:n]
log.Printf("[%s] 收到来自 %s 的消息: %s\n", time.Now().Format("15:04:05"), ws.RemoteAddr(), string(message))
// 将收到的消息发送到 Hub 的广播通道
hub.broadcast <- message
}
}
func main() {
// 创建并启动 Hub 协程
hub := NewHub()
go hub.Run()
// 注册 WebSocket 处理器
http.Handle("/echo", websocket.Handler(func(ws *websocket.Conn) {
WebSocketHandler(hub, ws)
}))
port := ":12345"
log.Printf("[%s] 服务器启动中,监听端口 %s\n", time.Now().Format("15:04:05"), port)
err := http.ListenAndServe(port, nil)
if err != nil {
log.Fatalf("[%s] ListenAndServe 失败: %v\n", time.Now().Format("15:04:05"), err)
}
}另一种相对简单但需要更谨慎处理并发的方法是使用一个全局的 map 来存储所有连接,并使用 sync.RWMutex(读写互斥锁)来保护对该 map 的并发访问。
package main
import (
"fmt"
"io"
"log"
"net/http"
"sync"
"time"
"golang.org/x/net/websocket"
)
// 全局客户端连接映射和读写互斥锁
var (
globalClients = make(map[*websocket.Conn]bool) // 存储活跃连接
clientsMutex = &sync.RWMutex{} // 保护 globalClients 的读写
)
// EchoServerGlobal 处理单个 WebSocket 连接,并尝试广播
func EchoServerGlobal(ws *websocket.Conn) {
// 1. 将新连接添加到全局映射
clientsMutex.Lock()
globalClients[ws] = true
log.Printf("[%s] 客户端已注册 (全局): %s, 当前连接数: %d\n", time.Now().Format("15:04:05"), ws.RemoteAddr(), len(globalClients))
clientsMutex.Unlock()
// 2. defer 确保连接关闭和从全局映射中移除
defer func() {
clientsMutex.Lock()
delete(globalClients, ws)
log.Printf("[%s] 客户端已注销 (全局): %s, 当前连接数: %d\n", time.Now().Format("15:04:05"), ws.RemoteAddr(), len(globalClients))
clientsMutex.Unlock()
ws.Close()
}()
// 3. 持续从连接中读取消息并广播
buff := make([]byte, 512)
for {
n, err := ws.Read(buff)
if err != nil {
if err != io.EOF {
log.Printf("[%s] 从客户端 %s 读取错误: %v\n", time.Now().Format("15:04:05"), ws.RemoteAddr(), err)
} else {
log.Printf("[%s] 客户端 %s 已断开连接 (EOF)\n", time.Now().Format("15:04:05"), ws.RemoteAddr())
}
break // 退出读取循环
}
message := buff[:n]
log.Printf("[%s] 收到来自 %s 的消息: %s\n", time.Now().Format("15:04:05"), ws.RemoteAddr(), string(message))
// 广播消息给所有活跃客户端
clientsMutex.RLock() // 读锁定,允许其他协程同时读
for client := range globalClients {
if client == ws {
// 避免将消息回显给发送者自身,如果不需要的话
continue
}
_, err := client.Write(message)
if err != nil {
log.Printf("[%s] 写入客户端 %s 失败: %v。标记为移除。\n", time.Now().Format("15:04:05"), client.RemoteAddr(), err)
// 注意:在 RLock 内部删除 map 元素是不安全的,因为 RLock 是共享锁。
// 更安全的做法是:收集需要移除的客户端列表,然后在 RUnlock 之后进行写锁定并移除。
// 或者,将移除操作发送到一个单独的清理协程。
}
}
clientsMutex.RUnlock() // 释放读锁定
}
}
func main() {
http.Handle("/echo_global", websocket.Handler(EchoServerGlobal))
port := ":12346"
log.Printf("[%s] 全局模式服务器启动中,监听端口 %s\n", time.Now().Format("15:04:05"), port)
err := http.ListenAndServe(port, nil)
if err != nil {
log.Fatalf("[%s] ListenAndServe 失败: %v\n", time.Now().Format("15:04:05"), err)
}
}以上就是Go WebSocket 服务器中实现连接广播:管理客户端连接的两种模式的详细内容,更多请关注php中文网其它相关文章!
每个人都需要一台速度更快、更稳定的 PC。随着时间的推移,垃圾文件、旧注册表数据和不必要的后台进程会占用资源并降低性能。幸运的是,许多工具可以让 Windows 保持平稳运行。
Copyright 2014-2025 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号