
在开发高性能、高并发的应用程序时,python的 asyncio 库提供了一种强大的异步编程范式。特别是在处理需要持续生成和调度任务的场景,例如长轮询服务器、事件驱动系统或数据流处理,如何有效地将这些任务添加到事件循环并确保它们能够并发执行,是一个常见的挑战。本文将深入探讨如何从一个任务生成器中,以异步、非阻塞的方式创建并执行任务,避免因等待单个任务完成而阻塞整个事件循环。
考虑以下场景:我们有一个任务生成器,它会不断地产生新的任务参数。我们希望为每个参数创建一个异步任务,并将其提交给事件循环,但又不希望主逻辑(即生成任务的部分)停下来等待这些任务完成。
最初的尝试可能如下:
import asyncio, random
async def wrapper(word: str):
print(f"Executing task for: {word}")
await asyncio.sleep(1) # 模拟耗时操作
print(f"Finished task for: {word}")
def generator():
abc = 'abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ'
while True:
yield random.choice(abc)
async def manager():
loop = asyncio.get_event_loop()
for letter in generator():
loop.create_task(wrapper(letter)) # 创建任务,但不等待
# 问题在于:这里没有让出控制权,事件循环无法调度其他任务
async def main():
await manager() # manager是一个无限循环,此处会阻塞
if __name__ == '__main__':
# asyncio.run(manager()) # 这样调用会因为manager的无限循环而阻塞
# 需要一种方式让manager能够持续创建任务,同时让其他任务运行
pass # 暂时不运行,因为会阻塞上述代码的问题在于,manager 协程内部的 for 循环会无限快速地运行,不断地调用 loop.create_task()。虽然 create_task 将 wrapper 协程包装成一个任务并提交给事件循环,但 manager 协身本身并没有任何 await 语句,这意味着它从不主动让出控制权给事件循环。结果是,事件循环没有机会去执行那些被创建的 wrapper 任务,因为 manager 始终占用着CPU。
要解决这个问题,我们需要理解 asyncio 的核心工作原理:
立即学习“Python免费学习笔记(深入)”;
最直接的解决方案是在 manager 协程的循环内部,显式地让出控制权。await asyncio.sleep(0) 是一个常用的技巧,它会立即暂停当前协程,并将控制权交还给事件循环。由于 sleep 的时间是0,事件循环会立即检查是否有其他任务准备就绪,并在下一个循环迭代中重新调度 manager 协程。
import asyncio, random
async def wrapper(word: str):
"""模拟一个耗时操作的异步任务"""
print(f"Executing task for: {word}")
await asyncio.sleep(1) # 任务模拟
print(f"Finished task for: {word}")
def generator():
"""一个无限生成随机字母的生成器"""
abc = 'abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ'
while True:
yield random.choice(abc)
async def manager_with_yield():
"""
负责从生成器获取任务并调度,通过await asyncio.sleep(0)显式让出控制权。
"""
loop = asyncio.get_event_loop()
print("Manager started, generating tasks...")
for i, letter in enumerate(generator()):
loop.create_task(wrapper(letter))
print(f"Task {i+1} created for '{letter}'")
await asyncio.sleep(0) # 关键:让出控制权,允许其他任务运行
# 实际应用中,可以根据需要增加一个短暂的等待,例如 await asyncio.sleep(0.01)
# 或者在处理一定数量任务后才让出控制权,以平衡调度开销和响应性。
if i >= 10: # 示例:限制生成任务的数量,否则会无限运行
print("Generated 10 tasks, stopping manager.")
break
async def main_with_yield():
"""主入口点,运行带有显式让出控制权的manager"""
await manager_with_yield()
# 等待所有已创建的wrapper任务完成
print("Manager finished, waiting for remaining tasks...")
await asyncio.sleep(2) # 给剩余任务一些时间完成
if __name__ == '__main__':
print("--- Running Solution 1: Explicit Yielding ---")
asyncio.run(main_with_yield())
print("--- Solution 1 Finished ---")注意事项:
Python 3.11 引入了 asyncio.TaskGroup,这是一种更现代、更结构化的并发管理方式。TaskGroup 提供了一个上下文管理器,可以在其中创建任务。它会自动管理这些任务的生命周期,并在退出上下文时等待所有在其内部创建的任务完成(或处理异常)。更重要的是,TaskGroup 在内部会自动处理任务的调度和让出控制权,使得代码更加简洁和健壮。
import asyncio, random
async def wrapper(word: str):
"""模拟一个耗时操作的异步任务"""
print(f"Executing task for: {word}")
await asyncio.sleep(1) # 任务模拟
print(f"Finished task for: {word}")
def generator():
"""一个无限生成随机字母的生成器"""
abc = 'abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ'
while True:
yield random.choice(abc)
async def manager_with_taskgroup():
"""
负责从生成器获取任务并调度,使用asyncio.TaskGroup进行结构化并发管理。
"""
print("Manager started with TaskGroup, generating tasks...")
async with asyncio.TaskGroup() as tg: # 使用TaskGroup上下文管理器
for i, letter in enumerate(generator()):
tg.create_task(wrapper(letter)) # 在TaskGroup中创建任务
print(f"Task {i+1} created for '{letter}'")
# TaskGroup在内部会处理调度和让出控制权,通常无需额外的await asyncio.sleep(0)
# 但如果生成任务的速度极快,且任务本身耗时很短,
# 偶尔添加 await asyncio.sleep(0) 仍可能优化响应性。
if i >= 10: # 示例:限制生成任务的数量
print("Generated 10 tasks, stopping manager.")
break
print("TaskGroup exited. All tasks created within it should have completed or been cancelled.")
async def main_with_taskgroup():
"""主入口点,运行带有TaskGroup的manager"""
await manager_with_taskgroup()
if __name__ == '__main__':
print("\n--- Running Solution 2: Using asyncio.TaskGroup (Python 3.11+) ---")
# 确保Python版本 >= 3.11
if hasattr(asyncio, 'TaskGroup'):
asyncio.run(main_with_taskgroup())
else:
print("Warning: asyncio.TaskGroup requires Python 3.11 or later. Skipping this example.")
print("--- Solution 2 Finished ---")TaskGroup 的优势:
版本要求: asyncio.TaskGroup 需要 Python 3.11 或更高版本。对于旧版本Python,解决方案一仍然是可行的。
结合上述两种方法,以下是一个更完整的示例,展示了如何从生成器高效地调度异步任务,并包含一些最佳实践的思考。我们优先推荐使用 TaskGroup。
import asyncio
import random
import time
async def process_item(item_id: int, data: str):
"""模拟一个异步处理任务,打印处理信息并模拟耗时"""
start_time = time.time()
print(f"[{item_id}] Processing item: '{data}'...")
await asyncio.sleep(random.uniform(0.5, 2.0)) # 模拟随机耗时
end_time = time.time()
print(f"[{item_id}] Finished item: '{data}' in {end_time - start_time:.2f}s")
def item_generator(max_items: int = 20):
"""一个生成器,生成带ID的随机数据"""
abc = 'abcdefghijklmnopqrstuvwxyz'
for i in range(1, max_items + 1):
yield i, random.choice(abc) * random.randint(3, 8) # 生成随机长度的字符串
async def task_dispatcher():
"""
任务调度器,从生成器获取数据并创建异步任务。
优先使用TaskGroup,如果不可用则回退到显式让出控制权。
"""
print("--- Task Dispatcher Started ---")
item_count = 0
if hasattr(asyncio, 'TaskGroup'):
print("Using asyncio.TaskGroup for task management.")
async with asyncio.TaskGroup() as tg:
for item_id, data in item_generator():
tg.create_task(process_item(item_id, data))
print(f"Dispatched task {item_id} for data '{data}'")
item_count += 1
# 即使使用TaskGroup,如果生成任务的速度远超任务执行速度,
# 也可以考虑在此处加入一个短暂的await,以避免内存中积压过多未开始的任务。
# 例如: if item_count % 5 == 0: await asyncio.sleep(0.01)
print(f"--- TaskGroup Finished. All {item_count} tasks completed or cancelled. ---")
else:
print("asyncio.TaskGroup not available (Python < 3.11). Falling back to explicit yield.")
loop = asyncio.get_event_loop()
for item_id, data in item_generator():
loop.create_task(process_item(item_id, data))
print(f"Dispatched task {item_id} for data '{data}'")
item_count += 1
await asyncio.sleep(0) # 显式让出控制权
print(f"--- Dispatcher Finished creating {item_count} tasks. Waiting for them to complete. ---")
# 由于是手动创建任务且没有TaskGroup等待,需要额外等待所有任务完成
await asyncio.sleep(3) # 粗略等待,实际应用中可能需要更精细的等待机制
async def main():
"""主程序入口"""
await task_dispatcher()
print("All dispatching and processing should be complete.")
if __name__ == '__main__':
asyncio.run(main())最佳实践:
在 asyncio 中从任务生成器实现高效异步并发执行的核心在于理解事件循环的协作式调度机制。仅仅通过 create_task() 创建任务不足以实现并发,关键在于主调度逻辑必须周期性地让出控制权给事件循环。
无论采用哪种方法,理解 await 的作用以及事件循环的工作原理,是构建高效、响应式 asyncio 应用程序的基础。在实际应用中,还需要结合流量控制、错误处理等机制,确保系统的稳定性和可扩展性。
以上就是Python asyncio:从任务生成器实现高效异步并发执行的原理与实践的详细内容,更多请关注php中文网其它相关文章!
每个人都需要一台速度更快、更稳定的 PC。随着时间的推移,垃圾文件、旧注册表数据和不必要的后台进程会占用资源并降低性能。幸运的是,许多工具可以让 Windows 保持平稳运行。
Copyright 2014-2025 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号