
在处理包含数十万甚至数百万行数据的大型pandas dataframe时,直接执行全局操作(如df.merge()、df.apply())或对每一行进行外部api请求,常常会导致以下问题:
分批处理(Batch Processing)是解决这些问题的有效策略,它将大型任务分解为更小、更易管理的子任务。
分批处理的核心思想是将一个庞大的DataFrame逻辑上或物理上拆分成多个较小的子DataFrame(即“批次”)。然后,对每个批次独立执行所需的操作(如合并、应用函数、API请求),并将每个批次的结果进行收集或即时保存。
这种方法的好处在于:
下面将通过一个具体的Python Pandas示例,演示如何对大型DataFrame进行分批处理,并模拟merge、apply操作以及外部API请求。
import pandas as pd
from sklearn.datasets import load_diabetes # 用于生成示例数据
import time # 用于模拟API请求延迟
import os # 用于文件路径操作
# --- 1. 数据准备与模拟 ---
# 假设我们有一个大型DataFrame
# 这里使用sklearn的diabetes数据集模拟,实际中替换为你的数据
df_large = pd.DataFrame(load_diabetes().data, columns=load_diabetes().feature_names)
# 为了模拟合并操作,添加一个唯一ID列
df_large['record_id'] = range(len(df_large))
# 模拟另一个需要合并的DataFrame
df_other = pd.DataFrame({
    'record_id': range(len(df_large)),
    'additional_info': [f'info_for_record_{i}' for i in range(len(df_large))]
})
# --- 2. 定义分批大小 ---
batch_size = 100 # 每批处理100行数据
# --- 3. 为DataFrame添加批次号列 ---
# 使用整数除法 // 来为每行分配一个批次号
df_large['batch_num'] = df_large.index // batch_size
# --- 4. 存储结果的准备 ---
# 可以选择将每个批次的结果追加到CSV文件,或先收集到列表中再合并
output_csv_path = 'processed_data_batched.csv'
# 如果文件已存在,先删除,确保从新开始
if os.path.exists(output_csv_path):
    os.remove(output_csv_path)
print(f"开始处理大型DataFrame,总行数: {len(df_large)},批次大小: {batch_size}")
print(f"预计总批次数: {df_large['batch_num'].nunique()}")
# --- 5. 遍历批次并执行操作 ---
# 使用groupby('batch_num')可以方便地迭代每个批次
for i, batch_df in df_large.groupby('batch_num'):
    current_batch_number = i + 1
    total_batches = df_large['batch_num'].nunique()
    print(f"\n--- 正在处理批次 {current_batch_number}/{total_batches} (行索引 {batch_df.index.min()} 到 {batch_df.index.max()}) ---")
    # --- 5.1 模拟 df.merge 操作 ---
    # 假设我们需要将 df_other 中的信息合并到当前批次
    # 注意:如果 df_other 也很大,可能需要对其进行预处理或优化查询
    batch_df = pd.merge(batch_df, df_other[['record_id', 'additional_info']], on='record_id', how='left')
    print(f"批次 {current_batch_number} 完成合并操作。")
    # --- 5.2 模拟 df.apply 操作 ---
    # 例如,对某一列进行复杂的数值转换或字符串处理
    def complex_calculation(row_data):
        # 实际中这里会是更复杂的业务逻辑
        return row_data['bmi'] * row_data['s1'] / 100 + 5
    batch_df['calculated_feature'] = batch_df.apply(complex_calculation, axis=1)
    print(f"批次 {current_batch_number} 完成 apply 操作。")
    # --- 5.3 模拟对外部API的请求 ---
    # 假设你需要根据批次中的每一行数据调用一个外部API(如Google Maps)
    def call_external_api(row_data):
        # 实际中这里会是 requests.get('your_api_endpoint', params={'param': row_data['some_column']})
        # 为了避免短时间内发送过多请求,这里引入延迟
        time.sleep(0.05) # 模拟API请求延迟,并控制速率
        return f"API_result_for_record_{row_data['record_id']}"
    # 对批次中的每一行调用API
    batch_df['api_response'] = batch_df.apply(call_external_api, axis=1)
    print(f"批次 {current_batch_number} 完成 {len(batch_df)} 个API请求。")
    # --- 5.4 保存当前批次结果 ---
    # 将当前批次的处理结果追加到CSV文件
    # 对于第一个批次,写入标题行;后续批次只追加数据
    if i == 0:
        batch_df.to_csv(output_csv_path, mode='w', index=False, header=True)
    else:
        batch_df.to_csv(output_csv_path, mode='a', index=False, header=False)
    print(f"批次 {current_batch_number} 结果已保存到 {output_csv_path}")
print("\n所有批次处理完成。")
# --- 6. 最终验证(可选) ---
# 如果需要,可以重新加载整个处理后的文件进行最终检查
final_processed_df = pd.read_csv(output_csv_path)
print("\n最终处理后的数据预览:")
print(final_processed_df.head())
print(f"最终文件包含 {len(final_processed_df)} 行数据。")代码详解:
通过将大型Pandas DataFrame操作和外部API请求分解为可管理的小批次,我们可以有效规避内存限制、API速率限制,并显著提高数据处理的鲁棒性和效率。本教程提供的分批处理策略和代码示例,为处理大数据量和集成外部服务提供了实用的解决方案。在实际应用中,根据具体场景调整批次大小、API请求策略和错误处理机制,将能够构建出更加稳定和高效的数据处理流程。
以上就是大型Pandas DataFrame分批处理策略与API请求优化的详细内容,更多请关注php中文网其它相关文章!
                        
                        每个人都需要一台速度更快、更稳定的 PC。随着时间的推移,垃圾文件、旧注册表数据和不必要的后台进程会占用资源并降低性能。幸运的是,许多工具可以让 Windows 保持平稳运行。
                Copyright 2014-2025 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号