
本文深入探讨了在Python asyncio中调度嵌套异步函数时遇到的并发挑战。通过分析传统`await`操作的阻塞特性,揭示了其在复杂流处理场景中的局限性。文章提出并详细阐述了基于`asyncio.Queue`和`asyncio.Event`的生产者-消费者模式,作为实现任务间解耦和真正并发执行的有效策略,从而显著提升异步应用的响应性和效率。
在异步编程中,我们经常需要处理数据流,其中一个任务负责生成数据,另一个任务负责处理数据。Python的asyncio库提供了强大的工具来构建并发应用程序,但在调度嵌套异步函数时,如果不正确地理解await关键字的行为,可能会导致程序并非按预期并发执行,而是串行阻塞。
考虑一个常见的场景:我们有一个字符流生成器,它逐步产生字符;一个句子生成器,它从字符流中收集字符并形成完整的句子;以及一个句子处理器,它对每个句子执行耗时操作。
以下是初始实现的代码结构:
import asyncio
async def stream():
char_string = "Hi. Hello. Hello."
for char in char_string:
await asyncio.sleep(0.1) # 模拟耗时操作
print("got char:", char)
yield char
async def sentences_generator():
sentence = ""
async for char in stream():
sentence += char
if char in [".", "!", "?"]:
print("got sentence: ", sentence)
yield sentence
sentence = ""
async def process_sentence(sentence: str):
print("waiting for processing sentence: ", sentence)
await asyncio.sleep(len(sentence)*0.1) # 模拟耗时操作
print("sentence processed!")
async def main():
i = 0
async for sentence in sentences_generator():
print("processing sentence: ", i)
await process_sentence(sentence) # 这里的await是关键
i += 1
asyncio.run(main())运行上述代码,其输出大致如下:
got char: H got char: i got char: . got sentence: Hi. processing sentence: 0 waiting for processing sentence: Hi. sentence processed! got char: got char: H got char: e got char: l got char: l got char: o got char: . got sentence: Hello. processing sentence: 1 waiting for processing sentence: Hello. sentence processed!
从输出可以看出,当process_sentence函数被await时,main协程会暂停,直到process_sentence完全执行完毕。这意味着在process_sentence处理第一个句子期间,stream和sentences_generator无法继续生成新的字符和句子。这并非我们期望的并发行为,我们希望在处理一个句子的同时,上游的字符流能够继续生成,从而提高整体吞吐量。
造成这种现象的根本原因在于await关键字的语义。当一个协程await另一个协程时,它会暂停自身的执行,并将控制权交给被await的协程。只有当被await的协程完成或自身也await了其他操作时,控制权才可能回到调用者。因此,在上述例子中,main函数中的await process_sentence(sentence)会完全阻塞main函数,直到当前句子处理完毕,才能继续从sentences_generator中获取下一个句子。
为了实现真正的并发,即在process_sentence处理句子的同时,sentences_generator和stream能够继续生成数据,我们可以采用经典的生产者-消费者模式。
在这种模式中:
asyncio提供了asyncio.Queue来实现这种异步安全的共享队列。此外,为了优雅地处理生产者完成后的消费者关闭问题,我们还可以引入asyncio.Event来发出生产者完成的信号。
以下是使用生产者-消费者模式重构后的代码:
import asyncio
async def stream():
char_string = "Hi. Hello. Thank you." # 更改了字符串以展示更长的流
for char in char_string:
await asyncio.sleep(0.1)
print("got char:", char)
yield char
async def sentences_generator(q: asyncio.Queue[str], flag: asyncio.Event):
"""
生产者:从字符流生成句子,并放入队列。
当字符流结束时,设置Event标志通知消费者。
"""
sentence = ""
async for char in stream():
sentence += char
if char in [".", "!", "?"]:
print("got sentence: ", sentence)
await q.put(sentence) # 将生成的句子放入队列
sentence = ""
flag.set() # 生产者完成所有句子的生成,设置Event
async def process_sentence(q: asyncio.Queue[str], flag: asyncio.Event):
"""
消费者:从队列中取出句子进行处理。
当队列为空且生产者已完成时,消费者停止。
"""
global i # 用于计数处理的句子
while True:
# 检查是否应该停止:队列为空且生产者已完成
if q.empty() and flag.is_set():
break
try:
# 尝试从队列获取项,如果队列为空,会等待
item = await asyncio.wait_for(q.get(), timeout=1.0) # 增加超时,避免无限等待
except asyncio.TimeoutError:
# 如果超时且生产者已完成,则退出
if flag.is_set():
break
continue # 否则继续等待
print("processing sentence: ", i)
print("waiting for processing sentence: ", item)
await asyncio.sleep(len(item) * 0.1)
print("sentence processed!")
i += 1
async def main():
global i
i = 1 # 初始化句子计数器
event = asyncio.Event() # 用于生产者通知消费者完成
queue = asyncio.Queue[str]() # 共享队列
# 创建生产者和消费者任务
producer_task = sentences_generator(queue, event)
consumer_task = process_sentence(queue, event)
# 并发运行生产者和消费者任务
await asyncio.gather(producer_task, consumer_task)
asyncio.run(main())代码解析:
sentences_generator (生产者):
process_sentence (消费者):
main 函数:
预期输出(部分):
got char: H got char: i got char: . got sentence: Hi. got char: got char: H got char: e got char: l got char: l got char: o got char: . got sentence: Hello. processing sentence: 1 waiting for processing sentence: Hi. got char: got char: T got char: h got char: a got char: n got char: k got char: got char: y got char: o got char: u got char: . got sentence: Thank you. sentence processed! processing sentence: 2 waiting for processing sentence: Hello. sentence processed! processing sentence: 3 waiting for processing sentence: Thank you. sentence processed!
从新的输出中可以看到,当process_sentence正在处理"Hi."时,stream和sentences_generator已经继续生成了"Hello."甚至"Thank you."。这种交错的输出表明生产者和消费者正在并发地工作,显著提高了程序的效率和响应性。
通过采用生产者-消费者模式并结合asyncio.Queue和asyncio.Event,我们可以有效地管理异步任务间的依赖关系,实现更高效、更具响应性的并发数据流处理。这对于构建复杂的异步系统,如网络服务、数据管道等,是至关重要的技术。
以上就是优化Asyncio嵌套函数调度:使用生产者-消费者模式实现并发流处理的详细内容,更多请关注php中文网其它相关文章!
每个人都需要一台速度更快、更稳定的 PC。随着时间的推移,垃圾文件、旧注册表数据和不必要的后台进程会占用资源并降低性能。幸运的是,许多工具可以让 Windows 保持平稳运行。
Copyright 2014-2025 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号