
我是 GO 新手,我有一个关于使用通道信号停止 goroutine 的问题。
我有一个长期运行的 goroutine(超过 1000 个)和管理器来管理它:
func myThreadFunc(stop chan bool) {
for {
select {
case <- stop:
log.Debug("Stopping thread")
return
default:
callClientTask()
}
}
}
func callClientTask() {
// This can take long time up to 30 seconds - this is external HTTP API call
time.Sleep(5 * time.Second)
}
func manager() {
var cancelChannelSlice []chan bool
for i := 0; i < 1000; i++ {
cancelChannel := make(chan bool)
cancelChannelSlice = append(cancelChannelSlice, cancelChannel)
go myThreadFunc(cancelChannel)
}
var stopTest = func() {
for _, c := range cancelChannelSlice {
c <- true
}
}
timeout := time.After(time.Duration(300) * time.Second)
for {
select {
case <-timeout:
stopTest()
default:
time.Sleep(time.Second)
}
}
}在这种情况下,每次我调用 c <- true 管理器都会等待 callClientTask() 完成,然后转到下一个 cancelChannel
我希望所有 goroutine 在 callClientTask() 的 1 次迭代中停止(不超过 30 秒)
我尝试的唯一方法是像这样投射新的 goroutine:
var stopTest = func() {
for _, c := range cancelChannelSlice {
go func(c chan bool) {
c <- true
close(c)
}(c)
}
}我这是正确的方法吗?
据我从您的问题中了解到,“您希望所有 goroutine 在 callClientTask() 的 1 次迭代中停止(不超过 30 秒)”并且工作线程同时运行而不会出现同步问题。
我重新组织了与等待组同时运行的代码。
示例代码:
package main
import (
"log"
"sync"
"time"
)
func worker(stop <-chan struct{}, wg *sync.WaitGroup) {
defer wg.Done()
for {
select {
case <-stop:
log.Println("Stopping thread")
return
default:
callClientTask()
}
}
}
func callClientTask() {
time.Sleep(2 * time.Second) // simulate a long-running task (for testing purposes)
}
func main() {
var wg sync.WaitGroup
stop := make(chan struct{})
for i := 0; i < 1000; i++ {
wg.Add(1)
go worker(stop, &wg)
}
time.Sleep(5 * time.Second) // allow workers to run for a while
close(stop) // stop all workers, close channel
wg.Wait() // wait for all workers
}输出:
2023/10/26 10:40:44 Stopping thread 2023/10/26 10:40:44 Stopping thread .... 2023/10/26 10:40:49 Stopping thread 2023/10/26 10:40:49 Stopping thread
编辑:
如果您想停止某些工作人员,则必须更新工作人员。以下代码包括具有“停止”和“停止”通道的工作人员以及启动/停止功能。
示例代码:
package main
import (
"log"
"sync"
"time"
)
type Worker struct {
stop chan struct{}
stopped chan struct{}
}
func NewWorker() *Worker {
return &Worker{
stop: make(chan struct{}),
stopped: make(chan struct{}),
}
}
func (w *Worker) Start(wg *sync.WaitGroup) {
wg.Add(1)
go func() {
defer wg.Done()
for {
select {
case <-w.stop:
log.Println("Stopping thread")
close(w.stopped)
return
default:
callClientTask()
}
}
}()
}
func (w *Worker) Stop() {
close(w.stop)
<-w.stopped
}
func callClientTask() {
time.Sleep(2 * time.Second) // simulate a long-running task (for testing purposes)
}
func main() {
var wg sync.WaitGroup
workers := make([]*Worker, 1000)
for i := 0; i < 1000; i++ {
workers[i] = NewWorker()
workers[i].Start(&wg)
}
time.Sleep(5 * time.Second) // allow workers to run for a while
for i := 0; i < 100; i++ { // stop first 100 workers
workers[i].Stop()
}
for i := 100; i < 1000; i++ { // wait other workers to finish
workers[i].Stop()
}
wg.Wait()
}输出:
2023/10/26 12:51:26 Stopping thread 2023/10/26 12:51:28 Stopping thread 2023/10/26 12:51:30 Stopping thread ....
以上就是使用通道更快地关闭 goroutine的详细内容,更多请关注php中文网其它相关文章!
每个人都需要一台速度更快、更稳定的 PC。随着时间的推移,垃圾文件、旧注册表数据和不必要的后台进程会占用资源并降低性能。幸运的是,许多工具可以让 Windows 保持平稳运行。
Copyright 2014-2025 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号