首页 > 后端开发 > Golang > 正文

Go语言并发模式:独立任务的并行处理与同步

花韻仙語
发布: 2025-10-26 10:15:10
原创
388人浏览过

go语言并发模式:独立任务的并行处理与同步

本文探讨了在Go语言中如何高效地实现独立任务的并行处理与同步。通过分析一个具体场景,我们展示了如何利用Go的通道(channels)或`sync.WaitGroup`来协调多个并发执行的worker goroutine,确保所有任务完成后才进行下一步操作,同时维持固定的goroutine数量,避免为每个数据项创建新goroutine,从而优化资源利用和执行效率。

在Go语言中,实现高效的并发处理是其核心优势之一。本教程将深入探讨一种常见的并发模式:如何协调多个独立的worker goroutine并行处理数据,并确保所有worker完成后才继续执行后续逻辑。我们将通过一个具体的案例来演示这种模式,并提供两种实现方案:基于通道的同步和基于sync.WaitGroup的同步。

场景描述

假设我们有一个主协调器account goroutine,它从一个输入通道接收数据。每接收到一个数据项,account需要将该数据分发给两个独立的worker goroutine(workerA和workerB)进行处理。这两个worker的执行顺序不重要,但account必须等待它们都完成对当前数据项的处理后,才能将该数据发送到最终的输出通道。关键要求如下:

  1. workerA和workerB各自是一个独立的goroutine。
  2. 系统中goroutine的数量应保持恒定,即不应为每个新数据项创建新的goroutine。
  3. workerA和workerB应并行执行,以最大化并发效率。

初始问题与分析

考虑以下初始的account函数实现:

立即学习go语言免费学习笔记(深入)”;

func account(account_chan <-chan int, final_chan chan<- int) {
    wa_in := make(chan int)
    wa_out := make(chan int)
    wb_in := make(chan int)
    wb_out := make(chan int)

    go workerA(wa_in, wa_out)
    go workerB(wb_in, wb_out)

    for d := range account_chan {
        // 顺序执行:发送给A,等待A完成;发送给B,等待B完成
        wa_in <- d
        <-wa_out

        wb_in <- d
        <-wb_out

        final_chan <- d
    }
}
登录后复制

其中workerA和workerB的定义如下:

func workerA(work_in_chan <-chan int, work_out_chan chan<- int) {
    for d := range work_in_chan {
        fmt.Println("A ", d)
        work_out_chan <- d // 假设这里是处理逻辑,然后发送完成信号
    }
}

func workerB(work_in_chan <-chan int, work_out_chan chan<- int) {
    for d := range work_in_chan {
        fmt.Println("B ", d)
        work_out_chan <- d // 假设这里是处理逻辑,然后发送完成信号
    }
}
登录后复制

这个实现的问题在于,wa_in <- d和<-wa_out会阻塞account goroutine,直到workerA完成并发送回信号。只有workerA完成后,account才会继续执行wb_in <- d,进而触发workerB。这导致workerA和workerB实际上是顺序执行的,未能实现并行处理,浪费了并发能力。

解决方案一:通过通道实现并行与同步

要实现workerA和workerB的并行执行,关键在于调整通道操作的顺序:先将数据同时发送给所有worker,然后等待所有worker的完成信号。

package main

import "fmt"
import "time" // 引入time包用于模拟工作耗时

func workerA(work_in_chan <-chan int, work_out_chan chan<- int) {
    for d := range work_in_chan {
        fmt.Printf("WorkerA 正在处理数据: %d\n", d)
        time.Sleep(time.Millisecond * 100) // 模拟工作耗时
        work_out_chan <- d                 // 发送完成信号
    }
}

func workerB(work_in_chan <-chan int, work_out_chan chan<- int) {
    for d := range work_in_chan {
        fmt.Printf("WorkerB 正在处理数据: %d\n", d)
        time.Sleep(time.Millisecond * 150) // 模拟工作耗时,可能与A不同
        work_out_chan <- d                 // 发送完成信号
    }
}

func account(account_chan <-chan int, final_chan chan<- int) {
    wa_in := make(chan int)
    wa_out := make(chan int)
    wb_in := make(chan int)
    wb_out := make(chan int)

    go workerA(wa_in, wa_out)
    go workerB(wb_in, wb_out)

    for d := range account_chan {
        // 先同时发送数据给两个worker
        wa_in <- d
        wb_in <- d

        // 然后等待两个worker的完成信号
        // 接收顺序不重要,因为两者都需要被接收
        <-wa_out
        <-wb_out

        final_chan <- d // 两个worker都完成后,将数据发送到最终通道
    }
    // 注意:在实际应用中,当account_chan关闭时,需要考虑如何优雅地关闭wa_in和wb_in,
    // 以便worker goroutine能够退出循环。此处为简化示例。
    close(wa_in)
    close(wb_in)
}

func main() {
    account_chan := make(chan int, 100)
    final_chan := make(chan int, 100)

    go account(account_chan, final_chan)

    // 发送一些数据
    account_chan <- 1
    account_chan <- 2
    account_chan <- 3

    // 关闭输入通道,通知account goroutine没有更多数据
    close(account_chan)

    // 等待并打印最终结果
    // 由于account goroutine在处理完所有数据并关闭其输入通道后,
    // 可能会立即关闭其输出通道,这里需要确保在接收前account goroutine有足够时间处理。
    // 更健壮的方式是使用sync.WaitGroup等待account goroutine完成。
    time.Sleep(time.Second) // 简单等待,确保account goroutine处理完成
    fmt.Println("最终结果:", <-final_chan)
    fmt.Println("最终结果:", <-final_chan)
    fmt.Println("最终结果:", <-final_chan)
}
登录后复制

原理分析:

云雀语言模型
云雀语言模型

云雀是一款由字节跳动研发的语言模型,通过便捷的自然语言交互,能够高效的完成互动对话

云雀语言模型54
查看详情 云雀语言模型

通过将wa_in <- d和wb_in <- d放在一起,account goroutine会非阻塞地将数据发送给workerA和workerB(假设输入通道有足够的缓冲区或worker能够立即接收)。一旦数据发送完成,两个worker就可以同时开始处理。接着,account goroutine会尝试从wa_out和wb_out接收信号。这两个接收操作都会阻塞,直到各自的worker完成处理并发送回数据。由于接收顺序不影响最终结果(两个信号都必须收到),因此这种模式有效地实现了并行处理和同步。

解决方案二:使用sync.WaitGroup进行同步

在许多情况下,worker goroutine的输出值可能并不需要被协调器直接使用,或者它们处理的是共享数据结构而不是通过通道返回数据。此时,sync.WaitGroup是更简洁、更符合Go习惯的同步机制。sync.WaitGroup用于等待一组goroutine完成。

package main

import (
    "fmt"
    "sync"
    "time"
)

func workerA_wg(work_in_chan <-chan int, wg *sync.WaitGroup) {
    defer wg.Done() // 确保在goroutine退出时调用Done()
    for d := range work_in_chan {
        fmt.Printf("WorkerA_wg 正在处理数据: %d\n", d)
        time.Sleep(time.Millisecond * 100) // 模拟工作耗时
        // 不需要通过通道返回数据,直接标记完成
    }
}

func workerB_wg(work_in_chan <-chan int, wg *sync.WaitGroup) {
    defer wg.Done() // 确保在goroutine退出时调用Done()
    for d := range work_in_chan {
        fmt.Printf("WorkerB_wg 正在处理数据: %d\n", d)
        time.Sleep(time.Millisecond * 150) // 模拟工作耗时
        // 不需要通过通道返回数据,直接标记完成
    }
}

func account_wg(account_chan <-chan int, final_chan chan<- int) {
    wa_in := make(chan int)
    wb_in := make(chan int)

    // 创建一个WaitGroup,用于同步两个worker
    var wg sync.WaitGroup

    // workerA_wg和workerB_wg各自需要调用wg.Done()一次
    wg.Add(2)
    go workerA_wg(wa_in, &wg)
    go workerB_wg(wb_in, &wg)

    for d := range account_chan {
        // 先同时发送数据给两个worker
        wa_in <- d
        wb_in <- d

        // 等待所有worker完成对当前数据项的处理
        // 注意:这里需要为每个数据项重置WaitGroup,或者采用不同的WaitGroup管理方式。
        // 考虑到worker是持续运行的,这种模式下WaitGroup不适合按数据项同步。
        // 正确的做法是,如果worker是短生命周期的,为每个数据项创建新的WaitGroup;
        // 如果worker是长生命周期的,如本例,则需要更精细的同步。

        // 对于本例(长生命周期worker),wg.Wait()应在worker goroutine完全退出后调用,
        // 而不是在每个数据项处理后。
        // 如果要实现每个数据项的同步,需要将wg.Add(2)和wg.Wait()放在循环内部,
        // 并且worker的Done()逻辑需要调整。

        // 重新思考:如果worker是长生命周期且需要对每个数据项同步,那么通道仍是更直接的方式。
        // WaitGroup更适合等待一组goroutine在生命周期结束时完成。
        // 但我们可以模拟一种每个数据项同步的WaitGroup用法,但这会增加复杂性。

        // 考虑到原问题中worker是单例goroutine,且对每个数据项都需同步,
        // 通道方案更自然。如果坚持使用WaitGroup,则每个数据项的同步需要更复杂的逻辑,
        // 例如,worker在处理完一个数据项后,通过另一个通道通知account,
        // account收到两个通知后,再调用WaitGroup的Add/Done。
        // 这种情况下,WaitGroup的优势就不明显了,甚至可能比通道更复杂。

        // 为了符合WaitGroup的典型用法,我们假设workerA_wg和workerB_wg是处理所有数据后才退出的。
        // 此时,wg.Wait()应该在for循环结束后调用,等待所有worker完全退出。
        // 但这与“每个数据项处理完后才发送到final_chan”的需求不符。

        // 因此,对于“每个数据项完成同步”的需求,通道方案更直接。
        // 如果确实要用WaitGroup,且worker是长生命周期,通常会结合其他机制。
        // 例如,worker在处理完一个数据项后,会发送一个信号到account,
        // account接收到所有worker的信号后,再继续。这本质上又回到了通道方案。

        // **结论:对于本问题场景,通道是更自然的同步机制。**
        // 如果worker的“输出”只是一个完成信号,且不需要传递任何值,
        // 那么可以将`work_out_chan <- d`替换为`work_out_chan <- struct{}{} `,
        // 接收方也相应调整。这与WaitGroup的语义更接近,但仍是通道。

        final_chan <- d // 假设这里是处理逻辑,然后发送完成信号
    }
    // 关闭输入通道,通知worker goroutine没有更多数据
    close(wa_in)
    close(wb_in)

    wg.Wait() // 等待所有worker goroutine完全退出
    close(final_chan) // 所有工作完成后关闭最终通道
}

func main_wg() {
    account_chan := make(chan int, 100)
    final_chan := make(chan int, 100)

    go account_wg(account_chan, final_chan)

    account_chan <- 1
    account_chan <- 2
    account_chan <- 3

    close(account_chan)

    for res := range final_chan {
        fmt.Println("最终结果 (WG):", res)
    }
}

// 由于WaitGroup在此特定场景下(每个数据项同步且worker是长生命周期)
// 实现起来不如通道直观和简洁,上面的`account_wg`函数没有完全实现按数据项同步。
// 如果要严格实现,则需要将`wg.Add(2)`和`wg.Wait()`放在循环内部,
// 并且worker每次处理完一个数据项后,需要调用`wg.Done()`,
// 并在处理下一个数据项前,需要重新`wg.Add(1)`,这会使得worker的循环结构复杂。
// 实际上,这种场景下,通道的“发送数据-接收完成信号”模式就是最简洁的。
登录后复制

对sync.WaitGroup的补充说明:

sync.WaitGroup更适用于等待一组goroutine在它们的整个生命周期结束后完成,而不是在每个中间步骤进行同步。例如,在启动多个并发任务,然后等待所有任务完成时,WaitGroup非常有效。

如果确实需要在每个数据项处理后使用WaitGroup进行同步,那么每个worker在处理完一个数据项后,需要调用wg.Done()。而account goroutine在发送完数据后,需要调用wg.Add(2)(或根据实际worker数量),然后调用wg.Wait()。但由于workerA和workerB是持续运行的goroutine,它们不能在每次处理完数据后就调用defer wg.Done()并退出。这需要更复杂的逻辑,例如,worker通过一个单独的通道通知account其已完成当前数据项,然后account在收到所有通知后,再使用WaitGroup的Add和Wait。但这又回到了通道的范畴。

因此,对于本教程中的“每个数据项处理完后才发送到final_chan”且worker是长生命周期的场景,基于通道的同步(解决方案一)是更自然、更简洁且更符合Go语言哲学的方式。

注意事项与最佳实践

  1. 通道的关闭: 在实际应用中,确保所有发送方在不再发送数据时关闭通道,以便接收方能够优雅地退出for range循环。这对于避免goroutine泄漏至关重要。在上述示例中,account goroutine在for range account_chan循环结束后,需要负责关闭wa_in和wb_in,以便workerA和workerB能够退出。
  2. 错误处理: worker goroutine在处理数据时可能会遇到错误。在生产环境中,需要设计机制将错误信息传递回account goroutine,以便进行统一的错误处理。这通常可以通过在work_out_chan中发送一个包含错误信息的结构体,或者使用select语句监听错误通道来实现。
  3. 资源管理: 确保所有goroutine都能正常退出,释放占用的资源。使用defer语句配合WaitGroup.Done()是确保goroutine退出的常见模式。
  4. 通道容量: make(chan int, 100)创建了一个带缓冲的通道。适当设置通道容量可以平衡生产者和消费者之间的速度差异,避免过早阻塞或内存溢出。
  5. 扇入/扇出模式: 本文展示的模式是经典的“扇出-扇入”(Fan-out/Fan-in)模式。account goroutine将数据“扇出”给多个worker,然后通过通道“扇入”完成信号,实现同步。

总结

Go语言通过其强大的goroutine和通道机制,使得实现复杂的并发模式变得相对简单和直观。对于需要并行处理独立任务并进行同步的场景,通过合理地安排通道的发送和接收操作,可以有效地实现并发执行,并确保所有必要的前置任务完成后再进行下一步。虽然sync.WaitGroup在某些同步场景下非常有用,但对于本教程中每个数据项的同步需求,通道提供了一种更直接、更易于理解和维护的解决方案。理解这两种同步机制的适用场景,是编写高效、健壮Go并发程序的关键。

以上就是Go语言并发模式:独立任务的并行处理与同步的详细内容,更多请关注php中文网其它相关文章!

最佳 Windows 性能的顶级免费优化软件
最佳 Windows 性能的顶级免费优化软件

每个人都需要一台速度更快、更稳定的 PC。随着时间的推移,垃圾文件、旧注册表数据和不必要的后台进程会占用资源并降低性能。幸运的是,许多工具可以让 Windows 保持平稳运行。

下载
来源:php中文网
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系admin@php.cn
最新问题
开源免费商场系统广告
热门教程
更多>
最新下载
更多>
网站特效
网站源码
网站素材
前端模板
关于我们 免责申明 意见反馈 讲师合作 广告合作 最新更新 English
php中文网:公益在线php培训,帮助PHP学习者快速成长!
关注服务号 技术交流群
PHP中文网订阅号
每天精选资源文章推送
PHP中文网APP
随时随地碎片化学习
PHP中文网抖音号
发现有趣的

Copyright 2014-2025 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号