
代码解释
-
导入必要的库:
- sys: 用于访问系统相关的参数和函数。
- asyncio: 用于编写并发代码。
- concurrent.futures: 用于异步执行任务。
- ipywidgets: 用于在 Jupyter Notebook 中创建交互式控件。
-
创建 ThreadPoolExecutor:
threadpool = concurrent.futures.ThreadPoolExecutor(4)
这行代码创建了一个包含 4 个线程的线程池。可以根据实际需求调整线程数量。
-
run 函数:
run 函数是核心,它负责将任务提交到线程池并处理输出。
- out = ipywidgets.Output(): 创建一个 Output 控件,用于显示任务的输出。
- print 函数重定向: 重新定义 print 函数,使其将输出发送到 Output 控件。
- done 函数: 定义一个回调函数,在任务完成后执行,用于处理任务的结果或异常。
- go 协程: 创建一个异步协程,使用 loop.run_in_executor 将任务提交到线程池。
- asyncio.create_task: 创建一个 asyncio 任务来运行 go 协程。
- task.add_done_callback: 将 done 函数添加到任务的回调列表中,以便在任务完成后执行。
示例用法
以下代码展示了如何使用 run 函数来执行一个耗时的函数:
GForge是一个基于Web的协同开发平台。它提供一组帮助你的团队进行协同开发的工具,如论坛,邮件列表等。用于创建和控制访问源代码管理库(如CVS,Subversion)的工具。GForge将自动创建一个源代码库并依据项目的角色设置进行访问控制。其它工具还包括:管理文件发布,文档管理,新闻公告,缺陷跟踪,任务管理等。
import time
def cpu_bound(print, dt, fail=False):
for i in range(10):
time.sleep(dt)
print(i, time.time())
if fail:
1 / 0
return "done"
run(cpu_bound, 0.1)示例代码解释
- cpu_bound 函数模拟一个耗时的 CPU 密集型任务。它接受一个 print 函数作为参数,用于将输出发送到 Output 控件。
- run(cpu_bound, 0.1) 将 cpu_bound 函数提交到线程池,并传递 0.1 作为 dt 参数,表示每次循环暂停 0.1 秒。
错误处理
done 函数中包含了错误处理逻辑,可以捕获任务执行过程中发生的异常,并将错误信息输出到 Output 控件。
run(cpu_bound, 0.5, fail=True)
在这个例子中,fail=True 会导致 cpu_bound 函数抛出一个 ZeroDivisionError 异常,done 函数会捕获这个异常并将错误信息输出到 Output 控件。
注意事项
- 避免共享可变状态: 在多线程环境中,共享可变状态可能会导致数据竞争和死锁。尽量避免在任务之间共享可变状态。如果必须共享,请使用适当的同步机制,例如锁。
- 选择合适的线程池大小: 线程池的大小应该根据实际情况进行调整。如果任务是 CPU 密集型的,线程池的大小应该接近 CPU 的核心数量。如果任务是 IO 密集型的,线程池的大小可以大于 CPU 的核心数量。
- 序列化问题: 由于 multiprocessing 模块使用 pickle 进行序列化,因此某些对象可能无法被序列化。concurrent.futures.ThreadPoolExecutor 不存在此问题。
总结
通过使用 concurrent.futures.ThreadPoolExecutor 和 ipywidgets.Output,可以在 Jupyter Notebook 中轻松实现并行任务队列,从而提高 Notebook 的响应性和执行效率。这种方法避免了使用 multiprocessing 模块时可能遇到的序列化问题,并且可以在 Notebook 中实时显示任务的输出。









