
在处理大量CSV文件并将其合并到单个Pandas DataFrame时,直接在循环中使用`pd.concat`会导致显著的性能下降和内存效率问题。本文将深入探讨这种低效模式的根源,并提供两种主要的优化策略:首先是采用“先收集后合并”的方法,通过将数据暂存到Python字典中,最后进行一次性合并;其次是引入多线程技术,利用`concurrent.futures.ThreadPoolExecutor`并行处理文件读取任务,从而大幅提升处理速度和资源利用率。
当需要从成百上千个甚至更多的CSV文件中读取数据,并将其整合到一个大型Pandas DataFrame中时,开发者常常会遇到性能瓶颈。一个常见的、但效率低下的做法是在循环内部反复调用pd.concat函数。这种模式的问题在于,每次pd.concat操作都会创建一个新的DataFrame,并将现有数据和新数据进行复制和拼接。随着DataFrame的不断增大,每次复制操作所需的时间和内存开销呈指数级增长,导致程序运行速度越来越慢,甚至可能耗尽系统内存。
考虑以下一个典型的低效代码示例,它迭代地读取文件、转置数据并追加到主DataFrame中:
import pandas as pd
import os
# 假设 df 包含 'File ID' 和 'File Name' 列
# root_path = 'your_root_path'
merged_data = pd.DataFrame()
count = 0
for index, row in df.iterrows():
folder_name = row['File ID'].strip()
file_name = row['File Name'].strip()
file_path = os.path.join(root_path, folder_name, file_name)
# 读取、转置、插入列
file_data = pd.read_csv(file_path, names=['Case', f'{folder_name}_{file_name}'], sep='\t')
file_data_transposed = file_data.set_index('Case').T.reset_index(drop=True)
file_data_transposed.insert(loc=0, column='folder_file_id', value=str(folder_name+'_'+file_name))
# 循环内反复合并,这是性能瓶颈
merged_data = pd.concat([merged_data, file_data_transposed], axis=0, ignore_index=True)
count += 1
print(count)这段代码在文件数量较少时可能表现尚可,但当文件数量达到数百或数千时,其执行时间会急剧增加,因为每次循环都需要进行昂贵的数据复制和内存重新分配。
解决上述性能问题的核心思想是避免在循环内部频繁地进行数据合并。相反,我们应该在循环中将每个文件处理后的数据结构(例如Pandas Series或DataFrame)收集起来,存储在一个Python列表或字典中,然后在循环结束后执行一次性的大规模合并操作。这种方法显著减少了内存复制的次数,从而大幅提升了效率。
pathlib模块提供了面向对象的路径操作,相比os.path更加直观和现代化。
import pathlib
# root_path = pathlib.Path('your_root_path') # 替换为你的根路径在读取CSV文件时,可以通过调整pd.read_csv的参数来提高效率和内存管理:
将每个文件的数据处理成一个Pandas Series,并以文件标识符作为键存储到字典中。squeeze()方法可以将单列DataFrame转换为Series,这对于后续的合并操作非常方便。
import pathlib
import pandas as pd
root_path = pathlib.Path('root') # 示例根路径
data_chunks = {}
# 使用 enumerate 替代外部计数器,从1开始计数
for count, (_, row) in enumerate(df.iterrows(), 1):
folder_name = row['File ID'].strip()
file_name = row['File Name'].strip()
file_path = root_path / folder_name / file_name # pathlib 的路径拼接
folder_file_id = f'{folder_name}_{file_name}'
file_data = pd.read_csv(file_path, header=None, sep='\t',
names=['Case', folder_file_id],
memory_map=True, low_memory=False)
# 将 'Case' 列设为索引,然后将单列 DataFrame 转换为 Series
data_chunks[folder_file_id] = file_data.set_index('Case').squeeze()
print(count)循环结束后,使用pd.concat将字典中的所有Series合并。通过names=['folder_file_id']为新的索引级别命名,然后使用unstack('Case')将Case索引级别转换为列,最后reset_index()将folder_file_id和新的列索引转换为常规列。
merged_data = (pd.concat(data_chunks, names=['folder_file_id'])
.unstack('Case').reset_index())示例输入数据结构:
df DataFrame:
1、该模板代码干净整洁; 2、效果相当的炫酷,相当简洁大气高端,模板简单,全部已数据调用,只需后台修改栏目名称即可 3、适用于教育培训、教育机构; 4、网站手工DIV+css,代码精简,首页排版整洁大方、布局合理、利于SEO、图文并茂、静态HTML; 5、首页和全局重新做了全面优化,方便大家无缝使用;
425
File ID File Name 0 folderA file001.txt 1 folderB file002.txt
root/folderA/file001.txt:
0 1234 1 5678 2 9012 3 3456 4 7890
root/folderB/file002.txt:
0 4567 1 8901 2 2345 3 6789
优化后的输出结果示例:
>>> merged_data Case folder_file_id 0 1 2 3 4 0 folderA_file001.txt 1234.0 5678.0 9012.0 3456.0 7890.0 1 folderB_file002.txt 4567.0 8901.0 2345.0 6789.0 NaN
这种方法通过一次性合并操作,极大地减少了内存分配和数据复制的开销,从而显著提升了处理大规模文件集合的性能。
对于文件读取这类I/O密集型任务,即使是单线程的优化也可能受限于磁盘I/O速度。在这种情况下,可以引入多线程并发处理,进一步缩短总执行时间。Python的concurrent.futures模块提供了一个高级接口来异步执行可调用对象,其中ThreadPoolExecutor适用于I/O密集型任务。
为了在多线程环境中执行,我们需要将单个文件的处理逻辑封装成一个独立的函数。这个函数将接收处理所需的参数,并返回处理结果。
from concurrent.futures import ThreadPoolExecutor
import pathlib
import pandas as pd
root_path = pathlib.Path('root') # 示例根路径
def process_single_file(args):
"""
读取并处理单个CSV文件。
args: (count, row_dict) - count为文件序号,row_dict为包含文件信息的字典。
返回: (folder_file_id, processed_series) - 文件标识符和处理后的Pandas Series。
"""
count, row_dict = args # 解包参数
folder_name = row_dict['File ID'].strip()
file_name = row_dict['File Name'].strip()
file_path = root_path / folder_name / file_name
folder_file_id = f'{folder_name}_{file_name}'
file_data = pd.read_csv(file_path, header=None, sep='\t',
names=['Case', folder_file_id],
memory_map=True, low_memory=False)
print(f"Processing file {count}: {folder_file_id}")
return folder_file_id, file_data.set_index('Case').squeeze()使用ThreadPoolExecutor创建一个线程池,并通过executor.map()方法将process_single_file函数应用到每个文件的数据上。map()会按照提交的顺序返回结果,这使得后续的pd.concat能够正确地合并数据。
# 将 df 转换为字典列表,以便在多线程函数中方便访问行数据
# enumerate 从1开始计数,为每个任务添加一个序号
batch_data = enumerate(df[['File ID', 'File Name']].to_dict('records'), 1)
# max_workers 参数控制并发线程数,根据系统资源和I/O特性调整
with ThreadPoolExecutor(max_workers=4) as executor: # 示例使用4个线程
# executor.map 会并行执行任务,并按顺序返回结果
processed_results = executor.map(process_single_file, batch_data)
# 将结果转换为字典,然后进行一次性合并
data_chunks_threaded = dict(processed_results)
merged_data_threaded = (pd.concat(data_chunks_threaded, names=['folder_file_id'])
.unstack('Case').reset_index())注意事项:
在处理大规模数据集合时,尤其涉及文件I/O和Pandas DataFrame操作,采用高效的编程模式至关重要。
通过采纳这些优化策略,你可以有效地处理大规模文件合并任务,避免性能瓶颈,并构建出更健壮、更高效的数据处理流程。
以上就是优化Pandas DataFrame合并:高效处理大规模文件集合的详细内容,更多请关注php中文网其它相关文章!
每个人都需要一台速度更快、更稳定的 PC。随着时间的推移,垃圾文件、旧注册表数据和不必要的后台进程会占用资源并降低性能。幸运的是,许多工具可以让 Windows 保持平稳运行。
Copyright 2014-2025 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号