直接开大量goroutine解析CSV会崩溃,因无并发控制导致文件偏移量冲突、内存溢出及数据库连接超限;应通过chan分发任务、独立csv.Reader、带行号的错误定位和资源限制来解决。

为什么直接开一堆 goroutine 解析 CSV 会崩
因为没加并发控制,runtime.GOMAXPROCS 默认是 CPU 核数,但文件解析本身不耗 CPU,主要卡在 I/O 和内存分配上。开几百个 goroutine 读同一个 *os.File,会出现 seek: invalid argument 或数据错乱——多个 goroutine 共享文件偏移量,互相覆盖读取位置。
- 别用
bufio.Scanner在多个 goroutine 里共用一个bufio.Reader - 别把整个大文件一次性
ReadAll进内存再切分,OOM 风险高 - 每行解析后若要写入数据库,必须控制写入并发度,否则 MySQL 报
Too many connections
用 sync.WaitGroup + chan 拆分文件并行处理
核心思路:主线程按行或按块(如每 1000 行)切分文件,把行内容发给 worker channel;worker goroutine 从 channel 拿数据、解析、组装结构体、提交到下游(DB / slice / channel)。
file, _ := os.Open("data.csv")
defer file.Close()
reader := bufio.NewReader(file)
// 启动 4 个 worker
jobs := make(chan []string, 100)
results := make(chan error, 100)
var wg sync.WaitGroup
for w := 0; w < 4; w++ {
wg.Add(1)
go func() {
defer wg.Done()
for line := range jobs {
// 解析 CSV 行:跳过空行、处理引号、转义等
if len(line) == 0 { continue }
record := parseCSVLine(line) // 自定义函数
if err := insertToDB(record); err != nil {
results <- err
return
}
}
}()
}
// 主线程逐行读,分发 job
for {
line, err := reader.ReadString('\n')
if err == io.EOF { break }
if err != nil { panic(err) }
fields, _ := csv.NewReader(strings.NewReader(line)).Read()
select {
case jobs <- fields:
default:
// channel 满了就阻塞,避免内存暴涨
time.Sleep(10 * time.Millisecond)
jobs <- fields
}
}
close(jobs)
wg.Wait()
close(results)
csv.Reader 本身不是并发安全的,但可以每个 goroutine 持有独立实例
如果你确定文件可随机访问(比如已按字节范围切好),更稳妥的做法是:预读文件,按 \n 找出行边界,把 []byte 切片传给每个 goroutine,各自 new 一个 csv.Reader 解析——这样完全隔离,无共享状态。
- 用
bytes.Split(data, []byte{'\n'})切分比用bufio.Scanner更可控 - 注意最后一行可能没换行符,需单独处理
-
csv.Reader的FieldsPerRecord和TrailingComma要按实际数据设,否则解析失败静默丢数据
导入失败时如何定位哪一行出错
不要只返回 error,要把原始行号、原始字符串、解析上下文一起打包。否则日志里看到 parse int: invalid syntax 完全不知道是第几行、什么内容。
立即学习“go语言免费学习笔记(深入)”;
type ParseError struct {
LineNum int
RawLine string
Err error
}
func parseCSVLine(fields []string) (Record, error) {
if len(fields) < 3 {
return Record{}, &ParseError{LineNum: currentLine, RawLine: strings.Join(fields, ","), Err: fmt.Errorf("too few fields")}
}
id, err := strconv.Atoi(fields[0])
if err != nil {
return Record{}, &ParseError{LineNum: currentLine, RawLine: strings.Join(fields, ","), Err: err}
}
return Record{ID: id, Name: fields[1], Email: fields[2]}, nil
}
真正难的不是起 goroutine,而是让每条错误能反查到原始输入位置,以及控制住内存和连接数。文件越大数据越容易在第 3 万行突然卡住——那往往不是代码逻辑问题,是没设 context.WithTimeout 或 DB 连接池 SetMaxOpenConns。










