应使用信号量限流而非无限制启goroutine,避免OOM;批量写入需拼接多行SQL而非单条执行;优先切片分片而非channel传数据;错误需计数、重试、记录,不可忽略或fatal。

并发控制用 semaphore 而不是无限制启 goroutine
直接在循环里写 go process(item) 看似简单,但数据量一大就会触发 runtime: out of memory 或系统级资源耗尽。Go 的 goroutine 虽轻量,但每个仍占 2KB+ 栈空间,上万并发瞬间吃光内存。
正确做法是用信号量(如 golang.org/x/sync/semaphore)限流:
sem := semaphore.NewWeighted(int64(maxConcurrency))
var wg sync.WaitGroup
for _, item := range items {
wg.Add(1)
go func(i Item) {
defer wg.Done()
if err := sem.Acquire(context.Background(), 1); err != nil {
log.Printf("acquire failed: %v", err)
return
}
defer sem.Release(1)
process(i)
}(item)
}
wg.Wait()
-
maxConcurrency建议设为 CPU 核心数 × 2~5,具体看process是 CPU 密集还是 IO 密集 - 别漏掉
defer sem.Release(1),否则后续 goroutine 永远拿不到许可 -
sem.Acquire可能阻塞或超时,生产环境建议传带超时的context
批量写入数据库要避免逐条 INSERT
用 go 并发处理数据后直接调 db.Exec("INSERT ...", item.ID, item.Name),本质是把单条 SQL 放到多个连接里跑——不仅没提升吞吐,还放大了连接池压力和事务开销。
真正高效的批量写法是:先分组,再拼 INSERT INTO ... VALUES (...), (...), (...),一次提交多行:
立即学习“go语言免费学习笔记(深入)”;
const batchSize = 100
for i := 0; i < len(items); i += batchSize {
batch := items[i:min(i+batchSize, len(items))]
values := make([]interface{}, 0, len(batch)*2)
placeholders := make([]string, 0, len(batch))
for _, b := range batch {
placeholders = append(placeholders, "(?, ?)")
values = append(values, b.ID, b.Name)
}
query := "INSERT INTO users (id, name) VALUES " + strings.Join(placeholders, ", ")
_, err := db.Exec(query, values...)
if err != nil {
log.Printf("batch insert failed: %v", err)
}
}
- MySQL / SQLite 支持单语句多值插入;PostgreSQL 需用
UNNEST或pgx.Batch - 注意
values参数顺序必须和placeholders严格对应 - 过大的
batchSize(如 > 1000)可能触发 MySQL 的max_allowed_packet限制
channel 传数据不如切片传得快,别为了“看起来并发”硬套
常见反模式:itemsCh := make(chan Item, 1000),然后一个 goroutine 往里塞,多个 goroutine 从 channel 读。这引入了额外的同步开销和内存拷贝,实测比直接切片分段慢 15%~30%。
除非需要动态负载均衡(比如任务耗时差异极大),否则优先用预分片 + 启固定 goroutine:
func splitSlice[T any](s []T, n int) [][]T {
var chunks [][]T
for i := 0; i < len(s); i += n {
end := i + n
if end > len(s) {
end = len(s)
}
chunks = append(chunks, s[i:end])
}
return chunks
}
chunks := splitSlice(items, len(items)/runtime.NumCPU())
var wg sync.WaitGroup
for _, chunk := range chunks {
wg.Add(1)
go func(c []Item) {
defer wg.Done()
for _, item := range c {
process(item)
}
}(chunk)
}
wg.Wait()
- channel 适合做事件通知、结果聚合、或跨 goroutine 协作,不是通用数据搬运工
- 切片分片后传参,零拷贝(只传指针和长度),cache 局部性更好
- 如果下游要合并结果,用
sync.Map或预先分配好结果切片更可控
错误处理不能只靠 log.Fatal 或忽略 err
并发场景下,一个 goroutine panic 会终止整个程序;而静默忽略 err 会导致数据丢失却毫无感知。
推荐组合策略:失败计数 + 可恢复错误重试 + 不可恢复错误记录后跳过:
var failed atomic.Int64
var mu sync.RWMutex
var errors []error
for _, item := range items {
go func(i Item) {
defer func() {
if r := recover(); r != nil {
mu.Lock()
errors = append(errors, fmt.Errorf("panic on %v: %v", i.ID, r))
mu.Unlock()
failed.Add(1)
}
}()
if err := processWithRetry(i, 3); err != nil {
mu.Lock()
errors = append(errors, fmt.Errorf("fail on %v: %w", i.ID, err))
mu.Unlock()
failed.Add(1)
return
}
}(item)
}
// 等待结束后检查 failed.Load() > 0 再决定是否告警
- 别在 goroutine 里用
log.Fatal,它会杀掉 main goroutine - 网络类错误(如 DB timeout)适合指数退避重试;校验失败类错误(如字段为空)应直接跳过
- 所有错误必须进日志或监控,至少包含
item关键标识(如 ID、批次号)










