Python Asyncio:优雅地管理与终止长时间运行的任务

霞舞
发布: 2025-07-22 14:40:14
原创
236人浏览过

python asyncio:优雅地管理与终止长时间运行的任务

本文旨在探讨在Python asyncio异步编程中,如何有效管理和终止可能长时间阻塞的任务,以避免程序无限期等待。我们将重点介绍 asyncio.wait 和 asyncio.wait_for 这两个关键工具,它们提供了设置任务超时机制的能力。通过详细的代码示例和最佳实践,您将学会如何确保异步应用程序在预设时间内响应并关闭,即使某些I/O操作不活跃。

1. 问题背景:asyncio.gather 的局限性

在 asyncio 应用中,asyncio.gather(*coros) 是一个常用的工具,用于并发运行多个协程或任务,并等待它们全部完成。然而,当这些任务中包含长时间阻塞的I/O操作(例如,等待网络消息但消息不频繁)时,gather 会无限期地等待下去,即使您希望程序在一定时间后停止。

考虑以下场景:

import asyncio

stop = False

async def watch_task1(client):
    while not stop:
        # 假设 client.ws.get_data() 可能长时间没有数据返回
        await client.ws.get_data()
        print("Task 1 received data")

async def watch_task2(client):
    while not stop:
        # 假设 client.ws.get_news() 也可能长时间没有数据返回
        await client.ws.get_news()
        print("Task 2 received news")

async def stop_after(delay_seconds):
    global stop
    await asyncio.sleep(delay_seconds)
    print(f"Stopping after {delay_seconds} seconds...")
    stop = True

class MockClient:
    async def get_data(self):
        await asyncio.sleep(100) # 模拟长时间阻塞
    async def get_news(self):
        await asyncio.sleep(100) # 模拟长时间阻塞
    async def sleep(self, delay):
        await asyncio.sleep(delay)

async def main_gather():
    client = MockClient()
    tasks = [
        watch_task1(client),
        watch_task2(client),
        stop_after(5), # 尝试在5秒后停止
    ]

    try:
        # 使用 gather,即使 stop 变为 True,阻塞的 get_data/get_news 仍会阻止 gather 完成
        await asyncio.gather(*tasks, return_exceptions=True)
    except Exception as e:
        print(f"An exception occurred: {e}")
    print("Main gather finished.")

# 运行 main_gather() 会发现程序在5秒后并不会立即停止,而是会等待 get_data/get_news 结束
登录后复制

在这个例子中,即使 stop_after 在5秒后将 stop 标志设置为 True,watch_task1 和 watch_task2 中的 await client.ws.get_data() 或 await client.ws.get_news() 仍然可能处于阻塞状态,导致 asyncio.gather 无法按时完成。

2. 解决方案:使用 asyncio.wait 进行超时控制

为了解决上述问题,asyncio 提供了更灵活的等待机制:asyncio.wait。它允许您设置一个总体的超时时间,并在超时后返回已完成和未完成的任务集。

立即学习Python免费学习笔记(深入)”;

2.1 asyncio.wait 概述

asyncio.wait(aws, *, timeout=None, return_when=ALL_COMPLETED) 函数用于并发运行 aws(一个可等待对象集合),并等待它们中的一部分或全部完成。

一览运营宝
一览运营宝

一览“运营宝”是一款搭载AIGC的视频创作赋能及变现工具,由深耕视频行业18年的一览科技研发推出。

一览运营宝 41
查看详情 一览运营宝
  • aws: 一个由协程、任务或 Future 组成的集合。
  • timeout: 可选参数,指定等待的最大秒数。如果超时,函数会立即返回。
  • return_when: 可选参数,定义何时返回。常用值包括:
    • asyncio.ALL_COMPLETED (默认): 等待所有任务完成。
    • asyncio.FIRST_COMPLETED: 只要有一个任务完成就返回。
    • asyncio.FIRST_EXCEPTION: 只要有一个任务抛出异常就返回。

asyncio.wait 返回两个集合:done(已完成的任务)和 pending(未完成的任务)。

2.2 实现超时停止逻辑

import asyncio

stop = False # 这是一个共享状态,用于控制协程的内部循环

async def watch_task1(client):
    try:
        while not stop:
            print("Task 1: Waiting for data...")
            await client.ws.get_data() # 可能会阻塞
            print("Task 1: Data received.")
    except asyncio.CancelledError:
        print("Task 1: Cancelled.")
    finally:
        print("Task 1: Exiting.")

async def watch_task2(client):
    try:
        while not stop:
            print("Task 2: Waiting for news...")
            await client.ws.get_news() # 可能会阻塞
            print("Task 2: News received.")
    except asyncio.CancelledError:
        print("Task 2: Cancelled.")
    finally:
        print("Task 2: Exiting.")

# MockClient 保持不变
class MockClient:
    async def get_data(self):
        # 模拟长时间阻塞,但为了演示,将其缩短
        await asyncio.sleep(5)
    async def get_news(self):
        # 模拟长时间阻塞
        await asyncio.sleep(5)
    async def sleep(self, delay):
        await asyncio.sleep(delay)

async def main_wait_timeout():
    client = MockClient()
    tasks_to_run = [
        asyncio.create_task(watch_task1(client)), # 显式创建任务
        asyncio.create_task(watch_task2(client)),
    ]

    print("Starting tasks with a 3-second timeout...")
    # 设置一个全局超时,例如3秒
    done, pending = await asyncio.wait(tasks_to_run, timeout=3, return_when=asyncio.ALL_COMPLETED)

    print("\n--- After asyncio.wait ---")
    print(f"Completed tasks ({len(done)}):")
    for task in done:
        try:
            # 获取任务结果,如果任务抛出异常,这里会重新抛出
            result = task.result()
            print(f"  Task finished successfully: {task.get_name()}, Result: {result}")
        except asyncio.CancelledError:
            print(f"  Task {task.get_name()} was cancelled (expected for pending tasks).")
        except Exception as e:
            print(f"  Task {task.get_name()} raised an exception: {e}")

    print(f"\nPending tasks ({len(pending)}):")
    for task in pending:
        print(f"  Task {task.get_name()} is still pending. Cancelling...")
        task.cancel() # 取消未完成的任务
        try:
            await task # 等待任务真正结束,以便其处理 CancelledError
        except asyncio.CancelledError:
            print(f"  Task {task.get_name()} successfully cancelled and cleaned up.")
        except Exception as e:
            print(f"  Task {task.get_name()} raised an exception during cancellation cleanup: {e}")

    print("Main wait_timeout finished.")

# 运行主函数
if __name__ == "__main__":
    asyncio.run(main_wait_timeout())
登录后复制

代码解释:

  1. 显式创建任务: 在 main_wait_timeout 中,我们使用 asyncio.create_task() 将协程包装成任务。这是推荐的做法,因为 asyncio.wait 接受任务或 Future 对象。
  2. 设置超时: await asyncio.wait(tasks_to_run, timeout=3) 会在3秒后返回,无论 watch_task 是否完成其内部的 await client.ws.get_data()。
  3. 处理 done 集合: 遍历 done 集合中的任务,通过 task.result() 获取其结果或捕获可能抛出的异常。
  4. 处理 pending 集合: 遍历 pending 集合中的任务。这些任务在超时时仍未完成。为了确保资源被释放,必须对它们调用 task.cancel()。
  5. 协程中的 CancelledError: 当一个任务被 cancel() 时,它会向任务内部抛出一个 asyncio.CancelledError。任务内部的协程应该捕获这个异常,并执行必要的清理工作(例如关闭文件句柄、网络连接等)。在上述 watch_task 示例中,我们添加了 try...except asyncio.CancelledError...finally 块来演示这一点。
  6. 等待任务真正结束: 在 task.cancel() 之后,最好 await task 一下,确保任务有时间处理 CancelledError 并完成其清理逻辑。

3. 替代方案:asyncio.wait_for

如果只需要为单个协程设置超时,可以使用 asyncio.wait_for(aw, timeout)。

async def example_wait_for():
    client = MockClient()
    try:
        print("Attempting to get data with a 2-second timeout...")
        # watch_task1 内部的循环会因为超时而中断
        await asyncio.wait_for(watch_task1(client), timeout=2)
        print("watch_task1 finished within timeout.")
    except asyncio.TimeoutError:
        print("watch_task1 timed out!")
    except Exception as e:
        print(f"watch_task1 raised an unexpected exception: {e}")
    print("Example wait_for finished.")

# 如果在 main_wait_timeout 之后运行
# asyncio.run(example_wait_for())
登录后复制

asyncio.wait_for 会在超时时抛出 asyncio.TimeoutError。如果被包装的协程内部没有处理 CancelledError,那么在 TimeoutError 抛出后,被包装的协程实际上可能还在后台运行,直到其下一个 await 点。因此,在使用 wait_for 时,被包装的协程也应该能够响应取消。

4. 注意事项与最佳实践

  • 任务取消的响应: 这是 asyncio 中非常重要的一点。当一个任务被 cancel() 时,它不会立即停止,而是在其下一个 await 点抛出 asyncio.CancelledError。任务的编写者必须在协程内部捕获 CancelledError,并执行必要的清理工作。如果协程不处理 CancelledError,则在取消后可能会继续运行,直到自然结束或遇到下一个 await 点。
  • 资源清理: 在 finally 块中进行资源清理是良好的实践,无论任务是正常完成还是被取消。
  • 选择 wait 还是 wait_for:
    • asyncio.wait_for 适用于为单个协程设置超时,并在超时时抛出 TimeoutError。
    • asyncio.wait 适用于管理一组协程/任务,并提供更细粒度的控制,如 return_when 参数,以及返回已完成和未完成任务的集合,方便后续处理(如取消未完成的任务)。
  • return_exceptions=True 与 task.result():
    • asyncio.gather 的 return_exceptions=True 会将协程中抛出的异常作为结果返回,而不是直接抛出。
    • 对于 asyncio.wait 返回的 done 任务,调用 task.result() 会重新抛出任务内部发生的任何异常,或者返回其结果。这是检查任务是否成功完成的推荐方式。

5. 总结

在 asyncio 应用程序中,有效管理和终止长时间运行或可能阻塞的任务至关重要。asyncio.wait 提供了一个强大的机制,允许您在指定超时后获取已完成和未完成的任务,并通过 task.cancel() 优雅地终止未完成的任务。结合任务内部对 asyncio.CancelledError 的处理,您可以构建出更加健壮、响应迅速的异步应用程序。记住,正确的取消处理和资源清理是编写高质量 asyncio 代码的关键。

以上就是Python Asyncio:优雅地管理与终止长时间运行的任务的详细内容,更多请关注php中文网其它相关文章!

最佳 Windows 性能的顶级免费优化软件
最佳 Windows 性能的顶级免费优化软件

每个人都需要一台速度更快、更稳定的 PC。随着时间的推移,垃圾文件、旧注册表数据和不必要的后台进程会占用资源并降低性能。幸运的是,许多工具可以让 Windows 保持平稳运行。

下载
来源:php中文网
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系admin@php.cn
最新问题
开源免费商场系统广告
热门教程
更多>
最新下载
更多>
网站特效
网站源码
网站素材
前端模板
关于我们 免责申明 举报中心 意见反馈 讲师合作 广告合作 最新更新 English
php中文网:公益在线php培训,帮助PHP学习者快速成长!
关注服务号 技术交流群
PHP中文网订阅号
每天精选资源文章推送
PHP中文网APP
随时随地碎片化学习

Copyright 2014-2025 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号