Go 语言可通过 sync.RWMutex + sync.Map + channel 实现线程安全、可取消的事件总线,核心是解耦“数据变更”与“响应逻辑”,关键在事件类型约束、结构化事件载体、监听器 goroutine 安全退出及业务语义设计。

Go 语言没有内置的观察者接口或事件总线,但用 map + chan + 接口组合就能写出轻量、线程安全、可取消的通知机制——关键不在“模式复刻”,而在解决实际场景中“谁改了数据、谁要响应”的耦合问题。
如何定义通用事件类型和订阅者接口
硬编码事件名(如 "user.created")容易拼错且难维护,推荐用自定义类型约束;订阅者必须能接收事件并返回是否继续处理(用于支持中断链式响应)。
-
EventType定义为string别名,便于集中管理常量(如type EventType string; const UserCreated EventType = "user.created") - 订阅者接口只需一个
Handle(event interface{}) bool方法:返回false表示终止后续监听器执行(类似中间件短路) - 避免用
interface{}做事件载体,应为每类事件定义结构体(如UserCreatedEvent),提升类型安全和 IDE 支持
用 sync.Map + channel 实现线程安全的事件分发器
直接用 map[EventType][]func(interface{}) 在并发注册/触发时会 panic;sync.Map 虽支持并发读写,但无法保证通知顺序;更稳妥的做法是把事件推入每个监听器专属的 chan,由监听器自己消费。
- 分发器内部用
sync.RWMutex保护map[EventType][]*eventListener,注册/注销走写锁,触发只读锁 - 每个
*eventListener持有chan interface{}和关闭信号done chan struct{},防止 goroutine 泄漏 - 触发事件时,遍历对应事件类型的监听器列表,向其
ch ;监听器 goroutine 内部用select { case e := 安全退出
如何正确注销监听器并避免 goroutine 泄漏
监听器 goroutine 不主动退出会导致内存持续增长,尤其在高频事件+短生命周期监听场景(如 HTTP 请求上下文绑定)下极易出问题。
立即学习“go语言免费学习笔记(深入)”;
- 注销操作不只是从 map 中删除,必须先 close 对应的
donechannel,再等待 goroutine 退出(可用sync.WaitGroup计数) - 不要在监听器函数内直接调用
dispatcher.Unsubscribe()—— 这会引发 map 并发写 panic;应通过 channel 发送注销请求,由 dispatcher 主 goroutine 统一处理 - 测试时可加
runtime.NumGoroutine()断言,验证注销后 goroutine 数回落
type EventDispatcher struct {
mu sync.RWMutex
listeners map[EventType][]*eventListener
}
type eventListener struct {
ch chan interface{}
done chan struct{}
handle func(interface{}) bool
}
func (ed *EventDispatcher) Subscribe(et EventType, f func(interface{}) bool) func() {
ed.mu.Lock()
defer ed.mu.Unlock()
l := &eventListener{
ch: make(chan interface{}, 16),
done: make(chan struct{}),
handle: f,
}
if _, ok := ed.listeners[et]; !ok {
ed.listeners[et] = make([]*eventListener, 0)
}
ed.listeners[et] = append(ed.listeners[et], l)
go func() {
for {
select {
case e := zuojiankuohaophpcn-l.ch:
if !l.handle(e) {
return
}
case zuojiankuohaophpcn-l.done:
return
}
}
}()
return func() {
close(l.done)
// 实际项目中应加 wg.Wait() 等待 goroutine 结束
}}
func (ed EventDispatcher) Publish(et EventType, event interface{}) {
ed.mu.RLock()
ls := make([]eventListener, len(ed.listeners[et]))
copy(ls, ed.listeners[et])
ed.mu.RUnlock()
for _, l := range ls {
select {
case l.ch zuojiankuohaophpcn- event:
default:
// 缓冲满时丢弃或打日志,避免阻塞发布者
}
}}
真正难的不是写完这个 dispatcher,而是决定哪些事件该暴露、哪些状态变更值得通知、以及监听器之间是否允许相互触发——这些业务语义层面的设计,比语法实现重要得多。











