
在Asyncio应用中,我们经常需要运行多个并发任务。然而,当某些任务内部包含长时间的I/O等待(如client.ws.get_data()或client.ws.get_news()),并且这些等待可能在很长时间内都没有数据返回时,即使设置了全局的停止标志,这些任务也无法及时响应,导致程序无法在预期时间内退出。传统的asyncio.gather会等待所有任务完成,这在任务可能无限期阻塞的情况下并不适用。为了解决这一问题,Asyncio提供了更精细的任务管理和超时机制。
在上述场景中,尽管我们尝试通过stop_after任务设置stop标志来通知watch_tasks停止,但如果await client.ws.get_data()或await client.ws.get_news()处于阻塞状态,并且在stop标志设置为True之后仍未收到任何数据,那么这些await调用将继续等待,导致任务无法检查stop标志并退出循环。这使得程序无法在设定的超时时间(例如60秒)内停止。
asyncio.wait_for(fut, timeout) 函数可以用于为单个可等待对象(如协程或Future)设置一个超时时间。如果在指定时间内协程没有完成,它将抛出asyncio.TimeoutError异常。这使得我们可以为每个可能长时间阻塞的I/O操作设置独立的超时。
示例:
import asyncio
async def watch_task1():
try:
while True:
# 假设 client.ws.get_data() 是一个可能阻塞的IO操作
# 为此操作设置一个短期的超时,例如5秒
data = await asyncio.wait_for(client.ws.get_data(), timeout=5)
print(f"Received data: {data}")
except asyncio.TimeoutError:
print("watch_task1: get_data timed out, retrying...")
# 可以在这里选择重试、记录日志或退出
except asyncio.CancelledError:
print("watch_task1 cancelled.")
except Exception as e:
print(f"watch_task1 error: {e}")
async def main():
# 启动任务,并在外部处理其可能的超时
task1 = asyncio.create_task(watch_task1())
try:
await asyncio.sleep(60) # 主程序运行60秒
finally:
task1.cancel() # 取消任务
await task1 # 等待任务真正结束注意事项:
asyncio.wait(aws, *, timeout=None, return_when=ALL_COMPLETED) 是一个更强大的工具,它允许我们等待一组可等待对象,并提供一个全局的超时机制。它返回两个集合:done(已完成的任务)和pending(未完成的任务)。
关键参数:
示例:
import asyncio
import time
# 模拟一个可能长时间阻塞的WebSocket客户端
class MockWebSocketClient:
async def get_data(self):
print("get_data: Waiting for data...")
await asyncio.sleep(100) # 模拟长时间阻塞
return "Some data"
async def get_news(self):
print("get_news: Waiting for news...")
await asyncio.sleep(120) # 模拟更长时间阻塞
return "Some news"
# 模拟全局停止标志
stop = False
client = MockWebSocketClient()
async def watch_task1():
global stop
while not stop:
try:
# 这里的阻塞由 asyncio.wait 的外部超时控制
data = await client.ws.get_data()
print(f"Watch Task 1 received: {data}")
except asyncio.CancelledError:
print("Watch Task 1 was cancelled.")
break
except Exception as e:
print(f"Watch Task 1 error: {e}")
break
print("Watch Task 1 exiting.")
async def watch_task2():
global stop
while not stop:
try:
news = await client.ws.get_news()
print(f"Watch Task 2 received: {news}")
except asyncio.CancelledError:
print("Watch Task 2 was cancelled.")
break
except Exception as e:
print(f"Watch Task 2 error: {e}")
break
print("Watch Task 2 exiting.")
async def stop_after_task(delay):
global stop
print(f"Stop After Task: Will stop after {delay} seconds.")
await asyncio.sleep(delay)
stop = True
print("Stop After Task: Global stop flag set to True.")
async def main():
global stop
stop = False # 重置停止标志
client.ws = MockWebSocketClient() # 实例化模拟客户端
tasks_to_run = [
asyncio.create_task(watch_task1(), name="WatchTask1"),
asyncio.create_task(watch_task2(), name="WatchTask2"),
asyncio.create_task(stop_after_task(60), name="StopAfterTask"),
]
print("Main: Starting tasks with a 60-second timeout...")
# 使用 asyncio.wait 设置整体超时
done, waiting = await asyncio.wait(tasks_to_run, timeout=60, return_when=asyncio.ALL_COMPLETED)
# 注意:ALL_COMPLETED在这里的意思是,要么所有任务完成,要么超时时间到。
# 如果超时时间到,则返回当前已完成的任务和未完成的任务。
print(f"\nMain: Operation completed after {60} seconds.")
print(f"Main: Number of done tasks: {len(done)}")
print(f"Main: Number of waiting tasks: {len(waiting)}")
# 处理已完成的任务
for task in done:
try:
# 获取任务结果,如果任务失败会抛出异常
result = task.result()
print(f"Main: Task '{task.get_name()}' completed with result: {result if result is not None else 'None'}")
except asyncio.CancelledError:
print(f"Main: Task '{task.get_name()}' was cancelled (expected if it was the stop_after_task).")
except Exception as e:
print(f"Main: Task '{task.get_name()}' raised an exception: {e}")
# 处理未完成的任务:通常需要取消它们以释放资源
if waiting:
print("\nMain: Cancelling remaining waiting tasks...")
for task in waiting:
task.cancel()
try:
# 等待任务真正被取消并结束,捕获 CancelledError
await task
except asyncio.CancelledError:
print(f"Main: Task '{task.get_name()}' successfully cancelled.")
except Exception as e:
print(f"Main: Task '{task.get_name()}' ended with error after cancellation: {e}")
else:
print("\nMain: All tasks completed within the timeout.")
print("\nMain: All tasks processed. Exiting.")
if __name__ == "__main__":
asyncio.run(main())代码解析与注意事项:
通过合理运用asyncio.wait_for和asyncio.wait,我们可以构建出更加健壮和可控的Asyncio应用程序,确保即使面对长时间阻塞的任务,程序也能在预设的时间内优雅地完成或终止。
以上就是Asyncio任务超时控制与优雅终止策略的详细内容,更多请关注php中文网其它相关文章!
每个人都需要一台速度更快、更稳定的 PC。随着时间的推移,垃圾文件、旧注册表数据和不必要的后台进程会占用资源并降低性能。幸运的是,许多工具可以让 Windows 保持平稳运行。
Copyright 2014-2025 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号