
本教程探讨go语言中如何优化独立工作协程的并行执行。针对传统顺序执行导致并发效率低下的问题,文章提出了一种通过巧妙重排通道操作的解决方案。该模式允许多个独立工作协程同时启动并并行处理数据,并通过通道接收操作实现同步,确保所有工作完成后再进行下一步处理,从而在保持固定协程数量的同时,显著提升系统吞吐量。
在Go语言中,利用协程(goroutine)和通道(channel)实现并发是其核心优势之一。然而,不恰当的通道操作顺序可能导致即使是独立的任务也无法真正并行执行,从而限制了程序的并发能力。本教程将深入探讨如何通过优化通道操作顺序,使得多个独立的工作协程能够高效并行处理数据,同时满足保持固定协程数量的约束。
考虑一个常见的场景:一个主协调协程(例如account)需要将接收到的数据分发给多个独立的子工作协程(例如workerA和workerB)进行处理。要求是:
初始的实现可能如下所示,其中主协调协程account在处理每个数据项时,会先将数据发送给workerA并等待其完成,然后再发送给workerB并等待其完成。这种串行等待的方式,即使workerA和workerB是独立的,也无法实现真正的并行。
package main
import "fmt"
func workerA(work_in_chan <-chan int, work_out_chan chan<- int) {
for d := range work_in_chan {
fmt.Println("A ", d)
// 模拟工作
work_out_chan <- d
}
}
func workerB(work_in_chan <-chan int, work_out_chan chan<- int) {
for d := range work_in_chan {
fmt.Println("B ", d)
// 模拟工作
work_out_chan <- d
}
}
func account(account_chan <-chan int, final_chan chan<- int) {
wa_in := make(chan int)
wa_out := make(chan int)
wb_in := make(chan int)
wb_out := make(chan int)
go workerA(wa_in, wa_out)
go workerB(wb_in, wb_out)
for d := range account_chan {
// 初始实现:串行处理,无法并行
wa_in <- d
<-wa_out // 阻塞,等待workerA完成
wb_in <- d
<-wb_out // 阻塞,等待workerB完成
final_chan <- d
}
}
func main() {
account_chan := make(chan int, 100)
final_chan := make(chan int, 100)
go account(account_chan, final_chan)
account_chan <- 1
account_chan <- 2
account_chan <- 3
close(account_chan) // 关闭输入通道,以便account协程最终退出
// 从final_chan接收结果
for i := 0; i < 3; i++ {
fmt.Println("Final:", <-final_chan)
}
close(final_chan) // 关闭输出通道
}在上述代码中,account协程在处理每个数据项d时,首先向wa_in发送数据,然后立即阻塞等待wa_out的返回。只有workerA处理完毕并发送到wa_out后,account协程才能继续向wb_in发送数据,并再次阻塞等待wb_out的返回。这种模式导致workerA和workerB无法同时运行,极大地限制了并发性。
立即学习“go语言免费学习笔记(深入)”;
要解决上述问题,关键在于改变主协调协程中通道的发送和接收顺序。既然workerA和workerB是独立的,我们可以先将数据同时发送给它们,让它们并行开始工作,然后统一等待它们全部完成。
优化的实现如下:
package main
import "fmt"
func workerA(work_in_chan <-chan int, work_out_chan chan<- int) {
for d := range work_in_chan {
fmt.Println("A processing:", d)
// 模拟工作,可能耗时
work_out_chan <- d // 完成后发送信号
}
close(work_out_chan) // 当输入通道关闭时,关闭输出通道
}
func workerB(work_in_chan <-chan int, work_out_chan chan<- int) {
for d := range work_in_chan {
fmt.Println("B processing:", d)
// 模拟工作,可能耗时
work_out_chan <- d // 完成后发送信号
}
close(work_out_chan) // 当输入通道关闭时,关闭输出通道
}
func account(account_chan <-chan int, final_chan chan<- int) {
// 创建用于workerA和workerB的输入输出通道
// 注意:这里使用无缓冲通道,确保worker在准备好接收前不会阻塞发送
wa_in := make(chan int)
wa_out := make(chan int)
wb_in := make(chan int)
wb_out := make(chan int)
// 启动worker协程
go workerA(wa_in, wa_out)
go workerB(wb_in, wb_out)
// 遍历输入数据
for d := range account_chan {
// 1. 同时将数据发送给所有工作协程
// 假设worker协程已准备好接收,此操作是非阻塞的(对于无缓冲通道,worker必须已在接收端等待)
// 或如果通道有缓冲,则只要缓冲未满,发送就是非阻塞的
wa_in <- d
wb_in <- d
// 2. 阻塞等待所有工作协程完成
// 接收操作会阻塞,直到对应的worker完成其工作并发送信号
<-wa_out
<-wb_out
// 3. 所有工作完成后,将数据发送到最终通道
final_chan <- d
}
// 当account_chan关闭且所有数据处理完毕后,关闭worker的输入通道
// 这样worker协程才能从for range循环中退出
close(wa_in)
close(wb_in)
// 等待worker协程完成所有剩余工作并关闭其输出通道
// 确保在关闭final_chan之前所有数据都已处理
for range wa_out {} // 消费完所有wa_out中可能剩余的信号
for range wb_out {} // 消费完所有wb_out中可能剩余的信号
close(final_chan) // 所有工作完成后关闭最终输出通道
}
func main() {
account_chan := make(chan int, 100) // 带缓冲的输入通道
final_chan := make(chan int, 100) // 带缓冲的输出通道
go account(account_chan, final_chan)
// 发送数据
account_chan <- 1
account_chan <- 2
account_chan <- 3
close(account_chan) // 发送完毕,关闭输入通道
// 从final_chan接收结果
for res := range final_chan {
fmt.Println("Final result:", res)
}
}代码分析:
通过这种简单的通道操作重排,我们成功地让两个独立的worker协程实现了真正的并行处理,同时满足了所有数据项必须经过所有worker处理的同步要求,并且保持了固定数量的协程。
并发与并行:
通道缓冲:
sync.WaitGroup的替代方案:
在当前场景中,workerA和workerB的输出通道(wa_out, wb_out)仅用于发送完成信号,其传输的具体值并不重要。
如果工作协程的输出值确实不需要被主协调协程使用,那么使用sync.WaitGroup可能是一个更简洁、更高效的同步机制。sync.WaitGroup专门用于等待一组协程完成。
使用sync.WaitGroup的伪代码示例:
// ... (workerA和workerB不再需要work_out_chan,而是接收一个*sync.WaitGroup)
func workerA(work_in_chan <-chan int, wg *sync.WaitGroup) {
defer wg.Done() // 在函数退出时通知WaitGroup
for d := range work_in_chan {
// ... 处理数据
}
}
func account(account_chan <-chan int, final_chan chan<- int) {
// ...
var wg sync.WaitGroup
// ...
for d := range account_chan {
wg.Add(2) // 增加计数,表示有两个worker需要完成
wa_in <- d
wb_in <- d
wg.Wait() // 阻塞等待所有worker完成
final_chan <- d
}
// ...
}sync.WaitGroup的优势在于它更明确地表达了“等待一组任务完成”的意图,并且避免了创建不必要的通道。
优雅关闭:
通过对Go语言中通道操作顺序的细致调整,我们能够有效地将独立的任务从串行执行转变为并行执行,从而充分利用多核处理器的能力,提升程序的整体吞吐量。这种模式的核心思想是:先同时启动所有独立的工作任务(通过非阻塞发送),然后统一等待所有任务完成(通过阻塞接收)。在实际开发中,根据具体需求(是否需要传递结果、同步机制的简洁性等),可以选择使用通道进行同步,或者考虑使用sync.WaitGroup等更专业的同步原语。理解并熟练运用这些并发模式,是编写高性能Go语言应用的关键。
以上就是Go语言并发模式:优化独立工作协程的并行执行的详细内容,更多请关注php中文网其它相关文章!
每个人都需要一台速度更快、更稳定的 PC。随着时间的推移,垃圾文件、旧注册表数据和不必要的后台进程会占用资源并降低性能。幸运的是,许多工具可以让 Windows 保持平稳运行。
Copyright 2014-2025 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号