
本文深入探讨了在python多进程编程中使用`multiprocessing.pool.starmap`结合`syncmanager.dict`时可能遇到的空结果问题。核心在于`zip`函数与空可迭代对象的行为,以及如何正确构造传递给`starmap`的参数列表。通过详细的代码示例和解释,文章展示了如何避免`zip`陷阱,并确保共享字典在多进程环境中被正确填充和访问,从而实现预期的并行计算和数据共享。
在Python多进程编程中,multiprocessing模块提供了强大的工具来利用多核CPU进行并行计算。Pool对象及其starmap方法常用于将一个函数应用于一组参数,而SyncManager则允许在不同进程间共享数据结构,如字典。然而,不当的参数构造方式可能导致意想不到的空结果。
Pool.starmap与共享字典的常见问题
考虑以下使用multiprocessing.Pool和SyncManager.dict的场景:
import multiprocessing as mp
from multiprocessing.managers import SyncManager
n_cores = mp.cpu_count()
def parallel_fn(job_n, cache):
# 尝试将结果存入共享缓存
cache['job_b'] = job_n # 这里的键名可能不是我们想要的
return job_n
if __name__=="__main__":
with SyncManager() as manager:
shared_cache = manager.dict()
# 尝试构造starmap的参数
args = list(zip(range(n_cores), shared_cache))
with mp.Pool(n_cores) as pool:
result = pool.starmap(parallel_fn, args)
print(f"Pool return: {result}")
print(f"Shared dict after: {shared_cache}")运行上述代码,我们可能会得到如下输出:
Pool return: []
Shared dict after: {}尽管我们期望Pool返回n_cores个结果,并且shared_cache被填充,但实际结果却是空的。这表明parallel_fn根本没有被执行,或者说starmap接收到了一个空的参数列表。
立即学习“Python免费学习笔记(深入)”;
问题根源分析:zip函数的行为
导致上述问题的主要原因是zip函数的行为。zip函数会聚合每个可迭代对象中对应位置的元素,并生成一个由这些元素组成的元组。它的关键特性是:当最短的可迭代对象被耗尽时,zip就会停止。
在我们的示例中,args = list(zip(range(n_cores), shared_cache))这一行是问题的症结所在。
- range(n_cores)是一个长度为n_cores(例如,如果CPU有8核,则为8)的可迭代对象。
- shared_cache是一个SyncManager.dict()对象,在初始化时是空的,其长度为0。
当zip函数接收到一个长度为8的range对象和一个长度为0的ProxyDict对象时,它会立即停止,因为shared_cache已经耗尽。因此,zip返回一个空的迭代器,list(zip(...))的结果自然就是一个空列表。
我们可以通过一个简单的例子来验证这一点:
print(list(zip([1, 2, 3], dict()))) # 输出: [] print(list(zip(range(5), []))) # 输出: []
由于args列表为空,pool.starmap自然不会执行任何任务,从而返回一个空列表,并且shared_cache也保持为空。
解决方案:正确构造starmap的参数
要解决这个问题,我们需要确保starmap接收到一个包含正确数量和结构参数的列表。最直接且推荐的方法是使用列表推导式来显式地构造参数列表。
同时,我们也注意到原始代码中parallel_fn内部将键固定为'job_b'。通常,我们希望将任务相关的标识符(例如job_n)作为键来存储数据,以避免不同任务覆盖相同键的值。
以下是修正后的代码:
import multiprocessing as mp
from multiprocessing.managers import SyncManager
n_cores = mp.cpu_count()
def parallel_fn(job_n, cache):
"""
并行执行的函数,将任务编号和结果存入共享缓存。
"""
# 将 job_n 作为键存储,而不是固定的 'job_b'
cache[job_n] = job_n
print(f"Process {mp.current_process().name} processing job {job_n}")
return job_n
if __name__=="__main__":
with SyncManager() as manager:
shared_cache = manager.dict()
# 使用列表推导式正确构造starmap的参数
# 每个元组包含一个任务编号和一个共享字典的引用
args = [(n, shared_cache) for n in range(n_cores)]
print(f"Constructed args for starmap: {args}")
with mp.Pool(n_cores) as pool:
print("Starting pool.starmap...")
result = pool.starmap(parallel_fn, args)
print(f"Pool return: {result}")
print(f"Shared dict after: {shared_cache}")运行修正后的代码(假设我的机器是8核),输出如下:
Constructed args for starmap: [(0,), (1, ), (2, ), (3, ), (4, ), (5, ), (6, ), (7, )] Starting pool.starmap... Process ForkPoolWorker-1 processing job 0 Process ForkPoolWorker-2 processing job 1 Process ForkPoolWorker-3 processing job 2 Process ForkPoolWorker-4 processing job 3 Process ForkPoolWorker-5 processing job 4 Process ForkPoolWorker-6 processing job 5 Process ForkPoolWorker-7 processing job 6 Process ForkPoolWorker-8 processing job 7 Pool return: [0, 1, 2, 3, 4, 5, 6, 7] Shared dict after: {0: 0, 1: 1, 2: 2, 3: 3, 4: 4, 5: 5, 6: 6, 7: 7}
从输出可以看出,args列表现在包含了n_cores个元组,每个元组都携带了任务编号和shared_cache的引用。parallel_fn被成功执行了n_cores次,pool.starmap返回了预期的结果列表,并且shared_cache也被正确地填充了数据。
总结与注意事项
- 理解zip函数行为:在使用zip函数组合多个可迭代对象时,务必注意其“最短可迭代对象决定长度”的特性。当其中一个输入是空的或很快耗尽时,zip的结果也会是空的。
- 正确构造starmap参数:当需要向starmap传递包含相同共享对象(如SyncManager.dict)的多个参数时,使用列表推导式是构造参数列表的清晰且可靠的方法:[(arg1, shared_obj), (arg2, shared_obj), ...]。
- 共享数据结构的使用:multiprocessing.managers.SyncManager提供的共享数据结构(如manager.dict()、manager.list())是实现进程间数据共享的关键。确保这些共享对象在所有进程中都能被正确引用和访问。
- 避免键冲突:在向共享字典写入数据时,如果每个进程都有自己的结果需要存储,应使用唯一的键来避免数据覆盖。将任务编号或其他唯一标识符作为键是常见的做法。
通过理解这些核心概念和避免常见陷阱,开发者可以更有效地利用Python的multiprocessing模块进行并行计算,并正确管理进程间的共享数据。










