
本文旨在介绍如何使用 Python 并发执行多个子进程并高效捕获它们的输出。通过使用 multiprocessing.pool.ThreadPool,我们可以避免阻塞主线程,从而显著提高程序的整体执行效率,尤其是在需要同时运行大量独立子进程的场景下。
在 Python 中,使用 subprocess 模块可以方便地创建和管理子进程。然而,当需要同时启动并等待多个子进程完成时,传统的串行方式可能会导致效率瓶颈。subprocess.Popen 本身是非阻塞的,这意味着启动子进程后会立即返回,但 proc.communicate() 方法会阻塞,直到子进程执行完毕。如果按照串行方式调用 communicate(),实际上子进程的执行顺序是被强制的,无法充分利用多核 CPU 的并行处理能力。
使用 ThreadPool 并发执行子进程
为了解决这个问题,我们可以使用 multiprocessing.pool.ThreadPool 来并发地执行 communicate() 方法,从而实现真正的并行处理。ThreadPool 允许我们将任务分配给一个线程池,由线程池负责调度和执行这些任务。
以下是一个改进后的代码示例:
立即学习“Python免费学习笔记(深入)”;
import subprocess
import logging
from multiprocessing.pool import ThreadPool
log = logging.getLogger(__name__) # 假设已经配置好 logging
def runShowCommands(cmdTable) -> dict:
"""
并发执行 cmdTable 中定义的命令,并返回一个包含命令输出的字典。
"""
procOutput = {} # 用于存储命令输出的字典
procHandles = {}
# 启动所有子进程
for cmd, command in cmdTable.items():
try:
log.debug(f"running subprocess {cmd} -- {command}")
procHandles[cmd] = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True) # Add shell=True
except Exception as e:
log.error(f"Error starting subprocess {cmd}: {e}")
procOutput[cmd] = f"Error starting subprocess: {e}" # Store error message to procOutput
continue # Skip to the next command
# 定义处理子进程输出的函数
def handle_proc_stdout(handle):
try:
stdout, stderr = procHandles[handle].communicate(timeout=180)
procOutput[handle] = stdout.decode("utf-8") # 将 stdout 部分转换为文本
log.debug(f"subprocess returned {handle}")
if stderr:
log.error(f"subprocess {handle} stderr: {stderr.decode('utf-8')}")
except subprocess.TimeoutExpired:
log.warning(f"subprocess {handle} timed out")
procHandles[handle].kill()
procOutput[handle] = "Timeout"
except Exception as e:
log.error(f"Error communicating with subprocess {handle}: {e}")
procOutput[handle] = f"Error communicating: {e}" # Store error message to procOutput
# 使用线程池并发执行 communicate
threadpool = ThreadPool()
threadpool.map(handle_proc_stdout, procHandles.keys())
threadpool.close()
threadpool.join()
return procOutput代码解释:
- 引入 ThreadPool: 首先,我们从 multiprocessing.pool 引入 ThreadPool 类。
- 启动子进程: 循环遍历 cmdTable,使用 subprocess.Popen 启动所有子进程,并将进程句柄存储在 procHandles 字典中。 shell=True 被添加到 subprocess.Popen 调用中。这允许您直接执行包含 shell 命令的字符串,而无需手动拆分命令。
- 定义 handle_proc_stdout 函数: 该函数负责调用 communicate() 方法,获取子进程的输出,并将结果存储在 procOutput 字典中。 函数还包括一个错误处理块,用于捕获超时和任何其他可能发生的异常。如果在与子进程通信时发生错误,它会将错误消息存储在 procOutput 字典中。 此外,它还会记录子进程的标准错误输出 (stderr)。
- 创建 ThreadPool 并执行任务: 创建一个 ThreadPool 实例,并使用 map() 方法将 handle_proc_stdout 函数应用到 procHandles.keys() 上。map() 方法会将 procHandles.keys() 中的每个键作为参数传递给 handle_proc_stdout 函数,并在线程池中并发执行这些函数调用。
- 关闭并等待线程池: 调用 threadpool.close() 方法,防止线程池接受新的任务。然后,调用 threadpool.join() 方法,等待线程池中的所有任务完成。
- 返回结果: 返回包含所有子进程输出的 procOutput 字典。
注意事项:
- 线程安全: 确保子进程的操作是线程安全的,避免出现竞态条件。
- 资源限制: ThreadPool 的大小应该根据系统的 CPU 核心数和内存资源进行合理配置,避免过度占用资源。
- 错误处理: 在 handle_proc_stdout 函数中添加适当的错误处理机制,例如超时处理和异常捕获,以保证程序的健壮性。
- 日志记录: 添加了日志记录以帮助调试并提供有关子进程执行的洞察力。
总结:
通过使用 ThreadPool,我们可以显著提高并发执行多个子进程的效率。这种方法特别适用于需要同时运行大量独立子进程的场景,例如并行编译、数据处理等。 务必注意线程安全、资源限制和错误处理,以确保程序的稳定性和可靠性。










