
本文探讨了在现有同步python应用中运行异步后台任务的策略。我们首先分析了使用`asyncio.create_task`但未正确`await`导致后台协程无法完成的问题,阐明了`asyncio`的协作式多任务机制。随后,提供了两种解决方案:一是通过在异步上下文中显式`await`后台任务来确保其顺序完成;二是通过结合`threading`模块,在单独的线程中运行`asyncio`事件循环,实现异步任务与主同步应用的并行执行。
理解Asyncio的异步机制
Python的asyncio库提供了一种编写并发代码的方式,它基于协程和事件循环实现协作式多任务。与传统的多线程或多进程不同,asyncio的并发并非真正的并行执行(即在同一时间点执行多个操作),而是通过在I/O操作(如网络请求、文件读写或asyncio.sleep)等待时,将控制权交还给事件循环,从而允许其他协程运行。这种机制被称为“协作式多任务”,意味着协程必须显式地通过await来放弃控制权。
当一个协程通过asyncio.create_task()被调度时,它被添加到事件循环中,但并不会立即执行。事件循环会在适当的时机开始执行它。关键在于,如果一个通过create_task创建的任务没有被await,并且事件循环在其完成之前就退出了,那么该任务可能无法完全执行。
初始问题的分析
考虑以下示例代码,它尝试在一个同步的main函数中,通过asyncio.run启动一个异步任务:
import asyncio
from time import sleep
import sys
async def task():
for i in range(5):
print(f"Background task iteration {i}")
await asyncio.sleep(1)
print('finished')
async def background_task():
print("a")
asyncio.create_task(task()) # 任务被创建但未被await
print("b")
def main():
print("Main program started python", sys.version)
asyncio.run(background_task()) # 运行异步入口
for i in range(3):
sleep(3) # 主程序中的同步阻塞
print(f"Main program iteration {i}")
if __name__ == "__main__":
main()运行上述代码,会得到类似以下的输出:
立即学习“Python免费学习笔记(深入)”;
Main program started python 3.11.6 (main, Oct 23 2023, 22:48:54) [GCC 11.4.0] a b Background task iteration 0 Main program iteration 0 Main program iteration 1 Main program iteration 2
从输出可以看出,task()协程只执行了一次迭代就停止了。这是因为:
- 在background_task中,asyncio.create_task(task())只是将task协程调度到事件循环中,但并没有await它。
- asyncio.run(background_task())执行完毕后,事件循环会退出。
- 在task协程第一次遇到await asyncio.sleep(1)时,它会将控制权交还给事件循环。此时,由于background_task已经执行到末尾,asyncio.run所在的事件循环认为没有更多需要await的任务,便会退出。
- 主程序main中的sleep(3)是同步阻塞的,它会暂停整个线程,这与asyncio的事件循环是独立的,无法让后台的asyncio任务继续执行。因此,task协程在第一次暂停后便没有机会恢复执行。
要确保asyncio.run中的所有任务完成,需要显式地await它们,或者确保事件循环持续运行直到所有任务完成。
解决方案一:在异步上下文中顺序完成任务
如果你的目标是在asyncio.run的整个生命周期内,确保所有被调度的异步任务都完成,那么最直接的方法就是在异步入口函数中await这些任务。
import asyncio
from time import sleep
import sys
async def task():
for i in range(5):
print(f"Background task iteration {i}")
await asyncio.sleep(0.1) # 缩短睡眠时间以便快速观察
print('finished')
async def background_task_sequential():
print("a")
# 创建任务并立即await它,确保其完成
scheduled_task = asyncio.create_task(task())
print("task scheduled")
# 可以在此处执行其他异步操作
# ...
await scheduled_task # 等待后台任务完成
print("b")
def main_sequential():
print("Main program started python", sys.version)
asyncio.run(background_task_sequential()) # 运行异步入口
for i in range(3):
sleep(0.5)
print(f"Main program iteration {i}")
if __name__ == "__main__":
main_sequential()输出:
a task scheduled Background task iteration 0 Background task iteration 1 Background task iteration 2 Background task iteration 3 Background task iteration 4 finished b Main program iteration 0 Main program iteration 1 Main program iteration 2
解释: 通过在background_task_sequential中添加await scheduled_task,我们确保了asyncio.run会一直等待task()协程执行完毕,然后才继续执行background_task_sequential中的print("b"),并最终退出事件循环。这种方式使得task协程能够完整运行,但它仍然是在asyncio.run的阻塞时间内完成的,对于外部的同步main_sequential函数来说,asyncio.run仍然是一个阻塞操作。
解决方案二:结合线程实现并行后台执行
如果你的同步主程序需要继续执行,而异步任务需要在“后台”真正并行地运行,那么你需要将asyncio事件循环放到一个单独的线程中。这样,主线程可以继续执行同步代码,而另一个线程则负责运行异步事件循环。
import asyncio
from time import sleep
import sys
import threading
async def task():
for i in range(5):
print(f"Background task iteration {i}")
await asyncio.sleep(1) # 恢复较长睡眠时间以观察并行效果
print('finished')
async def background_task_parallel():
print("a")
await task() # 直接await task(),因为整个asyncio.run都在一个线程中运行
print("b")
def main_parallel():
print("Main program started python", sys.version)
# 在一个新线程中运行asyncio事件循环
# lambda函数用于包装asyncio.run,使其成为threading.Thread的target
t = threading.Thread(target=lambda: asyncio.run(background_task_parallel()))
t.start() # 启动新线程
for i in range(3):
sleep(3) # 主程序继续执行同步阻塞
print(f"Main program iteration {i}")
# 可选:等待后台线程完成
# t.join()
print("Main program finished")
if __name__ == "__main__":
main_parallel()输出:
a Background task iteration 0 Main program started python 3.11.6 (main, Oct 23 2023, 22:48:54) [GCC 11.4.0] Background task iteration 1 Background task iteration 2 Main program iteration 0 Background task iteration 3 Background task iteration 4 finished b Main program iteration 1 Main program iteration 2 Main program finished
解释: 在这个方案中,我们创建了一个新的threading.Thread。这个新线程的目标函数是一个lambda表达式,它调用asyncio.run(background_task_parallel())。这意味着整个asyncio事件循环及其中的background_task_parallel(以及它所await的task协程)都在这个新线程中独立运行。
主线程在启动新线程后,立即继续执行其自身的同步循环(包含sleep(3))。由于asyncio事件循环运行在不同的线程中,它不会被主线程的sleep阻塞,可以独立地推进其内部的异步任务。从输出可以看出,主程序的迭代和后台任务的迭代是交错进行的,这表明它们在并行执行。
注意事项:
- 线程安全: 当在不同线程中运行代码时,如果它们需要共享数据,必须考虑线程安全问题,例如使用锁(threading.Lock)来保护共享资源。
- 资源管理: 确保后台线程能够正常退出。如果后台任务是长期运行的,可能需要机制来优雅地停止它。
- asyncio.run()的阻塞性: 无论在哪种方案中,asyncio.run()本身都是一个阻塞调用,它会阻塞调用它的那个线程,直到其内部的事件循环完成。这也是为什么在方案二中需要将其放入单独的线程中。
总结
在同步Python应用中集成异步后台任务时,理解asyncio的协作式多任务原理至关重要。
- 如果你希望在asyncio.run的生命周期内确保所有异步任务顺序完成,应在异步入口函数中显式地await通过create_task创建的任务。
- 如果你需要异步任务在不阻塞主同步应用的情况下真正并行运行,最有效的方法是将其asyncio事件循环封装在一个单独的threading.Thread中。
选择哪种方案取决于你的具体需求:是希望在一个集中的异步块中完成所有工作,还是需要将异步操作完全解耦到后台并行执行。










