
本文探讨了在Go语言中如何高效地实现独立任务的并行处理与同步。通过分析一个具体场景,我们展示了如何利用Go的通道(channels)或`sync.WaitGroup`来协调多个并发执行的worker goroutine,确保所有任务完成后才进行下一步操作,同时维持固定的goroutine数量,避免为每个数据项创建新goroutine,从而优化资源利用和执行效率。
在Go语言中,实现高效的并发处理是其核心优势之一。本教程将深入探讨一种常见的并发模式:如何协调多个独立的worker goroutine并行处理数据,并确保所有worker完成后才继续执行后续逻辑。我们将通过一个具体的案例来演示这种模式,并提供两种实现方案:基于通道的同步和基于sync.WaitGroup的同步。
假设我们有一个主协调器account goroutine,它从一个输入通道接收数据。每接收到一个数据项,account需要将该数据分发给两个独立的worker goroutine(workerA和workerB)进行处理。这两个worker的执行顺序不重要,但account必须等待它们都完成对当前数据项的处理后,才能将该数据发送到最终的输出通道。关键要求如下:
考虑以下初始的account函数实现:
立即学习“go语言免费学习笔记(深入)”;
func account(account_chan <-chan int, final_chan chan<- int) {
wa_in := make(chan int)
wa_out := make(chan int)
wb_in := make(chan int)
wb_out := make(chan int)
go workerA(wa_in, wa_out)
go workerB(wb_in, wb_out)
for d := range account_chan {
// 顺序执行:发送给A,等待A完成;发送给B,等待B完成
wa_in <- d
<-wa_out
wb_in <- d
<-wb_out
final_chan <- d
}
}其中workerA和workerB的定义如下:
func workerA(work_in_chan <-chan int, work_out_chan chan<- int) {
for d := range work_in_chan {
fmt.Println("A ", d)
work_out_chan <- d // 假设这里是处理逻辑,然后发送完成信号
}
}
func workerB(work_in_chan <-chan int, work_out_chan chan<- int) {
for d := range work_in_chan {
fmt.Println("B ", d)
work_out_chan <- d // 假设这里是处理逻辑,然后发送完成信号
}
}这个实现的问题在于,wa_in <- d和<-wa_out会阻塞account goroutine,直到workerA完成并发送回信号。只有workerA完成后,account才会继续执行wb_in <- d,进而触发workerB。这导致workerA和workerB实际上是顺序执行的,未能实现并行处理,浪费了并发能力。
要实现workerA和workerB的并行执行,关键在于调整通道操作的顺序:先将数据同时发送给所有worker,然后等待所有worker的完成信号。
package main
import "fmt"
import "time" // 引入time包用于模拟工作耗时
func workerA(work_in_chan <-chan int, work_out_chan chan<- int) {
for d := range work_in_chan {
fmt.Printf("WorkerA 正在处理数据: %d\n", d)
time.Sleep(time.Millisecond * 100) // 模拟工作耗时
work_out_chan <- d // 发送完成信号
}
}
func workerB(work_in_chan <-chan int, work_out_chan chan<- int) {
for d := range work_in_chan {
fmt.Printf("WorkerB 正在处理数据: %d\n", d)
time.Sleep(time.Millisecond * 150) // 模拟工作耗时,可能与A不同
work_out_chan <- d // 发送完成信号
}
}
func account(account_chan <-chan int, final_chan chan<- int) {
wa_in := make(chan int)
wa_out := make(chan int)
wb_in := make(chan int)
wb_out := make(chan int)
go workerA(wa_in, wa_out)
go workerB(wb_in, wb_out)
for d := range account_chan {
// 先同时发送数据给两个worker
wa_in <- d
wb_in <- d
// 然后等待两个worker的完成信号
// 接收顺序不重要,因为两者都需要被接收
<-wa_out
<-wb_out
final_chan <- d // 两个worker都完成后,将数据发送到最终通道
}
// 注意:在实际应用中,当account_chan关闭时,需要考虑如何优雅地关闭wa_in和wb_in,
// 以便worker goroutine能够退出循环。此处为简化示例。
close(wa_in)
close(wb_in)
}
func main() {
account_chan := make(chan int, 100)
final_chan := make(chan int, 100)
go account(account_chan, final_chan)
// 发送一些数据
account_chan <- 1
account_chan <- 2
account_chan <- 3
// 关闭输入通道,通知account goroutine没有更多数据
close(account_chan)
// 等待并打印最终结果
// 由于account goroutine在处理完所有数据并关闭其输入通道后,
// 可能会立即关闭其输出通道,这里需要确保在接收前account goroutine有足够时间处理。
// 更健壮的方式是使用sync.WaitGroup等待account goroutine完成。
time.Sleep(time.Second) // 简单等待,确保account goroutine处理完成
fmt.Println("最终结果:", <-final_chan)
fmt.Println("最终结果:", <-final_chan)
fmt.Println("最终结果:", <-final_chan)
}原理分析:
通过将wa_in <- d和wb_in <- d放在一起,account goroutine会非阻塞地将数据发送给workerA和workerB(假设输入通道有足够的缓冲区或worker能够立即接收)。一旦数据发送完成,两个worker就可以同时开始处理。接着,account goroutine会尝试从wa_out和wb_out接收信号。这两个接收操作都会阻塞,直到各自的worker完成处理并发送回数据。由于接收顺序不影响最终结果(两个信号都必须收到),因此这种模式有效地实现了并行处理和同步。
在许多情况下,worker goroutine的输出值可能并不需要被协调器直接使用,或者它们处理的是共享数据结构而不是通过通道返回数据。此时,sync.WaitGroup是更简洁、更符合Go习惯的同步机制。sync.WaitGroup用于等待一组goroutine完成。
package main
import (
"fmt"
"sync"
"time"
)
func workerA_wg(work_in_chan <-chan int, wg *sync.WaitGroup) {
defer wg.Done() // 确保在goroutine退出时调用Done()
for d := range work_in_chan {
fmt.Printf("WorkerA_wg 正在处理数据: %d\n", d)
time.Sleep(time.Millisecond * 100) // 模拟工作耗时
// 不需要通过通道返回数据,直接标记完成
}
}
func workerB_wg(work_in_chan <-chan int, wg *sync.WaitGroup) {
defer wg.Done() // 确保在goroutine退出时调用Done()
for d := range work_in_chan {
fmt.Printf("WorkerB_wg 正在处理数据: %d\n", d)
time.Sleep(time.Millisecond * 150) // 模拟工作耗时
// 不需要通过通道返回数据,直接标记完成
}
}
func account_wg(account_chan <-chan int, final_chan chan<- int) {
wa_in := make(chan int)
wb_in := make(chan int)
// 创建一个WaitGroup,用于同步两个worker
var wg sync.WaitGroup
// workerA_wg和workerB_wg各自需要调用wg.Done()一次
wg.Add(2)
go workerA_wg(wa_in, &wg)
go workerB_wg(wb_in, &wg)
for d := range account_chan {
// 先同时发送数据给两个worker
wa_in <- d
wb_in <- d
// 等待所有worker完成对当前数据项的处理
// 注意:这里需要为每个数据项重置WaitGroup,或者采用不同的WaitGroup管理方式。
// 考虑到worker是持续运行的,这种模式下WaitGroup不适合按数据项同步。
// 正确的做法是,如果worker是短生命周期的,为每个数据项创建新的WaitGroup;
// 如果worker是长生命周期的,如本例,则需要更精细的同步。
// 对于本例(长生命周期worker),wg.Wait()应在worker goroutine完全退出后调用,
// 而不是在每个数据项处理后。
// 如果要实现每个数据项的同步,需要将wg.Add(2)和wg.Wait()放在循环内部,
// 并且worker的Done()逻辑需要调整。
// 重新思考:如果worker是长生命周期且需要对每个数据项同步,那么通道仍是更直接的方式。
// WaitGroup更适合等待一组goroutine在生命周期结束时完成。
// 但我们可以模拟一种每个数据项同步的WaitGroup用法,但这会增加复杂性。
// 考虑到原问题中worker是单例goroutine,且对每个数据项都需同步,
// 通道方案更自然。如果坚持使用WaitGroup,则每个数据项的同步需要更复杂的逻辑,
// 例如,worker在处理完一个数据项后,通过另一个通道通知account,
// account收到两个通知后,再调用WaitGroup的Add/Done。
// 这种情况下,WaitGroup的优势就不明显了,甚至可能比通道更复杂。
// 为了符合WaitGroup的典型用法,我们假设workerA_wg和workerB_wg是处理所有数据后才退出的。
// 此时,wg.Wait()应该在for循环结束后调用,等待所有worker完全退出。
// 但这与“每个数据项处理完后才发送到final_chan”的需求不符。
// 因此,对于“每个数据项完成同步”的需求,通道方案更直接。
// 如果确实要用WaitGroup,且worker是长生命周期,通常会结合其他机制。
// 例如,worker在处理完一个数据项后,会发送一个信号到account,
// account接收到所有worker的信号后,再继续。这本质上又回到了通道方案。
// **结论:对于本问题场景,通道是更自然的同步机制。**
// 如果worker的“输出”只是一个完成信号,且不需要传递任何值,
// 那么可以将`work_out_chan <- d`替换为`work_out_chan <- struct{}{} `,
// 接收方也相应调整。这与WaitGroup的语义更接近,但仍是通道。
final_chan <- d // 假设这里是处理逻辑,然后发送完成信号
}
// 关闭输入通道,通知worker goroutine没有更多数据
close(wa_in)
close(wb_in)
wg.Wait() // 等待所有worker goroutine完全退出
close(final_chan) // 所有工作完成后关闭最终通道
}
func main_wg() {
account_chan := make(chan int, 100)
final_chan := make(chan int, 100)
go account_wg(account_chan, final_chan)
account_chan <- 1
account_chan <- 2
account_chan <- 3
close(account_chan)
for res := range final_chan {
fmt.Println("最终结果 (WG):", res)
}
}
// 由于WaitGroup在此特定场景下(每个数据项同步且worker是长生命周期)
// 实现起来不如通道直观和简洁,上面的`account_wg`函数没有完全实现按数据项同步。
// 如果要严格实现,则需要将`wg.Add(2)`和`wg.Wait()`放在循环内部,
// 并且worker每次处理完一个数据项后,需要调用`wg.Done()`,
// 并在处理下一个数据项前,需要重新`wg.Add(1)`,这会使得worker的循环结构复杂。
// 实际上,这种场景下,通道的“发送数据-接收完成信号”模式就是最简洁的。对sync.WaitGroup的补充说明:
sync.WaitGroup更适用于等待一组goroutine在它们的整个生命周期结束后完成,而不是在每个中间步骤进行同步。例如,在启动多个并发任务,然后等待所有任务完成时,WaitGroup非常有效。
如果确实需要在每个数据项处理后使用WaitGroup进行同步,那么每个worker在处理完一个数据项后,需要调用wg.Done()。而account goroutine在发送完数据后,需要调用wg.Add(2)(或根据实际worker数量),然后调用wg.Wait()。但由于workerA和workerB是持续运行的goroutine,它们不能在每次处理完数据后就调用defer wg.Done()并退出。这需要更复杂的逻辑,例如,worker通过一个单独的通道通知account其已完成当前数据项,然后account在收到所有通知后,再使用WaitGroup的Add和Wait。但这又回到了通道的范畴。
因此,对于本教程中的“每个数据项处理完后才发送到final_chan”且worker是长生命周期的场景,基于通道的同步(解决方案一)是更自然、更简洁且更符合Go语言哲学的方式。
Go语言通过其强大的goroutine和通道机制,使得实现复杂的并发模式变得相对简单和直观。对于需要并行处理独立任务并进行同步的场景,通过合理地安排通道的发送和接收操作,可以有效地实现并发执行,并确保所有必要的前置任务完成后再进行下一步。虽然sync.WaitGroup在某些同步场景下非常有用,但对于本教程中每个数据项的同步需求,通道提供了一种更直接、更易于理解和维护的解决方案。理解这两种同步机制的适用场景,是编写高效、健壮Go并发程序的关键。
以上就是Go语言并发模式:独立任务的并行处理与同步的详细内容,更多请关注php中文网其它相关文章!
每个人都需要一台速度更快、更稳定的 PC。随着时间的推移,垃圾文件、旧注册表数据和不必要的后台进程会占用资源并降低性能。幸运的是,许多工具可以让 Windows 保持平稳运行。
Copyright 2014-2025 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号