首页 > 后端开发 > Golang > 正文

Go语言并发模式:优化独立工作协程的并行执行

花韻仙語
发布: 2025-10-26 10:58:34
原创
510人浏览过

Go语言并发模式:优化独立工作协程的并行执行

本教程探讨go语言中如何优化独立工作协程的并行执行。针对传统顺序执行导致并发效率低下的问题,文章提出了一种通过巧妙重排通道操作的解决方案。该模式允许多个独立工作协程同时启动并并行处理数据,并通过通道接收操作实现同步,确保所有工作完成后再进行下一步处理,从而在保持固定协程数量的同时,显著提升系统吞吐量。

在Go语言中,利用协程(goroutine)和通道(channel)实现并发是其核心优势之一。然而,不恰当的通道操作顺序可能导致即使是独立的任务也无法真正并行执行,从而限制了程序的并发能力。本教程将深入探讨如何通过优化通道操作顺序,使得多个独立的工作协程能够高效并行处理数据,同时满足保持固定协程数量的约束。

挑战:独立工作协程的顺序执行

考虑一个常见的场景:一个主协调协程(例如account)需要将接收到的数据分发给多个独立的子工作协程(例如workerA和workerB)进行处理。要求是:

  1. workerA和workerB各自运行在一个独立的协程中,且这些协程数量固定,不随数据项的增加而动态创建。
  2. workerA和workerB对数据的处理是完全独立的,它们之间没有数据依赖,因此可以并行执行。
  3. 只有当所有相关的子工作协程都完成对当前数据项的处理后,主协调协程才能将该数据项传递给下一个阶段。

初始的实现可能如下所示,其中主协调协程account在处理每个数据项时,会先将数据发送给workerA并等待其完成,然后再发送给workerB并等待其完成。这种串行等待的方式,即使workerA和workerB是独立的,也无法实现真正的并行。

package main

import "fmt"

func workerA(work_in_chan <-chan int, work_out_chan chan<- int) {
    for d := range work_in_chan {
        fmt.Println("A ", d)
        // 模拟工作
        work_out_chan <- d
    }
}

func workerB(work_in_chan <-chan int, work_out_chan chan<- int) {
    for d := range work_in_chan {
        fmt.Println("B ", d)
        // 模拟工作
        work_out_chan <- d
    }
}

func account(account_chan <-chan int, final_chan chan<- int) {
    wa_in := make(chan int)
    wa_out := make(chan int)
    wb_in := make(chan int)
    wb_out := make(chan int)

    go workerA(wa_in, wa_out)
    go workerB(wb_in, wb_out)

    for d := range account_chan {
        // 初始实现:串行处理,无法并行
        wa_in <- d
        <-wa_out // 阻塞,等待workerA完成

        wb_in <- d
        <-wb_out // 阻塞,等待workerB完成

        final_chan <- d
    }
}

func main() {
    account_chan := make(chan int, 100)
    final_chan := make(chan int, 100)

    go account(account_chan, final_chan)

    account_chan <- 1
    account_chan <- 2
    account_chan <- 3
    close(account_chan) // 关闭输入通道,以便account协程最终退出

    // 从final_chan接收结果
    for i := 0; i < 3; i++ {
        fmt.Println("Final:", <-final_chan)
    }
    close(final_chan) // 关闭输出通道
}
登录后复制

在上述代码中,account协程在处理每个数据项d时,首先向wa_in发送数据,然后立即阻塞等待wa_out的返回。只有workerA处理完毕并发送到wa_out后,account协程才能继续向wb_in发送数据,并再次阻塞等待wb_out的返回。这种模式导致workerA和workerB无法同时运行,极大地限制了并发性。

立即学习go语言免费学习笔记(深入)”;

解决方案:重排通道操作实现并行

要解决上述问题,关键在于改变主协调协程中通道的发送和接收顺序。既然workerA和workerB是独立的,我们可以先将数据同时发送给它们,让它们并行开始工作,然后统一等待它们全部完成。

优化的实现如下:

package main

import "fmt"

func workerA(work_in_chan <-chan int, work_out_chan chan<- int) {
    for d := range work_in_chan {
        fmt.Println("A processing:", d)
        // 模拟工作,可能耗时
        work_out_chan <- d // 完成后发送信号
    }
    close(work_out_chan) // 当输入通道关闭时,关闭输出通道
}

func workerB(work_in_chan <-chan int, work_out_chan chan<- int) {
    for d := range work_in_chan {
        fmt.Println("B processing:", d)
        // 模拟工作,可能耗时
        work_out_chan <- d // 完成后发送信号
    }
    close(work_out_chan) // 当输入通道关闭时,关闭输出通道
}

func account(account_chan <-chan int, final_chan chan<- int) {
    // 创建用于workerA和workerB的输入输出通道
    // 注意:这里使用无缓冲通道,确保worker在准备好接收前不会阻塞发送
    wa_in := make(chan int)
    wa_out := make(chan int)
    wb_in := make(chan int)
    wb_out := make(chan int)

    // 启动worker协程
    go workerA(wa_in, wa_out)
    go workerB(wb_in, wb_out)

    // 遍历输入数据
    for d := range account_chan {
        // 1. 同时将数据发送给所有工作协程
        // 假设worker协程已准备好接收,此操作是非阻塞的(对于无缓冲通道,worker必须已在接收端等待)
        // 或如果通道有缓冲,则只要缓冲未满,发送就是非阻塞的
        wa_in <- d
        wb_in <- d

        // 2. 阻塞等待所有工作协程完成
        // 接收操作会阻塞,直到对应的worker完成其工作并发送信号
        <-wa_out
        <-wb_out

        // 3. 所有工作完成后,将数据发送到最终通道
        final_chan <- d
    }

    // 当account_chan关闭且所有数据处理完毕后,关闭worker的输入通道
    // 这样worker协程才能从for range循环中退出
    close(wa_in)
    close(wb_in)

    // 等待worker协程完成所有剩余工作并关闭其输出通道
    // 确保在关闭final_chan之前所有数据都已处理
    for range wa_out {} // 消费完所有wa_out中可能剩余的信号
    for range wb_out {} // 消费完所有wb_out中可能剩余的信号

    close(final_chan) // 所有工作完成后关闭最终输出通道
}

func main() {
    account_chan := make(chan int, 100) // 带缓冲的输入通道
    final_chan := make(chan int, 100)   // 带缓冲的输出通道

    go account(account_chan, final_chan)

    // 发送数据
    account_chan <- 1
    account_chan <- 2
    account_chan <- 3
    close(account_chan) // 发送完毕,关闭输入通道

    // 从final_chan接收结果
    for res := range final_chan {
        fmt.Println("Final result:", res)
    }
}
登录后复制

代码分析:

  1. 并行启动工作: wa_in <- d 和 wb_in <- d 这两个发送操作紧密相连。如果workerA和workerB协程都已启动并在等待从各自的输入通道接收数据,那么这两个发送操作将是非阻塞的(对于无缓冲通道,这意味着worker必须已在接收端等待;对于有缓冲通道,只要缓冲未满,发送就非阻塞)。这使得workerA和workerB几乎可以同时开始处理数据d。
  2. 同步等待完成: <-wa_out 和 <-wb_out 这两个接收操作将阻塞主协调协程,直到workerA和workerB分别完成它们的工作并将信号发送到各自的输出通道。由于接收操作的阻塞特性,account协程会等待两个工作协程都发出完成信号后才继续执行。
  3. 顺序不重要: 即使workerA比workerB先完成,或者反之,这种模式都能正确工作。因为account协程会同时等待两个接收操作,无论哪个先完成,它都会继续等待另一个,直到两者都完成为止。

通过这种简单的通道操作重排,我们成功地让两个独立的worker协程实现了真正的并行处理,同时满足了所有数据项必须经过所有worker处理的同步要求,并且保持了固定数量的协程。

关键概念与注意事项

  1. 并发与并行:

    行者AI
    行者AI

    行者AI绘图创作,唤醒新的灵感,创造更多可能

    行者AI100
    查看详情 行者AI
    • 并发(Concurrency) 是指程序设计结构能够处理多个任务。Go语言通过协程(goroutines)提供了优秀的并发原语。
    • 并行(Parallelism) 是指多个任务在同一时间点上物理地同时执行。本教程的优化正是为了在多核处理器上实现workerA和workerB的并行执行。
  2. 通道缓冲:

    • 在上述示例中,wa_in、wa_out、wb_in、wb_out通道默认是无缓冲的。这意味着发送操作会阻塞,直到有接收者准备好接收;接收操作会阻塞,直到有发送者发送数据。这种行为保证了严格的同步。
    • 如果将这些通道设置为带缓冲的(例如make(chan int, 1)),则发送操作在缓冲区未满时是非阻塞的。这可以减少协调协程与工作协程之间的紧密耦合,提高吞吐量,但需要注意缓冲区大小的选择,以避免死锁或资源耗尽。
  3. sync.WaitGroup的替代方案:

    • 在当前场景中,workerA和workerB的输出通道(wa_out, wb_out)仅用于发送完成信号,其传输的具体值并不重要。

    • 如果工作协程的输出值确实不需要被主协调协程使用,那么使用sync.WaitGroup可能是一个更简洁、更高效的同步机制。sync.WaitGroup专门用于等待一组协程完成。

    • 使用sync.WaitGroup的伪代码示例:

      // ... (workerA和workerB不再需要work_out_chan,而是接收一个*sync.WaitGroup)
      func workerA(work_in_chan <-chan int, wg *sync.WaitGroup) {
          defer wg.Done() // 在函数退出时通知WaitGroup
          for d := range work_in_chan {
              // ... 处理数据
          }
      }
      
      func account(account_chan <-chan int, final_chan chan<- int) {
          // ...
          var wg sync.WaitGroup
          // ...
          for d := range account_chan {
              wg.Add(2) // 增加计数,表示有两个worker需要完成
              wa_in <- d
              wb_in <- d
              wg.Wait() // 阻塞等待所有worker完成
              final_chan <- d
          }
          // ...
      }
      登录后复制
    • sync.WaitGroup的优势在于它更明确地表达了“等待一组任务完成”的意图,并且避免了创建不必要的通道。

  4. 优雅关闭:

    • 在main函数中,通过close(account_chan)来通知account协程不再有新的数据。
    • account协程在for range account_chan循环结束后,需要close(wa_in)和close(wb_in)来通知workerA和workerB不再有新的输入。
    • workerA和workerB在接收通道关闭后,也会退出其for range循环,并close其输出通道。
    • account协程在关闭其输入通道后,需要确保所有worker协程都已完成并关闭其输出通道后,才能安全地关闭final_chan。通过for range wa_out {}和for range wb_out {}来消费完所有可能的剩余信号,确保worker协程完全退出。这确保了整个数据流的完整性和程序的优雅终止。

总结

通过对Go语言中通道操作顺序的细致调整,我们能够有效地将独立的任务从串行执行转变为并行执行,从而充分利用多核处理器的能力,提升程序的整体吞吐量。这种模式的核心思想是:先同时启动所有独立的工作任务(通过非阻塞发送),然后统一等待所有任务完成(通过阻塞接收)。在实际开发中,根据具体需求(是否需要传递结果、同步机制的简洁性等),可以选择使用通道进行同步,或者考虑使用sync.WaitGroup等更专业的同步原语。理解并熟练运用这些并发模式,是编写高性能Go语言应用的关键。

以上就是Go语言并发模式:优化独立工作协程的并行执行的详细内容,更多请关注php中文网其它相关文章!

最佳 Windows 性能的顶级免费优化软件
最佳 Windows 性能的顶级免费优化软件

每个人都需要一台速度更快、更稳定的 PC。随着时间的推移,垃圾文件、旧注册表数据和不必要的后台进程会占用资源并降低性能。幸运的是,许多工具可以让 Windows 保持平稳运行。

下载
来源:php中文网
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系admin@php.cn
最新问题
开源免费商场系统广告
热门教程
更多>
最新下载
更多>
网站特效
网站源码
网站素材
前端模板
关于我们 免责申明 意见反馈 讲师合作 广告合作 最新更新 English
php中文网:公益在线php培训,帮助PHP学习者快速成长!
关注服务号 技术交流群
PHP中文网订阅号
每天精选资源文章推送
PHP中文网APP
随时随地碎片化学习
PHP中文网抖音号
发现有趣的

Copyright 2014-2025 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号