
在asyncio异步编程中,我们经常需要同时运行多个并发任务。asyncio.gather是一个常用的工具,它能够并发地运行多个协程,并等待它们全部完成。然而,当某些任务可能因为等待外部事件(如网络数据、消息队列消息)而长时间阻塞,甚至无限期地不返回时,asyncio.gather的默认行为就显得力不从心了。它会一直等待所有任务完成,导致整个程序无法在预设的时间内退出,即使我们通过某种机制(如设置全局标志)尝试通知任务停止,如果任务内部的await操作本身是阻塞的,任务也无法及时响应停止信号。
为了解决上述问题,asyncio提供了更为灵活的asyncio.wait方法。与asyncio.gather不同,asyncio.wait允许我们为一组并发任务设置一个整体的超时时间。它不会等待所有任务完成,而是在达到指定超时时间后立即返回,并告知哪些任务已完成,哪些仍在等待中。
asyncio.wait函数的基本签名如下: asyncio.wait(aws, *, timeout=None, return_when=ALL_COMPLETED)
asyncio.wait返回两个集合:done和pending。
以下是一个示例,展示如何使用asyncio.wait来管理带有超时限制的并发任务:
立即学习“Python免费学习笔记(深入)”;
import asyncio
import time
# 模拟长时间运行或阻塞的网络I/O任务
async def watch_task_data():
print(f"[{time.time():.2f}] watch_task_data: 启动,模拟等待数据...")
try:
# 模拟长时间等待网络数据,可能永不返回
await asyncio.sleep(100) # 模拟阻塞100秒
print(f"[{time.time():.2f}] watch_task_data: 收到数据并完成。")
except asyncio.CancelledError:
print(f"[{time.time():.2f}] watch_task_data: 任务被取消。")
except Exception as e:
print(f"[{time.time():.2f}] watch_task_data: 发生异常: {e}")
finally:
print(f"[{time.time():.2f}] watch_task_data: 结束。")
async def watch_task_news():
print(f"[{time.time():.2f}] watch_task_news: 启动,模拟等待新闻...")
try:
# 模拟另一个长时间等待新闻的任务
await asyncio.sleep(100) # 模拟阻塞100秒
print(f"[{time.time():.2f}] watch_task_news: 收到新闻并完成。")
except asyncio.CancelledError:
print(f"[{time.time():.2f}] watch_task_news: 任务被取消。")
except Exception as e:
print(f"[{time.time():.2f}] watch_task_news: 发生异常: {e}")
finally:
print(f"[{time.time():.2f}] watch_task_news: 结束。")
async def main():
tasks = [
watch_task_data(),
watch_task_news(),
]
print(f"[{time.time():.2f}] 主程序:开始等待任务,最长等待5秒...")
# 设置整体超时为5秒
done, pending = await asyncio.wait(tasks, timeout=5)
print(f"[{time.time():.2f}] 主程序:等待结束。已完成任务数: {len(done)}, 未完成任务数: {len(pending)}")
# 1. 处理已完成的任务 (done 集合)
# 这些任务可能已正常完成,也可能在超时前抛出异常
for task in done:
try:
result = task.result() # 获取任务结果,如果任务抛出异常,这里会重新抛出
print(f"[{time.time():.2f}] 已完成任务结果: {result if result is not None else '无'}")
except asyncio.CancelledError:
# 理论上,done集合中的任务不应该被取消,除非在wait返回前被外部取消
print(f"[{time.time():.2f}] 已完成任务被取消 (异常情况)")
except Exception as e:
print(f"[{time.time():.2f}] 已完成任务发生异常: {e}")
# 2. 处理未完成的任务 (pending 集合)
# 这些任务在超时时仍未完成,通常需要显式取消它们以释放资源
for task in pending:
print(f"[{time.time():.2f}] 主程序:正在取消未完成任务: {task.get_name() if hasattr(task, 'get_name') else task}")
task.cancel() # 发送取消信号
try:
# 推荐等待任务真正结束,以确保任务有机会执行清理逻辑
await task
except asyncio.CancelledError:
print(f"[{time.time():.2f}] 未完成任务 {task.get_name() if hasattr(task, 'get_name') else task} 已确认取消。")
except Exception as e:
print(f"[{time.time():.2f}] 取消未完成任务 {task.get_name() if hasattr(task, 'get_name') else task} 时发生异常: {e}")
print(f"[{time.time():.2f}] 主程序:所有任务处理完毕。")
if __name__ == "__main__":
asyncio.run(main())运行上述代码,你会发现尽管watch_task_data和watch_task_news内部模拟了100秒的阻塞,但整个main函数会在大约5秒后退出,并且会打印出任务被取消的信息。
在使用asyncio.wait后,正确处理done和pending集合至关重要:
处理 done 集合: 对于done集合中的每个任务,你可以调用task.result()来获取任务的返回值。如果任务在执行过程中抛出了异常,task.result()会重新抛出该异常,因此需要使用try...except块来捕获并处理。
处理 pending 集合: pending集合中的任务是在超时时仍未完成的任务。为了避免资源泄露或程序僵死,通常需要显式地取消这些任务。
除了asyncio.wait,asyncio.wait_for也是一个有用的工具,它用于为单个协程或Future设置超时。如果指定的协程在超时时间内没有完成,asyncio.wait_for会取消该协程并抛出asyncio.TimeoutError。
示例:
async def limited_task():
try:
await asyncio.wait_for(some_long_running_coroutine(), timeout=10)
print("limited_task: 任务在10秒内完成。")
except asyncio.TimeoutError:
print("limited_task: 任务超时。")asyncio.wait_for适用于只需要对特定单个任务进行超时控制的场景,而asyncio.wait则更适合对一组任务进行整体超时管理。
在asyncio应用中,有效管理并发任务的生命周期,特别是处理可能无限期阻塞的任务并实施超时控制,是构建稳定、响应迅速系统的关键。asyncio.wait提供了强大的能力,允许开发者精确控制任务组的等待行为,区分已完成和未完成的任务,并通过显式取消机制优雅地终止长时间运行的任务。结合任务内部对CancelledError的妥善处理,我们可以确保即使在复杂的异步场景下,程序也能按预期及时响应并释放资源。
以上就是Python asyncio并发任务的超时控制与优雅关闭的详细内容,更多请关注php中文网其它相关文章!
每个人都需要一台速度更快、更稳定的 PC。随着时间的推移,垃圾文件、旧注册表数据和不必要的后台进程会占用资源并降低性能。幸运的是,许多工具可以让 Windows 保持平稳运行。
Copyright 2014-2025 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号