
在Python的asyncio框架中,async关键字定义的函数是协程(coroutine),它们并不会立即执行,而是返回一个协程对象。这个协程对象需要被调度到一个事件循环(event loop)中,通过await关键字才能真正运行。当尝试将一个协程函数直接作为threading.Thread的目标(target)函数时,Python解释器会发出RuntimeWarning: coroutine '...' was never awaited的警告,因为Thread仅仅是创建了一个协程对象,但没有机制去执行它。
例如,在WebSocket服务器(如基于socketio和uvicorn的应用)中,我们可能需要一个后台任务持续从外部源(如SQS队列)接收消息并发送给客户端。如果这个后台任务是一个async函数,并且我们希望它在不阻塞主应用事件循环的情况下运行,那么直接将其放入一个新线程是行不通的。
解决此问题的核心在于,每个异步协程都需要一个事件循环来运行。当我们在一个新线程中运行一个异步协程时,这个新线程需要有自己的独立事件循环。asyncio.run()函数正是为此目的而设计的:它负责创建一个新的事件循环,运行指定的协程直到完成,然后关闭该事件循环。
因此,正确的做法是将asyncio.run()作为线程的目标函数,并将我们的异步协程作为asyncio.run()的参数。
立即学习“Python免费学习笔记(深入)”;
关键修改点:
import socketio
import threading
import json
import asyncio # 导入asyncio模块
from sqs_handler import SQSQueue # 假设存在此模块
sio = socketio.AsyncServer(async_mode='asgi')
app = socketio.ASGIApp(sio, static_files={"/": "./"})
@sio.event
async def connect(sid, environ):
print(sid, "connected")
@sio.event
async def disconnect(sid):
print(sid, "disconnected")
@sio.event
async def item_removed(sid, data):
await sio.emit("item_removed", data)
async def background_task():
"""
后台异步任务,持续从SQS获取消息并发送给客户端。
"""
queue = SQSQueue()
while True:
try:
# 模拟从SQS获取消息,实际应用中可能需要更复杂的错误处理和长轮询
message = queue.get_next_message_from_sqs()
if message:
data = json.loads(message.body)
await sio.emit('item_added', data)
else:
# 如果没有消息,短暂等待以避免CPU空转
await asyncio.sleep(1)
except Exception as e:
print(f"后台任务发生错误: {e}")
await asyncio.sleep(5) # 错误后等待一段时间再重试
# 修改线程创建方式:使用asyncio.run来执行异步协程
# 注意 args=(background_task,) 中的逗号,表示这是一个包含单个元素的元组
background_thread = threading.Thread(target=asyncio.run, args=(background_task,))
background_thread.daemon = True # 将线程设置为守护线程,主程序退出时自动终止
background_thread.start()asyncio.run(coroutine)的工作原理:
args=(background_task,)的语法:
守护线程(daemon=True):
优雅地停止后台任务:
在while True循环中运行的后台任务,在实际应用中需要一个机制来优雅地停止。简单的守护线程在主程序退出时会被强制终止,可能导致数据丢失或资源未释放。
更健壮的方法是引入一个事件标志或共享变量,当需要停止时设置该标志,并在background_task中检查此标志,从而跳出循环。例如:
stop_event = asyncio.Event()
async def background_task_with_stop():
queue = SQSQueue()
while not stop_event.is_set(): # 检查停止事件
# ... 任务逻辑 ...
await asyncio.sleep(1) # 短暂等待,避免CPU空转
print("后台任务已停止。")
# 在需要停止时:
# stop_event.set()在主程序退出前,可以调用stop_event.set()来通知后台任务停止,然后等待线程结束(background_thread.join())。
共享资源与线程安全:
通过将asyncio.run()作为threading.Thread的目标函数,我们可以有效地在独立线程中运行异步协程,为后台任务提供一个独立的事件循环,从而避免阻塞主应用的事件循环,并解决“coroutine was never awaited”的警告。这种模式在需要将长时间运行的异步任务从主应用逻辑中分离出来时非常有用,尤其是在Web服务、数据处理管道等场景中。务必注意线程的生命周期管理和共享资源的线程安全。
以上就是Python asyncio 协程在独立线程中运行的最佳实践的详细内容,更多请关注php中文网其它相关文章!
每个人都需要一台速度更快、更稳定的 PC。随着时间的推移,垃圾文件、旧注册表数据和不必要的后台进程会占用资源并降低性能。幸运的是,许多工具可以让 Windows 保持平稳运行。
Copyright 2014-2025 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号