答案是使用单向channel和goroutine实现生产者消费者模型。通过producer函数向channel发送数据,consumer函数接收并处理数据,最后用done channel通知完成,确保并发安全与正确关闭。

在Golang中,channel 是实现并发通信的核心机制,非常适合用于构建生产者消费者模型。该模型通过分离任务的生成与处理,提升程序的并发性和响应性。下面介绍几种常见的实践方式,帮助你高效使用 channel 实现生产者消费者模式。
基本结构:一个生产者,一个消费者
最简单的场景是单个生产者向 channel 发送数据,单个消费者从中读取。
示例代码:package mainimport ( "fmt" "time" )
func producer(ch chan<- int) { for i := 0; i < 5; i++ { ch <- i fmt.Printf("生产: %d\n", i) time.Sleep(time.Millisecond * 500) } close(ch) // 关闭channel,通知消费者结束 }
func consumer(ch <-chan int, done chan<- bool) { for data := range ch { // 自动检测channel关闭 fmt.Printf("消费: %d\n", data) } done <- true // 通知主协程完成 }
func main() { ch := make(chan int) done := make(chan bool)
go producer(ch) go consumer(ch, done) <-done // 等待消费者完成}
说明:使用 单向channel(`chan
多个生产者,一个消费者
当需要并发生成任务时,可启动多个生产者,共用一个缓冲 channel。
立即学习“go语言免费学习笔记(深入)”;
func multiProducer(ch chan<- int, id int) { for i := 0; i < 3; i++ { ch <- id*10 + i fmt.Printf("生产者%d 生成: %d\n", id, id*10+i) time.Sleep(time.Millisecond * 200) } }func main() { ch := make(chan int, 10) // 缓冲channel避免阻塞 done := make(chan bool)
go func() { for i := 1; i <= 3; i++ { go multiProducer(ch, i) } }() go func() { count := 0 for data := range ch { fmt.Printf("消费: %d\n", data) count++ if count == 9 { // 预期9个数据 close(ch) // 注意:仅由生产者一侧决定是否关闭 break } } done <- true }() <-done}
注意:多个生产者时,不能在每个生产者中调用 close,否则可能引发 panic。通常由独立的协程或主逻辑控制关闭,或使用 sync.WaitGroup 协调。
使用 sync.WaitGroup 管理多生产者关闭
更安全的做法是等待所有生产者完成后再关闭 channel。
func wgProducer(ch chan<- int, wg *sync.WaitGroup, id int) { defer wg.Done() for i := 0; i < 3; i++ { ch <- id*10 + i fmt.Printf("生产者%d 生成: %d\n", id, id*10+i) } }func main() { ch := make(chan int) var wg sync.WaitGroup done := make(chan bool)
// 启动3个生产者 for i := 1; i <= 3; i++ { wg.Add(1) go wgProducer(ch, &wg, i) } // 消费者:等待所有生产者完成再退出 go func() { go func() { wg.Wait() close(ch) // 所有生产者结束后关闭 }() for data := range ch { fmt.Printf("消费: %d\n", data) } done <- true }() <-done}
这种方式确保 channel 只被关闭一次,且不会在仍有生产者写入时提前关闭。
多个消费者并行处理
为提升处理能力,可以启动多个消费者从同一个 channel 读取数据。
func worker(ch <-chan int, id int, wg *sync.WaitGroup) { defer wg.Done() for data := range ch { fmt.Printf("Worker-%d 处理: %d\n", id, data) time.Sleep(time.Millisecond * 300) // 模拟处理耗时 } }func main() { ch := make(chan int, 10) var wg sync.WaitGroup
// 启动3个消费者 for i := 1; i <= 3; i++ { wg.Add(1) go worker(ch, i, &wg) } // 生产者 go func() { for i := 1; i <= 6; i++ { ch <- i } close(ch) }() wg.Wait() // 等待所有消费者完成 fmt.Println("所有任务处理完毕")}
多个消费者自动竞争消费任务,适合负载均衡场景。注意:channel 的关闭由生产者负责,消费者只需读取到 EOF(channel 关闭)即可退出。
基本上就这些常见模式。合理使用缓冲 channel、单向类型、WaitGroup 和 close 控制,能写出稳定高效的生产者消费者程序。关键是避免重复关闭 channel 和 goroutine 泄漏。不复杂但容易忽略细节。










