
在实际数据处理中,我们经常会遇到csv文件并非纯粹的逗号分隔值数据,而是在文件开头或结尾包含一些非结构化的元数据、报告标题或脚注。对于少量文件,手动清理尚可接受,但当需要处理成千上万个此类文件时,自动化清理和读取就变得至关重要。本文将介绍几种利用python和pandas库来应对这一挑战的有效方法。
1. 准备示例数据
为了演示这些方法,我们首先创建一个模拟包含非结构化文本的CSV文件 students.csv:
# 创建一个示例文件
sample_data = """SAMPLE FILE LTD
STUDENT NUMBERS
INFO OF ALL STUDENTS No : from 27-Mar-2023 00:00:00 to 04-Apr-2023 00:00:00 and from 05-Oct-2023 00:00:00 to 13-Oct-2023 00:00:00
Student,id,add,div,rank
ABC,12,USA,A,1
DEF,13,IND,C,2
XYZ,14,UK,E,3
PQR,15,DE,F,4
This is System generated report, and needs no signature.
14-Oct-2023 18:14:12
"""
with open('students.csv', 'w') as f:
f.write(sample_data)
print("示例文件 students.csv 已创建。")我们的目标是从这个文件中准确地读取以下数据:
Student,id,add,div,rank ABC,12,USA,A,1 DEF,13,IND,C,2 XYZ,14,UK,E,3 PQR,15,DE,F,4
2. 方法一:基于关键词定位并跳过指定行
这种方法的核心思想是,首先找到包含实际数据头部的行(例如,通过匹配列名),然后计算出在该行之前有多少行需要跳过,最后使用 pd.read_csv 的 skiprows 参数进行读取。
2.1 实现步骤
- 定位头部行: 编写一个函数,逐行读取文件,直到找到包含所有列名(或其中一个关键列名)的行。
- 计算跳过行数: 记录从文件开头到头部行之间的行数。
- 使用 pd.read_csv 读取: 将计算出的行数传递给 skiprows 参数。
- 后处理: 由于文件末尾可能仍有冗余文本或空行,使用 dropna() 清理。
2.2 示例代码
import pandas as pd
def get_rows_to_skip(file_name, header_keyword):
"""
计算从文件开头到包含指定关键词的行之间的行数。
"""
rows = 0
with open(file_name, 'r') as file:
while True:
line = file.readline()
if not line: # 文件结束
return -1 # 表示未找到关键词
if header_keyword in line:
return rows
rows += 1
def read_cleaned_csv_by_skiprows(file_name, header_keyword, expected_columns):
"""
通过跳过指定行数来读取清理后的CSV文件。
"""
skip_rows = get_rows_to_skip(file_name, header_keyword)
if skip_rows == -1:
print(f"错误:未在文件 '{file_name}' 中找到关键词 '{header_keyword}'。")
return pd.DataFrame()
# 读取CSV,跳过前导行
df = pd.read_csv(file_name, skiprows=skip_rows)
# 清理数据:移除全为空的行,这通常发生在文件末尾的冗余文本被读入后
# 确保列名是预期的,并且数据行不包含NaN
# 检查第一行是否是预期的列名,如果不是,则可能需要进一步处理
if not all(col in df.columns for col in expected_columns):
print(f"警告:读取的列名与预期不符。实际列名:{df.columns.tolist()}")
# 尝试将第一行作为列名并重新读取,或者进行更复杂的清洗
# 考虑到我们的header_keyword是'rank',它在列名中,所以pd.read_csv会正确识别头部。
pass
# 移除所有列都为NaN的行,这有助于清理文件末尾的空行或无关文本
df = df.dropna(how='all')
# 进一步清理:如果某些列被读取为NaN,但它们应该是数据,这通常意味着文件末尾有额外文本
# 我们可以根据关键列(如'rank')来过滤掉无效数据行
if 'rank' in df.columns:
df = df[df['rank'].notna()]
return df
# 预期列名,用于验证和清理
expected_columns = ['Student', 'id', 'add', 'div', 'rank']
df_skiprows = read_cleaned_csv_by_skiprows('students.csv', 'rank', expected_columns)
print("方法一:基于关键词跳过行读取结果:")
print(df_skiprows)
print("-" * 30)2.3 注意事项
- 此方法依赖于 header_keyword 的唯一性和稳定性。如果关键词在非数据行中也出现,或者数据头部行结构不稳定,可能会导致错误。
- skiprows 参数只能跳过文件开头的行,对于文件末尾的冗余文本,需要额外的 dropna() 等后处理步骤。
- pd.read_csv 会尝试自动识别列名。如果 header_keyword 是列名的一部分,通常能正确识别。
3. 方法二:逐行读取定位头部并传递文件句柄(推荐)
这种方法更加灵活和健壮,尤其适用于头部行位置不固定,但其内容结构相对稳定的情况。它通过Python的文件操作逐行读取,直到找到真正的CSV头部,然后将文件句柄的剩余部分直接传递给 pd.read_csv。
3.1 实现步骤
- 打开文件: 使用 with open(...) 打开文件。
- 逐行扫描: 循环读取文件的每一行,直到找到符合头部特征的行(例如,以特定字符串开头,或包含所有预期的列名)。
- 解析头部: 从找到的头部行中提取列名。
- 传递文件句柄: 将当前文件句柄(它已定位在头部行之后)传递给 pd.read_csv。
- 指定列名和清理: 使用解析出的列名作为 names 参数,并使用 dropna() 清理可能存在的空行或文件末尾的冗余数据。
3.2 示例代码
import pandas as pd
def read_cleaned_csv_by_line_scan(file_name, header_start_string, expected_columns):
"""
通过逐行扫描定位头部,然后读取清理后的CSV文件。
"""
with open(file_name, 'r') as file:
header_line = None
# 逐行读取直到找到头部行
for line in file:
if line.strip().startswith(header_start_string):
header_line = line.strip()
break
if header_line is None:
print(f"错误:未在文件 '{file_name}' 中找到以 '{header_start_string}' 开头的头部行。")
return pd.DataFrame()
# 从找到的头部行解析列名
column_names = header_line.split(',')
# 将文件句柄的剩余部分传递给pd.read_csv
# 使用 names 参数指定列名,因为我们已经读取了头部行
df = pd.read_csv(file, names=column_names)
# 清理数据:移除所有列都为NaN的行,这有助于清理文件末尾的空行或无关文本
df = df.dropna(how='all')
# 进一步清理:如果某些列被读取为NaN,但它们应该是数据,这通常意味着文件末尾有额外文本
if 'rank' in df.columns:
df = df[df['rank'].notna()]
return df
# 使用 'Student' 作为头部行的起始字符串
df_line_scan = read_cleaned_csv_by_line_scan('students.csv', 'Student', expected_columns)
print("方法二:逐行扫描定位头部读取结果:")
print(df_line_scan)
print("-" * 30)3.3 优点与注意事项
- 优点: 这种方法对文件头部冗余行的数量不敏感,只要能准确识别头部行即可。对于非常大的文件,它避免了一次性加载整个文件到内存,效率更高。
- 灵活性: header_start_string 可以是更复杂的条件,例如检查是否包含所有 expected_columns 中的列名。
- names 参数: 由于我们手动读取了头部行,pd.read_csv 应该使用 names 参数来明确指定列名,而不是让它尝试从文件中读取。
- dropna(how='all'): 这是清理文件末尾空行或无关文本的有效方式。如果文件末尾的文本不是完全空行,可能需要更精确的过滤。
4. 方法三:全文件读取与后处理(通用但可能效率低)
这种方法将整个文件内容作为字符串读取,然后进行分割、清洗和转换为DataFrame。它更通用,可以处理更复杂的非标准格式,但对于超大文件可能效率较低,因为它需要将整个文件内容加载到内存中。
4.1 实现步骤
- 读取整个文件: 将整个文件内容读取为一个字符串。
- 按行分割: 将字符串按换行符分割成行的列表。
- 转换为DataFrame: 将行的列表转换为单列的DataFrame。
- 分割列并清洗: 使用逗号分割列,并删除包含NaN的行。
- 设置列名: 手动将第一行数据设置为列名。
4.2 示例代码
import pandas as pd
def read_cleaned_csv_by_full_read(file_name):
"""
将整个文件读取为字符串,然后进行分割和清洗。
"""
with open(file_name, 'r') as file:
# 将整个文件读取为字符串,然后按换行符分割成行
df_raw = pd.DataFrame(file.read().split('\n'))
# 将单列DataFrame的每一行按逗号分割成多列
# dropna() 用于移除文件开头和结尾的非CSV行,以及可能存在的空行
df = df_raw[0].str.split(',', expand=True).dropna()
# 将第一行(此时应该是实际的CSV头部)设置为列名
# 并移除原始的第一行(因为现在它是列名)
df.columns = df.iloc[0].values
df = df[1:].reset_index(drop=True)
# 再次清理,确保数据行中没有NaN
df = df.dropna(how='any')
return df
df_full_read = read_cleaned_csv_by_full_read('students.csv')
print("方法三:全文件读取与后处理读取结果:")
print(df_full_read)
print("-" * 30)4.3 优点与注意事项
- 优点: 这种方法非常灵活,可以处理各种非标准格式,只要能通过字符串操作进行解析。
- 缺点: 对于非常大的文件,一次性将所有内容加载到内存中可能导致性能问题或内存溢出。
- 复杂性: 需要更多手动的数据清洗和列名设置步骤,可能不如 pd.read_csv 直接处理文件句柄那么简洁。
5. 总结与最佳实践
在处理包含非结构化文本的CSV文件时,选择合适的方法取决于文件的具体结构、大小和性能要求:
-
对于文件头部冗余行数不固定,但头部行内容相对稳定的情况(推荐):
- 方法二(逐行读取定位头部并传递文件句柄) 是最推荐的方案。它结合了Python文件操作的灵活性和Pandas的强大读取能力,既高效又健壮。
-
对于文件头部冗余行数相对固定,或可以通过一个关键词准确识别头部行的情况:
- 方法一(基于关键词定位并跳过指定行) 是一个简单有效的选择。
-
对于文件结构极其复杂,或者需要对文件内容进行更细粒度的字符串操作的情况:
- 方法三(全文件读取与后处理) 提供了最大的灵活性,但应注意其潜在的性能开销。
通用注意事项:
- 错误处理: 在实际应用中,务必添加文件不存在、关键词未找到等异常处理逻辑。
- 模式匹配: 如果头部识别逻辑非常复杂,可以考虑使用正则表达式来匹配头部行。
- 数据类型: 在读取后,通常需要检查并转换DataFrame中列的数据类型,因为 pd.read_csv 可能会将数字列识别为字符串。
- dropna() 的使用: 根据实际情况选择 how='any'(删除包含任何NaN的行)或 how='all'(删除所有值为NaN的行)。
通过这些方法,您可以有效地自动化处理大量包含非结构化文本的CSV文件,从而节省时间和精力,确保数据处理流程的准确性和效率。










