
本文深入探讨了在使用python `multiprocessing.pool`的`starmap`方法结合`syncmanager.dict`共享字典时,因`zip`函数误用导致任务不执行和共享数据为空的常见问题。通过分析`zip`处理空迭代器的行为,提供了正确的参数构造方法和共享字典键值设置,确保多进程任务能够成功执行并正确更新共享状态,从而实现高效的并行计算。
理解Python多进程与共享数据
在Python中,multiprocessing模块允许程序利用多核处理器进行并行计算,显著提升处理速度。multiprocessing.Pool提供了一种便捷的方式来管理一组工作进程,并分发任务。当需要在这些并行进程之间共享数据时,multiprocessing.managers.SyncManager提供了一种机制,例如通过manager.dict()创建一个可以在不同进程间同步访问的字典。
Pool.starmap(func, iterable)方法是map的一个变体,它接受一个可迭代对象,其中每个元素本身也是一个可迭代对象(如元组),starmap会将这些子元素作为单独的参数解包(*操作符)传递给func。
问题分析:starmap参数与zip函数的陷阱
初学者在使用starmap时,常遇到的一个问题是任务未能按预期执行,或者共享数据未被更新。这往往是由于传递给starmap的参数列表构造不当造成的。
考虑以下一个尝试使用starmap和共享字典的示例代码:
立即学习“Python免费学习笔记(深入)”;
import multiprocessing as mp
from multiprocessing.managers import SyncManager
n_cores = mp.cpu_count()
def parallel_fn(job_n, cache):
cache['job_b'] = job_n # 尝试更新共享字典
return job_n
if __name__=="__main__":
with SyncManager() as manager:
shared_cache = manager.dict()
# 错误:使用zip构造参数
args = list(zip(range(n_cores), shared_cache))
with mp.Pool(n_cores) as pool:
result = pool.starmap(parallel_fn, args)
print(f"Pool return: {result}")
print(f"Shared dict after: {shared_cache}")运行上述代码,会发现Pool return和Shared dict after都为空。这是因为starmap根本没有接收到任何任务。问题出在args = list(zip(range(n_cores), shared_cache))这一行。
zip()函数的作用是将多个可迭代对象中对应位置的元素打包成一个个元组,然后返回由这些元组组成的迭代器。它的一个关键特性是:zip函数会以最短的可迭代对象为准停止迭代。
在这个例子中:
- range(n_cores)是一个长度为n_cores的可迭代对象(例如,如果CPU核心数是8,则为range(0, 8))。
- shared_cache是一个ProxyDict对象,它在初始化时是空的,因此其长度为0。
当zip尝试将range(n_cores)和空的shared_cache打包时,由于shared_cache的长度为0,zip会立即停止,生成一个空的迭代器。因此,list(zip(range(n_cores), shared_cache))的结果将是一个空列表。
由于args是一个空列表,pool.starmap没有接收到任何任务来执行,所以result自然也是空的,shared_cache也保持为空,因为parallel_fn从未被调用。
解决方案:正确构造starmap参数与共享字典键
要解决这个问题,我们需要确保传递给starmap的args列表包含正确数量和结构的参数。同时,对于共享字典的更新,也应确保使用有意义的键。
1. 正确构造starmap的参数列表
starmap期望接收一个可迭代对象,其中每个元素都是一个元组或列表,代表传递给目标函数的独立参数。为了让每个进程执行一个任务并访问共享字典,我们可以使用列表推导式来构造args:
# 修正:使用列表推导式构造参数 args = [(n, shared_cache) for n in range(n_cores)]
这样,args将是一个包含n_cores个元组的列表,每个元组形如(job_number, shared_cache_reference)。例如,如果n_cores是8,args将是[(0, shared_cache), (1, shared_cache), ..., (7, shared_cache)]。
2. 修正共享字典的键
在原始的parallel_fn中,cache['job_b'] = job_n会使所有进程都尝试使用相同的键'job_b'来更新字典。虽然SyncManager.dict会处理同步,但如果目标是存储每个任务的结果,通常会希望使用任务的唯一标识符作为键。将键改为job_n会更合理,使得每个任务将其自身的结果存储在以其任务编号为键的位置:
def parallel_fn(job_n, cache):
# 修正:使用job_n作为键
cache[job_n] = job_n
return job_n完整的修正代码示例
结合以上修正,完整的代码如下:
import multiprocessing as mp
from multiprocessing.managers import SyncManager
n_cores = mp.cpu_count()
def parallel_fn(job_n, cache):
"""
并行函数:将任务编号作为键和值存入共享字典。
"""
cache[job_n] = job_n
return job_n
if __name__=="__main__":
with SyncManager() as manager:
shared_cache = manager.dict()
# 正确构造starmap的参数列表
# 每个元组包含一个任务编号和一个共享字典的引用
args = [(n, shared_cache) for n in range(n_cores)]
print(f"准备分发任务,args: {args}")
with mp.Pool(n_cores) as pool:
# 使用starmap分发任务
result = pool.starmap(parallel_fn, args)
print(f"Pool返回结果: {result}")
print(f"共享字典最终内容: {shared_cache}")预期输出
在我的8核机器上运行上述修正后的代码,输出将是:
准备分发任务,args: [(0,), (1, ), ..., (7, )] Pool返回结果: [0, 1, 2, 3, 4, 5, 6, 7] 共享字典最终内容: {0: 0, 1: 1, 2: 2, 3: 3, 4: 4, 5: 5, 6: 6, 7: 7}
可以看到,starmap成功返回了所有任务的结果,并且shared_cache也正确地包含了每个任务更新的数据。
总结与最佳实践
- 理解zip函数的行为:zip会以最短的输入迭代器为准停止。在构造starmap或其他需要多个并行列表的参数时,务必注意这一点,避免因空迭代器导致参数列表为空。
- 正确构造starmap参数:starmap期望一个可迭代对象,其每个元素都是一个元组或列表,这些元组/列表中的元素将作为单独的参数传递给目标函数。列表推导式通常是构造这类参数列表的清晰有效方式。
- 共享数据的使用:当使用SyncManager创建共享数据结构(如manager.dict())时,确保将其作为参数正确传递给每个工作函数。在更新共享字典时,选择合适的键名来存储数据,以避免冲突或覆盖,并确保数据的逻辑性。
- 调试中间变量:当并行程序行为异常时,打印出中间变量(如本例中的args)是诊断问题的有效方法,可以帮助你理解数据流和函数调用是否符合预期。
通过遵循这些原则,可以更有效地利用Python的multiprocessing模块,构建健壮且高效的并行应用程序。










