# 并行执行 Jupyter Notebook 中的任务队列

花韻仙語
发布: 2025-08-17 22:44:28
原创
456人浏览过

# 并行执行 Jupyter Notebook 中的任务队列

在 Jupyter Notebook 中实现并行任务队列,以在不阻塞 Notebook 界面的情况下执行耗时较长的函数。通过使用 `concurrent.futures.ThreadPoolExecutor` 和 `ipywidgets.Output`,可以实现任务的异步执行和结果的实时显示,同时避免了常见的序列化和作用域问题。 ## 使用 ThreadPoolExecutor 实现并行任务 在 Jupyter Notebook 中并行执行任务,同时保持 Notebook 的响应性,是一个常见的需求。`concurrent.futures.ThreadPoolExecutor` 提供了一种简单有效的方法来实现这一点。结合 `ipywidgets.Output`,我们可以将任务的输出实时显示在 Notebook 中。 ### 核心代码 以下代码展示了如何使用 `ThreadPoolExecutor` 和 `ipywidgets.Output` 来实现并行任务队列: ```python import sys import asyncio import concurrent.futures import ipywidgets threadpool = concurrent.futures.ThreadPoolExecutor(4) def run(fn, *args, **kwds): "run fn in threadpool" out = ipywidgets.Output() def print(*args, file=sys.stdout): line = ' '.join(map(str, args)) + '\n' if file is sys.stderr: out.append_stderr(line) else: out.append_stdout(line) def done(fut: asyncio.Future): try: result = fut.result() except asyncio.CancelledError: print("cancelled", fut, file=sys.stderr) except Exception: print("failed", fut, file=sys.stderr) else: print("completed", fut) async def go(): loop = asyncio.get_running_loop() return await loop.run_in_executor( threadpool, lambda: fn(print, *args, **kwds), ) task = asyncio.create_task(go()) task.add_done_callback(done) return out

代码解释

  1. 导入必要的库:

    • sys: 用于访问系统相关的参数和函数。
    • asyncio: 用于编写并发代码。
    • concurrent.futures: 用于异步执行任务。
    • ipywidgets: 用于在 Jupyter Notebook 中创建交互式控件。
  2. 创建 ThreadPoolExecutor:

    threadpool = concurrent.futures.ThreadPoolExecutor(4)
    登录后复制

    这行代码创建了一个包含 4 个线程的线程池。可以根据实际需求调整线程数量。

  3. run 函数:

    run 函数是核心,它负责将任务提交到线程池并处理输出。

    • out = ipywidgets.Output(): 创建一个 Output 控件,用于显示任务的输出。
    • print 函数重定向: 重新定义 print 函数,使其将输出发送到 Output 控件。
    • done 函数: 定义一个回调函数,在任务完成后执行,用于处理任务的结果或异常。
    • go 协程: 创建一个异步协程,使用 loop.run_in_executor 将任务提交到线程池。
    • asyncio.create_task: 创建一个 asyncio 任务来运行 go 协程。
    • task.add_done_callback: 将 done 函数添加到任务的回调列表中,以便在任务完成后执行。

示例用法

以下代码展示了如何使用 run 函数来执行一个耗时的函数:

行者AI
行者AI

行者AI绘图创作,唤醒新的灵感,创造更多可能

行者AI 100
查看详情 行者AI
import time

def cpu_bound(print, dt, fail=False):
    for i in range(10):
        time.sleep(dt)
        print(i, time.time())
    if fail:
        1 / 0
    return "done"

run(cpu_bound, 0.1)
登录后复制

示例代码解释

  • cpu_bound 函数模拟一个耗时的 CPU 密集型任务。它接受一个 print 函数作为参数,用于将输出发送到 Output 控件。
  • run(cpu_bound, 0.1) 将 cpu_bound 函数提交到线程池,并传递 0.1 作为 dt 参数,表示每次循环暂停 0.1 秒。

错误处理

done 函数中包含了错误处理逻辑,可以捕获任务执行过程中发生的异常,并将错误信息输出到 Output 控件。

run(cpu_bound, 0.5, fail=True)
登录后复制

在这个例子中,fail=True 会导致 cpu_bound 函数抛出一个 ZeroDivisionError 异常,done 函数会捕获这个异常并将错误信息输出到 Output 控件。

注意事项

  • 避免共享可变状态: 在多线程环境中,共享可变状态可能会导致数据竞争和死锁。尽量避免在任务之间共享可变状态。如果必须共享,请使用适当的同步机制,例如锁。
  • 选择合适的线程池大小: 线程池的大小应该根据实际情况进行调整。如果任务是 CPU 密集型的,线程池的大小应该接近 CPU 的核心数量。如果任务是 IO 密集型的,线程池的大小可以大于 CPU 的核心数量。
  • 序列化问题: 由于 multiprocessing 模块使用 pickle 进行序列化,因此某些对象可能无法被序列化。concurrent.futures.ThreadPoolExecutor 不存在此问题。

总结

通过使用 concurrent.futures.ThreadPoolExecutor 和 ipywidgets.Output,可以在 Jupyter Notebook 中轻松实现并行任务队列,从而提高 Notebook 的响应性和执行效率。这种方法避免了使用 multiprocessing 模块时可能遇到的序列化问题,并且可以在 Notebook 中实时显示任务的输出。

登录后复制

以上就是# 并行执行 Jupyter Notebook 中的任务队列的详细内容,更多请关注php中文网其它相关文章!

最佳 Windows 性能的顶级免费优化软件
最佳 Windows 性能的顶级免费优化软件

每个人都需要一台速度更快、更稳定的 PC。随着时间的推移,垃圾文件、旧注册表数据和不必要的后台进程会占用资源并降低性能。幸运的是,许多工具可以让 Windows 保持平稳运行。

下载
来源:php中文网
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系admin@php.cn
最新问题
开源免费商场系统广告
热门教程
更多>
最新下载
更多>
网站特效
网站源码
网站素材
前端模板
关于我们 免责申明 意见反馈 讲师合作 广告合作 最新更新 English
php中文网:公益在线php培训,帮助PHP学习者快速成长!
关注服务号 技术交流群
PHP中文网订阅号
每天精选资源文章推送
PHP中文网APP
随时随地碎片化学习

Copyright 2014-2025 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号