
在python中,当我们需要对大量数据执行计算密集型任务时,多进程(multiprocessing)通常是实现并行化的首选方案。然而,对于涉及大型numpy数组的计算,直接使用tqdm.contrib.concurrent.process_map等高级接口进行多进程处理,可能会发现性能不升反降,甚至比单线程循环还要慢。
让我们通过一个具体的例子来观察这个问题。假设我们有一个calc函数,它对一个500x500的NumPy矩阵执行1000次均值和标准差计算,模拟一个耗时的操作。我们需要对100个这样的矩阵进行处理。
import time
import numpy as np
from tqdm.auto import tqdm
from tqdm.contrib.concurrent import process_map, thread_map
from multiprocessing import Pool, Manager
def mydataset(size, length):
"""生成指定大小和数量的随机NumPy矩阵数据集"""
for _ in range(length):
yield np.random.rand(*size)
def calc(mat):
"""模拟对NumPy矩阵的重度计算"""
for _ in range(1000):
_ = np.mean(mat)
_ = np.std(mat)
return True # 简化返回值,原问题返回avg, std
def main_initial_test():
ds = list(mydataset((500, 500), 100)) # 生成100个500x500的矩阵
print("--- 原始方法性能测试 ---")
t0 = time.time()
for mat in tqdm(ds, desc="For Loop"):
calc(mat)
print(f'For Loop: {time.time() - t0:.2f}s')
t0 = time.time()
list(map(calc, tqdm(ds, desc="Native Map")))
print(f'Native Map: {time.time() - t0:.2f}s')
t0 = time.time()
process_map(calc, ds, desc="Process Map")
print(f'Process Map: {time.time() - t0:.2f}s')
t0 = time.time()
thread_map(calc, ds, desc="Thread Map")
print(f'Thread Map: {time.time() - t0:.2f}s')
if __name__ == '__main__':
# main_initial_test()
pass # 暂时注释,后面会展示优化后的代码运行上述代码,在某些系统上可能会得到类似以下的结果:
For Loop: 51.88s Native Map: 52.49s Process Map: 71.06s Thread Map: 42.04s
可以看到,process_map(多进程)竟然比for循环和map(单进程)还要慢,而thread_map(多线程)虽然有所提升,但提升幅度可能不如预期,且CPU利用率并未达到饱和。这与我们对多核并行计算的期望大相径庭。
问题根源分析:
立即学习“Python免费学习笔记(深入)”;
这个问题的核心在于Python多进程的工作机制。当使用multiprocessing模块(包括process_map等基于它的工具)创建新进程时,父进程中的对象(例如我们数据集ds中的NumPy矩阵)需要被序列化(pickling)并拷贝到每个子进程独立的内存空间中。对于大型NumPy数组,每次将一个矩阵传递给子进程进行计算时,都会发生一次昂贵的数据序列化和拷贝操作。
这个拷贝操作的开销,尤其是在数据量大、任务数量多的情况下,会迅速累积并成为整个计算过程的瓶颈,甚至超过了并行计算所带来的收益。这意味着,尽管CPU核心可能空闲,但进程间的数据传输却在拖慢整体进度。
为了解决多进程中数据拷贝带来的性能问题,我们需要一种机制,让所有子进程能够访问同一份数据,而不是各自拥有独立的副本。multiprocessing模块提供了Manager类,它能够创建一个服务器进程,并管理一些共享的Python对象,如列表、字典等。其他进程可以通过代理对象来访问这些共享对象,从而避免了不必要的数据拷贝。
核心思想:
以下是使用multiprocessing.Manager进行优化的代码示例:
import time
import numpy as np
from multiprocessing import Pool, Manager
def mydataset(size, length):
"""生成指定大小和数量的随机NumPy矩阵数据集"""
for _ in range(length):
yield np.random.rand(*size)
def calc_with_shared_data(idx, mat_list_proxy):
"""
模拟对NumPy矩阵的重度计算,通过索引访问共享数据。
mat_list_proxy 是 Manager.list 的代理对象。
"""
mat = mat_list_proxy[idx] # 通过索引获取共享列表中的矩阵
# 模拟一些重度计算
for _ in range(1000):
_ = np.mean(mat)
_ = np.std(mat)
return True # 简化返回值
# return avg, std # 如果需要返回计算结果
def main_optimized():
ds = list(mydataset((500, 500), 100)) # 生成100个500x500的矩阵
# 1. 创建Manager实例
manager = Manager()
# 2. 将原始数据集转换为Manager管理的共享列表
# 数据在此处被一次性拷贝到Manager的服务器进程内存中
shared_mat_list = manager.list(ds)
# 3. 创建进程池,通常设置为CPU核心数
# 这里使用4个进程进行演示,可根据实际CPU核心数调整
with Pool(processes=4) as mypool:
t0 = time.time()
# 4. 使用starmap传递多个参数:任务索引和共享列表的代理对象
# zip(range(len(ds)), [shared_mat_list] * len(ds)) 为每个任务生成 (索引, 共享列表代理) 对
results = mypool.starmap(calc_with_shared_data,
zip(range(len(ds)), [shared_mat_list] * len(ds)))
print(f"Manager Pool Starmap: {time.time() - t0:.2f}s")
# 注意:Manager在with Pool块结束后会自动清理,
# 如果不使用with语句,需要手动调用manager.shutdown()
if __name__ == '__main__':
print("--- 优化后方法性能测试 ---")
main_optimized()性能验证与分析:
运行优化后的代码,您会看到显著的性能提升。例如,在原问题提供的测试环境中,优化后的代码可能输出:
Manager Pool Starmap: 1.94s
与原始的50-70秒相比,性能提升了数十倍!
性能提升的原因:
在使用multiprocessing.Manager或其他共享内存机制时,需要考虑以下几点:
选择合适的共享机制:
数据可变性与同步:
进程池管理:
序列化限制:
内存占用:
调试复杂性:
在Python中进行高性能NumPy计算时,盲目应用多进程并行化可能适得其反。理解多进程中数据序列化和拷贝的开销是解决性能瓶颈的关键。通过巧妙地利用multiprocessing.Manager等共享内存机制,我们可以将大型数据集一次性加载到共享内存中,并让所有子进程通过引用访问,从而避免昂贵的数据传输,显著提升计算效率。选择正确的并行策略和数据共享机制是实现高效并行计算、充分利用现代多核处理器性能的关键。
以上就是Python中NumPy计算加速:如何利用多进程避免数据拷贝瓶颈的详细内容,更多请关注php中文网其它相关文章!
每个人都需要一台速度更快、更稳定的 PC。随着时间的推移,垃圾文件、旧注册表数据和不必要的后台进程会占用资源并降低性能。幸运的是,许多工具可以让 Windows 保持平稳运行。
Copyright 2014-2025 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号