
在处理包含数十万甚至数百万行数据的大型pandas dataframe时,直接对整个数据集执行复杂操作(如df.merge、df.apply)或频繁调用外部api(如google maps api),极易引发内存溢出、程序崩溃或因api请求频率过高而被限流等问题。尤其当每次api调用耗时且有严格的速率限制时,一次性处理所有数据几乎不可行。解决这些问题的关键在于采用分批处理(batch processing)策略。
分批处理的核心思想是将一个庞大的数据集分解成若干个大小可控的小数据集(批次),然后逐一处理这些批次。这种方法不仅能有效降低单次操作的内存消耗,还能更好地管理外部API的调用频率。
我们可以利用DataFrame的索引(df.index)结合整数除法(//)来为每一行分配一个批次编号。例如,如果希望每100行作为一个批次,那么df.index // 100就能生成相应的批次号。
以下是一个演示如何创建批次并迭代处理的示例代码:
import pandas as pd
import numpy as np
import time
import os
# 模拟一个大型DataFrame
# 实际应用中,这里会是您加载的50万行数据
data_size = 500000
df = pd.DataFrame({
'id': range(data_size),
'col_a': np.random.rand(data_size) * 100,
'address': [f'Address {i}, City {i % 100}' for i in range(data_size)],
'value_b': np.random.randint(0, 1000, data_size)
})
print(f"原始DataFrame大小: {len(df)} 行")
# 定义批次大小
batch_size = 100
# 为DataFrame中的每一行生成批次号
df['batch_num'] = df.index // batch_size
# 模拟一个外部API调用函数
def call_google_maps_api(address):
"""
模拟调用Google Maps API,获取经纬度
实际应用中,这里会是您的requests.get()调用
"""
# 模拟网络延迟和API处理时间
time.sleep(0.05) # 每次调用暂停50毫秒,以避免过快请求
if "City 0" in address: # 模拟某些地址可能失败
# raise ValueError(f"API Error for address: {address}")
return f"ERROR: {address}"
return f"Lat: {hash(address) % 90}, Lng: {hash(address) % 180}"
# 存储最终结果的列表
# 也可以直接写入CSV,下面会介绍两种方式
processed_batches = []
output_csv_path = 'processed_data_batched.csv'
# 如果输出文件已存在,先删除,确保从头开始
if os.path.exists(output_csv_path):
os.remove(output_csv_path)
print(f"已删除旧的输出文件: {output_csv_path}")
# 遍历所有唯一的批次号
unique_batches = df['batch_num'].unique()
for i, batch_id in enumerate(unique_batches):
print(f"正在处理批次 {i+1}/{len(unique_batches)} (批次号: {batch_id})...")
# 提取当前批次的DataFrame
# 使用 .copy() 避免SettingWithCopyWarning
current_batch_df = df[df['batch_num'] == batch_id].copy()
# --- 在此处对 current_batch_df 执行您的操作 ---
# 1. 模拟 df.merge 操作 (例如,与另一个小表合并)
# 假设有一个小的查找表
lookup_data = pd.DataFrame({
'id': range(data_size),
'category': [f'Cat_{i % 5}' for i in range(data_size)]
})
# 只合并当前批次所需的查找数据
current_batch_df = pd.merge(current_batch_df, lookup_data[['id', 'category']], on='id', how='left')
# 2. 模拟 df.apply 操作,其中包含外部API调用
# 针对 'address' 列调用模拟的Google Maps API
try:
current_batch_df['coordinates'] = current_batch_df['address'].apply(call_google_maps_api)
except Exception as e:
print(f"批次 {batch_id} API调用失败: {e}")
# 可以在这里实现重试逻辑或记录错误
current_batch_df['coordinates'] = "API_CALL_FAILED" # 标记失败
# 3. 其他数据转换或计算
current_batch_df['calculated_col'] = current_batch_df['col_a'] * 2
# --- 批次处理结束 ---
# 将处理后的批次数据添加到列表中
# processed_batches.append(current_batch_df)
# 替代方案:直接将批次结果写入CSV文件
# 对于第一个批次,写入头部;对于后续批次,不写入头部并以追加模式写入
if i == 0:
current_batch_df.to_csv(output_csv_path, mode='w', index=False, header=True, encoding='utf-8')
else:
current_batch_df.to_csv(output_csv_path, mode='a', index=False, header=False, encoding='utf-8')
# 释放内存 (可选,对于极大的DataFrame可能有用)
del current_batch_df
import gc
gc.collect()
print("\n所有批次处理完成!")
# 如果您选择将所有批次收集到列表中,最后再合并
# final_df = pd.concat(processed_batches, ignore_index=True)
# print(f"最终合并的DataFrame大小: {len(final_df)} 行")
# final_df.to_csv(output_csv_path, index=False, encoding='utf-8')
print(f"处理后的数据已保存到: {output_csv_path}")
final_df_check = pd.read_csv(output_csv_path)
print(f"从CSV读取的数据行数: {len(final_df_check)}")在上述示例中,我们展示了两种处理批次结果的方式:
当分批处理涉及到外部API调用时,必须特别注意API的速率限制(Rate Limiting)和错误处理。
通过采用分批处理策略,我们能够有效地管理大型DataFrame的数据处理任务,避免因内存或API限制导致的程序崩溃。核心在于将数据分解为可管理的批次,并在每个批次内部执行所需的合并、计算和API调用。结合适当的API请求管理和错误处理机制,以及将结果增量写入文件,可以显著提升数据处理的稳定性和效率,确保即使面对海量数据和外部服务依赖,也能顺利完成任务。
每个人都需要一台速度更快、更稳定的 PC。随着时间的推移,垃圾文件、旧注册表数据和不必要的后台进程会占用资源并降低性能。幸运的是,许多工具可以让 Windows 保持平稳运行。
Copyright 2014-2025 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号