在go语言中控制goroutine并发数量的推荐方法是使用x/sync/semaphore信号量。1. semaphore.newweighted创建带容量的信号量;2. 每个goroutine执行前用acquire获取权重;3. 执行结束后用release释放权重;4. 结合context可实现超时控制和优雅退出。这种方法相比waitgroup和channel更灵活,能精确控制并发资源,避免系统过载。

Golang中要控制并发的goroutine数量,使用标准库扩展包
x/sync/semaphore

在Go语言中,如果你需要限制同时运行的goroutine数量,例如,你正在处理一个巨大的任务队列,但又不希望一次性启动成千上万个goroutine导致系统崩溃,那么
x/sync/semaphore
一个典型的使用场景是:你有一堆需要处理的图片,但你的服务器只能同时处理有限数量的图片压缩任务。
立即学习“go语言免费学习笔记(深入)”;

package main
import (
"context"
"fmt"
"log"
"runtime"
"sync"
"time"
"golang.org/x/sync/semaphore"
)
func processImage(id int) {
fmt.Printf("处理图片 %d 开始...\n", id)
time.Sleep(time.Duration(id%3+1) * time.Second) // 模拟不同处理时间
fmt.Printf("处理图片 %d 完成。\n", id)
}
func main() {
maxConcurrency := int64(runtime.NumCPU()) // 通常我会设置为CPU核心数或根据实际负载测试决定
sem := semaphore.NewWeighted(maxConcurrency)
totalImages := 20
var wg sync.WaitGroup
fmt.Printf("开始处理 %d 张图片,最大并发数: %d\n", totalImages, maxConcurrency)
for i := 0; i < totalImages; i++ {
wg.Add(1)
go func(imageID int) {
defer wg.Done()
// 尝试获取一个权重(默认是1),如果容量不足则阻塞
// 在真实项目中,这里通常会结合 context.WithTimeout/WithCancel 来避免无限等待
if err := sem.Acquire(context.Background(), 1); err != nil {
log.Printf("获取信号量失败,图片 %d: %v\n", imageID, err)
return
}
defer sem.Release(1) // 处理完后释放权重
processImage(imageID)
}(i)
}
wg.Wait()
fmt.Println("所有图片处理完毕。")
}这段代码中,
semaphore.NewWeighted(maxConcurrency)
maxConcurrency
sem.Acquire(context.Background(), 1)
maxConcurrency
sem.Release(1)
说实话,Go语言的goroutine非常轻量级,启动几万甚至几十万个goroutine在理论上是可行的。但实际情况往往比理论复杂得多。我觉得,控制goroutine数量的核心原因在于:资源是有限的。

想象一下,你有一个程序需要从数据库读取大量数据,然后进行复杂的计算。如果每个数据项都启动一个goroutine去处理,而你的数据库连接池只有几十个连接,或者你的CPU核心数只有几个,那么:
在我看来,这是一种对系统负载的“自我保护”机制,确保你的程序在任何情况下都能保持稳定和高效。
Go语言提供了多种并发原语,它们各有侧重,有时候容易混淆。理解它们之间的差异,能帮助你做出正确的选择。
sync.WaitGroup
Add
Done
Wait
WaitGroup
WaitGroup
chan
x/sync/semaphore.Weighted
那么,何时选择信号量呢?
在我实际工作中,我倾向于在以下场景使用信号量:
Acquire
简单来说,如果你需要限制同时运行的“活动单元”的数量,并且这些活动单元可能消耗不同量的“资源配额”,那么信号量通常是最佳选择。
在实际生产环境中,仅仅使用
sem.Acquire(context.Background(), 1)
context.Background()
这就是
context
semaphore.Acquire
context.Context
package main
import (
"context"
"fmt"
"log"
"runtime"
"sync"
"time"
"golang.org/x/sync/semaphore"
)
func processTaskWithTimeout(id int) {
fmt.Printf("任务 %d 开始...\n", id)
time.Sleep(time.Duration(id%3+1) * time.Second) // 模拟不同处理时间
fmt.Printf("任务 %d 完成。\n", id)
}
func main() {
maxConcurrency := int64(runtime.NumCPU())
sem := semaphore.NewWeighted(maxConcurrency)
totalTasks := 10
var wg sync.WaitGroup
fmt.Printf("开始处理 %d 个任务,最大并发数: %d\n", totalTasks, maxConcurrency)
for i := 0; i < totalTasks; i++ {
wg.Add(1)
go func(taskID int) {
defer wg.Done()
// 设置一个超时Context,例如500毫秒
ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
defer cancel() // 确保Context被取消,释放资源
// 尝试获取信号量,带超时
if err := sem.Acquire(ctx, 1); err != nil {
// 错误处理:可能是超时,也可能是Context被取消
if err == context.DeadlineExceeded {
log.Printf("任务 %d: 获取信号量超时,放弃处理。\n", taskID)
} else if err == context.Canceled {
log.Printf("任务 %d: 获取信号量被取消,放弃处理。\n", taskID)
} else {
log.Printf("任务 %d: 获取信号量时发生未知错误: %v\n", taskID, err)
}
return
}
defer sem.Release(1) // 确保无论如何都释放信号量
processTaskWithTimeout(taskID)
}(i)
}
wg.Wait()
fmt.Println("所有任务处理完毕。")
}在这个例子中,我为每次
Acquire
context
sem.Acquire
context.DeadlineExceeded
处理错误时,一个常见的模式是使用
defer sem.Release(1)
processTaskWithTimeout
在实际应用中,你可能还会有一个全局的
context.Context
context
cancel
以上就是Golang如何控制并发goroutine数量 使用semaphore权重信号量的详细内容,更多请关注php中文网其它相关文章!
每个人都需要一台速度更快、更稳定的 PC。随着时间的推移,垃圾文件、旧注册表数据和不必要的后台进程会占用资源并降低性能。幸运的是,许多工具可以让 Windows 保持平稳运行。
Copyright 2014-2025 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号