
在python中处理大量计算密集型任务时,利用多核cpu进行并行计算是提高效率的常见方法。对于cpu密集型任务,由于python的全局解释器锁(gil)限制,多线程通常无法实现真正的并行计算,而多进程(multiprocessing)则通过创建独立的python解释器进程来绕过gil,从而实现并行执行。
然而,在使用multiprocessing库或其高级封装(如tqdm.contrib.concurrent.process_map)时,开发者有时会发现性能不升反降,尤其是在处理大型数据结构(如NumPy数组)时。这背后的主要原因在于进程间通信(IPC)的开销,特别是数据序列化和反序列化(即所谓的“pickling”和“unpickling”)过程。
当我们将一个Python对象作为参数传递给一个新创建的子进程时,该对象不会直接在进程间共享内存。相反,它会被序列化(pickled),然后复制到子进程的内存空间中,子进程再对其进行反序列化(unpickled)。对于小型数据,这个开销可以忽略不计,但对于像numpy.ndarray这样的大型数据结构,每次任务调用都进行这种复制操作会消耗大量CPU时间和内存带宽,最终成为整个并行计算的瓶颈。
考虑以下一个模拟NumPy密集型计算的例子,它展示了process_map在处理大型数组时的效率问题:
import time
import numpy as np
from tqdm.auto import tqdm
from tqdm.contrib.concurrent import process_map, thread_map
# 模拟生成大型数据集
def mydataset(size, length):
for ii in range(length):
yield np.random.rand(*size)
# 模拟耗时计算函数
def calc(mat):
# 模拟一些耗时的NumPy计算
for ii in range(1000):
avg = np.mean(mat)
std = np.std(mat)
return avg, std
def main_original_test():
ds = list(mydataset((500, 500), 100)) # 100个500x500的NumPy数组
print("--- 原始测试结果 ---")
t0 = time.time()
res1 = []
for mat in tqdm(ds):
res1.append(calc(mat))
print(f'for loop: {time.time() - t0:.2f}s')
t0 = time.time()
res2 = list(map(calc, tqdm(ds)))
print(f'native map: {time.time() - t0:.2f}s')
t0 = time.time()
res3 = process_map(calc, ds) # 使用process_map
print(f'process map: {time.time() - t0:.2f}s')
t0 = time.time()
res4 = thread_map(calc, ds) # 使用thread_map
print(f'thread map: {time.time() - t0:.2f}s')
if __name__ == '__main__':
main_original_test()上述代码在某些环境下可能产生如下结果:
立即学习“Python免费学习笔记(深入)”;
for loop: 51.88s native map: 52.49s process map: 71.06s # 明显慢于for循环 thread map: 42.04s # 略快,但未充分利用多核
可以看到,process_map的执行时间甚至超过了简单的for循环,这正是由于每次调用calc函数时,整个NumPy数组mat都需要被序列化并复制到子进程,导致了巨大的性能开销。thread_map虽然略快,但由于GIL的存在,其加速效果有限。
解决上述问题的关键在于避免在每次任务调用时重复复制大型数据。Python的multiprocessing模块提供了一种解决方案:Manager。Manager对象可以创建一个服务进程,该进程管理共享的Python对象,并允许其他进程通过代理对象来访问这些共享对象。这样,大型数据只需复制一次到Manager的内存中,后续的子进程通过引用来访问,大大减少了进程间通信的开销。
对于我们的NumPy数组列表,我们可以使用Manager().list()来创建一个共享列表。然后,子进程通过列表的索引来访问特定的NumPy数组,而不是直接传递整个数组。
以下是使用multiprocessing.Manager和Pool.starmap进行优化的示例代码:
import time
import numpy as np
from multiprocessing import Pool, Manager
# 模拟生成大型数据集
def mydataset(size, length):
for ii in range(length):
yield np.random.rand(*size)
# 适应共享内存的计算函数
# 现在接收数据索引和共享列表作为参数
def calc_optimized(idx, mat_list):
# 从共享列表中获取NumPy数组
mat = mat_list[idx]
# 模拟一些耗时的NumPy计算
for ii in range(1000):
avg = np.mean(mat)
std = np.std(mat)
return avg, std
def main_optimized_test():
ds = list(mydataset((500, 500), 100)) # 原始数据集
# 1. 创建进程池
# 建议根据CPU核心数设置,例如os.cpu_count()
num_processes = 4
mypool = Pool(num_processes)
# 2. 创建Manager并生成共享列表
manager = Manager()
# 将原始数据集一次性复制到Manager管理的共享列表中
mylist = manager.list(ds)
print(f"\n--- 优化后测试结果 ({num_processes} 进程) ---")
t0 = time.time()
# 使用starmap传递多个参数:数据索引和共享列表
# zip(range(len(ds)), [mylist]*len(ds)) 为每个任务生成 (索引, 共享列表) 对
res_optimized = mypool.starmap(calc_optimized, zip(range(len(ds)), [mylist]*len(ds)))
print(f"map with manager: {time.time() - t0:.2f}s")
# 关闭进程池
mypool.close()
mypool.join()
manager.shutdown() # 关闭Manager进程
if __name__ == '__main__':
main_optimized_test()运行上述优化后的代码,其输出结果可能如下:
map with manager: 1.94s
与原始的for循环和process_map相比,性能提升是巨大的。这验证了通过Manager实现共享内存,避免重复数据复制,是解决此类问题的有效途径。
在Python中对NumPy等库进行计算密集型任务的并行加速时,简单地使用multiprocessing.Pool或process_map可能因数据序列化和反序列化的开销而导致性能下降。通过深入理解其背后的机制,我们发现对于大型数据集,利用multiprocessing.Manager创建共享内存是避免重复数据复制、显著提升并行计算效率的关键。这种方法将数据一次性加载到共享内存,后续子进程通过索引访问,从而消除了主要的性能瓶颈,实现了高效的并行处理。在实际应用中,务必根据任务特性和数据规模选择合适的并行策略。
以上就是加速Python中NumPy密集型计算的多进程优化策略的详细内容,更多请关注php中文网其它相关文章!
每个人都需要一台速度更快、更稳定的 PC。随着时间的推移,垃圾文件、旧注册表数据和不必要的后台进程会占用资源并降低性能。幸运的是,许多工具可以让 Windows 保持平稳运行。
Copyright 2014-2025 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号