
引言:大型CSV文件合并的挑战与流式解决方案
在数据处理领域,合并大型文件是常见的需求。例如,您可能需要每周将一个新增的CSV文件与一个已有的、高达50GB的CSV归档文件进行合并。传统的做法是尝试将文件全部加载到内存中进行处理,但这对于数十GB甚至更大的文件来说,会导致严重的内存溢出问题,甚至使程序崩溃。
为了解决这一挑战,我们可以借鉴归并排序算法中的“合并”步骤,采用一种流式处理的方法。这种方法的核心思想是逐行读取两个已排序的输入文件,比较当前行并按序写入到输出文件,而不是一次性加载所有数据。Go语言凭借其强大的并发原语和高效的I/O操作,非常适合实现这种流式合并方案。
核心原理:基于归并排序的流式处理
流式合并的效率源于其对内存的极低占用。它不需要将整个文件读入内存,而是每次只在内存中保留两个输入文件各一行的数据,以及一个用于写入的缓冲区。其工作流程如下:
- 逐行读取: 同时从两个已排序的输入文件中各读取一行数据。
- 比较与写入: 根据预定义的比较规则(例如,CSV行的第一个字段作为键),判断哪一行应该先写入输出文件。
- 循环迭代: 将较小(或相等)的那一行写入输出文件后,从对应的输入文件中读取下一行,然后重复比较和写入过程。
- 文件耗尽处理: 当其中一个输入文件的数据全部写入输出文件后,将另一个输入文件中剩余的所有行直接复制到输出文件。
这种方法确保了输出文件也是排序的,并且整个过程是高度内存效率的。
立即学习“go语言免费学习笔记(深入)”;
Go语言实现详解
以下是使用Go语言实现流式合并的详细代码及解释。
1. 程序入口与参数校验
main函数是程序的入口点,负责处理命令行参数、文件操作和协调合并流程。
package main
import (
"encoding/csv"
"io"
"log"
"os"
"fmt" // 用于示例中的日志输出
)
const outFile = "merged_output.csv" // 定义输出文件名
func main() {
// 确保程序接收到两个输入文件路径作为命令行参数
if len(os.Args) != 3 {
log.Fatalf("\nUsage: %s \nExample: %s archive.csv weekly_update.csv", os.Args[0], os.Args[0])
}
// 打开第一个输入文件
f1, err := os.Open(os.Args[1])
if err != nil {
log.Fatalf("\nError opening first file %s: %v", os.Args[1], err)
}
defer f1.Close() // 确保文件在函数结束时关闭
// 打开第二个输入文件
f2, err := os.Open(os.Args[2])
if err != nil {
log.Fatalf("\nError opening second file %s: %v", os.Args[2], err)
}
defer f2.Close() // 确保文件在函数结束时关闭
// 创建输出文件
w, err := os.Create(outFile)
if err != nil {
log.Fatalf("\nError creating output file %s: %v", outFile, err)
}
defer w.Close() // 确保文件在函数结束时关闭
// 包装文件读取器为CSV读取器
cr1 := csv.NewReader(f1)
cr2 := csv.NewReader(f2)
// 包装输出文件写入器为CSV写入器
cw := csv.NewWriter(w)
defer cw.Flush() // 确保所有缓冲数据在程序退出前写入文件 说明:
- os.Args 用于获取命令行参数,os.Args[0] 是程序名,os.Args[1] 和 os.Args[2] 是输入文件路径。
- os.Open 和 os.Create 分别用于打开现有文件和创建新文件。
- defer f.Close() 是一种Go语言的惯用模式,确保文件句柄在函数返回前被正确关闭,即使发生错误。
- csv.NewReader 和 csv.NewWriter 是 encoding/csv 包提供的函数,用于方便地读写CSV格式数据。
- cw.Flush() 对于 csv.Writer 至关重要,它会强制将缓冲区中的所有数据写入到底层 io.Writer。
2. 初始化读取与核心合并逻辑
在进入主循环之前,我们需要从两个文件中各读取第一行数据。然后,在一个无限循环中,根据 compare 函数的结果,决定写入哪一行,并从对应的文件读取下一行。
// 初始化读取两行数据
line1, b1 := readline(cr1)
if !b1 {
// 如果第一个文件为空或无CSV行,直接复制第二个文件剩余内容
log.Printf("File 1 (%s) is empty or has no CSV lines. Copying remaining lines from File 2.", os.Args[1])
copyRemaining(cr2, cw)
return // 结束程序
}
line2, b2 := readline(cr2)
if !b2 {
// 如果第二个文件为空或无CSV行,直接复制第一个文件剩余内容
log.Printf("File 2 (%s) is empty or has no CSV lines. Copying remaining lines from File 1.", os.Args[2])
writeline(cw, line1) // 写入之前读取的line1
copyRemaining(cr1, cw)
return // 结束程序
}
// 核心合并逻辑
for {
// 比较两行数据,决定哪一行应该先写入
if compare(line1, line2) {
writeline(cw, line1)
line1, b1 = readline(cr1) // 读取下一个line1
if !b1 {
// 如果文件1已读完,将文件2的剩余内容全部复制
writeline(cw, line2) // 写入最后读取的line2
copyRemaining(cr2, cw)
break // 退出循环
}
} else {
writeline(cw, line2)
line2, b2 = readline(cr2) // 读取下一个line2
if !b2 {
// 如果文件2已读完,将文件1的剩余内容全部复制
writeline(cw, line1) // 写入最后读取的line1
copyRemaining(cr1, cw)
break // 退出循环
}
}
}
log.Printf("CSV files merged successfully to %s", outFile)
}说明:
- readline 函数用于从 csv.Reader 中读取一行数据。它返回一个 []string 代表一行字段,以及一个 bool 指示是否成功读取。
- 在主循环开始前,我们先尝试读取两个文件的第一行。如果任何一个文件为空,则直接将另一个文件的所有内容复制到输出文件。
- compare(line1, line2) 是决定合并顺序的关键,它根据您的业务逻辑比较两行数据。










