
在处理包含数十万甚至数百万行数据的大型pandas dataframe时,直接对整个数据集执行复杂操作,如df.merge、df.apply,尤其是涉及外部api调用(例如google maps api)时,常常会导致程序崩溃、内存溢出或因api限流而耗时过长。为了解决这些问题,采用分批处理(batch processing)是一种高效且稳健的策略。本文将详细介绍如何将大型dataframe分批处理,并优化外部api调用,实现数据的高效与稳定处理。
大型DataFrame在内存中占用大量资源,一次性加载和处理可能超出系统内存限制。同时,对每一行或每一组数据发起独立的外部API请求,会面临以下问题:
分批处理的核心思想是将大型DataFrame分解成若干个较小的、可管理的子集(批次),然后对每个批次独立进行处理。这种方法带来了多重优势:
我们将通过一个模拟场景来演示如何分批处理大型DataFrame,其中包含模拟的apply操作和外部API调用,并将结果增量写入CSV文件。
首先,我们创建一个大型的模拟DataFrame,包含一个需要通过API获取信息的“地址”列。
import pandas as pd
import numpy as np
import time
import os
# 创建一个大型模拟DataFrame
data_size = 500000 # 50万行数据
df = pd.DataFrame({
'id': range(data_size),
'value1': np.random.rand(data_size) * 100,
'value2': np.random.randint(1, 1000, data_size),
'address': [f"模拟地址 {i}, 城市A, 国家B" for i in range(data_size)] # 模拟地址信息
})
print(f"原始DataFrame大小: {len(df)} 行")确定一个合适的批次大小(例如100行或1000行),然后为DataFrame中的每一行分配一个批次编号。
batch_size = 1000 # 每批处理1000行
df['batch_num'] = df.index // batch_size
# 打印批次信息
print(f"数据将被分割成 {df['batch_num'].nunique()} 个批次,每批 {batch_size} 行。")定义一个函数来模拟外部API调用,并引入延迟以模拟网络请求和API限流。同时,定义一个函数来处理每个批次的数据,包括apply操作和API调用。
# 模拟外部API调用函数
def get_coordinates_from_address(address):
"""
模拟一个外部API调用,根据地址获取经纬度。
在实际应用中,这里会是调用Google Maps API等。
"""
time.sleep(0.01) # 模拟API请求的延迟,例如10毫秒
# 模拟返回经纬度数据
lat = np.random.uniform(30, 40)
lon = np.random.uniform(-100, -90)
return f"Lat:{lat:.4f}, Lon:{lon:.4f}"
# 定义批次数据处理函数
def process_data_chunk(chunk_df):
"""
对单个数据批次执行复杂的apply操作和API调用。
"""
# 示例1: 执行一个复杂的apply操作
chunk_df['processed_value'] = chunk_df['value1'] * 0.5 + chunk_df['value2'] / 10
# 示例2: 对地址列进行API调用
# 注意:如果API支持批量查询,应优先使用批量查询以减少网络开销
# 这里为了演示,我们假设API是按行调用的
chunk_df['coordinates'] = chunk_df['address'].apply(get_coordinates_from_address)
# 示例3: 模拟一个merge操作 (如果需要与其他DataFrame合并)
# 假设有一个小型配置DataFrame需要合并
# config_df = pd.DataFrame({'id': [0, 1, 2], 'config_info': ['A', 'B', 'C']})
# chunk_df = pd.merge(chunk_df, config_df, on='id', how='left')
return chunk_df现在,我们可以遍历所有批次,对每个批次进行处理,并将结果增量写入同一个CSV文件。
output_csv_path = 'processed_large_dataframe.csv'
# 如果输出文件已存在,先删除,确保从头开始写入
if os.path.exists(output_csv_path):
os.remove(output_csv_path)
print(f"已删除旧的输出文件: {output_csv_path}")
header_written = False # 标记是否已写入CSV头部
print(f"\n开始分批处理 {len(df)} 行数据并写入 {output_csv_path}...")
unique_batches = df['batch_num'].unique()
total_batches = len(unique_batches)
for i, batch_id in enumerate(unique_batches):
# 提取当前批次的数据
current_batch_df = df[df['batch_num'] == batch_id].copy() # 使用 .copy() 避免 SettingWithCopyWarning
print(f"正在处理批次 {i+1}/{total_batches} (行范围: {current_batch_df.index.min()} - {current_batch_df.index.max()})")
# 处理当前批次的数据
processed_batch = process_data_chunk(current_batch_df)
# 将处理后的批次数据写入CSV文件
if not header_written:
# 首次写入,包含头部
processed_batch.to_csv(output_csv_path, mode='w', index=False, encoding='utf-8')
header_written = True
else:
# 后续写入,不包含头部,以追加模式写入
processed_batch.to_csv(output_csv_path, mode='a', header=False, index=False, encoding='utf-8')
# 可选:在批次之间引入额外的延迟,以更严格地遵守API速率限制
# time.sleep(0.5) # 例如,每处理完一个批次暂停0.5秒
print(f"\n所有批次处理完成,结果已写入 {output_csv_path}")
# 验证写入结果 (可选)
# processed_df = pd.read_csv(output_csv_path)
# print(f"\n从CSV读取的数据总行数: {len(processed_df)}")
# print("前5行数据示例:")
# print(processed_df.head())通过将大型Pandas DataFrame分解为可管理的小批次进行处理,我们能够有效地规避内存限制、遵守API速率,并提高数据处理的整体稳定性和效率。这种分批处理结合增量写入的策略,是处理海量数据和外部服务交互时的最佳实践之一,尤其适用于那些需要长时间运行且对资源消耗敏感的数据管道。遵循本文提供的指南和代码示例,您可以构建出更加健壮和高效的数据处理解决方案。
以上就是高效处理大型DataFrame:Pandas分批操作与外部API请求管理的详细内容,更多请关注php中文网其它相关文章!
每个人都需要一台速度更快、更稳定的 PC。随着时间的推移,垃圾文件、旧注册表数据和不必要的后台进程会占用资源并降低性能。幸运的是,许多工具可以让 Windows 保持平稳运行。
Copyright 2014-2025 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号