
在 asyncio 应用中,asyncio.gather(*coros) 是一个常用的工具,用于并发运行多个协程或任务,并等待它们全部完成。然而,当这些任务中包含长时间阻塞的I/O操作(例如,等待网络消息但消息不频繁)时,gather 会无限期地等待下去,即使您希望程序在一定时间后停止。
考虑以下场景:
import asyncio
stop = False
async def watch_task1(client):
while not stop:
# 假设 client.ws.get_data() 可能长时间没有数据返回
await client.ws.get_data()
print("Task 1 received data")
async def watch_task2(client):
while not stop:
# 假设 client.ws.get_news() 也可能长时间没有数据返回
await client.ws.get_news()
print("Task 2 received news")
async def stop_after(delay_seconds):
global stop
await asyncio.sleep(delay_seconds)
print(f"Stopping after {delay_seconds} seconds...")
stop = True
class MockClient:
async def get_data(self):
await asyncio.sleep(100) # 模拟长时间阻塞
async def get_news(self):
await asyncio.sleep(100) # 模拟长时间阻塞
async def sleep(self, delay):
await asyncio.sleep(delay)
async def main_gather():
client = MockClient()
tasks = [
watch_task1(client),
watch_task2(client),
stop_after(5), # 尝试在5秒后停止
]
try:
# 使用 gather,即使 stop 变为 True,阻塞的 get_data/get_news 仍会阻止 gather 完成
await asyncio.gather(*tasks, return_exceptions=True)
except Exception as e:
print(f"An exception occurred: {e}")
print("Main gather finished.")
# 运行 main_gather() 会发现程序在5秒后并不会立即停止,而是会等待 get_data/get_news 结束在这个例子中,即使 stop_after 在5秒后将 stop 标志设置为 True,watch_task1 和 watch_task2 中的 await client.ws.get_data() 或 await client.ws.get_news() 仍然可能处于阻塞状态,导致 asyncio.gather 无法按时完成。
为了解决上述问题,asyncio 提供了更灵活的等待机制:asyncio.wait。它允许您设置一个总体的超时时间,并在超时后返回已完成和未完成的任务集。
立即学习“Python免费学习笔记(深入)”;
asyncio.wait(aws, *, timeout=None, return_when=ALL_COMPLETED) 函数用于并发运行 aws(一个可等待对象集合),并等待它们中的一部分或全部完成。
asyncio.wait 返回两个集合:done(已完成的任务)和 pending(未完成的任务)。
import asyncio
stop = False # 这是一个共享状态,用于控制协程的内部循环
async def watch_task1(client):
try:
while not stop:
print("Task 1: Waiting for data...")
await client.ws.get_data() # 可能会阻塞
print("Task 1: Data received.")
except asyncio.CancelledError:
print("Task 1: Cancelled.")
finally:
print("Task 1: Exiting.")
async def watch_task2(client):
try:
while not stop:
print("Task 2: Waiting for news...")
await client.ws.get_news() # 可能会阻塞
print("Task 2: News received.")
except asyncio.CancelledError:
print("Task 2: Cancelled.")
finally:
print("Task 2: Exiting.")
# MockClient 保持不变
class MockClient:
async def get_data(self):
# 模拟长时间阻塞,但为了演示,将其缩短
await asyncio.sleep(5)
async def get_news(self):
# 模拟长时间阻塞
await asyncio.sleep(5)
async def sleep(self, delay):
await asyncio.sleep(delay)
async def main_wait_timeout():
client = MockClient()
tasks_to_run = [
asyncio.create_task(watch_task1(client)), # 显式创建任务
asyncio.create_task(watch_task2(client)),
]
print("Starting tasks with a 3-second timeout...")
# 设置一个全局超时,例如3秒
done, pending = await asyncio.wait(tasks_to_run, timeout=3, return_when=asyncio.ALL_COMPLETED)
print("\n--- After asyncio.wait ---")
print(f"Completed tasks ({len(done)}):")
for task in done:
try:
# 获取任务结果,如果任务抛出异常,这里会重新抛出
result = task.result()
print(f" Task finished successfully: {task.get_name()}, Result: {result}")
except asyncio.CancelledError:
print(f" Task {task.get_name()} was cancelled (expected for pending tasks).")
except Exception as e:
print(f" Task {task.get_name()} raised an exception: {e}")
print(f"\nPending tasks ({len(pending)}):")
for task in pending:
print(f" Task {task.get_name()} is still pending. Cancelling...")
task.cancel() # 取消未完成的任务
try:
await task # 等待任务真正结束,以便其处理 CancelledError
except asyncio.CancelledError:
print(f" Task {task.get_name()} successfully cancelled and cleaned up.")
except Exception as e:
print(f" Task {task.get_name()} raised an exception during cancellation cleanup: {e}")
print("Main wait_timeout finished.")
# 运行主函数
if __name__ == "__main__":
asyncio.run(main_wait_timeout())代码解释:
如果只需要为单个协程设置超时,可以使用 asyncio.wait_for(aw, timeout)。
async def example_wait_for():
client = MockClient()
try:
print("Attempting to get data with a 2-second timeout...")
# watch_task1 内部的循环会因为超时而中断
await asyncio.wait_for(watch_task1(client), timeout=2)
print("watch_task1 finished within timeout.")
except asyncio.TimeoutError:
print("watch_task1 timed out!")
except Exception as e:
print(f"watch_task1 raised an unexpected exception: {e}")
print("Example wait_for finished.")
# 如果在 main_wait_timeout 之后运行
# asyncio.run(example_wait_for())asyncio.wait_for 会在超时时抛出 asyncio.TimeoutError。如果被包装的协程内部没有处理 CancelledError,那么在 TimeoutError 抛出后,被包装的协程实际上可能还在后台运行,直到其下一个 await 点。因此,在使用 wait_for 时,被包装的协程也应该能够响应取消。
在 asyncio 应用程序中,有效管理和终止长时间运行或可能阻塞的任务至关重要。asyncio.wait 提供了一个强大的机制,允许您在指定超时后获取已完成和未完成的任务,并通过 task.cancel() 优雅地终止未完成的任务。结合任务内部对 asyncio.CancelledError 的处理,您可以构建出更加健壮、响应迅速的异步应用程序。记住,正确的取消处理和资源清理是编写高质量 asyncio 代码的关键。
以上就是Python Asyncio:优雅地管理与终止长时间运行的任务的详细内容,更多请关注php中文网其它相关文章!
每个人都需要一台速度更快、更稳定的 PC。随着时间的推移,垃圾文件、旧注册表数据和不必要的后台进程会占用资源并降低性能。幸运的是,许多工具可以让 Windows 保持平稳运行。
Copyright 2014-2025 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号