fan-in fan-out是一种Go并发模式,先将任务分发给多个goroutine并行执行(fan-out),再从多个通道收集结果合并为单一输出(fan-in),提升处理效率。

在Go语言中,fan-in fan-out 是一种常见的并发设计模式,用于提升程序的处理效率。它通过多个goroutine并行处理任务(fan-out),再将结果汇总到一个通道中(fan-in),特别适合I/O密集型或可并行计算的场景。
该模式包含两个阶段:
这种结构能有效利用多核资源,提高吞吐量,同时保持代码简洁。
假设我们要处理一批URL请求,可以启动多个worker并发执行HTTP调用。
立即学习“go语言免费学习笔记(深入)”;
func fetch(url string, ch chan<- string) {
resp, _ := http.Get(url)
ch <- fmt.Sprintf("fetched %s: %d", url, resp.StatusCode)
}
<p>func main() {
urls := []string{"<a href="https://www.php.cn/link/374cad868cb62202053d308252bc4040">https://www.php.cn/link/374cad868cb62202053d308252bc4040</a>", "<a href="https://www.php.cn/link/5f46e3006c4072122784b2adcf7bb10e">https://www.php.cn/link/5f46e3006c4072122784b2adcf7bb10e</a>", "<a href="https://www.php.cn/link/44856cd0e9468bc2674a05c05210a144">https://www.php.cn/link/44856cd0e9468bc2674a05c05210a144</a>"}
resultCh := make(chan string, len(urls))</p><pre class='brush:php;toolbar:false;'>// Fan-out: 每个URL启动一个goroutine
for _, url := range urls {
go fetch(url, resultCh)
}
// 收集所有结果
for i := 0; i < len(urls); i++ {
fmt.Println(<-resultCh)
}}
这种方式简单直接,但无法动态控制worker数量,容易导致资源耗尽。
更实用的做法是固定worker数量,从任务通道读取输入。
func worker(tasks <-chan int, results chan<- int, id int) {
for num := range tasks {
time.Sleep(time.Millisecond * 100) // 模拟耗时操作
results <- num * num
fmt.Printf("worker %d processed %d\n", id, num)
}
}
<p>func main() {
tasks := make(chan int, 10)
results := make(chan int, 10)</p><pre class='brush:php;toolbar:false;'>// 启动3个worker
for i := 0; i < 3; i++ {
go worker(tasks, results, i)
}
// 发送任务
for i := 1; i <= 5; i++ {
tasks <- i
}
close(tasks)
// 收集结果
for i := 0; i < 5; i++ {
fmt.Println("result:", <-results)
}}
通过限制worker数,避免系统过载,适用于高并发任务调度。
当每个worker有自己的输出通道时,需要fan-in函数统一收集。
func fanIn(channels ...<-chan string) <-chan string {
out := make(chan string)
for _, ch := range channels {
go func(c <-chan string) {
for val := range c {
out <- val
}
}(ch)
}
<pre class='brush:php;toolbar:false;'>// 所有goroutine启动后关闭out(注意:此处简化处理)
go func() {
for _, ch := range channels {
for range ch {}
}
close(out)
}()
return out}
更安全的方式是使用wg sync.WaitGroup等待所有worker完成后再关闭通道。
结合以上思路,构建一个完整的流程:
func generate(nums ...int) <-chan int {
out := make(chan int)
go func() {
for _, n := range nums {
out <- n
}
close(out)
}()
return out
}
<p>func square(in <-chan int) <-chan int {
out := make(chan int)
go func() {
for n := range in {
out <- n * n
}
close(out)
}()
return out
}</p><p>func main() {
// Fan-out: 分发任务
nums := generate(1, 2, 3, 4, 5)</p><pre class='brush:php;toolbar:false;'>// 多个worker并行处理
c1 := square(nums)
c2 := square(nums)
// Fan-in: 合并结果
merged := merge(c1, c2)
// 输出结果
for v := range merged {
fmt.Println(v)
}}
// merge函数合并多个通道 func merge(cs ...<-chan int) <-chan int { var wg sync.WaitGroup out := make(chan int)
output := func(c <-chan int) {
for n := range c {
out <- n
}
wg.Done()
}
wg.Add(len(cs))
for _, c := range cs {
go output(c)
}
go func() {
wg.Wait()
close(out)
}()
return out}
这个例子展示了典型的管道模式,数据流清晰,易于扩展和测试。
基本上就这些。fan-in fan-out的核心在于合理划分任务与结果收集,配合channel和goroutine实现高效并发。实际应用中可根据业务需求调整worker数量、缓冲大小和错误处理机制。
以上就是如何在Golang中实现fan-in fan-out模式_Golang fan-in fan-out模式实践方法汇总的详细内容,更多请关注php中文网其它相关文章!
每个人都需要一台速度更快、更稳定的 PC。随着时间的推移,垃圾文件、旧注册表数据和不必要的后台进程会占用资源并降低性能。幸运的是,许多工具可以让 Windows 保持平稳运行。
Copyright 2014-2025 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号