直接用 go func(){} 易致内存暴涨和调度开销激增;应采用固定数量 worker + taskChan 的可控并发模式,用 for range 阻塞接收任务执行。

为什么直接用 go func() {} 容易出问题
无节制启动 goroutine 会导致内存暴涨、调度开销激增,甚至触发 runtime: out of memory。Go 的调度器不是万能的,10w+ 个 goroutine 同时活跃时,goroutine stack 占用和上下文切换成本会明显拖慢程序。真正需要的是可控并发数 + 任务排队机制,而不是“越多越好”。
用 chan *workerTask 实现最简 worker pool
核心思路:用一个无缓冲或带缓冲的 taskChan 接收任务,固定数量的 goroutine 持续从该 channel 取任务执行。不依赖第三方库,纯标准库即可落地。
-
taskChan类型建议定义为chan func()或更明确的chan Task(其中Task是含Do()方法的接口),避免类型断言错误 - 启动 worker 的循环必须是阻塞式接收:
for task := range taskChan { task.Do() },不能写成select { case task := 否则会漏任务 - 关闭 pool 时,先关闭
taskChan,再waitGroup.Wait();不能反过来,否则 worker 可能卡在range上无法退出
type WorkerPool struct {
taskChan chan func()
wg sync.WaitGroup
}
func NewWorkerPool(size int) *WorkerPool {
pool := &WorkerPool{
taskChan: make(chan func(), 1024), // 缓冲区防瞬时压测阻塞提交方
}
for i := 0; i < size; i++ {
pool.wg.Add(1)
go func() {
defer pool.wg.Done()
for task := range pool.taskChan {
task()
}
}()
}
return pool
}
func (p *WorkerPool) Submit(task func()) {
p.taskChan <- task
}
func (p *WorkerPool) Shutdown() {
close(p.taskChan)
p.wg.Wait()
}
context.Context 怎么安全注入到 worker 中
原生 channel 传函数无法携带 context.Context,硬塞进闭包容易导致 context 生命周期失控(比如传入的 ctx 已 cancel,但 worker 还在等 channel)。正确做法是让 task 自身持有 context,并在执行前检查是否超时/取消。
- 定义
type Task struct { Ctx context.Context; Fn func() },worker 中执行前加select { case - 不要在 Submit 时把
context.WithTimeout(parent, time.Second)的结果传给 worker——parent 可能比 worker 生命周期长得多,造成 context 泄漏 - 若需传播 cancel,应在 task 初始化时调用
childCtx, cancel := context.WithCancel(t.Ctx),并在 task 执行完后显式调用cancel()
实际压测时发现吞吐上不去?检查这三点
worker pool 不是银弹,瓶颈常不在 goroutine 调度本身,而在任务内部。
立即学习“go语言免费学习笔记(深入)”;
- 任务函数里有没有隐式同步操作?比如共用一个
sync.Mutex、频繁写同一片[]byte、或大量调用log.Printf(默认锁 stdout) - channel 缓冲区设太小(如
make(chan, 1)),导致 Submit 方频繁阻塞,掩盖了真实处理能力 - worker 数量 ≠ CPU 核心数。IO 密集型任务(如 HTTP 请求)可设为
runtime.NumCPU() * 4;计算密集型应 ≤runtime.NumCPU(),否则 cache miss 增多
真正难的不是建 pool,而是判断每个 task 是否真的“可并行”——如果它们共享状态、依赖顺序、或底层调用串行化(比如单连接 DB 驱动),加再多 worker 也白搭。










