
Goroutines 是 Go 语言并发编程的核心。理解 Goroutines 的工作方式以及它们在主进程结束后的行为至关重要。本文将深入探讨 Goroutines 的生命周期,以及如何使用 sync.WaitGroup 来确保 Goroutines 完成任务。
Goroutines 的基本概念
Goroutines 本质上是轻量级的线程,由 Go 运行时(runtime)管理。与传统的线程相比,Goroutines 的创建和销毁开销更小,切换速度更快,因此可以在程序中创建大量的 Goroutines 来实现并发执行。
当程序启动时,会创建一个主 Goroutine 来执行 main 函数。可以通过 go 关键字来启动新的 Goroutines,每个 Goroutine 都会并发地执行指定的函数。
package main
import (
"fmt"
"time"
)
func worker(id int) {
fmt.Printf("Worker %d starting\n", id)
time.Sleep(time.Second)
fmt.Printf("Worker %d done\n", id)
}
func main() {
for i := 1; i <= 3; i++ {
go worker(i)
}
// 暂停几秒钟,让 worker 线程有机会运行
time.Sleep(time.Second * 2)
}在这个例子中,main 函数启动了三个 Goroutines,每个 Goroutine 执行 worker 函数。worker 函数会打印一条消息,暂停一秒钟,然后再次打印一条消息。main 函数暂停了两秒钟,以便让 Goroutines 有机会完成执行。
Goroutines 的生命周期
Goroutines 的生命周期始于 go 关键字的调用,终于函数的执行完成。需要注意的是,当 main 函数返回时,程序会立即退出,不会等待其他 Goroutines 完成执行。这意味着,如果 main 函数在 Goroutines 完成之前退出,那么这些 Goroutines 就会被强制终止,可能导致数据丢失或程序状态不一致。
使用 sync.WaitGroup 管理 Goroutines
为了确保 Goroutines 完成任务后再退出程序,可以使用 sync.WaitGroup。sync.WaitGroup 提供了一种简单的机制来等待一组 Goroutines 完成执行。
sync.WaitGroup 的使用步骤如下:
- 创建 sync.WaitGroup 实例:在 main 函数中创建一个 sync.WaitGroup 实例。
- 增加计数器:在启动每个 Goroutine 之前,调用 wg.Add(1) 来增加计数器。
- 标记完成:在每个 Goroutine 的结束处,调用 wg.Done() 来标记完成,减少计数器。
- 等待完成:在 main 函数中,调用 wg.Wait() 来等待计数器归零,即所有 Goroutines 都已完成。
package main
import (
"fmt"
"sync"
"time"
)
func worker(id int, wg *sync.WaitGroup) {
defer wg.Done() // 确保在函数退出时调用 Done()
fmt.Printf("Worker %d starting\n", id)
time.Sleep(time.Second)
fmt.Printf("Worker %d done\n", id)
}
func main() {
var wg sync.WaitGroup
for i := 1; i <= 3; i++ {
wg.Add(1) // 启动一个 worker 线程,增加计数器
go worker(i, &wg)
}
wg.Wait() // 阻塞直到计数器归零
fmt.Println("All workers done")
}在这个例子中,main 函数在启动每个 worker Goroutine 之前调用 wg.Add(1),在 worker 函数的结束处调用 wg.Done()。main 函数调用 wg.Wait() 来等待所有 worker Goroutines 完成执行。
示例:并发插入 MongoDB
以下代码演示了如何使用 Goroutines 和 sync.WaitGroup 并发地向 MongoDB 插入数据。
package main
import (
"fmt"
"labix.org/v2/mgo"
"strconv"
"sync"
"time"
)
// Reading 结构体
type Reading struct {
Id string
Name string
}
func main() {
// 设置计时器
startTime := time.Now()
// 设置集合
collection := getCollection("test", "readings")
fmt.Println("collection complete: " + strconv.FormatFloat(time.Since(startTime).Seconds(), 'f', 2, 64))
// 准备 readings
readings := prepareReadings()
fmt.Println("readings prepared: " + strconv.FormatFloat(time.Since(startTime).Seconds(), 'f', 2, 64))
var waitGroup sync.WaitGroup
// 插入 readings
for i := 1; i <= 1000000; i++ {
waitGroup.Add(1)
go insertReadings(collection, readings, &waitGroup)
if i%100000 == 0 {
fmt.Println("100000 readings queued for insert: " + strconv.FormatFloat(time.Since(startTime).Seconds(), 'f', 2, 64))
}
}
waitGroup.Wait()
fmt.Println("all readings inserted: " + strconv.FormatFloat(time.Since(startTime).Seconds(), 'f', 2, 64))
}
func getCollection(databaseName string, tableName string) *mgo.Collection {
session, err := mgo.Dial("localhost")
if err != nil {
fmt.Println("error getCollection:", err)
panic(err)
}
collection := session.DB(databaseName).C(tableName)
return collection
}
func insertReadings(collection *mgo.Collection, readings []Reading, waitGroup *sync.WaitGroup) {
defer waitGroup.Done()
err := collection.Insert(readings)
if err != nil {
fmt.Println("error insertReadings:", err)
}
}
func prepareReadings() []Reading {
var readings []Reading
for i := 1; i <= 1; i++ {
readings = append(readings, Reading{Name: "Thing"})
}
return readings
}在这个例子中,insertReadings 函数接收一个 sync.WaitGroup 指针,并在函数结束时调用 waitGroup.Done()。main 函数在启动每个 insertReadings Goroutine 之前调用 waitGroup.Add(1),并使用 waitGroup.Wait() 等待所有 Goroutines 完成。
注意事项:
- 确保在每个 Goroutine 中都调用 defer wg.Done(),以避免 Goroutine 提前退出导致程序死锁。
- 在并发访问共享资源时,需要使用互斥锁(sync.Mutex)或其他同步机制来保护数据的一致性。
总结
Goroutines 是 Go 语言并发编程的强大工具。通过理解 Goroutines 的生命周期以及如何使用 sync.WaitGroup,可以编写出高效、可靠的并发程序。在实际开发中,需要根据具体情况选择合适的并发模型,并注意数据同步和资源竞争的问题。










