
本文探讨python中如何将同步阻塞函数与异步协程任务并行执行。通过分析`asyncio`事件循环的特性,我们揭示了直接调用同步函数会阻塞事件循环的问题。核心解决方案是利用`asyncio.run_in_executor`将同步任务提交到独立的线程池中执行,从而实现与异步任务的并发运行,有效提升应用程序的响应性和吞吐量,尤其适用于处理i/o密集型或cpu密集型同步操作。
在Python的asyncio框架中,协程(coroutine)是实现并发的主要机制。一个asyncio事件循环在单线程中运行,通过协作式多任务处理(cooperative multitasking)来实现并发。这意味着当一个协程遇到await表达式时,它会暂停执行并将控制权交还给事件循环,允许其他协程运行。然而,如果一个协程内部调用了一个耗时且不包含await的同步阻塞函数,那么整个事件循环将会被阻塞,直到该同步函数执行完毕,其他所有协程都无法运行。
考虑以下原始代码示例:
import asyncio
import threading
def do_sync(number: int) -> int:
# 这里的lock操作是为了演示,实际中如果函数内部不共享状态,则无需加锁
lock = threading.Lock()
lock.acquire()
print(f"do_sync: 开始处理 {number}")
result = number + 10
print(f"do_sync: 完成处理 {number}")
lock.release()
return result
async def do_async(number: int) -> int:
print(f"do_async: 开始处理 {number}")
# 模拟异步I/O操作
await asyncio.sleep(0.1)
result = number + 10
print(f"do_async: 完成处理 {number}")
return result
async def main():
# 这里的max函数会先完全执行do_sync(1),然后才await do_async(2)
# 它们是顺序执行的,不是并行
result = max(do_sync(1), await do_async(2))
print(f"最终结果: {result}")
if __name__ == "__main__":
asyncio.run(main())在这段代码中,main函数内的max(do_sync(1), await do_async(2))表达式会首先完全执行do_sync(1)。由于do_sync是一个普通的同步函数,它会立即运行并阻塞当前线程,直到其返回结果。只有当do_sync(1)执行完毕后,await do_async(2)才会被调度执行。因此,这两个函数是顺序执行的,并未实现真正的并发。
为了让同步阻塞函数与异步协程任务并行运行,我们需要将同步函数从asyncio事件循环的主线程中“卸载”到另一个独立的线程或进程中执行。asyncio为此提供了loop.run_in_executor方法。
立即进入“豆包AI人工智官网入口”;
立即学习“豆包AI人工智能在线问答入口”;
run_in_executor允许我们将一个普通的同步函数提交到一个执行器(executor)中运行。默认情况下,asyncio使用ThreadPoolExecutor作为执行器,这意味着同步函数将在一个单独的线程中运行。这样,asyncio事件循环可以继续处理其他协程任务,而无需等待同步函数完成。
run_in_executor的基本用法如下:
await loop.run_in_executor(executor, func, *args)
run_in_executor会返回一个Future对象,我们可以await这个Future来等待同步函数的执行结果。
现在,我们使用run_in_executor来改造main函数,使do_sync与do_async能够并行运行:
import asyncio
import threading
import time # 引入time模块用于模拟耗时操作
def do_sync(number: int) -> int:
# 这里的lock操作是为了演示,实际中如果函数内部不共享状态,则无需加锁
# 在run_in_executor场景下,如果do_sync内部需要线程安全,则仍需加锁
# lock = threading.Lock() # 每次调用都会创建新锁,这里不合适,应在外部管理或无需
print(f"do_sync: 开始处理 {number} 在线程 {threading.current_thread().name}")
time.sleep(1) # 模拟一个耗时1秒的同步操作
result = number + 10
print(f"do_sync: 完成处理 {number} 在线程 {threading.current_thread().name}")
return result
async def do_async(number: int) -> int:
print(f"do_async: 开始处理 {number} 在线程 {threading.current_thread().name}")
await asyncio.sleep(0.5) # 模拟一个耗时0.5秒的异步I/O操作
result = number + 10
print(f"do_async: 完成处理 {number} 在线程 {threading.current_thread().name}")
return result
async def main():
loop = asyncio.get_running_loop()
# 使用run_in_executor将do_sync提交到线程池执行
# 注意:do_sync_future是一个Future对象,它代表do_sync的未来结果
do_sync_future = loop.run_in_executor(None, do_sync, 1)
# do_async是一个awaitable对象
do_async_task = do_async(2)
# 使用asyncio.gather等待这两个任务并行完成
# asyncio.gather会等待所有给定的awaitable对象完成
sync_result, async_result = await asyncio.gather(do_sync_future, do_async_task)
final_max_result = max(sync_result, async_result)
print(f"同步任务结果: {sync_result}")
print(f"异步任务结果: {async_result}")
print(f"最终最大结果: {final_max_result}")
if __name__ == "__main__":
start_time = time.monotonic()
asyncio.run(main())
end_time = time.monotonic()
print(f"总耗时: {end_time - start_time:.2f} 秒")代码解析:
运行上述代码,你会观察到do_sync和do_async的打印输出几乎同时开始,并且总耗时约为1秒(取决于do_sync的time.sleep时间),而不是1.5秒(1秒 + 0.5秒),这证明了它们是并行执行的。
选择合适的执行器:
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
# 自定义线程池
with ThreadPoolExecutor(max_workers=5) as thread_pool:
do_sync_future = loop.run_in_executor(thread_pool, do_sync, 1)
# 自定义进程池
with ProcessPoolExecutor(max_workers=2) as process_pool:
do_sync_future = loop.run_in_executor(process_pool, do_sync_cpu_bound, 1)共享状态与线程安全: 如果提交给run_in_executor的同步函数需要访问或修改共享数据(如全局变量、类属性),则必须使用线程锁(threading.Lock)或其他同步原语来确保线程安全,以避免数据竞争问题。在示例中的do_sync函数,由于其内部操作是独立的,不涉及共享状态,因此无需外部加锁。
错误处理: run_in_executor返回的Future对象在被await时,如果其内部的同步函数抛出异常,该异常会被重新抛出。因此,可以使用try...except块来捕获和处理这些异常。
避免过度使用: run_in_executor引入了线程/进程管理的开销。对于非常轻量级的同步操作,直接在事件循环中执行可能比使用执行器更高效。它主要用于处理那些确实会阻塞事件循环的耗时同步任务。
通过asyncio.run_in_executor,Python的asyncio框架能够优雅地与传统的同步阻塞代码以及多线程/多进程模型结合。它提供了一种强大的机制,允许开发者在保持事件循环响应性的同时,处理耗时的同步I/O或CPU密集型任务。掌握这一技术对于构建高性能、高并发的Python应用程序至关重要,它弥合了协程的非阻塞特性与传统同步操作之间的鸿沟,使得Python开发者能够充分利用现代多核处理器和异步编程范式。
以上就是Python并发编程:asyncio与threading协同实现同步任务并行化的详细内容,更多请关注php中文网其它相关文章!
编程怎么学习?编程怎么入门?编程在哪学?编程怎么学才快?不用担心,这里为大家提供了编程速学教程(入门课程),有需要的小伙伴保存下载就能学习啦!
Copyright 2014-2025 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号