
本文探讨了go语言中如何有效地协调多个独立worker goroutine并行处理数据流的并发模式。通过优化通道操作顺序,实现数据项在多个worker间的并发分发与同步等待,确保所有worker完成处理后才进行下一步操作,同时维持固定的goroutine数量,避免了不必要的资源开销。
在Go语言的并发编程中,我们经常面临需要协调多个独立工作单元(Worker)来处理同一批数据的情况。一个常见的挑战是,如何在保证数据项按序处理的同时,让这些独立的Worker实现真正的并行执行,而非串行等待。本文将深入探讨一种简洁而高效的Go语言并发模式,以解决此类问题。
假设有一个主协调器(account goroutine)负责从一个输入通道接收数据,并需要将每个数据项分发给两个独立的Worker(workerA和workerB)进行处理。要求是:
一个初级的、但存在性能瓶颈的实现方式可能如下:
package main
import "fmt"
func workerA(work_in_chan <-chan int, work_out_chan chan<- int) {
    for d := range work_in_chan {
        fmt.Println("A ", d)
        work_out_chan <- d // 假设这里是实际工作
    }
}
func workerB(work_in_chan <-chan int, work_out_chan chan<- int) {
    for d := range work_in_chan {
        fmt.Println("B ", d)
        work_out_chan <- d // 假设这里是实际工作
    }
}
func account(account_chan <-chan int, final_chan chan<- int) {
    wa_in := make(chan int)
    wa_out := make(chan int)
    wb_in := make(chan int)
    wb_out := make(chan int)
    go workerA(wa_in, wa_out)
    go workerB(wb_in, wb_out)
    for d := range account_chan {
        // 初始的“低效”实现
        wa_in <- d // 发送数据给WorkerA
        <-wa_out   // 等待WorkerA完成
        wb_in <- d // 发送数据给WorkerB
        <-wb_out   // 等待WorkerB完成
        final_chan <- d
    }
}
func main() {
    account_chan := make(chan int, 100)
    final_chan := make(chan int, 100)
    go account(account_chan, final_chan)
    account_chan <- 1
    account_chan <- 2
    account_chan <- 3
    close(account_chan) // 关闭输入通道,以便account goroutine能退出
    // 从final_chan接收并打印结果
    for i := 0; i < 3; i++ {
        fmt.Println("Final:", <-final_chan)
    }
}上述实现中,account goroutine在处理每个数据项时,会先将数据发送给workerA并等待其完成,然后才发送给workerB并等待其完成。这导致workerA和workerB实际上是串行执行的,未能发挥出它们之间独立性带来的并行优势。
立即学习“go语言免费学习笔记(深入)”;
要实现workerA和workerB的并行执行,关键在于调整数据分发和结果等待的顺序。我们可以先将数据同时分发给所有Worker,然后再并行等待所有Worker的完成信号。
优化的account函数实现如下:
func account(account_chan <-chan int, final_chan chan<- int) {
    wa_in := make(chan int)
    wa_out := make(chan int)
    wb_in := make(chan int)
    wb_out := make(chan int)
    go workerA(wa_in, wa_out)
    go workerB(wb_in, wb_out)
    for d := range account_chan {
        // 优化后的并发实现
        wa_in <- d // 并发地发送数据给WorkerA
        wb_in <- d // 并发地发送数据给WorkerB
        <-wa_out // 等待WorkerA完成
        <-wb_out // 等待WorkerB完成
        final_chan <- d
    }
    close(wa_in) // 当account_chan关闭时,确保关闭worker的输入通道
    close(wb_in)
    // 注意:这里需要确保wa_out和wb_out也被正确关闭,
    // 或者通过其他机制(如WaitGroup)来安全退出worker。
    // 为简化示例,此处省略了更复杂的退出逻辑。
}通过这种调整,当account goroutine接收到一个数据项d时,它会立即尝试将d发送给wa_in和wb_in。由于通道发送操作是阻塞的,但如果接收方(workerA和workerB)已经准备好接收,则发送会立即完成。之后,account goroutine会阻塞等待从wa_out和wb_out接收完成信号。因为发送操作是并发进行的,workerA和workerB可以同时开始处理数据,从而实现真正的并行。
值得注意的是,从wa_out和wb_out接收完成信号的顺序并不重要。无论哪个Worker先完成,account goroutine都会等待直到从两个通道都接收到信号,才将数据发送到final_chan。
package main
import (
    "fmt"
    "time" // 引入time包用于模拟工作耗时
)
func workerA(work_in_chan <-chan int, work_out_chan chan<- int) {
    for d := range work_in_chan {
        fmt.Printf("Worker A processing: %d\n", d)
        time.Sleep(100 * time.Millisecond) // 模拟工作耗时
        work_out_chan <- d
    }
    fmt.Println("Worker A exited.")
}
func workerB(work_in_chan <-chan int, work_out_chan chan<- int) {
    for d := range work_in_chan {
        fmt.Printf("Worker B processing: %d\n", d)
        time.Sleep(150 * time.Millisecond) // 模拟工作耗时,比A稍长
        work_out_chan <- d
    }
    fmt.Println("Worker B exited.")
}
func account(account_chan <-chan int, final_chan chan<- int) {
    wa_in := make(chan int)
    wa_out := make(chan int)
    wb_in := make(chan int)
    wb_out := make(chan int)
    go workerA(wa_in, wa_out)
    go workerB(wb_in, wb_out)
    for d := range account_chan {
        // 并发发送数据
        wa_in <- d
        wb_in <- d
        // 并行等待完成
        <-wa_out
        <-wb_out
        final_chan <- d
    }
    // 当account_chan关闭且所有数据处理完毕后,关闭worker的输入通道
    close(wa_in)
    close(wb_in)
    // 为了确保main goroutine能接收到所有final_chan的数据,这里不关闭final_chan,
    // 而是依赖main函数在接收完预期数量的数据后自行结束。
    // 在实际应用中,可能需要更健壮的退出机制,例如使用sync.WaitGroup。
}
func main() {
    account_chan := make(chan int, 100)
    final_chan := make(chan int, 100)
    go account(account_chan, final_chan)
    // 模拟发送数据
    for i := 1; i <= 3; i++ {
        account_chan <- i
    }
    close(account_chan) // 关闭输入通道,通知account goroutine没有更多数据
    // 从final_chan接收并打印结果
    // 由于不知道account何时关闭final_chan,这里我们根据发送的数据量来接收
    for i := 0; i < 3; i++ {
        fmt.Println("Final processed data:", <-final_chan)
    }
    // 给予goroutine一些时间来打印退出信息
    time.Sleep(500 * time.Millisecond)
}运行上述代码,你将观察到Worker A processing和Worker B processing的输出是交错出现的,这证明了它们正在并行执行。
通道的职责划分: 在本模式中,work_in_chan用于将数据传递给Worker,而work_out_chan则仅用于发送一个完成信号(其内容通常不重要,因为account goroutine只关心接收到信号)。这种设计清晰地分离了数据传输和同步通知的职责。
sync.WaitGroup的替代方案: 如果Worker goroutine在完成工作后不需要向account goroutine返回任何具体数据,仅仅是通知完成,那么使用sync.WaitGroup会是更简洁和推荐的同步机制。 例如,account函数可以改写为:
import "sync"
func accountWithWaitGroup(account_chan <-chan int, final_chan chan<- int) {
    wa_in := make(chan int)
    wb_in := make(chan int)
    var wg sync.WaitGroup // 声明WaitGroup
    go func() { // WorkerA
        for d := range wa_in {
            fmt.Printf("Worker A processing: %d (via WaitGroup)\n", d)
            time.Sleep(100 * time.Millisecond)
            wg.Done() // 通知WaitGroup完成
        }
        fmt.Println("Worker A exited.")
    }()
    go func() { // WorkerB
        for d := range wb_in {
            fmt.Printf("Worker B processing: %d (via WaitGroup)\n", d)
            time.Sleep(150 * time.Millisecond)
            wg.Done() // 通知WaitGroup完成
        }
        fmt.Println("Worker B exited.")
    }()
    for d := range account_chan {
        wg.Add(2) // 每次处理一个数据项,需要等待两个Worker
        wa_in <- d
        wb_in <- d
        wg.Wait() // 等待两个Worker都完成
        final_chan <- d
    }
    close(wa_in)
    close(wb_in)
}使用sync.WaitGroup可以避免创建额外的输出通道,使代码更专注于同步而非数据传递。
资源管理与优雅退出: 在实际应用中,确保所有goroutine在程序结束时能够优雅地退出至关重要。当account_chan关闭时,account goroutine会停止循环并关闭wa_in和wb_in。Worker goroutine在接收到wa_in或wb_in关闭的信号后,也会退出其循环。对于final_chan,通常由发送方负责关闭,或者通过sync.WaitGroup来确保所有数据处理完毕后再关闭。
数据共享安全性: 如果Worker goroutine需要修改传入的数据项d,并且这些修改需要被其他Worker或后续处理可见,那么需要考虑数据竞争问题。在这种情况下,传入的数据应是不可变的副本,或者使用互斥锁(sync.Mutex)等机制来保护共享数据。在本例中,数据项d是int类型,按值传递,因此不存在共享修改问题。
通过简单地调整通道操作的顺序——先并发地将数据分发给所有独立的Worker,然后等待所有Worker的完成信号——我们可以在Go语言中实现高效的并行处理。这种模式在保持固定goroutine数量的同时,最大化了独立工作单元的并行度。在选择同步机制时,应根据Worker是否需要返回数据来决定使用通道还是sync.WaitGroup,以编写出更清晰、更符合意图的并发代码。
以上就是Go语言并发模式:优化独立Worker的并行执行策略的详细内容,更多请关注php中文网其它相关文章!
 
                        
                        每个人都需要一台速度更快、更稳定的 PC。随着时间的推移,垃圾文件、旧注册表数据和不必要的后台进程会占用资源并降低性能。幸运的是,许多工具可以让 Windows 保持平稳运行。
 
                Copyright 2014-2025 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号