分块读取是处理大型CSV文件的核心策略,通过pandas的chunksize参数将文件分割为小块迭代加载,避免内存溢出;结合dtype优化、usecols筛选列、增量聚合及分块写入文件或数据库,可显著降低内存占用并提升处理效率。

处理大型CSV文件,尤其是在内存有限的环境下,Python的pandas库提供了一个非常有效的策略:分块读取。核心思想是,不是一次性将整个文件加载到内存中,而是将其拆分成若干小块(chunks),逐块处理,这样可以显著降低内存占用,避免程序崩溃。
当面对一个GB级别甚至更大的CSV文件时,直接使用
pd.read_csv()
read_csv
chunksize
chunksize
read_csv
import pandas as pd
file_path = 'your_large_file.csv'
chunk_size = 100000 # 例如,每次读取10万行
# 创建一个空的列表来存储处理后的数据块,如果需要最终合并的话
processed_chunks = []
try:
# read_csv 返回一个TextFileReader对象,可以像迭代器一样使用
for i, chunk in enumerate(pd.read_csv(file_path, chunksize=chunk_size)):
print(f"正在处理第 {i+1} 个数据块,行数: {len(chunk)}")
# 在这里对每个chunk进行你的数据处理、清洗、分析等操作
# 例如,筛选特定列、计算均值、聚合数据等
# processed_chunk = chunk[chunk['some_column'] > 0]
# 如果需要将处理后的数据块合并,可以添加到列表中
# processed_chunks.append(processed_chunk)
# 如果只是做一些统计或聚合,可能不需要存储整个chunk
# 例如:total_sum += chunk['value_column'].sum()
except MemoryError:
print("内存溢出!请尝试减小 chunk_size。")
except FileNotFoundError:
print(f"文件未找到: {file_path}")
except Exception as e:
print(f"读取或处理文件时发生错误: {e}")
# 如果之前存储了处理后的chunks,现在可以合并它们
# final_df = pd.concat(processed_chunks, ignore_index=True)
# print("所有数据块处理完毕并合并。")这个策略的核心在于“化整为零”。每次只在内存中保留一小部分数据,处理完就释放掉,或者只保留处理结果,极大地缓解了内存压力。
立即学习“Python免费学习笔记(深入)”;
你肯定遇到过那种情况:一个看似普通的CSV文件,在本地编辑器里打开没啥问题,但一用Python跑
pd.read_csv()
MemoryError
pandas.read_csv()
如果你的CSV文件有几个GB,而你的机器只有8GB或16GB内存,那么很容易就会超出可用内存上限。要知道,DataFrame在内存中的占用通常会比原始CSV文件大,因为数据类型转换、索引创建以及Python对象本身的开销都会增加内存消耗。举个例子,一个存储整数的列,在CSV里可能只是几个字符,但在DataFrame里可能会被存储为64位的整型对象,占用8字节,加上Python对象的额外开销,内存占用会迅速膨胀。当操作系统发现程序请求的内存超过了物理内存加上交换空间(swap space)的总和时,就会抛出
MemoryError
分块读取只是第一步,更关键的是如何有效地处理这些数据块。这不仅仅是把
chunksize
如果你的最终目标是得到一个完整的、经过处理的DataFrame,并且你认为即使处理后的DataFrame仍然可以放入内存,那么你可以将每个处理后的
chunk
pd.concat()
更多时候,我们处理大型CSV是为了进行一些统计分析或聚合操作,比如计算总和、平均值、计数、最大最小值,或者进行一些数据清洗和过滤,然后将结果保存到另一个文件。在这种情况下,我们根本不需要将所有数据块合并成一个巨大的DataFrame。
我的做法通常是这样的:
增量聚合: 如果你需要计算总和、平均值等,可以在循环中维护一个累加器。
total_value = 0
total_rows = 0
for chunk in pd.read_csv(file_path, chunksize=chunk_size):
total_value += chunk['value_column'].sum()
total_rows += len(chunk)
average_value = total_value / total_rows if total_rows else 0
print(f"总平均值: {average_value}")对于更复杂的聚合,比如按某个列分组求和,你可以对每个
chunk
groupby().sum()
chunk
add
筛选与过滤: 如果你只需要CSV中的一部分行或列,可以在每个
chunk
filtered_data_chunks = []
for chunk in pd.read_csv(file_path, chunksize=chunk_size):
# 假设我们只关心 'status' 列为 'active' 的行
filtered_chunk = chunk[chunk['status'] == 'active']
if not filtered_chunk.empty:
filtered_data_chunks.append(filtered_chunk)
# 如果 filtered_data_chunks 不会太大,可以合并
# final_filtered_df = pd.concat(filtered_data_chunks, ignore_index=True)
# 或者直接将过滤后的数据写入新的CSV文件
# if not filtered_data_chunks:
# pd.concat(filtered_data_chunks).to_csv('filtered_output.csv', index=False)
# else:
# for i, fc in enumerate(filtered_data_chunks):
# if i == 0:
# fc.to_csv('filtered_output.csv', mode='w', header=True, index=False)
# else:
# fc.to_csv('filtered_output.csv', mode='a', header=False, index=False)直接输出到数据库或新文件: 处理完每个
chunk
to_sql
# 假设你已经有了一个数据库连接 engine
# from sqlalchemy import create_engine
# engine = create_engine('sqlite:///my_database.db')
# 第一次写入时创建表头,后续追加
first_chunk = True
for chunk in pd.read_csv(file_path, chunksize=chunk_size):
# 对 chunk 进行处理...
processed_chunk = chunk.dropna() # 举例:删除空值
# 写入新的CSV文件
if first_chunk:
processed_chunk.to_csv('processed_output.csv', mode='w', header=True, index=False)
first_chunk = False
else:
processed_chunk.to_csv('processed_output.csv', mode='a', header=False, index=False)
# 写入数据库 (如果需要)
# processed_chunk.to_sql('my_table', con=engine, if_exists='append', index=False)这种方式的效率很高,因为它将内存消耗保持在最低水平,并将I/O操作分散开来。
分块读取是解决内存问题的基石,但仅仅依靠它还不够。在实际工作中,我发现结合一些其他优化技巧,能让整个处理流程更加顺畅和高效。
精确指定数据类型(dtype
pandas
int64
object
read_csv
dtype
# 示例:预先知道列的数据类型
optimized_dtypes = {
'id': 'int32',
'category': 'category', # 对于重复值较少的字符串列,使用category类型可以节省大量内存
'value': 'float32',
'timestamp': 'datetime64[ns]'
}
for chunk in pd.read_csv(file_path, chunksize=chunk_size, dtype=optimized_dtypes):
# ...处理
pass这需要你对数据有一定的了解,或者可以先读取少量数据来分析其类型分布。
只读取所需列(usecols
usecols
# 假设我们只需要 'id', 'name', 'score' 这几列
required_columns = ['id', 'name', 'score']
for chunk in pd.read_csv(file_path, chunksize=chunk_size, usecols=required_columns):
# ...处理
pass选择合适的解析引擎(engine
pandas.read_csv
engine='c'
engine='python'
跳过不必要的行(skiprows
nrows
skiprows
nrows
考虑更高效的数据存储格式:对于真正意义上的“大数据”,CSV文件其实并不是最优选择。一旦数据经过清洗和预处理,我通常会将其转换为更高效的二进制格式,比如 Parquet 或 Feather。这些格式是列式存储的,支持数据压缩,并且读取速度比CSV快几个数量级。下次需要分析时,直接读取Parquet文件会快很多,内存占用也更低。
# 假设你已经处理完数据并得到了一个DataFrame
# final_df.to_parquet('processed_data.parquet', index=False)
# 以后读取:pd.read_parquet('processed_data.parquet')Dask DataFrames:如果你的数据集真的大到即使分块处理也感觉力不从心,或者需要进行复杂的分布式计算,那么 Dask 是一个值得考虑的工具。Dask DataFrames 模仿了pandas API,但它能够在比内存更大的数据集上运行,并且可以轻松地扩展到多核处理器或集群上。它通过将大型DataFrame分解成多个小的pandas DataFrames,并延迟计算,来实现“out-of-core”处理。
# import dask.dataframe as dd
# ddf = dd.read_csv(file_path, chunksize=chunk_size) # 或者直接 dd.read_csv(file_path)
# result = ddf.groupby('category')['value'].mean().compute() # .compute() 触发实际计算Dask的学习曲线比纯pandas略高,但对于处理TB级别的数据集,它提供了强大的解决方案。
这些策略并非相互独立,而是可以组合使用的。在处理大型数据集时,往往需要根据具体情况灵活选择和搭配,才能找到最适合的解决方案。
以上就是Python怎么读取一个大的CSV文件_pandas分块读取大型CSV文件策略的详细内容,更多请关注php中文网其它相关文章!
每个人都需要一台速度更快、更稳定的 PC。随着时间的推移,垃圾文件、旧注册表数据和不必要的后台进程会占用资源并降低性能。幸运的是,许多工具可以让 Windows 保持平稳运行。
Copyright 2014-2025 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号