Go pipeline 模式是基于 channel 和 select 构建的多阶段并发数据流处理模式,包含 read→transform→write 三阶段:read 读取并关闭输入 channel,transform 转换并关闭输出 channel,write 仅消费不关闭 channel。

什么是 Go pipeline 模式
Go pipeline 是一种通过 channel 串联多个 goroutine 阶段来处理数据流的模式,每个阶段负责单一职责(如读取、转换、过滤、聚合),天然支持并发与解耦。它不是语言特性,而是基于 chan 和 select 的惯用设计模式。
如何构建一个三阶段 pipeline:read → transform → write
典型 pipeline 要求各阶段间用 channel 传递数据,且每个阶段应能独立退出(避免 goroutine 泄漏)。关键点在于:输入 channel 关闭后,下游阶段需感知并停止;中间 stage 必须主动关闭输出 channel,否则接收方会永久阻塞。
- 第一阶段(
read):从 slice / file / DB 读取数据,写入in chan int,完成后close(in) - 第二阶段(
transform):从in读,做计算,写入out chan int,读到io.EOF或in关闭后close(out) - 第三阶段(
write):只从out读,不关闭任何 channel(它是终端)
func main() {
in := make(chan int)
go func() {
defer close(in)
for i := 1; i <= 5; i++ {
in <- i * 2
}
}()
transformed := transform(in)
for res := range write(transformed) {
fmt.Println(res)
}}
func transform(in
func write(in
为什么必须显式 close 输出 channel
如果不调用 close(out),下游 for range out 将永远等待新值,即使上游已退出。这是 pipeline 最常见的死锁来源。注意:range 只在 channel 被 close 后退出,不会因发送 goroutine 结束而自动终止。
立即学习“go语言免费学习笔记(深入)”;
-
range在 channel 关闭且缓冲区为空时才退出 - 多个 goroutine 向同一
out写?需用sync.WaitGroup控制关闭时机 - 若某阶段可能 panic,要用
defer close(out)确保关闭 - 不要对同一个 channel 多次
close,会 panic
如何加 context 控制超时和取消
真实场景中 pipeline 常需响应取消信号或超时。应在每个阶段的 goroutine 中监听 ctx.Done(),并在退出前清理资源(如关闭输出 channel)。
func transformWithContext(ctx context.Context, in <-chan int) <-chan int {
out := make(chan int)
go func() {
defer close(out)
for {
select {
case v, ok := <-in:
if !ok {
return
}
select {
case out <- v + 10:
case <-ctx.Done():
return
}
case <-ctx.Done():
return
}
}
}()
return out
}使用时传入带超时的 context:ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)。注意:context 取消后,未被消费的 channel 数据会被丢弃,这是预期行为。
真正难处理的是“中间阶段阻塞在 send 上但下游已退出”的情况——此时需用带缓冲的 channel 或 select 配合 default 分支做非阻塞写,否则可能卡住整个 pipeline。










