
在python中使用pandas库处理数据时,许多初学者会习惯性地使用for循环结合iterrows()或apply()方法对dataframe的每一行进行操作。这种做法对于小型数据集可能尚可接受,但一旦面对百万级甚至千万级行数的大型csv文件,其性能问题便会凸显。
考虑以下伪代码示例,它展示了一个常见的低效处理模式:
import os
import pandas as pd
# 假设文件路径和目标列表已定义
# desktop = os.path.join(os.path.join(os.environ["USERPROFILE"]), "Desktop")
# dados = pd.read_csv(desktop + '\test-1000-rows.csv')
# 模拟一个DataFrame和目标项列表
data = {'column_a': ['apple_pie', 'banana_split', 'cherry_tart', 'grape_juice', 'apple_cider'],
'column_b': ['orange_soda', 'lemonade', 'pineapple_juice', 'mango_shake', 'berry_smoothie'],
'column_c': [10, 20, 30, 40, 50]}
dados = pd.DataFrame(data)
target_items = ['apple', 'juice', 'berry'] # 假设这是要检查的列表
result_list = []
# 低效的行迭代操作
for i, row in dados.iterrows():
# 遍历目标项列表
for item in target_items:
# 检查 column_a 或 column_b 是否包含该项
if item in str(row['column_a']) or item in str(row['column_b']):
result_list.append(row['column_c'])
break # 找到一个匹配项后跳出内层循环
print(f"通过iterrows获取的结果: {result_list}")这段代码的问题在于,iterrows()会返回一个生成器,每次迭代都会将一行数据转换为Series对象,这涉及到Python层面的循环和对象创建,而非底层的C语言或NumPy优化操作。对于大数据集,这种开销会迅速累积,导致处理时间呈线性甚至超线性增长。apply()方法虽然在某些场景下比iterrows()略好,但如果其内部逻辑仍然是行级别的Python操作,同样会面临性能瓶颈。
Pandas的强大之处在于其底层基于NumPy,能够进行高度优化的向量化操作。这意味着我们可以对整个Series或DataFrame的列进行操作,而不是逐行处理。向量化操作通常在C语言层面实现,效率远高于Python循环。
要优化上述示例中的逻辑,我们需要将“检查column_a或column_b是否包含某个项,并根据条件收集column_c的值”这一过程向量化。
立即学习“Python免费学习笔记(深入)”;
以下是使用向量化操作重写上述逻辑的示例:
import pandas as pd
import re
# 模拟数据 (与上面相同)
data = {'column_a': ['apple_pie', 'banana_split', 'cherry_tart', 'grape_juice', 'apple_cider'],
'column_b': ['orange_soda', 'lemonade', 'pineapple_juice', 'mango_shake', 'berry_smoothie'],
'column_c': [10, 20, 30, 40, 50]}
dados = pd.DataFrame(data)
target_items = ['apple', 'juice', 'berry']
# 1. 构建正则表达式模式
# 使用 re.escape 确保目标项中的特殊字符被正确转义
pattern = '|'.join([re.escape(item) for item in target_items])
# 2. 应用向量化操作进行条件检查
# 检查 'column_a' 是否包含任一目标项
condition_a = dados['column_a'].str.contains(pattern, na=False, regex=True)
# 检查 'column_b' 是否包含任一目标项
condition_b = dados['column_b'].str.contains(pattern, na=False, regex=True)
# 组合两个条件 (逻辑或操作)
final_condition = condition_a | condition_b
# 3. 使用布尔索引筛选并获取 'column_c' 的值
result_list_vectorized = dados.loc[final_condition, 'column_c'].tolist()
print(f"通过向量化操作获取的结果: {result_list_vectorized}")代码解析与优势:
通过这种方式,原本需要400秒处理10,000行数据的操作,在百万行级别的数据集上也能在几秒甚至更短的时间内完成,极大地提升了处理效率。
对于一些极其庞大的CSV文件(例如,GB级别甚至TB级别),即使是向量化操作,也可能因为文件过大无法一次性加载到内存中而导致内存错误。在这种情况下,pd.read_csv()的chunksize参数就显得尤为重要。
chunksize允许我们分批次读取CSV文件,每次只加载文件的一部分到内存中,形成一个DataFrame迭代器。我们可以对每个数据块(chunk)应用向量化操作,然后将结果汇总。
import pandas as pd
import re
file_path = 'your_large_file.csv' # 替换为你的大型CSV文件路径
chunk_size = 100000 # 每次读取10万行数据,可根据内存情况调整
target_items = ['apple', 'juice', 'berry']
pattern = '|'.join([re.escape(item) for item in target_items])
all_results = [] # 用于存储所有数据块处理后的结果
# 使用 chunksize 分块读取CSV文件
for chunk in pd.read_csv(file_path, chunksize=chunk_size):
# 对每个数据块应用之前定义的向量化操作
condition_a = chunk['column_a'].str.contains(pattern, na=False, regex=True)
condition_b = chunk['column_b'].str.contains(pattern, na=False, regex=True)
final_condition = condition_a | condition_b
chunk_results = chunk.loc[final_condition, 'column_c'].tolist()
all_results.extend(chunk_results) # 将当前数据块的结果添加到总结果列表中
print(f"通过分块处理获取的总结果数量: {len(all_results)}")
# print(all_results) # 如果结果列表不大,可以打印查看注意事项:
处理大型CSV文件并提升Pandas性能的核心原则是:尽可能避免Python层面的显式循环,拥抱Pandas的向量化操作。
通过遵循这些最佳实践,你将能够高效、稳定地处理任何规模的CSV数据,充分发挥Pandas在数据科学领域的强大能力。
以上就是优化Python中Pandas处理大型CSV文件的性能的详细内容,更多请关注php中文网其它相关文章!
Copyright 2014-2025 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号