0

0

优化asyncio中嵌套异步任务的并发调度

聖光之護

聖光之護

发布时间:2025-11-27 11:37:19

|

491人浏览过

|

来源于php中文网

原创

优化asyncio中嵌套异步任务的并发调度

本文探讨了在`asyncio`中处理嵌套异步生成器时,如何通过传统`await`模式导致的串行执行问题。针对`await`的阻塞特性,文章提出并详细阐述了利用`asyncio.queue`和`asyncio.event`构建生产者-消费者模式的解决方案,从而实现任务间的解耦和真正的并发执行,显著提升异步应用的效率和响应性。

理解asyncio中的await与并发限制

在asyncio编程中,await关键字是调度协程的核心机制。当一个协程遇到await表达式时,它会暂停自身的执行,将控制权交还给事件循环,并等待被await的协程完成。一旦被await的协程完成并返回结果,原协程才会从暂停点继续执行。这种机制虽然实现了协作式多任务,但如果设计不当,也可能导致非预期的串行执行。

考虑以下场景:一个异步任务(main)需要从一个异步生成器(sentences_generator)获取数据,然后将数据传递给另一个异步任务(process_sentence)进行处理。如果main函数在每次获取到数据后,都直接await process_sentence的完成,那么在process_sentence执行期间,sentences_generator将无法继续生成新的数据。这违背了我们期望的并发处理,即当process_sentence在处理当前数据时,sentences_generator应该能够同时准备下一批数据。

以下是原始代码示例及其输出,展示了这种串行阻塞行为:

import asyncio

async def stream():
    char_string = "Hi. Hello. Hello."
    for char in char_string:
        await asyncio.sleep(0.1)  # 模拟耗时操作
        print("got char:", char)
        yield char

async def sentences_generator():
    sentence = ""
    async for char in stream():
        sentence += char
        if char in [".", "!", "?"]:
            print("got sentence: ", sentence)
            yield sentence
            sentence = ""

async def process_sentence(sentence: str):
    print("waiting for processing sentence: ", sentence)
    await asyncio.sleep(len(sentence)*0.1) # 模拟耗时处理
    print("sentence processed!")

async def main():
    i=0
    async for sentence in sentences_generator():
        print("processing sentence: ", i)
        await process_sentence(sentence) # 这里的await导致阻塞
        i += 1

# asyncio.run(main())

原始输出示例:

got char: H
got char: i
got char: .
got sentence:  Hi.
processing sentence:  0
waiting for processing sentence:  Hi.
sentence processed!
got char:  
got char: H
got char: e
got char: y
got char: .
got sentence:   Hey.
processing sentence:  1
waiting for processing sentence:   Hey.
sentence processed!
...

从输出可以看出,只有当process_sentence完全处理完一个句子后,stream和sentences_generator才能继续生成下一个字符和句子。这并不是我们期望的并发效果。

解决方案:使用asyncio.Queue实现生产者-消费者模式

为了实现真正的并发,我们需要解耦数据的生产和消费过程,使它们能够独立运行。asyncio.Queue是实现这种生产者-消费者模式的理想工具

核心思想:

魔珐星云
魔珐星云

无需昂贵GPU,一键解锁超写实/二次元等多风格3D数字人,跨端适配千万级并发的具身智能平台。

下载
  1. 生产者(Producer):一个或多个异步任务负责生成数据,并将数据放入asyncio.Queue中。
  2. 消费者(Consumer):一个或多个异步任务从asyncio.Queue中取出数据进行处理。
  3. 独立运行:生产者和消费者作为独立的协程,由asyncio事件循环调度,它们之间通过队列进行通信,互不阻塞。

此外,为了实现优雅的关闭和通知消费者数据已全部生产完毕,我们可以引入asyncio.Event。生产者在完成所有数据生产后设置Event,消费者则可以结合队列是否为空和Event状态来判断何时停止。

优化后的代码实现

我们将修改sentences_generator作为生产者,将生成的句子放入队列;process_sentence作为消费者,从队列中取出句子进行处理。main函数将负责启动这两个独立的协程。

import asyncio

# 定义全局变量用于计数,方便观察
i = 1

async def stream():
    char_string = "Hi. Hello. Thank you." # 增加一些内容以更好地展示并发
    for char in char_string:
        await asyncio.sleep(0.1) # 模拟耗时操作
        print("got char:", char)
        yield char

async def sentences_generator(q: asyncio.Queue[str], flag: asyncio.Event):
    """
    生产者协程:从字符流生成句子,并放入队列。
    当所有句子生成完毕后,设置flag通知消费者。
    """
    sentence = ""
    async for char in stream():
        sentence += char
        if char in [".", "!", "?"]:
            print("got sentence: ", sentence)
            await q.put(sentence) # 将生成的句子放入队列
            sentence = ""
    # 确保最后一个不以标点符号结尾的句子也被处理(如果需要)
    if sentence:
        print("got sentence: ", sentence)
        await q.put(sentence)
    flag.set() # 生产完毕,设置事件标志

async def process_sentence(q: asyncio.Queue[str], flag: asyncio.Event):
    """
    消费者协程:从队列中获取句子并进行处理。
    当队列为空且生产者已设置flag时,停止消费。
    """
    global i
    while True:
        # 检查是否应该停止:队列为空且生产者已完成
        if q.empty() and flag.is_set():
            break

        # 尝试从队列获取项目,如果队列为空则等待
        item = await q.get()

        print("processing sentence: ", i)
        print("waiting for processing sentence: ", item)
        await asyncio.sleep(len(item) * 0.1) # 模拟耗时处理
        print("sentence processed!")

        q.task_done() # 通知队列此任务已完成
        i += 1

async def main():
    global i
    i = 1 # 重置计数器
    event = asyncio.Event() # 用于生产者通知消费者结束
    queue = asyncio.Queue[str]() # 生产者和消费者之间的通信队列

    # 启动生产者和消费者作为独立的协程任务
    producer_task = asyncio.create_task(sentences_generator(queue, event))
    consumer_task = asyncio.create_task(process_sentence(queue, event))

    # 等待所有任务完成
    await asyncio.gather(producer_task, consumer_task)

    # 可选:等待队列中所有任务被标记为完成,确保所有数据都被处理
    await queue.join()

asyncio.run(main())

预期输出示例:

got char: H
got char: i
got char: .
got sentence:  Hi.
got char:  
got char: H
processing sentence:  1
waiting for processing sentence:  Hi.
got char: e
got char: l
got char: l
got char: o
got char: .
got sentence:   Hello.
sentence processed!
got char:  
got char: T
processing sentence:  2
waiting for processing sentence:   Hello.
got char: h
got char: a
got char: n
got char: k
got char:  
got char: y
got char: o
got char: u
got char: .
got sentence:  Thank you.
sentence processed!
processing sentence:  3
waiting for processing sentence:  Thank you.
sentence processed!

从这个输出可以看出,当process_sentence正在处理第一个句子时,stream和sentences_generator已经继续生成了后续的字符和句子,并将其放入队列。这正是我们期望的并发行为。

关键点和注意事项

  1. asyncio.Queue的作用:它提供了一个线程安全的(在asyncio中是协程安全的)FIFO队列。put()操作在队列满时会暂停,get()操作在队列空时会暂停,直到有新的数据可用。
  2. asyncio.Event的作用:它是一个简单的同步原语,用于一个协程通知另一个协程某个事件已经发生。生产者在完成所有数据生产后调用flag.set(),消费者则通过flag.is_set()来检查生产者的状态。
  3. asyncio.gather():用于并发运行多个协程或任务,并等待它们全部完成。
  4. q.task_done() 和 q.join()
    • q.task_done():消费者在完成对从队列中获取的项目的处理后调用,通知队列该项目已处理完毕。
    • q.join():main函数可以调用await queue.join()来等待队列中所有项目都被get并task_done。这确保了在程序退出前所有数据都已得到处理。在我们的示例中,虽然gather已经等待了所有协程,但queue.join()提供了一个更明确的机制来等待所有队列中的工作完成。
  5. 消费者退出条件:消费者协程的退出逻辑至关重要。一个常见的模式是while True循环,内部判断q.empty() and flag.is_set()来决定是否退出。这确保了在生产者完成且队列中所有待处理项都已消费后,消费者才能安全退出。
  6. 错误处理:在实际应用中,生产者和消费者内部应添加适当的错误处理机制,例如try-except块。
  7. 背压(Backpressure):asyncio.Queue可以有容量限制。如果生产者生产速度远快于消费者,队列会逐渐填满,最终q.put()会暂停,从而对生产者施加背压,防止内存无限增长。

总结

通过将异步任务分解为独立的生产者和消费者,并利用asyncio.Queue进行通信,我们成功地将原本串行执行的逻辑转换为了并发执行。这种模式不仅提高了资源利用率,也使得代码结构更加清晰,易于维护和扩展。在设计复杂的asyncio应用时,当存在数据流动的依赖但又希望实现任务并行时,生产者-消费者模式与asyncio.Queue是解决这类问题的强大工具。

相关专题

更多
while的用法
while的用法

while的用法是“while 条件: 代码块”,条件是一个表达式,当条件为真时,执行代码块,然后再次判断条件是否为真,如果为真则继续执行代码块,直到条件为假为止。本专题为大家提供while相关的文章、下载、课程内容,供大家免费下载体验。

85

2023.09.25

线程和进程的区别
线程和进程的区别

线程和进程的区别:线程是进程的一部分,用于实现并发和并行操作,而线程共享进程的资源,通信更方便快捷,切换开销较小。本专题为大家提供线程和进程区别相关的各种文章、以及下载和课程。

480

2023.08.10

Java 桌面应用开发(JavaFX 实战)
Java 桌面应用开发(JavaFX 实战)

本专题系统讲解 Java 在桌面应用开发领域的实战应用,重点围绕 JavaFX 框架,涵盖界面布局、控件使用、事件处理、FXML、样式美化(CSS)、多线程与UI响应优化,以及桌面应用的打包与发布。通过完整示例项目,帮助学习者掌握 使用 Java 构建现代化、跨平台桌面应用程序的核心能力。

61

2026.01.14

php与html混编教程大全
php与html混编教程大全

本专题整合了php和html混编相关教程,阅读专题下面的文章了解更多详细内容。

31

2026.01.13

PHP 高性能
PHP 高性能

本专题整合了PHP高性能相关教程大全,阅读专题下面的文章了解更多详细内容。

73

2026.01.13

MySQL数据库报错常见问题及解决方法大全
MySQL数据库报错常见问题及解决方法大全

本专题整合了MySQL数据库报错常见问题及解决方法,阅读专题下面的文章了解更多详细内容。

20

2026.01.13

PHP 文件上传
PHP 文件上传

本专题整合了PHP实现文件上传相关教程,阅读专题下面的文章了解更多详细内容。

24

2026.01.13

PHP缓存策略教程大全
PHP缓存策略教程大全

本专题整合了PHP缓存相关教程,阅读专题下面的文章了解更多详细内容。

7

2026.01.13

jQuery 正则表达式相关教程
jQuery 正则表达式相关教程

本专题整合了jQuery正则表达式相关教程大全,阅读专题下面的文章了解更多详细内容。

4

2026.01.13

热门下载

更多
网站特效
/
网站源码
/
网站素材
/
前端模板

精品课程

更多
相关推荐
/
热门推荐
/
最新课程
Go 教程
Go 教程

共32课时 | 3.7万人学习

Go语言实战之 GraphQL
Go语言实战之 GraphQL

共10课时 | 0.8万人学习

关于我们 免责申明 举报中心 意见反馈 讲师合作 广告合作 最新更新
php中文网:公益在线php培训,帮助PHP学习者快速成长!
关注服务号 技术交流群
PHP中文网订阅号
每天精选资源文章推送

Copyright 2014-2026 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号