
本文详解如何优化 go 中大规模 csv 文件的按列分流处理,避免内存爆炸与频繁文件重写,通过流式读取 + 并发写入 + csv.writer 复用显著提升性能。
Go 在处理大 CSV 文件时若采用 csv.NewReader.ReadAll() 全量加载到内存,再逐行拼接字符串并反复调用 os.Create() 和 WriteString() 写入文件,会导致严重性能瓶颈——这正是原代码运行缓慢的根本原因。其问题集中在三点:
- 内存冗余:ReadAll() 将整个 CSV(可能数百 MB)一次性载入内存,且 map[string]string 存储的是完整 CSV 内容字符串,造成指数级内存占用;
- I/O 浪费:每遇到一行就 createFile() 重写整个目标文件,导致同一文件被打开、覆盖、关闭数千次;
- 序列化低效:手动拼接带引号的 CSV 字符串(如 "\""+each[0]+"\",\""+...)既易出错,又绕过了 encoding/csv 的安全转义与性能优化。
✅ 正确解法是流式处理 + 分离关注点:
- 仅用 reader.Read() 单行迭代,不缓存原始数据;
- 为每个目标国家(如 "AT")维护一个专属 chan []string,用于异步传递待写入的记录;
- 启动独立 goroutine 负责该国家文件的创建、csv.Writer 初始化及批量写入,实现 I/O 并发;
- 头部(header)仅写入一次,在首个对应记录到达时发送给该 channel。
以下是优化后的完整可运行代码:
package main
import (
"encoding/csv"
"fmt"
"os"
"sync"
)
func main() {
input, err := os.Open("union_exp.csv")
if err != nil {
fmt.Printf("Error opening input file: %v\n", err)
return
}
defer input.Close()
reader := csv.NewReader(input)
reader.FieldsPerRecord = -1 // 允许变长字段(兼容不同行)
// 读取 header 行
headers, err := reader.Read()
if err != nil {
fmt.Printf("Error reading header: %v\n", err)
return
}
// 管理各输出文件的 channel 和 goroutine
files := make(map[string]chan []string)
var wg sync.WaitGroup
// 逐行处理数据行
for {
record, err := reader.Read()
if err == csv.ErrFieldCount {
fmt.Printf("Warning: skipping malformed line (field count mismatch)\n")
continue
}
if err == io.EOF {
break
}
if err != nil {
fmt.Printf("Error reading record: %v\n", err)
return
}
if len(record) == 0 {
continue // 跳过空行
}
country := record[0]
ch, exists := files[country]
if !exists {
ch = make(chan []string, 1024) // 缓冲 channel 减少 goroutine 阻塞
files[country] = ch
wg.Add(1)
go fileWriter(country+".csv", ch, &wg, headers)
}
ch <- record // 发送数据行(header 已在 goroutine 中写入)
}
// 关闭所有 channel,通知 writer 结束
for _, ch := range files {
close(ch)
}
wg.Wait()
fmt.Println("All files written successfully.")
}
func fileWriter(filename string, ch chan []string, wg *sync.WaitGroup, headers []string) {
defer wg.Done()
f, err := os.Create(filename)
if err != nil {
fmt.Printf("Error creating %s: %v\n", filename, err)
return
}
defer f.Close()
writer := csv.NewWriter(f)
defer writer.Flush() // 必须调用 Flush 才能写出缓冲内容
// 写入 header
if err := writer.Write(headers); err != nil {
fmt.Printf("Error writing header to %s: %v\n", filename, err)
return
}
// 写入所有数据行
for record := range ch {
if err := writer.Write(record); err != nil {
fmt.Printf("Error writing record to %s: %v\n", filename, err)
return
}
}
}⚠️ 关键注意事项:
- 不要省略 writer.Flush():csv.Writer 内部有缓冲,不显式调用将导致部分或全部数据丢失;
- channel 缓冲大小建议设为 1024 或更高:避免 writer goroutine 因消费慢而阻塞主流程;
- 错误处理需分层:I/O 错误应记录但不粗暴 os.Exit(1)(原答案中该行为会中断整个程序),应让其他文件继续生成;
- 内存友好性:峰值内存仅取决于 channel 缓冲区总和 + 单行数据,与文件总行数无关;
- 扩展性提示:若国家数量极大(如上万),可限制并发 goroutine 数量(使用 worker pool 模式),避免系统资源耗尽。
通过以上重构,处理 GB 级 CSV 文件的耗时可从分钟级降至秒级,真正发挥 Go 在高吞吐 I/O 场景下的优势。










