0

0

Go语言中并行独立工作协程的同步模式

霞舞

霞舞

发布时间:2025-10-26 08:22:06

|

991人浏览过

|

来源于php中文网

原创

Go语言中并行独立工作协程的同步模式

本文探讨在go语言中如何高效地实现独立工作协程的并行执行与同步。通过分析一个常见场景,即主协程需要等待多个独立工作协程完成对同一数据项的处理后才能继续,文章详细介绍了使用go通道(channel)进行输入分发和输出同步的正确模式,并提供了代码示例和最佳实践,确保在固定协程数量下实现真正的并发处理。

Go语言中并行独立工作协程的同步模式

在Go语言中,利用其强大的并发原语——Goroutine和Channel,可以优雅地构建复杂的并发系统。然而,正确地编排这些并发任务以实现真正的并行并确保数据同步,是Go并发编程中的一个核心挑战。本文将深入探讨一种常见的并发场景:一个主协程需要将数据分发给多个独立的子工作协程进行处理,并且必须等待所有子工作协程完成处理后才能继续其自身流程。

问题场景描述

假设我们有一个account协程,它从account_chan接收数据项。对于每个接收到的数据项,account协程需要委托给两个独立的子工作协程workerA和workerB进行处理。这两个worker协程的处理顺序不重要,但account协程必须确保workerA和workerB都已完成对当前数据项的处理,才能将该数据项发送到final_chan并继续处理下一个数据项。

此场景有以下关键要求:

  1. workerA和workerB是单例协程,即在程序生命周期内只启动一次。
  2. 系统中并发运行的协程数量应保持恒定,避免为每个数据项创建新的协程。
  3. workerA和workerB是完全独立的,它们可以并且应该并发执行。

初始“非并发”实现分析

考虑以下一种“直观”但错误的实现方式:

立即学习go语言免费学习笔记(深入)”;

package main

import "fmt"

func workerA(work_in_chan <-chan int, work_out_chan chan<- int) {
    for d := range work_in_chan {
        fmt.Println("A ", d)
        work_out_chan <- d // 模拟工作完成并发送信号
    }
}

func workerB(work_in_chan <-chan int, work_out_chan chan<- int) {
    for d := range work_in_chan {
        fmt.Println("B ", d)
        work_out_chan <- d // 模拟工作完成并发送信号
    }
}

func account(account_chan <-chan int, final_chan chan<- int) {
    wa_in := make(chan int)
    wa_out := make(chan int)
    wb_in := make(chan int)
    wb_out := make(chan int)

    go workerA(wa_in, wa_out)
    go workerB(wb_in, wb_out)

    for d := range account_chan {
        // 错误的实现方式:顺序执行
        wa_in <- d    // 发送数据给workerA
        <-wa_out      // 等待workerA完成

        wb_in <- d    // 发送数据给workerB
        <-wb_out      // 等待workerB完成

        final_chan <- d
    }
}

func main() {
    account_chan := make(chan int, 100)
    final_chan := make(chan int, 100)

    go account(account_chan, final_chan)

    account_chan <- 1
    account_chan <- 2
    account_chan <- 3
    close(account_chan) // 关闭输入通道,以便account协程最终退出

    for i := 0; i < 3; i++ {
        fmt.Println("Final:", <-final_chan)
    }
}

上述代码中的account协程在处理每个数据项时,首先将数据发送给workerA并立即等待其完成,然后才将数据发送给workerB并等待其完成。这种模式导致workerA和workerB实际上是顺序执行的,完全失去了并行处理的优势。这与我们希望它们并发执行的初衷相悖。

正确的并发模式:并行分发与同步等待

要实现workerA和workerB的真正并发,关键在于改变数据发送和完成信号接收的顺序。正确的做法是:首先将数据并行地发送给所有需要处理的子工作协程,然后并行地等待所有子工作协程的完成信号。

修改后的account协程中的循环逻辑如下:

启科网络PHP商城系统
启科网络PHP商城系统

启科网络商城系统由启科网络技术开发团队完全自主开发,使用国内最流行高效的PHP程序语言,并用小巧的MySql作为数据库服务器,并且使用Smarty引擎来分离网站程序与前端设计代码,让建立的网站可以自由制作个性化的页面。 系统使用标签作为数据调用格式,网站前台开发人员只要简单学习系统标签功能和使用方法,将标签设置在制作的HTML模板中进行对网站数据、内容、信息等的调用,即可建设出美观、个性的网站。

下载
// ... (workerA, workerB, channel声明部分同上)

func account(account_chan <-chan int, final_chan chan<- int) {
    wa_in := make(chan int)
    wa_out := make(chan int)
    wb_in := make(chan int)
    wb_out := make(chan int)

    go workerA(wa_in, wa_out)
    go workerB(wb_in, wb_out)

    for d := range account_chan {
        // 正确的实现方式:并行发送输入,并行等待输出
        wa_in <- d // 发送数据给workerA
        wb_in <- d // 发送数据给workerB (此时workerA和workerB可同时开始处理)

        <-wa_out   // 等待workerA完成
        <-wb_out   // 等待workerB完成 (这两个接收操作会阻塞,直到两个worker都发送了信号)

        final_chan <- d
    }
    // 当account_chan关闭且所有数据处理完毕后,关闭worker的输入通道
    // 这样worker协程也能优雅退出
    close(wa_in)
    close(wb_in)
    // 等待worker协程退出,或者确保它们处理完所有数据
    // 实际应用中可能需要更复杂的协调机制,例如WaitGroup
    close(wa_out) // 如果worker协程已退出,这些通道可能需要关闭
    close(wb_out)
}

代码解释:

  1. wa_in

这种模式确保了workerA和workerB能够真正地并发执行。当一个数据项被发送给它们时,它们会同时开始处理。account协程则会在两个worker都发出完成信号后,才继续处理下一个数据项。

关于完成顺序的思考

初次接触这种模式时,可能会担心“如果workerB比workerA先完成怎么办?”。实际上,这并不重要。

替代方案与注意事项

在上述示例中,worker协程的work_out_chan实际上只用于发送一个完成信号,其发送的具体值在account协程中并未被使用。在这种情况下,sync.WaitGroup是一个更简洁且推荐的替代方案,特别是当工作协程不需要返回任何处理结果,仅需通知完成时。

使用 sync.WaitGroup 的示例:

package main

import (
    "fmt"
    "sync"
    "time" // 引入time包用于模拟耗时操作
)

func workerA_wg(work_in_chan <-chan int, wg *sync.WaitGroup) {
    defer wg.Done() // 确保无论如何都调用Done
    for d := range work_in_chan {
        fmt.Println("A ", d)
        time.Sleep(100 * time.Millisecond) // 模拟耗时
        // workerA完成一个任务后,并不立即调用Done,而是在协程退出时调用一次
        // 如果是每个任务完成后都要通知,则需要每次循环内调用Done,并增加Add计数
    }
    fmt.Println("WorkerA exited.")
}

func workerB_wg(work_in_chan <-chan int, wg *sync.WaitGroup) {
    defer wg.Done() // 确保无论如何都调用Done
    for d := range work_in_chan {
        fmt.Println("B ", d)
        time.Sleep(150 * time.Millisecond) // 模拟耗时
    }
    fmt.Println("WorkerB exited.")
}

func account_wg(account_chan <-chan int, final_chan chan<- int) {
    wa_in := make(chan int)
    wb_in := make(chan int)

    // 注意:WaitGroup通常用于等待一组goroutine的完成。
    // 在本例中,worker协程是常驻的,每个数据项的处理需要单独同步。
    // 因此,WaitGroup的Add/Done操作需要针对每个数据项进行。

    go workerA_wg(wa_in, nil) // 这里的wg传入nil,因为workerA_wg的wg参数用于其自身退出,而非每次任务完成
    go workerB_wg(wb_in, nil) // 同上

    for d := range account_chan {
        var wg sync.WaitGroup
        wg.Add(2) // 为workerA和workerB各增加一个计数

        // 改进的worker函数,每次处理完一个数据项后调用wg.Done()
        go func(data int) {
            defer wg.Done()
            wa_in <- data
            // 在实际worker中处理,这里只是发送数据
            // 假设workerA接收到数据后会自己处理并发送一个信号
            // 但如果workerA是常驻的,它的Done应该由它自己控制
        }(d)
        go func(data int) {
            defer wg.Done()
            wb_in <- data
        }(d)

        // 这种模式下,如果workerA/B是常驻的,且每次处理一个数据后需要通知,
        // 那么workerA/B内部需要接收一个wg指针并在处理完数据后调用Done。
        // 这会使workerA/B的签名变得复杂,需要传递WaitGroup指针。

        // 更直接的WaitGroup使用方式,如果worker是短暂的:
        // 如果worker是常驻的,且每个数据项处理完后需要通知,
        // 那么原始的out_chan模式更清晰。
        // 如果要用WaitGroup,需要重构worker函数使其接收WaitGroup指针,并在处理完数据后调用Done。
        // 例如:
        // go func(data int, wg *sync.WaitGroup) {
        //     defer wg.Done()
        //     // 模拟workerA处理
        //     fmt.Println("A processing", data)
        //     time.Sleep(100 * time.Millisecond)
        // }(d, &wg)
        // go func(data int, wg *sync.WaitGroup) {
        //     defer wg.Done()
        //     // 模拟workerB处理
        //     fmt.Println("B processing", data)
        //     time.Sleep(150 * time.Millisecond)
        // }(d, &wg)

        // 如果worker是常驻的,并且每次处理一个数据后需要通知,
        // 那么每个worker需要一个输入通道和一个输出通道(或直接使用WaitGroup)。
        // 原始的channel方案在这里更直观。
        // 如果坚持使用WaitGroup,则每个worker需要一个输入通道,
        // 并且在处理完一个数据后,主协程(或一个协调协程)负责调用wg.Done()。
        // 这意味着worker的输出通道仍然是必要的,或者worker自己调用Done。

        // 鉴于原问题中workerA和workerB是单例协程,且每次处理一个数据后需要通知主协程,
        // 原始的输入/输出通道对模式是更直接且符合其设计意图的。
        // WaitGroup通常用于等待一组goroutine的启动和最终退出,
        // 而不是用于每次任务的同步。
        // 如果要用于每次任务同步,那么每个任务需要一个WaitGroup,这会比Channel复杂。

        // 因此,对于原问题描述,使用独立输出通道的模式是更合适的。
        // WaitGroup更适合于等待一组一次性任务的完成,或者等待常驻goroutine的最终退出。
        // 在这里,每个数据项的处理都是一个“任务”,需要等待两个worker完成,
        // 每次迭代都需要独立的同步。

        // 重新考虑:如果worker的out channel仅仅是信号,
        // 那么可以在account协程内部为每个数据项创建一个临时的WaitGroup。
        // workerA和workerB需要被改造,使其接收WaitGroup指针并在处理完成后调用Done。

        // 鉴于原始问题中的约束和代码结构,使用独立输出通道是最直接和符合Go惯用法的方式。
        // 让我们回到原始的Channel解决方案,因为它更贴合“固定数量Goroutine”和“每次任务同步”的需求。
    }
    close(wa_in)
    close(wb_in)
}

// 总结:对于“固定数量常驻worker协程,每次处理一个数据项后需要同步”的场景,
// 使用输入通道分发数据,输出通道接收完成信号,是最直接和符合Go语言习惯的模式。
// WaitGroup更适用于等待一组Goroutine的整体完成,而非每次任务的细粒度同步。

总结

在Go语言中,实现多个独立工作协程的并行执行和同步,关键在于合理地利用通道进行数据传输和信号协调。当主协程需要等待所有子工作协程完成对同一数据项的处理时,正确的模式是:

  1. 并行发送输入: 将数据项同时发送给所有相关的子工作协程的输入通道。
  2. 并行等待输出: 阻塞等待从所有子工作协程的输出通道接收完成信号。

这种模式不仅确保了真正的并发,而且利用Go通道的阻塞特性,自然地实现了“全部完成”的同步语义,而无需手动管理复杂的锁或条件变量。通过这种方式,我们可以构建出高效、健壮且易于理解的并发系统。

相关专题

更多
Go中Type关键字的用法
Go中Type关键字的用法

Go中Type关键字的用法有定义新的类型别名或者创建新的结构体类型。本专题为大家提供Go相关的文章、下载、课程内容,供大家免费下载体验。

234

2023.09.06

go怎么实现链表
go怎么实现链表

go通过定义一个节点结构体、定义一个链表结构体、定义一些方法来操作链表、实现一个方法来删除链表中的一个节点和实现一个方法来打印链表中的所有节点的方法实现链表。

446

2023.09.25

go语言编程软件有哪些
go语言编程软件有哪些

go语言编程软件有Go编译器、Go开发环境、Go包管理器、Go测试框架、Go文档生成器、Go代码质量工具和Go性能分析工具等。本专题为大家提供go语言相关的文章、下载、课程内容,供大家免费下载体验。

249

2023.10.13

0基础如何学go语言
0基础如何学go语言

0基础学习Go语言需要分阶段进行,从基础知识到实践项目,逐步深入。php中文网给大家带来了go语言相关的教程以及文章,欢迎大家前来学习。

699

2023.10.26

Go语言实现运算符重载有哪些方法
Go语言实现运算符重载有哪些方法

Go语言不支持运算符重载,但可以通过一些方法来模拟运算符重载的效果。使用函数重载来模拟运算符重载,可以为不同的类型定义不同的函数,以实现类似运算符重载的效果,通过函数重载,可以为不同的类型实现不同的操作。本专题为大家提供相关的文章、下载、课程内容,供大家免费下载体验。

194

2024.02.23

Go语言中的运算符有哪些
Go语言中的运算符有哪些

Go语言中的运算符有:1、加法运算符;2、减法运算符;3、乘法运算符;4、除法运算符;5、取余运算符;6、比较运算符;7、位运算符;8、按位与运算符;9、按位或运算符;10、按位异或运算符等等。本专题为大家提供相关的文章、下载、课程内容,供大家免费下载体验。

229

2024.02.23

go语言开发工具大全
go语言开发工具大全

本专题整合了go语言开发工具大全,想了解更多相关详细内容,请阅读下面的文章。

282

2025.06.11

go语言引用传递
go语言引用传递

本专题整合了go语言引用传递机制,想了解更多相关内容,请阅读专题下面的文章。

158

2025.06.26

Golang 性能分析与pprof调优实战
Golang 性能分析与pprof调优实战

本专题系统讲解 Golang 应用的性能分析与调优方法,重点覆盖 pprof 的使用方式,包括 CPU、内存、阻塞与 goroutine 分析,火焰图解读,常见性能瓶颈定位思路,以及在真实项目中进行针对性优化的实践技巧。通过案例讲解,帮助开发者掌握 用数据驱动的方式持续提升 Go 程序性能与稳定性。

9

2026.01.22

热门下载

更多
网站特效
/
网站源码
/
网站素材
/
前端模板

精品课程

更多
相关推荐
/
热门推荐
/
最新课程
Go 教程
Go 教程

共32课时 | 4万人学习

Go语言实战之 GraphQL
Go语言实战之 GraphQL

共10课时 | 0.8万人学习

关于我们 免责申明 举报中心 意见反馈 讲师合作 广告合作 最新更新
php中文网:公益在线php培训,帮助PHP学习者快速成长!
关注服务号 技术交流群
PHP中文网订阅号
每天精选资源文章推送

Copyright 2014-2026 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号