
本文深入探讨了 Go 语言中 Goroutines 的工作机制,包括 Goroutines 的生命周期以及主进程结束后 Goroutines 的处理方式。通过分析一个向 MongoDB 插入大量数据的并发示例,解释了如何使用 sync.WaitGroup 来确保所有 Goroutines 完成后再退出程序。同时,提供了一个精简的可运行示例,帮助读者理解 Goroutines 的基本用法,并指导读者逐步构建更复杂的并发程序。
Goroutines 的基本概念
Goroutines 是 Go 语言中实现并发的核心机制。它们本质上是轻量级的线程,由 Go 运行时环境(runtime)进行管理。与传统的操作系统线程相比,Goroutines 的创建和销毁开销更小,上下文切换速度更快,因此可以轻松地创建成千上万个 Goroutines,从而实现高并发。
在 Go 语言中,启动一个 Goroutine 非常简单,只需要在函数调用前加上 go 关键字即可。例如:
go myFunction()
这将会创建一个新的 Goroutine 并并发执行 myFunction 函数。
Goroutines 的生命周期
Goroutines 的生命周期从创建开始,到函数执行完毕或发生 panic 结束。需要特别注意的是,当 main 函数返回时,程序会立即退出,而不会等待其他 Goroutines 完成。 这也是导致并发程序出现问题的常见原因。
使用 sync.WaitGroup 管理 Goroutines
为了确保所有 Goroutines 在 main 函数退出前完成,可以使用 sync.WaitGroup。 sync.WaitGroup 提供了一种简单的机制来等待一组 Goroutines 完成。
sync.WaitGroup 的使用方法如下:
- Add(delta int): 在启动 Goroutine 之前,调用 Add 方法,增加计数器的值,表示需要等待的 Goroutine 的数量。
- Done(): 在 Goroutine 完成后,调用 Done 方法,减少计数器的值。
- Wait(): 在 main 函数中,调用 Wait 方法,阻塞当前 Goroutine(通常是 main 函数),直到计数器的值为 0,表示所有 Goroutines 都已完成。
以下是一个使用 sync.WaitGroup 的示例:
package main
import (
"fmt"
"sync"
"time"
)
var waitGroup sync.WaitGroup
func worker(id int) {
defer waitGroup.Done() // 确保 Goroutine 结束后调用 Done()
fmt.Printf("Worker %d starting\n", id)
time.Sleep(time.Second) // 模拟耗时操作
fmt.Printf("Worker %d done\n", id)
}
func main() {
for i := 1; i <= 3; i++ {
waitGroup.Add(1) // 启动一个 Goroutine 前,增加计数器
go worker(i)
}
waitGroup.Wait() // 等待所有 Goroutines 完成
fmt.Println("All workers done")
}在这个例子中,我们启动了 3 个 Goroutines 来执行 worker 函数。waitGroup.Add(1) 在每次启动 Goroutine 之前将计数器加 1,waitGroup.Done() 在每个 Goroutine 结束后将计数器减 1。waitGroup.Wait() 会阻塞 main 函数,直到计数器的值为 0,即所有 Goroutines 都已完成。
并发插入 MongoDB 的示例分析
以下是一个向 MongoDB 并发插入数据的示例(基于原问题中的代码进行简化和修正):
package main
import (
"fmt"
"labix.org/v2/mgo"
"strconv"
"sync"
"time"
)
// Reading 结构体
type Reading struct {
Id string
Name string
}
var waitGroup sync.WaitGroup
func main() {
startTime := time.Now()
// 连接 MongoDB
session, err := mgo.Dial("localhost")
if err != nil {
panic(err)
}
defer session.Close()
collection := session.DB("test").C("readings")
readings := prepareReadings()
fmt.Println("readings prepared: " + strconv.FormatFloat(time.Since(startTime).Seconds(), 'f', 2, 64))
// 并发插入数据
numReadings := 1000000
for i := 1; i <= numReadings; i++ {
waitGroup.Add(1)
go insertReading(collection, readings)
if i%100000 == 0 {
fmt.Println("100000 readings queued for insert: " + strconv.FormatFloat(time.Since(startTime).Seconds(), 'f', 2, 64))
}
}
waitGroup.Wait()
fmt.Println("all readings inserted: " + strconv.FormatFloat(time.Since(startTime).Seconds(), 'f', 2, 64))
}
func insertReading(collection *mgo.Collection, readings []Reading) {
defer waitGroup.Done() // 确保 Goroutine 结束后调用 Done()
err := collection.Insert(readings...) // 插入 readings 切片中的所有元素
if err != nil {
fmt.Println("error insertReadings:", err)
}
}
func prepareReadings() []Reading {
var readings []Reading
for i := 1; i <= 10; i++ { // 创建 10 个 Reading 对象
readings = append(readings, Reading{Name: "Thing " + strconv.Itoa(i)})
}
return readings
}注意事项:
- 连接复用: 在并发环境下,尽量复用 MongoDB 连接,避免频繁创建和销毁连接,以提高性能。可以将 mgo.Session 对象传递给 Goroutines,或者使用连接池。
- 错误处理: 在 Goroutines 中进行错误处理非常重要。需要检查 MongoDB 操作是否成功,并记录或处理错误。
- 批量插入: 为了提高插入效率,可以考虑使用 MongoDB 的批量插入功能,一次性插入多个文档。
- 数据竞争: 如果多个 Goroutines 同时访问和修改共享数据,需要使用互斥锁(sync.Mutex)或其他同步机制来避免数据竞争。
总结
Goroutines 是 Go 语言强大的并发特性,可以轻松地构建高性能的并发应用程序。理解 Goroutines 的生命周期以及如何使用 sync.WaitGroup 来管理 Goroutines 是编写并发程序的基础。在实际应用中,还需要注意连接复用、错误处理、批量插入和数据竞争等问题,以确保程序的正确性和性能。通过逐步构建和测试,可以更好地理解和掌握 Goroutines 的使用。










