
在构建现代异步网络服务,特别是基于asyncio和ASGI框架(如FastAPI、Uvicorn、Socket.IO)的应用时,我们经常需要执行一些独立的、长时间运行的后台任务,例如从消息队列(如SQS)持续接收数据并推送给客户端。一个常见的需求是,这些后台任务不应阻塞主事件循环,以保证Web服务的高响应性。
当我们将一个异步函数(使用async def定义的函数,即协程)直接作为threading.Thread的target参数时,Python解释器会发出RuntimeWarning: coroutine '...' was never awaited.警告。这通常发生在Uvicorn等ASGI服务器启动时。
产生此警告的根本原因在于:异步协程函数并非普通函数,它们在被调用时并不会立即执行其内部逻辑。相反,它们返回一个“协程对象”(coroutine object)。要真正执行协程内部的代码,必须在一个asyncio事件循环中对其进行“调度”和“等待”(await)。
当我们写下threading.Thread(target=background_task)时,background_task被当作一个普通的可调用对象传递给了线程。线程启动时,它会尝试直接调用background_task()。然而,background_task()仅仅返回了一个协程对象,而没有在一个事件循环中被await。因此,协程从未真正开始执行,导致了“从未被等待”的警告。
立即学习“Python免费学习笔记(深入)”;
要在一个独立的线程中正确运行异步协程,我们需要确保该线程拥有自己的asyncio事件循环,并在该循环中await我们的协程。asyncio.run()函数正是为此目的而设计的便捷工具。
asyncio.run(coro)函数负责以下操作:
因此,我们可以将asyncio.run作为threading.Thread的target,并将我们的异步协程作为asyncio.run的参数传递。
修正后的代码示例:
以下是一个整合了socketio和asyncio的完整示例,展示了如何在一个独立的后台线程中,利用asyncio.run正确地运行一个持续从SQS接收消息并发送给客户端的协程:
import socketio
import threading
import json
import asyncio # 引入asyncio模块
# 假设 sqs_handler.py 中有一个 SQSQueue 类
# 实际应用中需要根据你的SQS客户端库进行实现
class SQSQueue:
def get_next_message_from_sqs(self):
"""模拟从SQS接收消息"""
# 在实际应用中,这里会调用AWS SDK等获取消息
print("从SQS获取消息...")
# 模拟消息内容
import time
time.sleep(1) # 模拟网络延迟
return type('obj', (object,), {'body': json.dumps({"id": time.time(), "status": "added", "message": "New item from SQS"})})()
sio = socketio.AsyncServer(async_mode='asgi')
app = socketio.ASGIApp(sio, static_files={"/": "./"})
@sio.event
async def connect(sid, environ):
"""客户端连接事件"""
print(f"客户端 {sid} 已连接")
@sio.event
async def disconnect(sid):
"""客户端断开连接事件"""
print(f"客户端 {sid} 已断开")
@sio.event
async def item_removed(sid, data):
"""处理客户端发送的item_removed事件"""
print(f"客户端 {sid} 请求移除项: {data}")
await sio.emit("item_removed", data)
async def background_task():
"""后台协程任务:持续从SQS接收消息并发送给客户端"""
queue = SQSQueue()
print("后台任务协程已启动,开始监听SQS...")
while True:
try:
message = queue.get_next_message_from_sqs()
data = json.loads(message.body)
print(f"从SQS接收到消息: {data}")
# 使用sio.emit向所有连接的客户端广播消息
await sio.emit('item_added', data)
# 避免CPU空转,即使get_next_message_from_sqs是同步的,
# 协程内部也需要适当的暂停点以让出控制权
await asyncio.sleep(0.1) # 模拟异步IO等待,确保事件循环有机会切换
except Exception as e:
print(f"后台任务发生错误: {e}")
await asyncio.sleep(5) # 错误后等待一段时间重试
# 关键修正:将 asyncio.run 作为线程的目标,并传入 background_task 协程
background_thread = threading.Thread(target=asyncio.run, args=(background_task,))
background_thread.daemon = True # 设置为守护线程,主程序退出时自动终止
background_thread.start() # 启动后台线程
# 如果使用uvicorn运行,通常会通过命令行启动:
# uvicorn your_module_name:app --port 8000 --reload
# 确保你的文件名为 your_module_name.py代码解释:
通过将asyncio.run作为threading.Thread的target,我们能够成功地在独立的后台线程中启动并运行asyncio协程,解决了RuntimeWarning: coroutine '...' was never awaited.的问题。这种方法使得异步后台任务可以在不阻塞主应用事件循环的情况下,高效地执行长时间运行的操作,从而提升了整个应用的响应性和并发能力。理解协程的执行机制以及asyncio.run的作用是构建健壮异步应用的关键。
以上就是在Python asyncio应用中优雅地运行后台协程任务的详细内容,更多请关注php中文网其它相关文章!
每个人都需要一台速度更快、更稳定的 PC。随着时间的推移,垃圾文件、旧注册表数据和不必要的后台进程会占用资源并降低性能。幸运的是,许多工具可以让 Windows 保持平稳运行。
Copyright 2014-2025 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号