
引言:处理非标准CSV文件的挑战
在日常数据分析工作中,我们经常会遇到格式不规范的csv文件。这些文件可能由系统自动生成,包含了大量与实际数据无关的页眉信息(如报告名称、生成日期、筛选条件等)和页脚信息(如免责声明、生成时间戳等)。当需要使用pandas等数据处理库加载这些文件时,这些冗余文本会干扰正常的csv解析过程,导致数据加载失败或生成错误的dataframe。
例如,一个典型的系统报告CSV文件可能如下所示:
SAMPLE FILE LTD STUDENT NUMBERS INFO OF ALL STUDENTS No : from 27-Mar-2023 00:00:00 to 04-Apr-2023 00:00:00 and from 05-Oct-2023 00:00:00 to 13-Oct-2023 00:00:00 Student,id,add,div,rank ABC,12,USA,A,1 DEF,13,IND,C,2 XYZ,14,UK,E,3 PQR,15,DE,F,4 This is System generated report, and needs no signature. 14-Oct-2023 18:14:12
直接使用pd.read_csv()读取此类文件通常会失败,或者将非数据行错误地解析为数据。本教程将介绍两种有效的策略来解决这一问题,帮助您清理并准确地读取这类CSV文件。
方法一:整体读取与后处理
这种方法的核心思想是将整个CSV文件作为纯文本内容读取,然后利用Pandas强大的字符串处理功能,从文本中提取出结构化的数据。它适用于那些数据头(列名)位置相对固定,但页眉和页脚内容不规则的场景。
工作原理
- 读取整个文件: 将CSV文件的全部内容读取为一个长字符串。
- 按行分割: 将长字符串按换行符分割成行的列表。
- 转换为DataFrame: 将行的列表转换为一个单列的Pandas DataFrame。
- 拆分列与清理: 对单列进行字符串操作,按CSV分隔符(通常是逗号)拆分成多列,并移除所有包含NaN值的行,以剔除非数据行。
- 设置列名: 识别出包含列名的那一行,并将其设置为DataFrame的列名,同时重置索引以确保数据从第一行开始。
示例代码
假设我们的CSV文件名为 students.csv:
import pandas as pd
file_name = 'students.csv'
# 1. 读取整个文件内容到DataFrame的单列中
with open(file_name, 'r') as file:
df = pd.DataFrame(file.read().split('\n'))
# 2. 将单列按逗号分隔,并扩展为多列,然后删除所有包含NaN的行
# 这一步会清理掉页眉和页脚中的非数据行
df = df[0].str.split(',', expand=True).dropna()
# 3. 将第一行(即原文件中的列头行)设置为新的列名
# 同时,将实际数据从第二行开始,并重置索引
df, df.columns = df.iloc[1:].reset_index(drop=True), df.iloc[0].values
print(df)输出结果
Student id add div rank 0 ABC 12 USA A 1 1 DEF 13 IND C 2 2 XYZ 14 UK E 3 3 PQR 15 DE F 4
优点与注意事项
- 优点: 灵活性高,对于页眉和页脚内容非常不规则的情况,这种方法能够通过dropna()有效地移除它们。
- 注意事项: 对于非常大的文件,将整个文件内容加载到内存中可能会消耗大量资源。此外,dropna()会移除任何包含NaN的行,如果您的数据中本身就可能存在NaN值,需要更精细的过滤策略。
方法二:预解析文件流定位数据起始点
这种方法更高效,尤其适用于大型文件,因为它避免了一次性加载所有文件内容到内存中进行字符串处理。其核心思想是在将文件对象传递给pd.read_csv()之前,手动读取文件流,直到找到数据头所在的行。
工作原理
- 打开文件: 以只读模式打开CSV文件。
- 逐行查找数据头: 循环读取文件的每一行,直到找到包含预期列名的行。例如,如果您的列名总是以"Student"开头,就查找以"Student"开头的行。
- 使用read_csv接管: 一旦找到数据头行,将其解析为列名,并从当前文件指针位置(即数据头行之后)开始,将文件对象传递给pd.read_csv()。read_csv会从文件流的当前位置继续读取。
- 清理页脚: pd.read_csv()读取完成后,使用dropna()移除末尾可能存在的页脚行。
示例代码
import pandas as pd
file_name = 'students.csv'
# 1. 打开文件并逐行读取,直到找到数据头
with open(file_name, 'r') as file:
line = file.readline()
while not line.startswith('Student'): # 假设数据头总是以'Student'开头
line = file.readline()
# 2. 找到数据头后,解析列名
column_names = line.strip().split(',')
# 3. 使用pd.read_csv从当前文件指针位置开始读取,并指定列名
# 文件对象会从上一次readline()结束的位置继续读取
df = pd.read_csv(file, names=column_names)
# 4. 删除所有包含NaN的行,以清理可能存在的页脚
df = df.dropna()
print(df)输出结果
Student id add div rank 0 ABC 12 USA A 1.0 1 DEF 13 IND C 2.0 2 XYZ 14 UK E 3.0 3 PQR 15 DE F 4.0
优点与注意事项
- 优点: 内存效率高,因为pd.read_csv可以直接从文件流的正确位置开始读取,无需预先加载整个文件。对于大型CSV文件,性能表现更佳。
- 注意事项: 要求能够准确识别数据头行的特征(如特定的起始字符串)。如果数据头行的模式不固定,可能需要更复杂的逻辑来定位。dropna()同样会移除数据中可能存在的NaN值。
注意事项与最佳实践
- 动态查找数据头: 如果数据头的起始字符串不总是固定,可以考虑使用正则表达式 (re模块) 或更灵活的字符串匹配逻辑来定位数据头行。例如,可以查找包含所有预期列名(如"Student", "id", "add")的行。
- 处理复杂页脚: 如果页脚内容非常复杂,或者dropna()可能会误删有效数据,您可能需要更精确地确定数据结束的位置。例如,可以查找文件末尾的特定标记行,然后仅读取该标记行之前的数据。
- 性能考量: 对于少量文件或小文件,两种方法性能差异不明显。但对于大量文件(如问题中提到的10K+文件)或超大型文件,方法二(预解析文件流)通常是更优的选择,因为它更好地利用了pd.read_csv的内部优化和流式读取能力。
- 错误处理: 在实际应用中,应考虑文件不存在、数据头未找到、文件内容为空等异常情况,并添加相应的错误处理机制(如try-except块)。
- 通用性封装: 建议将这些逻辑封装成一个可复用的函数,接受文件路径和数据头识别模式作为参数,提高代码的模块化和可维护性。
def read_messy_csv(file_path, header_start_string=None, column_names=None):
"""
读取包含冗余页眉和页脚的CSV文件。
参数:
file_path (str): CSV文件路径。
header_start_string (str, optional): 数据头行开始的字符串。如果提供,将使用方法二。
例如: 'Student'。
column_names (list, optional): 如果已知列名,可以提前提供。
如果使用方法一,则不需要。
如果使用方法二且header_start_string未提供,则此参数用于定位。
返回:
pd.DataFrame: 清理后的数据DataFrame。
"""
if header_start_string:
# 方法二:预解析文件流
with open(file_path, 'r') as file:
line = file.readline()
while not line.startswith(header_start_string):
line = file.readline()
if not line: # 防止文件末尾没有找到匹配项
raise ValueError(f"在文件 {file_path} 中未找到以 '{header_start_string}' 开头的数据头。")
# 如果未提供column_names,则从找到的行中解析
if column_names is None:
parsed_column_names = line.strip().split(',')
else:
parsed_column_names = column_names
df = pd.read_csv(file, names=parsed_column_names)
return df.dropna().reset_index(drop=True)
else:
# 方法一:整体读取与后处理 (适用于header_start_string不固定或未知的情况)
with open(file_path, 'r') as file:
df = pd.DataFrame(file.read().split('\n'))
df = df[0].str.split(',', expand=True).dropna()
# 尝试从数据中识别列名,或者使用提供的column_names
if column_names is None:
# 假设第一行非空且包含多个逗号分隔的元素是列名
if not df.empty and len(df.iloc[0].dropna()) > 1:
df, df.columns = df.iloc[1:].reset_index(drop=True), df.iloc[0].values
else:
raise ValueError("无法自动识别列名,请提供 'column_names' 或 'header_start_string'。")
else:
# 强制设置列名,并尝试匹配数据
# 这种情况下,可能需要更复杂的逻辑来对齐数据
# 简单处理:假设第一行是列名,然后进行替换
if not df.empty and len(df.iloc[0]) == len(column_names):
df, df.columns = df.iloc[1:].reset_index(drop=True), column_names
else:
# 否则,假设没有找到明确的列头行,直接使用提供的列名,并尝试清理
df.columns = column_names # 这可能导致列数不匹配或数据错位
df = df.dropna().reset_index(drop=True)
return df
# 示例使用
# # 创建一个示例文件
# with open('students.csv', 'w') as f:
# f.write("""SAMPLE FILE LTD
# STUDENT NUMBERS
# INFO OF ALL STUDENTS No : from 27-Mar-2023 00:00:00 to 04-Apr-2023 00:00:00 and from 05-Oct-2023 00:00:00 to 13-Oct-2023 00:00:00
# Student,id,add,div,rank
# ABC,12,USA,A,1
# DEF,13,IND,C,2
# XYZ,14,UK,E,3
# PQR,15,DE,F,4
# This is System generated report, and needs no signature.
# 14-Oct-2023 18:14:12""")
# # 使用方法二
# df_cleaned_method2 = read_messy_csv('students.csv', header_start_string='Student')
# print("\n--- 方法二清理结果 ---")
# print(df_cleaned_method2)
# # 使用方法一(假设不知道确切的header_start_string,但知道列名)
# # df_cleaned_method1 = read_messy_csv('students.csv', column_names=['Student', 'id', 'add', 'div', 'rank'])
# # print("\n--- 方法一清理结果 ---")
# # print(df_cleaned_method1)总结
处理包含冗余文本的非标准CSV文件是数据预处理中的常见挑战。本教程介绍了两种基于Pandas的有效策略:整体读取后处理和预解析文件流。
- 整体读取与后处理 (df[0].str.split(',', expand=True).dropna()) 提供了高度的灵活性,适用于页眉和页脚结构多变但数据头位置相对固定的情况。
- 预解析文件流 (while not line.startswith('...') 后 pd.read_csv(file, names=...)) 在处理大型文件时表现出更高的效率,因为它允许pd.read_csv从文件流的正确位置开始读取,减少了不必要的内存消耗和字符串操作。
在实际应用中,应根据CSV文件的具体特性(文件大小、页眉/页脚的规律性、数据头行的可识别性)选择最适合的方法,并结合错误处理和通用性封装,以构建健壮、高效的数据加载流程。










