0

0

Pandas大数据框分批处理与外部API调用优化实践

心靈之曲

心靈之曲

发布时间:2025-09-17 11:54:01

|

832人浏览过

|

来源于php中文网

原创

Pandas大数据框分批处理与外部API调用优化实践

本教程旨在解决Pandas处理大型DataFrame时,因内存限制或外部API请求频率过高导致的程序崩溃及性能瓶颈问题。核心策略是通过将大数据集逻辑地划分为小批次进行独立处理,并演示如何高效地执行数据合并、应用自定义函数以及管理外部API调用,最终将分批处理结果统一写入目标文件,从而提升数据处理的稳定性和效率。

1. 理解大数据处理挑战

在处理包含数十万甚至数百万行数据的大型pandas dataframe时,直接对整个数据集执行复杂操作(如df.merge、df.apply)或频繁调用外部api(如google maps api),极易引发内存溢出、程序崩溃或因api请求频率过高而被限流等问题。尤其当每次api调用耗时且有严格的速率限制时,一次性处理所有数据几乎不可行。解决这些问题的关键在于采用分批处理(batch processing)策略。

2. 核心策略:数据分批处理

分批处理的核心思想是将一个庞大的数据集分解成若干个大小可控的小数据集(批次),然后逐一处理这些批次。这种方法不仅能有效降低单次操作的内存消耗,还能更好地管理外部API的调用频率。

2.1 分批逻辑实现

我们可以利用DataFrame的索引(df.index)结合整数除法(//)来为每一行分配一个批次编号。例如,如果希望每100行作为一个批次,那么df.index // 100就能生成相应的批次号。

以下是一个演示如何创建批次并迭代处理的示例代码:

import pandas as pd
import numpy as np
import time
import os

# 模拟一个大型DataFrame
# 实际应用中,这里会是您加载的50万行数据
data_size = 500000
df = pd.DataFrame({
    'id': range(data_size),
    'col_a': np.random.rand(data_size) * 100,
    'address': [f'Address {i}, City {i % 100}' for i in range(data_size)],
    'value_b': np.random.randint(0, 1000, data_size)
})

print(f"原始DataFrame大小: {len(df)} 行")

# 定义批次大小
batch_size = 100

# 为DataFrame中的每一行生成批次号
df['batch_num'] = df.index // batch_size

# 模拟一个外部API调用函数
def call_google_maps_api(address):
    """
    模拟调用Google Maps API,获取经纬度
    实际应用中,这里会是您的requests.get()调用
    """
    # 模拟网络延迟和API处理时间
    time.sleep(0.05) # 每次调用暂停50毫秒,以避免过快请求
    if "City 0" in address: # 模拟某些地址可能失败
        # raise ValueError(f"API Error for address: {address}")
        return f"ERROR: {address}"
    return f"Lat: {hash(address) % 90}, Lng: {hash(address) % 180}"

# 存储最终结果的列表
# 也可以直接写入CSV,下面会介绍两种方式
processed_batches = []
output_csv_path = 'processed_data_batched.csv'

# 如果输出文件已存在,先删除,确保从头开始
if os.path.exists(output_csv_path):
    os.remove(output_csv_path)
    print(f"已删除旧的输出文件: {output_csv_path}")

# 遍历所有唯一的批次号
unique_batches = df['batch_num'].unique()
for i, batch_id in enumerate(unique_batches):
    print(f"正在处理批次 {i+1}/{len(unique_batches)} (批次号: {batch_id})...")

    # 提取当前批次的DataFrame
    # 使用 .copy() 避免SettingWithCopyWarning
    current_batch_df = df[df['batch_num'] == batch_id].copy()

    # --- 在此处对 current_batch_df 执行您的操作 ---

    # 1. 模拟 df.merge 操作 (例如,与另一个小表合并)
    # 假设有一个小的查找表
    lookup_data = pd.DataFrame({
        'id': range(data_size),
        'category': [f'Cat_{i % 5}' for i in range(data_size)]
    })
    # 只合并当前批次所需的查找数据
    current_batch_df = pd.merge(current_batch_df, lookup_data[['id', 'category']], on='id', how='left')

    # 2. 模拟 df.apply 操作,其中包含外部API调用
    # 针对 'address' 列调用模拟的Google Maps API
    try:
        current_batch_df['coordinates'] = current_batch_df['address'].apply(call_google_maps_api)
    except Exception as e:
        print(f"批次 {batch_id} API调用失败: {e}")
        # 可以在这里实现重试逻辑或记录错误
        current_batch_df['coordinates'] = "API_CALL_FAILED" # 标记失败

    # 3. 其他数据转换或计算
    current_batch_df['calculated_col'] = current_batch_df['col_a'] * 2

    # --- 批次处理结束 ---

    # 将处理后的批次数据添加到列表中
    # processed_batches.append(current_batch_df)

    # 替代方案:直接将批次结果写入CSV文件
    # 对于第一个批次,写入头部;对于后续批次,不写入头部并以追加模式写入
    if i == 0:
        current_batch_df.to_csv(output_csv_path, mode='w', index=False, header=True, encoding='utf-8')
    else:
        current_batch_df.to_csv(output_csv_path, mode='a', index=False, header=False, encoding='utf-8')

    # 释放内存 (可选,对于极大的DataFrame可能有用)
    del current_batch_df
    import gc
    gc.collect()

print("\n所有批次处理完成!")

# 如果您选择将所有批次收集到列表中,最后再合并
# final_df = pd.concat(processed_batches, ignore_index=True)
# print(f"最终合并的DataFrame大小: {len(final_df)} 行")
# final_df.to_csv(output_csv_path, index=False, encoding='utf-8')

print(f"处理后的数据已保存到: {output_csv_path}")
final_df_check = pd.read_csv(output_csv_path)
print(f"从CSV读取的数据行数: {len(final_df_check)}")

2.2 处理流程与结果合并

在上述示例中,我们展示了两种处理批次结果的方式:

火山方舟
火山方舟

火山引擎一站式大模型服务平台,已接入满血版DeepSeek

下载
  1. 收集到列表再合并(注释掉的部分):将每个处理后的current_batch_df添加到processed_batches列表中。在所有批次处理完毕后,使用pd.concat(processed_batches, ignore_index=True)将所有批次合并成一个完整的DataFrame。这种方式适用于最终需要一个完整DataFrame进行后续操作的场景,但会占用更多内存。
  2. 直接追加写入CSV:这是处理大数据集更推荐的方法,尤其是在内存受限或最终目标是生成一个CSV文件时。
    • 对于第一个批次(i == 0),使用mode='w'(写入模式)和header=True来创建文件并写入列头。
    • 对于后续批次(i > 0),使用mode='a'(追加模式)和header=False来将数据追加到现有文件末尾,且不再写入列头。 这种方式能有效避免将所有处理结果同时加载到内存中,从而节省大量内存资源。

3. 外部API请求管理

当分批处理涉及到外部API调用时,必须特别注意API的速率限制(Rate Limiting)和错误处理。

  • 速率限制:大多数API都有每秒、每分钟或每天的请求次数限制。在上述示例中,我们通过time.sleep(0.05)模拟了每次API调用后的延迟,以控制请求频率。实际应用中,您可能需要根据API提供商的具体要求,设置更长的延迟或实现指数退避(Exponential Backoff)策略,即在API返回错误(如429 Too Many Requests)时,等待更长时间再重试。
  • 错误处理:将API调用放在try-except块中,可以捕获网络错误、API响应错误等,并进行相应的处理,例如记录错误、跳过当前条目、使用默认值或实现重试机制。
  • 缓存中间结果:对于耗时或高频调用的API,可以考虑在本地缓存API响应。如果同一地址被多次查询,可以直接从缓存中获取结果,减少实际的API请求。

4. 注意事项与优化建议

  • 批次大小的选择:没有一劳永逸的批次大小。它取决于您的系统内存、数据复杂性、API限制和处理逻辑的计算成本。通常建议从一个较小的批次(如1000或5000行)开始测试,逐步调整以找到最佳平衡点。
  • 内存管理:在每次循环结束时,如果current_batch_df不再需要,可以显式使用del current_batch_df并调用gc.collect()来帮助Python的垃圾回收器释放内存。
  • 进度保存与恢复:对于耗时数小时甚至数天的任务,考虑在每次批次处理完成后,记录已处理的批次号或将中间结果保存到临时文件。这样,如果程序意外中断,您可以从中断点恢复,而不是从头开始。
  • 并行处理(高级):如果API调用是I/O密集型且可以并行执行,可以考虑使用Python的multiprocessing或concurrent.futures模块来并行处理多个批次。但这会增加代码复杂性,并需要更精细的API速率控制。
  • 异常处理和日志记录:在实际生产环境中,为API调用和数据处理逻辑添加详细的异常处理和日志记录,有助于调试和监控程序的运行状态。

5. 总结

通过采用分批处理策略,我们能够有效地管理大型DataFrame的数据处理任务,避免因内存或API限制导致的程序崩溃。核心在于将数据分解为可管理的批次,并在每个批次内部执行所需的合并、计算和API调用。结合适当的API请求管理和错误处理机制,以及将结果增量写入文件,可以显著提升数据处理的稳定性和效率,确保即使面对海量数据和外部服务依赖,也能顺利完成任务。

相关专题

更多
python开发工具
python开发工具

php中文网为大家提供各种python开发工具,好的开发工具,可帮助开发者攻克编程学习中的基础障碍,理解每一行源代码在程序执行时在计算机中的过程。php中文网还为大家带来python相关课程以及相关文章等内容,供大家免费下载使用。

746

2023.06.15

python打包成可执行文件
python打包成可执行文件

本专题为大家带来python打包成可执行文件相关的文章,大家可以免费的下载体验。

634

2023.07.20

python能做什么
python能做什么

python能做的有:可用于开发基于控制台的应用程序、多媒体部分开发、用于开发基于Web的应用程序、使用python处理数据、系统编程等等。本专题为大家提供python相关的各种文章、以及下载和课程。

758

2023.07.25

format在python中的用法
format在python中的用法

Python中的format是一种字符串格式化方法,用于将变量或值插入到字符串中的占位符位置。通过format方法,我们可以动态地构建字符串,使其包含不同值。php中文网给大家带来了相关的教程以及文章,欢迎大家前来阅读学习。

617

2023.07.31

python教程
python教程

Python已成为一门网红语言,即使是在非编程开发者当中,也掀起了一股学习的热潮。本专题为大家带来python教程的相关文章,大家可以免费体验学习。

1261

2023.08.03

python环境变量的配置
python环境变量的配置

Python是一种流行的编程语言,被广泛用于软件开发、数据分析和科学计算等领域。在安装Python之后,我们需要配置环境变量,以便在任何位置都能够访问Python的可执行文件。php中文网给大家带来了相关的教程以及文章,欢迎大家前来学习阅读。

547

2023.08.04

python eval
python eval

eval函数是Python中一个非常强大的函数,它可以将字符串作为Python代码进行执行,实现动态编程的效果。然而,由于其潜在的安全风险和性能问题,需要谨慎使用。php中文网给大家带来了相关的教程以及文章,欢迎大家前来学习阅读。

577

2023.08.04

scratch和python区别
scratch和python区别

scratch和python的区别:1、scratch是一种专为初学者设计的图形化编程语言,python是一种文本编程语言;2、scratch使用的是基于积木的编程语法,python采用更加传统的文本编程语法等等。本专题为大家提供scratch和python相关的文章、下载、课程内容,供大家免费下载体验。

705

2023.08.11

Java 项目构建与依赖管理(Maven / Gradle)
Java 项目构建与依赖管理(Maven / Gradle)

本专题系统讲解 Java 项目构建与依赖管理的完整体系,重点覆盖 Maven 与 Gradle 的核心概念、项目生命周期、依赖冲突解决、多模块项目管理、构建加速与版本发布规范。通过真实项目结构示例,帮助学习者掌握 从零搭建、维护到发布 Java 工程的标准化流程,提升在实际团队开发中的工程能力与协作效率。

10

2026.01.12

热门下载

更多
网站特效
/
网站源码
/
网站素材
/
前端模板

精品课程

更多
相关推荐
/
热门推荐
/
最新课程
最新Python教程 从入门到精通
最新Python教程 从入门到精通

共4课时 | 0.6万人学习

Django 教程
Django 教程

共28课时 | 3万人学习

SciPy 教程
SciPy 教程

共10课时 | 1.1万人学习

关于我们 免责申明 举报中心 意见反馈 讲师合作 广告合作 最新更新
php中文网:公益在线php培训,帮助PHP学习者快速成长!
关注服务号 技术交流群
PHP中文网订阅号
每天精选资源文章推送

Copyright 2014-2026 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号