
在数据分析和处理的实践中,我们经常会遇到需要处理包含数十万甚至数百万行数据的大型Pandas DataFrame。当这些处理过程涉及复杂的DataFrame操作(如df.merge、df.apply)以及频繁的外部API调用(例如Google Maps API),往往会导致程序崩溃、内存溢出或执行时间过长。特别是对于有速率限制的API,短时间内发出大量请求会触发限制,导致请求失败。本文将介绍一种有效的分批处理策略,帮助开发者优化这类场景下的数据处理流程。
处理大型DataFrame并结合外部API调用时,主要挑战包括:
分批处理(Batch Processing)的核心思想是将一个庞大的任务分解成一系列较小的、独立的子任务。每次只处理数据的一个子集,这样可以有效控制内存使用、遵守API速率限制,并提高程序的健壮性。
分批处理通常涉及以下几个关键步骤:
首先,我们需要为DataFrame中的每一行分配一个批次编号,以便后续按批次进行迭代。这可以通过整数除法 (//) 实现。
import pandas as pd
from sklearn.datasets import load_diabetes # 用于生成示例数据
import time
import os
# 模拟一个大型DataFrame
# 在实际应用中,这里会加载您真实的50万行数据
data = load_diabetes().data
columns = load_diabetes().feature_names
df = pd.DataFrame(data, columns=columns)
# 模拟一些需要处理的额外列
df['dummy_col_1'] = df['age'] * 10
df['dummy_col_2'] = df['bmi'] / 2
# 定义批次大小,例如每批处理100行
batch_size = 100
# 为DataFrame添加一个批次编号列
# df.index // batch_size 会根据索引值自动生成批次号
df['batch_num'] = df.index // batch_size
print(f"原始DataFrame总行数: {len(df)}")
print(f"总批次数量: {df['batch_num'].nunique()}")
print(f"示例批次分配:\n{df[['age', 'batch_num']].head(batch_size + 5)}")创建批次编号后,我们可以通过遍历这些唯一的批次号来逐个处理每个数据块。在循环内部,我们获取当前批次的数据子集,并对其执行所需的操作。
output_csv_path = 'processed_data_batched.csv'
# 确保输出文件是干净的,以便重新运行示例
if os.path.exists(output_csv_path):
os.remove(output_csv_path)
print(f"已删除现有文件: {output_csv_path}")
# 存储处理结果的列表(如果选择先收集再合并)
# processed_batches = []
# 遍历所有唯一的批次编号
for i, batch_id in enumerate(df['batch_num'].unique()):
# 获取当前批次的数据子集
# 使用 .copy() 避免 SettingWithCopyWarning
current_batch_df = df[df['batch_num'] == batch_id].copy()
print(f"\n正在处理第 {i+1}/{df['batch_num'].nunique()} 批次 (批次ID: {batch_id}),包含 {len(current_batch_df)} 行数据...")
# --- 在此模拟批次内的操作 ---
# 1. 模拟 df.merge 操作:
# 例如,根据现有列创建新列,模拟合并外部数据
current_batch_df['merged_data_sim'] = current_batch_df['s1'] + current_batch_df['s2']
# 2. 模拟 df.apply 操作,特别是涉及外部API调用的场景:
def custom_api_call_sim(row):
# 模拟一个耗时的API调用,例如Google Maps API请求
# 在实际应用中,这里会是您真实的API调用逻辑
# time.sleep(0.01) # 模拟每行数据的网络延迟,或在批次结束后统一延迟
return f"Processed_{row['age']}_{row['bmi']}_via_API"
# 对当前批次的数据应用模拟的API调用函数
current_batch_df['api_result'] = current_batch_df.apply(custom_api_call_sim, axis=1)
# 3. 模拟其他 df.apply 或数据转换
current_batch_df['transformed_data'] = current_batch_df['bmi'] * 100
# --- 结果持久化:写入CSV文件 ---
# 选择需要输出的列
output_columns = ['age', 'sex', 'bmi', 'bp', 'merged_data_sim', 'api_result', 'transformed_data']
if i == 0: # 对于第一个批次,写入时包含CSV头
current_batch_df[output_columns].to_csv(output_csv_path, mode='w', header=True, index=False)
print(f"已创建文件 {output_csv_path} 并写入首批数据。")
else: # 对于后续批次,以追加模式写入,不包含CSV头
current_batch_df[output_columns].to_csv(output_csv_path, mode='a', header=False, index=False)
print(f"已将批次 {batch_id} 数据追加到 {output_csv_path}。")
# 可选:在批次之间引入延迟,以遵守API速率限制
# time.sleep(1) # 每处理完一个批次暂停1秒
print(f"\n所有批次处理完毕。结果已保存到 {output_csv_path}")
# 验证最终输出文件(可选)
final_df_check = pd.read_csv(output_csv_path)
print(f"\n最终CSV文件 '{output_csv_path}' 总行数: {len(final_df_check)}")
print("最终CSV文件前5行数据:\n", final_df_check.head())在实施分批处理时,需要考虑以下几点以确保效率和稳定性:
批次大小的选择:
API速率限制与错误处理:
结果持久化策略:
内存管理:
进度跟踪:
分批处理是处理大型Pandas DataFrame并结合外部API调用的强大策略。它通过将复杂任务分解为可管理的小块,有效解决了内存、API速率限制和执行时间等问题。通过合理规划批次大小、实现健壮的API调用机制以及选择合适的持久化策略,开发者可以构建出更高效、更稳定、更具扩展性的数据处理管道。
以上就是Pandas DataFrame 大数据分批处理与外部API调用优化指南的详细内容,更多请关注php中文网其它相关文章!
每个人都需要一台速度更快、更稳定的 PC。随着时间的推移,垃圾文件、旧注册表数据和不必要的后台进程会占用资源并降低性能。幸运的是,许多工具可以让 Windows 保持平稳运行。
Copyright 2014-2025 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号