0

0

高效处理大型DataFrame:Pandas分批操作与外部API请求管理

心靈之曲

心靈之曲

发布时间:2025-09-17 11:47:23

|

503人浏览过

|

来源于php中文网

原创

高效处理大型DataFrame:Pandas分批操作与外部API请求管理

针对大型Pandas DataFrame在执行merge、apply操作及调用外部API时遇到的性能和稳定性问题,本文提供了一种分批处理策略。通过将DataFrame分割成小块,逐批处理数据并管理API请求速率,有效避免内存溢出和API限流,确保数据处理流程的顺畅与高效,并支持结果的增量写入。

在处理包含数十万甚至数百万行数据的大型pandas dataframe时,直接对整个数据集执行复杂操作,如df.merge、df.apply,尤其是涉及外部api调用(例如google maps api)时,常常会导致程序崩溃、内存溢出或因api限流而耗时过长。为了解决这些问题,采用分批处理(batch processing)是一种高效且稳健的策略。本文将详细介绍如何将大型dataframe分批处理,并优化外部api调用,实现数据的高效与稳定处理。

一、挑战:大型DataFrame与外部API调用

大型DataFrame在内存中占用大量资源,一次性加载和处理可能超出系统内存限制。同时,对每一行或每一组数据发起独立的外部API请求,会面临以下问题:

  • API速率限制(Rate Limiting):大多数公共API都有每秒、每分钟或每天的请求次数限制。短时间内大量请求会导致API拒绝服务。
  • 网络延迟与开销:每次API调用都涉及网络通信,累积的延迟会显著增加总处理时间。
  • 程序稳定性:长时间运行的复杂操作更容易因内存不足、网络瞬断或其他未知错误而崩溃,且难以恢复。

二、核心策略:数据分批处理

分批处理的核心思想是将大型DataFrame分解成若干个较小的、可管理的子集(批次),然后对每个批次独立进行处理。这种方法带来了多重优势:

  1. 内存优化:每次只加载和处理一个批次的数据,显著降低内存占用
  2. API限流管理:可以在处理每个批次之间引入延迟,以遵守API的速率限制。
  3. 提高容错性:如果某个批次处理失败,可以更容易地识别问题并重新处理该批次,而不是从头开始。
  4. 增量写入:处理完一个批次后,可以立即将结果写入文件(如CSV),即使程序中断,已处理的数据也不会丢失。

三、实现分批处理的步骤与示例

我们将通过一个模拟场景来演示如何分批处理大型DataFrame,其中包含模拟的apply操作和外部API调用,并将结果增量写入CSV文件。

1. 创建模拟数据

首先,我们创建一个大型的模拟DataFrame,包含一个需要通过API获取信息的“地址”列。

import pandas as pd
import numpy as np
import time
import os

# 创建一个大型模拟DataFrame
data_size = 500000 # 50万行数据
df = pd.DataFrame({
    'id': range(data_size),
    'value1': np.random.rand(data_size) * 100,
    'value2': np.random.randint(1, 1000, data_size),
    'address': [f"模拟地址 {i}, 城市A, 国家B" for i in range(data_size)] # 模拟地址信息
})

print(f"原始DataFrame大小: {len(df)} 行")

2. 定义批次大小并标记批次

确定一个合适的批次大小(例如100行或1000行),然后为DataFrame中的每一行分配一个批次编号。

腾讯AI 开放平台
腾讯AI 开放平台

腾讯AI开放平台

下载
batch_size = 1000 # 每批处理1000行
df['batch_num'] = df.index // batch_size

# 打印批次信息
print(f"数据将被分割成 {df['batch_num'].nunique()} 个批次,每批 {batch_size} 行。")

3. 模拟外部API调用与数据处理函数

定义一个函数来模拟外部API调用,并引入延迟以模拟网络请求和API限流。同时,定义一个函数来处理每个批次的数据,包括apply操作和API调用。

# 模拟外部API调用函数
def get_coordinates_from_address(address):
    """
    模拟一个外部API调用,根据地址获取经纬度。
    在实际应用中,这里会是调用Google Maps API等。
    """
    time.sleep(0.01) # 模拟API请求的延迟,例如10毫秒
    # 模拟返回经纬度数据
    lat = np.random.uniform(30, 40)
    lon = np.random.uniform(-100, -90)
    return f"Lat:{lat:.4f}, Lon:{lon:.4f}"

# 定义批次数据处理函数
def process_data_chunk(chunk_df):
    """
    对单个数据批次执行复杂的apply操作和API调用。
    """
    # 示例1: 执行一个复杂的apply操作
    chunk_df['processed_value'] = chunk_df['value1'] * 0.5 + chunk_df['value2'] / 10

    # 示例2: 对地址列进行API调用
    # 注意:如果API支持批量查询,应优先使用批量查询以减少网络开销
    # 这里为了演示,我们假设API是按行调用的
    chunk_df['coordinates'] = chunk_df['address'].apply(get_coordinates_from_address)

    # 示例3: 模拟一个merge操作 (如果需要与其他DataFrame合并)
    # 假设有一个小型配置DataFrame需要合并
    # config_df = pd.DataFrame({'id': [0, 1, 2], 'config_info': ['A', 'B', 'C']})
    # chunk_df = pd.merge(chunk_df, config_df, on='id', how='left')

    return chunk_df

4. 迭代批次并增量写入

现在,我们可以遍历所有批次,对每个批次进行处理,并将结果增量写入同一个CSV文件。

output_csv_path = 'processed_large_dataframe.csv'

# 如果输出文件已存在,先删除,确保从头开始写入
if os.path.exists(output_csv_path):
    os.remove(output_csv_path)
    print(f"已删除旧的输出文件: {output_csv_path}")

header_written = False # 标记是否已写入CSV头部

print(f"\n开始分批处理 {len(df)} 行数据并写入 {output_csv_path}...")

unique_batches = df['batch_num'].unique()
total_batches = len(unique_batches)

for i, batch_id in enumerate(unique_batches):
    # 提取当前批次的数据
    current_batch_df = df[df['batch_num'] == batch_id].copy() # 使用 .copy() 避免 SettingWithCopyWarning

    print(f"正在处理批次 {i+1}/{total_batches} (行范围: {current_batch_df.index.min()} - {current_batch_df.index.max()})")

    # 处理当前批次的数据
    processed_batch = process_data_chunk(current_batch_df)

    # 将处理后的批次数据写入CSV文件
    if not header_written:
        # 首次写入,包含头部
        processed_batch.to_csv(output_csv_path, mode='w', index=False, encoding='utf-8')
        header_written = True
    else:
        # 后续写入,不包含头部,以追加模式写入
        processed_batch.to_csv(output_csv_path, mode='a', header=False, index=False, encoding='utf-8')

    # 可选:在批次之间引入额外的延迟,以更严格地遵守API速率限制
    # time.sleep(0.5) # 例如,每处理完一个批次暂停0.5秒

print(f"\n所有批次处理完成,结果已写入 {output_csv_path}")

# 验证写入结果 (可选)
# processed_df = pd.read_csv(output_csv_path)
# print(f"\n从CSV读取的数据总行数: {len(processed_df)}")
# print("前5行数据示例:")
# print(processed_df.head())

四、注意事项与优化建议

  1. 选择合适的批次大小:批次大小的选择取决于您的系统内存、API限流策略以及操作的复杂性。过小的批次会增加迭代和文件I/O的开销;过大的批次则可能再次引入内存或API问题。建议通过实验找到最佳平衡点。
  2. API限流与错误处理
    • 延迟:在批次处理之间添加 time.sleep() 是最直接的限流方式。
    • 重试机制:对于外部API调用,应实现健壮的重试逻辑,例如使用 tenacity 库,在API返回429(Too Many Requests)或5xx错误时自动重试。
    • 批量API请求:如果API支持,优先使用批量请求接口,一次性发送多个数据点的请求,这能显著减少网络往返次数和总延迟。
  3. 增量写入的考量
    • 文件模式:mode='w' 用于首次写入(创建文件并写入头部),mode='a' 用于后续追加(不写入头部)。
    • 索引:index=False 避免将DataFrame的索引作为一列写入CSV。
    • 编码:指定 encoding='utf-8' 以避免字符编码问题。
  4. 并行处理:对于CPU密集型或大量API请求的场景,可以考虑使用 multiprocessing 模块将批次处理任务分配给多个CPU核心或进程并行执行。但这会增加代码复杂性,且需要更精细的API限流管理。
  5. 内存管理:在处理完一个批次后,如果不再需要原始批次数据,可以考虑使用 del 语句释放内存,或者确保变量超出作用域后被垃圾回收。
  6. 中间结果保存:如果处理流程非常漫长,可以考虑在每个批次处理后,不仅写入最终结果,还保存一些关键的中间状态,以便在程序崩溃后能从最近的检查点恢复。

五、总结

通过将大型Pandas DataFrame分解为可管理的小批次进行处理,我们能够有效地规避内存限制、遵守API速率,并提高数据处理的整体稳定性和效率。这种分批处理结合增量写入的策略,是处理海量数据和外部服务交互时的最佳实践之一,尤其适用于那些需要长时间运行且对资源消耗敏感的数据管道。遵循本文提供的指南和代码示例,您可以构建出更加健壮和高效的数据处理解决方案。

相关专题

更多
Python 时间序列分析与预测
Python 时间序列分析与预测

本专题专注讲解 Python 在时间序列数据处理与预测建模中的实战技巧,涵盖时间索引处理、周期性与趋势分解、平稳性检测、ARIMA/SARIMA 模型构建、预测误差评估,以及基于实际业务场景的时间序列项目实操,帮助学习者掌握从数据预处理到模型预测的完整时序分析能力。

51

2025.12.04

硬盘接口类型介绍
硬盘接口类型介绍

硬盘接口类型有IDE、SATA、SCSI、Fibre Channel、USB、eSATA、mSATA、PCIe等等。详细介绍:1、IDE接口是一种并行接口,主要用于连接硬盘和光驱等设备,它主要有两种类型:ATA和ATAPI,IDE接口已经逐渐被SATA接口;2、SATA接口是一种串行接口,相较于IDE接口,它具有更高的传输速度、更低的功耗和更小的体积;3、SCSI接口等等。

1014

2023.10.19

PHP接口编写教程
PHP接口编写教程

本专题整合了PHP接口编写教程,阅读专题下面的文章了解更多详细内容。

60

2025.10.17

php8.4实现接口限流的教程
php8.4实现接口限流的教程

PHP8.4本身不内置限流功能,需借助Redis(令牌桶)或Swoole(漏桶)实现;文件锁因I/O瓶颈、无跨机共享、秒级精度等缺陷不适用高并发场景。本专题为大家提供相关的文章、下载、课程内容,供大家免费下载体验。

375

2025.12.29

Java 项目构建与依赖管理(Maven / Gradle)
Java 项目构建与依赖管理(Maven / Gradle)

本专题系统讲解 Java 项目构建与依赖管理的完整体系,重点覆盖 Maven 与 Gradle 的核心概念、项目生命周期、依赖冲突解决、多模块项目管理、构建加速与版本发布规范。通过真实项目结构示例,帮助学习者掌握 从零搭建、维护到发布 Java 工程的标准化流程,提升在实际团队开发中的工程能力与协作效率。

10

2026.01.12

c++主流开发框架汇总
c++主流开发框架汇总

本专题整合了c++开发框架推荐,阅读专题下面的文章了解更多详细内容。

106

2026.01.09

c++框架学习教程汇总
c++框架学习教程汇总

本专题整合了c++框架学习教程汇总,阅读专题下面的文章了解更多详细内容。

64

2026.01.09

学python好用的网站推荐
学python好用的网站推荐

本专题整合了python学习教程汇总,阅读专题下面的文章了解更多详细内容。

139

2026.01.09

学python网站汇总
学python网站汇总

本专题整合了学python网站汇总,阅读专题下面的文章了解更多详细内容。

13

2026.01.09

热门下载

更多
网站特效
/
网站源码
/
网站素材
/
前端模板

精品课程

更多
相关推荐
/
热门推荐
/
最新课程
Go 教程
Go 教程

共32课时 | 3.6万人学习

Go语言实战之 GraphQL
Go语言实战之 GraphQL

共10课时 | 0.8万人学习

关于我们 免责申明 举报中心 意见反馈 讲师合作 广告合作 最新更新
php中文网:公益在线php培训,帮助PHP学习者快速成长!
关注服务号 技术交流群
PHP中文网订阅号
每天精选资源文章推送

Copyright 2014-2026 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号