
go 语言以其独特的并发模型而闻名,该模型基于 goroutine 和通道(channel)。goroutine 是一种轻量级的线程,由 go 运行时管理,而通道则是 goroutine 之间进行通信和同步的主要方式。通过通道,goroutine 可以安全地发送和接收数据,避免了传统共享内存并发模型中常见的竞态条件问题。
扇入(Fan-In)是一种常见的并发模式,其核心思想是将多个输入通道的数据汇聚到一个单一的输出通道中。这在需要合并来自不同源的数据流、实现负载均衡或简化消费者逻辑的场景中非常有用。通过扇入模式,消费者无需关心数据来源于哪个具体的 goroutine,只需从一个统一的通道中获取数据即可。
我们通过一个经典的“无聊对话”示例来演示扇入模式。在这个例子中,两个独立的 goroutine(Ann 和 Joe)会周期性地发送消息,而 fanIn 函数则负责将它们的消息合并到一个通道中。
package main
import (
"fmt"
"math/rand"
"time"
)
// boring 函数模拟一个 goroutine 持续发送消息,并带有随机延迟
func boring(msg string) <-chan string {
c := make(chan string)
go func() { // 在函数内部启动一个 goroutine
for i := 0; ; i++ {
c <- fmt.Sprintf("%s %d", msg, i)
time.Sleep(time.Duration(rand.Intn(1e3)) * time.Millisecond) // 引入 0-999ms 的随机延迟
}
}()
return c
}
// fanIn 函数将两个输入通道的数据汇聚到一个输出通道
func func fanIn(input1, input2 <-chan string) <-chan string {
c := make(chan string)
go func() {
for {
c <- <-input1 // 从 input1 读取并发送到 c
}
}()
go func() {
for {
c <- <-input2 // 从 input2 读取并发送到 c
}
}()
return c
}
func main() {
c := fanIn(boring("Joe"), boring("Ann")) // 启动两个 boring goroutine 并扇入其输出
for i := 0; i < 10; i++ { // 初始的循环次数较少
fmt.Println(<-c)
}
fmt.Printf("You're both boring, I'm leaving...\n")
}上述代码的 boring 函数创建了一个 goroutine,它会无限循环地发送带有序号的消息,并在每次发送后引入一个 0 到 999 毫秒的随机延迟。fanIn 函数则接收两个 boring 函数返回的通道,并创建两个新的 goroutine,分别负责从这两个输入通道中读取消息,然后将其发送到 fanIn 返回的单一输出通道 c。main 函数通过调用 fanIn 来启动整个流程,并尝试从合并后的通道 c 中读取 10 条消息。
当我们运行上述代码时,可能会观察到如下输出:
Joe 0 Ann 0 Joe 1 Ann 1 Joe 2 Ann 2 Joe 3 Ann 3 Joe 4 Ann 4 You're both boring, I'm leaving...
这种输出结果似乎表明 Joe 和 Ann 的消息是严格同步交替出现的,与我们期望的“非同步”行为有所出入。这导致了许多初学者对 Go 并发模型产生疑问:既然每个 boring goroutine 都引入了随机延迟,为什么它们还会步调一致地输出?
造成这种现象的原因在于,尽管每个 boring goroutine 都引入了随机延迟,但在程序运行的初期,尤其是在只读取少量消息(例如 10 条)的情况下,这些随机延迟可能尚未积累出足够大的差异。Go 调度器在短时间内可能会以相对稳定的顺序调度这些 goroutine,加上 main 函数快速地从扇入通道中消费消息,使得随机延迟的去同步效果不明显。简单来说,在程序刚启动时,两个 goroutine 几乎同时开始,它们的第一个消息可能也几乎同时准备好,而 fanIn 机制会从先准备好的通道中取出消息。如果两者准备时间相近,或者调度器倾向于某个顺序,就会出现这种看似同步的现象。
要真正观察到 boring goroutine 之间的非同步行为,我们需要延长程序的运行时间,让随机延迟有足够的机会积累并产生显著的差异。当这些差异足够大时,fanIn 机制将自然地反映出哪一个 boring goroutine 的消息先到达,从而打破“步调一致”的假象。
我们可以通过简单地增加 main 函数中循环的次数来达到这个目的:
func main() {
c := fanIn(boring("Joe"), boring("Ann"))
for i := 0; i < 20; i++ { // 将循环次数增加到 20
fmt.Println(<-c)
}
fmt.Printf("You're both boring, I'm leaving...\n")
}当我们将循环次数从 10 增加到 20 甚至更多时,通常会观察到如下的输出结果(具体顺序会因每次运行的随机性而异):
Joe 0 Ann 0 Joe 1 Ann 1 Joe 2 Ann 2 Joe 3 Ann 3 Joe 4 Ann 4 Joe 5 Ann 5 Joe 6 Ann 6 Ann 7 // Ann 的消息比 Joe 的先到达 Joe 7 Joe 8 Joe 9 Ann 8 Ann 9 Ann 10 Joe 10 ...
从上述输出中可以看出,在 Ann 7 和 Joe 7 之后,Ann 的消息开始比 Joe 的消息更早地到达。这正是我们所期望的非同步行为,它证明了两个 boring goroutine 确实是独立运行的,并且它们的随机延迟最终导致了它们输出顺序的错位。fanIn 模式在这种情况下完美地履行了其职责,将先到达的消息优先转发到输出通道。
通过这个例子,我们不仅理解了 Go 语言中扇入模式的实现和作用,更重要的是,我们学会了如何正确地观察和理解并发程序中的非同步行为。在设计和实现 Go 并发应用时,认识到随机性和观察时间对并发行为呈现的影响至关重要。
以上就是Go 并发模式:理解扇入(Fan-In)与通道非同步行为的详细内容,更多请关注php中文网其它相关文章!
每个人都需要一台速度更快、更稳定的 PC。随着时间的推移,垃圾文件、旧注册表数据和不必要的后台进程会占用资源并降低性能。幸运的是,许多工具可以让 Windows 保持平稳运行。
Copyright 2014-2025 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号