
本文深入探讨Go并发编程中常见的“all goroutines are asleep - deadlock!”错误,尤其是在构建工作者系统时因未正确关闭输出通道导致的死锁。通过分析问题根源,文章将演示如何利用控制通道或sync.WaitGroup机制,实现对工作协程的有效协调,确保所有任务完成后安全关闭通道,从而优雅地终止程序,避免死锁。
在Go语言的并发编程模型中,goroutine和channel是核心构建块。然而,不当的通道使用方式,特别是通道的关闭机制,很容易导致程序进入“死锁”状态,并抛出fatal error: all goroutines are asleep - deadlock!。这个错误表明Go运行时检测到程序中所有goroutine都处于阻塞状态,且没有可以被调度的goroutine来解除这些阻塞,因此程序无法继续执行。
在一个典型的生产者-消费者或工作者池(Worker Pool)模式中,如果一个或多个goroutine正在尝试从一个通道接收数据,而这个通道的发送方已经完成其所有工作,但却忘记关闭通道,那么这些接收goroutine将永远等待下去,从而导致整个程序死锁。
考虑一个Go语言实现的工作者系统骨架,其设计目标是创建一批工作协程处理任务,并通过通道进行协调。
原始代码结构如下:
package main
import (
"bufio"
"flag"
"fmt"
"log"
"math/rand"
"os"
"time"
)
type Work struct {
id int
ts time.Duration
}
const (
NumWorkers = 5000
NumJobs = 100000
)
func worker(in <-chan *Work, out chan<- *Work) {
for w := range in {
st := time.Now()
time.Sleep(time.Duration(rand.Int63n(int64(200 * time.Millisecond))))
w.ts = time.Since(st)
out <- w
}
}
func main() {
wait := flag.Bool("w", false, "wait for <enter> before starting")
flag.Parse()
if *wait {
fmt.Printf("I'm <%d>, press <enter> to continue", os.Getpid())
reader := bufio.NewReader(os.Stdin)
reader.ReadString('\n')
}
Run()
}
func Run() {
in, out := make(chan *Work, 100), make(chan *Work, 100)
for i := 0; i < NumWorkers; i++ {
go worker(in, out)
}
go createJobs(in)
receiveResults(out)
}
func createJobs(queue chan<- *Work) {
for i := 0; i < NumJobs; i++ {
work := &Work{i, 0}
queue <- work
}
close(queue) // 输入通道在所有任务创建后关闭
}
func receiveResults(completed <-chan *Work) {
for w := range completed { // 从完成通道接收结果
log.Printf("job %d completed in %s", w.id, w.ts)
}
}在这个示例中,createJobs协程负责向in通道发送任务,并在所有任务发送完毕后正确地关闭了in通道。worker协程从in通道接收任务,处理后将结果发送到out通道。receiveResults函数则通过for w := range completed循环从out通道(在此函数中命名为completed)接收所有完成的任务结果。
死锁的根源在于: 当createJobs协程完成并关闭in通道后,所有的worker协程会逐一处理完in通道中剩余的任务,然后它们从for w := range in循环中退出。这些worker协程退出后,out通道将不再有发送者。然而,receiveResults函数中的for w := range completed循环会持续尝试从out通道接收数据。由于out通道从未被关闭,receiveResults协程将永远阻塞等待新的数据。此时,所有worker协程已退出,createJobs协程也已完成,只剩下receiveResults协程一个活跃的goroutine在无限等待一个永远不会关闭的通道,最终导致死锁。
为了解决这个问题,我们需要在所有工作协程完成其工作后,显式地关闭out通道。一种方法是引入一个额外的“控制通道”来协调工作协程的完成状态。
实现步骤:
package main
import (
"bufio"
"flag"
"fmt"
"log"
"math/rand"
"os"
"time"
)
type Work struct {
id int
ts time.Duration
}
const (
NumWorkers = 5000
NumJobs = 100000
)
// worker 函数现在接收一个额外的控制通道参数
func worker(ctrl chan<- bool, in <-chan *Work, out chan<- *Work) {
defer func() {
ctrl <- true // worker 完成其所有工作后,向控制通道发送完成信号
}()
for w := range in {
st := time.Now()
time.Sleep(time.Duration(rand.Int63n(int64(200 * time.Millisecond))))
w.ts = time.Since(st)
out <- w
}
}
// control 协程负责等待所有worker完成,然后关闭输出通道
func control(ctrl <-chan bool, numWorkers int, out chan<- *Work) {
for i := 0; i < numWorkers; i++ {
<-ctrl // 等待每个worker的完成信号
}
close(out) // 所有worker完成后,关闭输出通道
}
func main() {
wait := flag.Bool("w", false, "wait for <enter> before starting")
flag.Parse()
if *wait {
fmt.Printf("I'm <%d>, press <enter> to continue", os.Getpid())
reader := bufio.NewReader(os.Stdin)
reader.ReadString('\n')
}
Run()
}
func Run() {
in, out := make(chan *Work, 100), make(chan *Work, 100)
ctrl := make(chan bool, NumWorkers) // 创建控制通道,缓冲大小为worker数量
// 启动工作协程
for i := 0; i < NumWorkers; i++ {
go worker(ctrl, in, out)
}
// 启动任务创建协程
go createJobs(in)
// 启动控制协程,它将等待所有worker完成并关闭 'out' 通道
go control(ctrl, NumWorkers, out)
// 接收结果
receiveResults(out)
}
func createJobs(queue chan<- *Work) {
for i := 0; i < NumJobs; i++ {
work := &Work{i, 0}
queue <- work
}
close(queue) // 创建任务完成后关闭输入通道
}
func receiveResults(completed <-chan *Work) {
for w := range completed {
log.Printf("job %d completed in %s", w.id, w.ts)
}
}sync.WaitGroup 是Go标准库提供的一种更通用的同步原语,用于等待一组goroutine完成。它通常比手动管理控制通道更简洁和惯用。
实现步骤:
package main
import (
"bufio"
"flag"
"fmt"
"log"
"math/rand"
"os"
"sync" // 引入 sync 包
"time"
)
type Work struct {
id int
ts time.Duration
}
const (
NumWorkers = 5000
NumJobs = 100000
)
// worker 函数现在接收一个 WaitGroup 指针
func worker(wg *sync.WaitGroup, in <-chan *Work, out chan<- *Work) {
defer wg.Done() // 确保worker退出时通知WaitGroup
for w := range in {
st := time.Now()
time.Sleep(time.Duration(rand.Int63n(int64(200 * time.Millisecond))))
w.ts = time.Since(st)
out <- w
}
}
func main() {
wait := flag.Bool("w", false, "wait for <enter> before starting")
flag.Parse()
if *wait {
fmt.Printf("I'm <%d>, press <enter> to continue", os.Getpid())
reader := bufio.NewReader(os.Stdin)
reader.ReadString('\n')
}
Run()
}
func Run() {
in, out := make(chan *Work, 100), make(chan *Work, 100)
var wg sync.WaitGroup // 声明 WaitGroup
// 启动工作协程
for i := 0; i < NumWorkers; i++ {
wg.Add(1) // 增加计数
go worker(&wg, in, out)
}
// 启动任务创建协程
go createJobs(in)
// 启动一个独立的协程来等待所有worker完成并关闭输出通道
go func() {
wg.Wait() // 等待所有worker完成
close(out) // 关闭输出通道
}()
// 接收结果
receiveResults(out)
}
func createJobs(queue chan<- *Work) {
for i := 0; i < NumJobs; i++ {
work := &Work{i, 0}
queue <- work
}
close(queue) // 创建任务完成后关闭输入通道
}
func receiveResults(completed <-chan *Work) {
for w := range completed {
log.Printf("job %d completed in %s", w.id, w.ts)
}
}正确管理Go通道是编写健壮并发程序的基石。以下是一些关键原则:
“all goroutines are asleep - deadlock!”错误是Go并发编程中常见的陷阱,通常源于通道的生命周期管理不当,特别是输出通道未被正确关闭。通过本文介绍的两种方法——使用控制通道或sync.WaitGroup——我们可以有效地协调goroutine的完成状态,确保在所有发送方都已完成工作后,能够及时关闭通道,从而避免死锁,并使程序优雅地退出。
在实际开发中,sync.WaitGroup因其简洁性和通用性,常被视为处理此类同步问题的首选方案。理解并遵循通道管理的
以上就是Go 并发编程:避免 Goroutine 死锁与通道的优雅关闭的详细内容,更多请关注php中文网其它相关文章!
编程怎么学习?编程怎么入门?编程在哪学?编程怎么学才快?不用担心,这里为大家提供了编程速学教程(入门课程),有需要的小伙伴保存下载就能学习啦!
Copyright 2014-2025 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号