
在实际数据处理中,我们经常会遇到 csv 文件中包含除了表格数据之外的额外文本内容,例如文件标题、报告生成信息、脚注等。这些冗余信息会干扰 pandas 的 read_csv 函数,导致数据加载失败或数据格式错误。本教程将探讨几种有效策略,帮助您在加载这些“不规范”的 csv 文件时,准确地提取出所需的表格数据。
识别问题 CSV 文件的特征
假设我们有一个名为 students.csv 的文件,其内容示例如下:
SAMPLE FILE LTD STUDENT NUMBERS INFO OF ALL STUDENTS No : from 27-Mar-2023 00:00:00 to 04-Apr-2023 00:00:00 and from 05-Oct-2023 00:00:00 to 13-Oct-2023 00:00:00 Student,id,add,div,rank ABC,12,USA,A,1 DEF,13,IND,C,2 XYZ,14,UK,E,3 PQR,15,DE,F,4 This is System generated report, and needs no signature. 14-Oct-2023 18:14:12
从上述示例中可以看出,文件顶部有多行描述性文本,底部也有报告生成信息。真正的列标题 Student,id,add,div,rank 位于文件中间,且数据行紧随其后。直接使用 pd.read_csv('students.csv') 会因为这些非表格数据而报错。
策略一:预先确定跳过行数并加载
这种方法适用于列标题行内容固定或可预测的情况。我们可以通过编程方式读取文件,找到列标题所在的行,然后计算出需要跳过的行数,再将这个行数传递给 pd.read_csv 的 skiprows 参数。
实现步骤:
- 打开文件并逐行读取。
- 在每一行中查找预期的列标题(例如,包含“rank”字符串的行)。
- 一旦找到,记录当前行号(即需要跳过的行数)。
- 使用 pd.read_csv 加载数据,并通过 skiprows 参数跳过冗余行。
- 处理可能存在的尾部冗余行,通常通过检查关键列的 NaN 值来删除。
示例代码:
import pandas as pd
def get_rows_to_skip(file_name, filter_text):
"""
计算需要跳过的行数,直到找到包含指定文本的行。
Args:
file_name (str): CSV 文件路径。
filter_text (str): 用于识别列标题行的关键词。
Returns:
int: 需要跳过的行数。
"""
rows = 0
with open(file_name, 'r', encoding='utf-8') as file:
while True:
line = file.readline()
if not line: # 文件结束
break
if filter_text in line:
return rows
rows += 1
return 0 # 如果未找到,默认不跳过
def read_cleaned_csv(file_name, header_filter_text):
"""
读取并清理包含冗余文本的 CSV 文件。
Args:
file_name (str): CSV 文件路径。
header_filter_text (str): 用于识别列标题行的关键词。
Returns:
pd.DataFrame: 清理后的数据框。
"""
skip_rows_count = get_rows_to_skip(file_name, header_filter_text)
# 使用 skiprows 加载数据
df = pd.read_csv(file_name, skiprows=skip_rows_count)
# 清理尾部可能存在的 NaN 行(如果文件底部也有冗余文本)
# 假设 'rank' 是数据中一定会存在的列
df = df[df[header_filter_text].notna()]
return df
# 创建一个模拟的 students.csv 文件
sample_content = """SAMPLE FILE LTD
STUDENT NUMBERS
INFO OF ALL STUDENTS No : from 27-Mar-2023 00:00:00 to 04-Apr-2023 00:00:00 and from 05-Oct-2023 00:00:00 to 13-Oct-2023 00:00:00
Student,id,add,div,rank
ABC,12,USA,A,1
DEF,13,IND,C,2
XYZ,14,UK,E,3
PQR,15,DE,F,4
This is System generated report, and needs no signature.
14-Oct-2023 18:14:12
"""
with open('students.csv', 'w', encoding='utf-8') as f:
f.write(sample_content)
# 调用函数读取文件
df_cleaned = read_cleaned_csv('students.csv', "rank")
print(df_cleaned)输出:
Student id add div rank 0 ABC 12 USA A 1.0 1 DEF 13 IND C 2.0 3 XYZ 14 UK E 3.0 4 PQR 15 DE F 4.0
注意: 这里的输出中,XYZ 和 PQR 的索引是 3 和 4,因为在原始文件中,DEF 后面跟着一个空行,导致 pd.read_csv 在默认情况下将空行也作为数据行加载,然后因为 rank 列为 NaN 而被 df[df['rank'].notna()] 过滤掉。如果需要连续索引,可以添加 df.reset_index(drop=True, inplace=True)。
策略二:将整个文件作为文本读取,然后分割、清理并转换
这种方法不依赖于 skiprows 参数,而是将整个文件内容作为字符串读取,然后手动进行分割和清理,最后再构建 DataFrame。
实现步骤:
- 打开文件,将所有内容读取为一个字符串。
- 将字符串按换行符分割成行列表。
- 将行列表转换为一个单列的 DataFrame。
- 使用 str.split(',') 将单列拆分为多列,并 expand=True。
- 删除所有值为 NaN 的行,以去除头部和尾部的冗余文本以及空行。
- 识别并提取列标题行,将其设置为 DataFrame 的列名。
- 删除原始标题行,并重置索引。
示例代码:
import pandas as pd
with open('students.csv', 'r', encoding='utf-8') as file:
# 将整个文件读取为一个字符串,然后按换行符分割成行
df_raw = pd.DataFrame(file.read().split('\n'))
# 将单列拆分为多列,以逗号为分隔符
df_split = df_raw[0].str.split(',', expand=True)
# 删除所有包含 NaN 值的行,这会有效地去除头部和尾部的非数据行以及空行
df_cleaned = df_split.dropna()
# 假设第一行是列标题
# 提取列标题
new_columns = df_cleaned.iloc[0].values
# 将第一行数据设置为列名
df_cleaned.columns = new_columns
# 删除作为列名使用的第一行数据
df_final = df_cleaned.iloc[1:].reset_index(drop=True)
print(df_final)输出:
Student id add div rank 0 ABC 12 USA A 1 1 DEF 13 IND C 2 2 XYZ 14 UK E 3 3 PQR 15 DE F 4
这种方法在处理列标题位置不固定,但数据行结构相对规整的场景下非常有效。
策略三:迭代读取文件直到找到标题,然后使用 read_csv
此方法结合了文件流式读取和 Pandas 的强大功能,尤其适用于处理非常大的文件,因为它避免了一次性将整个文件加载到内存中。
实现步骤:
- 打开文件。
- 逐行读取,直到找到包含预期列标题的行。
- 将找到的标题行解析为列名。
- 将文件句柄的剩余部分直接传递给 pd.read_csv,并指定列名。
- 对加载的数据进行进一步的清理(例如,删除尾部可能存在的 NaN 行)。
示例代码:
import pandas as pd
with open('students.csv', 'r', encoding='utf-8') as file:
line = file.readline()
# 循环读取行,直到找到以“Student”开头的行(我们的列标题行)
while not line.startswith('Student'):
line = file.readline()
if not line: # 防止文件末尾未找到标题而陷入死循环
raise ValueError("Header line not found in the file.")
# 解析找到的标题行作为列名
column_names = line.strip().split(',')
# 将文件句柄的剩余部分传递给 pd.read_csv
# names 参数用于指定列名,因为我们已经手动解析了标题行
df = pd.read_csv(file, names=column_names)
# 清理尾部可能存在的 NaN 行(如果文件底部也有冗余文本)
# 假设 'rank' 是数据中一定会存在的列
df_cleaned = df.dropna(subset=['rank']) # 只检查关键列
print(df_cleaned)输出:
Student id add div rank 0 ABC 12 USA A 1.0 1 DEF 13 IND C 2.0 2 XYZ 14 UK E 3.0 3 PQR 15 DE F 4.0
这种方法特别高效,因为它只在找到标题行之后才开始由 Pandas 进行解析,并且可以处理文件末尾的额外文本,因为 dropna() 会将其清除。
注意事项与最佳实践
-
选择合适的识别策略: 根据您的 CSV 文件特点(头部/尾部冗余模式、文件大小、列标题的稳定性)选择最适合的清理策略。
- 如果头部冗余行数固定或标题行模式稳定,skiprows 和迭代读取是高效的选择。
- 如果文件大小适中且结构复杂,read().split('\n') 后手动处理可能更灵活。
- 编码问题: 在打开文件时,务必指定正确的编码(如 encoding='utf-8'),以避免乱码问题。
- 错误处理: 在实际应用中,应增加健壮的错误处理机制,例如当预期的标题行未找到时抛出异常或记录日志。
- 通用性: 尽量使您的清理逻辑具有通用性,能够适应文件内容的一些微小变化。例如,使用 startswith() 或 in 关键字来查找标题行,而不是精确匹配整个行。
- 性能考量: 对于非常大的文件(GB 级别),迭代读取文件或使用 chunksize 参数分块读取会是更优的选择,以避免内存溢出。
- 尾部清理: 文件尾部的冗余文本通常可以通过 dropna() 或根据数据特性进行行切片来处理。选择一个关键列(如 rank)来判断一行是否为有效数据,是一个常见的做法。
总结
处理含有冗余文本的 CSV 文件是数据预处理中的常见挑战。通过结合 Python 的文件 I/O 操作和 Pandas 强大的数据处理能力,我们可以灵活高效地解决这类问题。无论是通过预计算跳过行数、将文件作为文本整体处理,还是通过迭代读取文件流,核心思想都是在将数据传递给 Pandas 之前,准确地识别并隔离出真正的表格数据区域。掌握这些技巧将显著提高您数据清洗工作的效率和准确性。










