
python中的迭代器是单次消费的,一旦被完全遍历(例如通过`list()`转换),它就会耗尽并变为空。在多进程环境中,如果一个迭代器在传递给`multiprocessing.pool.starmap`之前被意外耗尽,`starmap`将接收到一个空的迭代器,导致没有任何任务被提交和执行。这会掩盖潜在的运行时错误,因为工作函数根本没有被调用,从而无法抛出预期的异常。
在Python中,迭代器是一种对象,它允许我们一次访问一个元素。许多内置类型,如列表、元组、字符串和字典,都是可迭代的,但它们本身并不是迭代器。当我们使用for循环、list()、tuple()、sum()等函数或表达式时,Python会在内部从可迭代对象中获取一个迭代器。
迭代器的核心特性是它实现了__iter__()和__next__()方法。__next__()方法在每次调用时返回序列中的下一个项目,并在没有更多项目时引发StopIteration异常。
一个关键点是:迭代器是单次消费的。这意味着一旦一个迭代器被完全遍历,它就变得“耗尽”了,无法再次提供数据。例如,zip函数返回一个迭代器,它也遵循这个原则。
# 示例:zip对象作为迭代器
x = (0, 1, 2)
y = "ABC"
zipper = zip(x, y)
print("第一次遍历:")
for n, s in zipper:
    print(n, s)
print("第二次遍历:")
# 此时zipper已经耗尽,不会打印任何内容
for n, s in zipper:
    print(n, s)运行上述代码,你会发现“第二次遍历”部分不会有任何输出,因为zipper迭代器在第一次for循环中已经被完全消费。
立即学习“Python免费学习笔记(深入)”;
当对一个迭代器执行诸如list(iterator)、tuple(iterator)、set(iterator)或在for循环中完整遍历它时,迭代器中的所有元素都会被取出并用于构建新的数据结构或执行相应操作。完成这些操作后,迭代器内部的状态指针会指向序列的末尾,使其无法再提供任何数据。
考虑以下示例:
x = (0, 1, 2)
y = "ABC"
zipper = zip(x, y)
# 显式地将迭代器转换为列表
my_list = list(zipper)
print(f"转换为列表后:{my_list}")
# 此时zipper迭代器已经耗尽
print("尝试再次遍历耗尽的迭代器:")
for n, s in zipper:
    print(n, s) # 这行代码不会被执行在这个例子中,list(zipper)操作彻底耗尽了zipper迭代器。因此,随后的for循环发现zipper已经为空,便直接跳过循环体,不会引发任何错误,但也不会执行任何操作。
在多进程编程中,特别是使用multiprocessing.Pool.starmap时,迭代器耗尽的特性可能会导致难以察觉的问题。starmap函数接受一个可迭代对象作为其任务参数的来源。它会从这个可迭代对象中逐一取出元素,并将它们作为参数传递给目标函数在不同的进程中执行。
如果传递给starmap的可迭代对象在被starmap使用之前就已经耗尽,那么starmap将接收到一个空的序列。这意味着:
回到原始问题中的代码片段:
from itertools import repeat
import multiprocessing
def starmap_with_kwargs(pool, fn, args_iter, kwargs_iter):
    args_for_starmap = zip(repeat(fn), args_iter, kwargs_iter)
    # print(args_iter) # 这里的args_iter是zip对象,尚未耗尽
    return pool.starmap(apply_args_and_kwargs, args_for_starmap)
def apply_args_and_kwargs(fn, args, kwargs):
    # print('test')
    return fn(*args, **kwargs)
def func(path, dictArg, **kwargs):
    # 这里的dictArg预期是字典,但如果数据源有误,可能是字符串
    for i in dictArg: # 如果dictArg是字符串,此循环会迭代字符串的字符
        print(i['a']) # 如果i是字符,尝试['a']索引会引发TypeError
        print(kwargs['yes'])
def funcWrapper(path, dictList, **kwargs):
    args_iter = zip(repeat(path), dictList)
    kwargs_iter = repeat(kwargs)
    # 关键行:如果取消注释,args_iter会在此处耗尽
    # list(args_iter) 
    pool = multiprocessing.Pool()
    starmap_with_kwargs(pool, func, args_iter, kwargs_iter)
    pool.close()
    pool.join() # 确保所有进程完成
dictList = [{'a: 2'}, {'a': 65}, {'a': 213}, {'a': 3218}] # 注意第一个元素是字符串!
path = 'some/path/to/something'
funcWrapper(path, dictList, yes=1)在dictList中,第一个元素{'a: 2'}是一个字符串,而不是一个字典。当func函数尝试对这个字符串进行i['a']操作时,就会引发TypeError: string indices must be integers。
情况一:list(args_iter)被注释掉args_iter (一个zip迭代器) 被创建后,直接传递给了starmap_with_kwargs,最终进入pool.starmap。starmap会从args_for_starmap(也是一个zip迭代器,内部包含args_iter的引用)中取出任务,并调度apply_args_and_kwargs在子进程中执行。当func接收到dictArg为字符串{'a: 2'}时,会尝试i['a']操作,从而引发TypeError。
情况二:list(args_iter)被取消注释 在funcWrapper中,当执行list(args_iter)时,args_iter这个zip迭代器会被立即完全遍历,并将其所有元素收集到一个临时列表中。完成此操作后,args_iter迭代器自身就耗尽了。 随后,当这个已经耗尽的args_iter被传递给starmap_with_kwargs时,args_for_starmap = zip(repeat(fn), args_iter, kwargs_iter)也会创建一个基于一个空迭代器的新zip迭代器。 最终,pool.starmap接收到一个空的args_for_starmap迭代器。这意味着starmap发现没有任务可供执行,所以它不会调用apply_args_and_kwargs,进而func也永远不会被调用。由于func从未被调用,其中导致TypeError的逻辑也就无从触发,因此看不到任何错误信息。
为了避免这种迭代器耗尽导致的问题,并确保多进程任务能够按预期执行,请遵循以下原则:
一次性转换为具体数据结构: 如果你需要在程序的多个地方使用同一个迭代器的数据,或者需要对其进行预处理或调试,最好的方法是将其一次性转换为一个列表或元组。
# 修正后的funcWrapper示例
def funcWrapper_fixed(path, dictList, **kwargs):
    args_iter_raw = zip(repeat(path), dictList)
    # 将迭代器转换为列表,这样可以多次使用或检查
    args_list = list(args_iter_raw) 
    # 此时args_list可以用于调试或多次传递
    # print(args_list) 
    kwargs_iter = repeat(kwargs) # kwargs_iter可以保持为迭代器,因为它只在zip中被消费一次
    pool = multiprocessing.Pool()
    # 注意:这里需要重新构造args_for_starmap,因为它依赖于args_iter
    # 如果args_list是固定列表,则可以直接使用
    # 但如果starmap_with_kwargs需要迭代器,那么args_list在这里可以作为新的迭代源
    # 实际传递给starmap_with_kwargs的应该是zip(repeat(fn), args_list, kwargs_iter)
    # 更简洁的传递方式,确保args_list被正确处理
    starmap_with_kwargs(pool, func, args_list, kwargs_iter) 
    pool.close()
    pool.join()在starmap_with_kwargs内部,如果args_iter期望的是一个可迭代对象,那么args_list作为列表是完全兼容的。
每次使用时重新创建迭代器: 如果数据源允许,并且你确实需要在不同上下文中使用独立的迭代序列,可以在每次需要时重新创建迭代器。
# 重新创建迭代器的示例
def funcWrapper_recreate(path, dictList, **kwargs):
    # 第一次使用
    args_iter_1 = zip(repeat(path), dictList)
    # do something with args_iter_1, e.g., list(args_iter_1)
    # 第二次使用时,重新创建
    args_iter_2 = zip(repeat(path), dictList) 
    kwargs_iter = repeat(kwargs)
    pool = multiprocessing.Pool()
    starmap_with_kwargs(pool, func, args_iter_2, kwargs_iter)
    pool.close()
    pool.join()注意调试时的副作用: 在调试代码时,避免在不经意间通过list()或for循环耗尽你打算传递给后续函数的迭代器。如果需要查看迭代器的内容,可以先将其转换为列表,然后将该列表传递给后续函数,而不是原始的迭代器。
Python迭代器的单次消费特性是其设计的一部分,旨在提高内存效率。然而,在多进程或其他需要多次访问相同数据序列的场景中,如果不理解这一特性,就可能导致逻辑错误被掩盖。核心原则是:一旦迭代器被完全遍历,它就耗尽了。 在将迭代器传递给multiprocessing.Pool.starmap等函数之前,务必确保它尚未被其他操作耗尽。通过将迭代器转换为列表或在每次需要时重新创建迭代器,可以有效规避这类问题,并确保代码的健壮性。
以上就是Python迭代器耗尽机制在多进程中的影响与规避的详细内容,更多请关注php中文网其它相关文章!
 
                        
                        每个人都需要一台速度更快、更稳定的 PC。随着时间的推移,垃圾文件、旧注册表数据和不必要的后台进程会占用资源并降低性能。幸运的是,许多工具可以让 Windows 保持平稳运行。
 
                Copyright 2014-2025 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号