Python并发编程:asyncio与threading协同实现同步任务并行化

碧海醫心
发布: 2025-10-30 11:38:35
原创
127人浏览过

Python并发编程:asyncio与threading协同实现同步任务并行化

本文探讨python中如何将同步阻塞函数与异步协程任务并行执行。通过分析`asyncio`事件循环的特性,我们揭示了直接调用同步函数会阻塞事件循环的问题。核心解决方案是利用`asyncio.run_in_executor`将同步任务提交到独立的线程池中执行,从而实现与异步任务的并发运行,有效提升应用程序的响应性和吞吐量,尤其适用于处理i/o密集型或cpu密集型同步操作。

理解asyncio中的并发与阻塞

在Python的asyncio框架中,协程(coroutine)是实现并发的主要机制。一个asyncio事件循环在单线程中运行,通过协作式多任务处理(cooperative multitasking)来实现并发。这意味着当一个协程遇到await表达式时,它会暂停执行并将控制权交还给事件循环,允许其他协程运行。然而,如果一个协程内部调用了一个耗时且不包含await的同步阻塞函数,那么整个事件循环将会被阻塞,直到该同步函数执行完毕,其他所有协程都无法运行。

考虑以下原始代码示例:

import asyncio
import threading

def do_sync(number: int) -> int:
    # 这里的lock操作是为了演示,实际中如果函数内部不共享状态,则无需加锁
    lock = threading.Lock()
    lock.acquire()
    print(f"do_sync: 开始处理 {number}")
    result = number + 10
    print(f"do_sync: 完成处理 {number}")
    lock.release()
    return result

async def do_async(number: int) -> int:
    print(f"do_async: 开始处理 {number}")
    # 模拟异步I/O操作
    await asyncio.sleep(0.1) 
    result = number + 10
    print(f"do_async: 完成处理 {number}")
    return result

async def main():
    # 这里的max函数会先完全执行do_sync(1),然后才await do_async(2)
    # 它们是顺序执行的,不是并行
    result = max(do_sync(1), await do_async(2))
    print(f"最终结果: {result}")

if __name__ == "__main__":
    asyncio.run(main())
登录后复制

在这段代码中,main函数内的max(do_sync(1), await do_async(2))表达式会首先完全执行do_sync(1)。由于do_sync是一个普通的同步函数,它会立即运行并阻塞当前线程,直到其返回结果。只有当do_sync(1)执行完毕后,await do_async(2)才会被调度执行。因此,这两个函数是顺序执行的,并未实现真正的并发。

利用 asyncio.run_in_executor 实现同步任务并行化

为了让同步阻塞函数与异步协程任务并行运行,我们需要将同步函数从asyncio事件循环的主线程中“卸载”到另一个独立的线程或进程中执行。asyncio为此提供了loop.run_in_executor方法。

立即进入豆包AI人工智官网入口”;

立即学习豆包AI人工智能在线问答入口”;

run_in_executor允许我们将一个普通的同步函数提交到一个执行器(executor)中运行。默认情况下,asyncio使用ThreadPoolExecutor作为执行器,这意味着同步函数将在一个单独的线程中运行。这样,asyncio事件循环可以继续处理其他协程任务,而无需等待同步函数完成。

run_in_executor的基本用法如下:

await loop.run_in_executor(executor, func, *args)
登录后复制
  • executor: 可以是一个concurrent.futures.ThreadPoolExecutor或concurrent.futures.ProcessPoolExecutor实例,或者None(默认使用ThreadPoolExecutor)。
  • func: 要在执行器中运行的同步函数。
  • *args: 传递给func的参数。

run_in_executor会返回一个Future对象,我们可以await这个Future来等待同步函数的执行结果。

示例代码与解析

现在,我们使用run_in_executor来改造main函数,使do_sync与do_async能够并行运行:

豆包AI编程
豆包AI编程

豆包推出的AI编程助手

豆包AI编程483
查看详情 豆包AI编程
import asyncio
import threading
import time # 引入time模块用于模拟耗时操作

def do_sync(number: int) -> int:
    # 这里的lock操作是为了演示,实际中如果函数内部不共享状态,则无需加锁
    # 在run_in_executor场景下,如果do_sync内部需要线程安全,则仍需加锁
    # lock = threading.Lock() # 每次调用都会创建新锁,这里不合适,应在外部管理或无需
    print(f"do_sync: 开始处理 {number} 在线程 {threading.current_thread().name}")
    time.sleep(1) # 模拟一个耗时1秒的同步操作
    result = number + 10
    print(f"do_sync: 完成处理 {number} 在线程 {threading.current_thread().name}")
    return result

async def do_async(number: int) -> int:
    print(f"do_async: 开始处理 {number} 在线程 {threading.current_thread().name}")
    await asyncio.sleep(0.5) # 模拟一个耗时0.5秒的异步I/O操作
    result = number + 10
    print(f"do_async: 完成处理 {number} 在线程 {threading.current_thread().name}")
    return result

async def main():
    loop = asyncio.get_running_loop()

    # 使用run_in_executor将do_sync提交到线程池执行
    # 注意:do_sync_future是一个Future对象,它代表do_sync的未来结果
    do_sync_future = loop.run_in_executor(None, do_sync, 1)

    # do_async是一个awaitable对象
    do_async_task = do_async(2)

    # 使用asyncio.gather等待这两个任务并行完成
    # asyncio.gather会等待所有给定的awaitable对象完成
    sync_result, async_result = await asyncio.gather(do_sync_future, do_async_task)

    final_max_result = max(sync_result, async_result)
    print(f"同步任务结果: {sync_result}")
    print(f"异步任务结果: {async_result}")
    print(f"最终最大结果: {final_max_result}")

if __name__ == "__main__":
    start_time = time.monotonic()
    asyncio.run(main())
    end_time = time.monotonic()
    print(f"总耗时: {end_time - start_time:.2f} 秒")
登录后复制

代码解析:

  1. 获取事件循环: loop = asyncio.get_running_loop() 获取当前正在运行的事件循环实例。
  2. 提交同步任务: do_sync_future = loop.run_in_executor(None, do_sync, 1) 将do_sync(1)函数提交到默认的线程池中执行。None表示使用默认的ThreadPoolExecutor。run_in_executor立即返回一个Future对象,而不是阻塞。
  3. 创建异步任务: do_async_task = do_async(2) 创建一个协程对象,它是一个可等待(awaitable)对象。
  4. 并行等待: await asyncio.gather(do_sync_future, do_async_task) 是关键。asyncio.gather允许我们同时等待多个可等待对象。在这种情况下,do_sync_future会在一个单独的线程中执行,而do_async_task则在事件循环的主线程中作为协程执行。它们会并行地运行,等待时间由最长的那个任务决定。
  5. 结果处理: 当asyncio.gather返回时,它会按顺序返回各个任务的结果。

运行上述代码,你会观察到do_sync和do_async的打印输出几乎同时开始,并且总耗时约为1秒(取决于do_sync的time.sleep时间),而不是1.5秒(1秒 + 0.5秒),这证明了它们是并行执行的。

注意事项

  1. 选择合适的执行器:

    • ThreadPoolExecutor (默认): 适用于I/O密集型同步任务(如网络请求、文件读写)。它将阻塞操作放到单独的线程中,释放GIL,允许Python解释器在其他线程中运行。
    • ProcessPoolExecutor: 适用于CPU密集型同步任务。它将任务放到单独的进程中,可以绕过GIL的限制,真正实现CPU层面的并行计算。使用时需要注意进程间通信的开销以及数据序列化问题。
    from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
    
    # 自定义线程池
    with ThreadPoolExecutor(max_workers=5) as thread_pool:
        do_sync_future = loop.run_in_executor(thread_pool, do_sync, 1)
    
    # 自定义进程池
    with ProcessPoolExecutor(max_workers=2) as process_pool:
        do_sync_future = loop.run_in_executor(process_pool, do_sync_cpu_bound, 1)
    登录后复制
  2. 共享状态与线程安全: 如果提交给run_in_executor的同步函数需要访问或修改共享数据(如全局变量、类属性),则必须使用线程锁(threading.Lock)或其他同步原语来确保线程安全,以避免数据竞争问题。在示例中的do_sync函数,由于其内部操作是独立的,不涉及共享状态,因此无需外部加锁。

  3. 错误处理: run_in_executor返回的Future对象在被await时,如果其内部的同步函数抛出异常,该异常会被重新抛出。因此,可以使用try...except块来捕获和处理这些异常。

  4. 避免过度使用: run_in_executor引入了线程/进程管理的开销。对于非常轻量级的同步操作,直接在事件循环中执行可能比使用执行器更高效。它主要用于处理那些确实会阻塞事件循环的耗时同步任务。

总结

通过asyncio.run_in_executor,Python的asyncio框架能够优雅地与传统的同步阻塞代码以及多线程/多进程模型结合。它提供了一种强大的机制,允许开发者在保持事件循环响应性的同时,处理耗时的同步I/O或CPU密集型任务。掌握这一技术对于构建高性能、高并发的Python应用程序至关重要,它弥合了协程的非阻塞特性与传统同步操作之间的鸿沟,使得Python开发者能够充分利用现代多核处理器和异步编程范式。

以上就是Python并发编程:asyncio与threading协同实现同步任务并行化的详细内容,更多请关注php中文网其它相关文章!

编程速学教程(入门课程)
编程速学教程(入门课程)

编程怎么学习?编程怎么入门?编程在哪学?编程怎么学才快?不用担心,这里为大家提供了编程速学教程(入门课程),有需要的小伙伴保存下载就能学习啦!

下载
来源:php中文网
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系admin@php.cn
最新问题
开源免费商场系统广告
热门教程
更多>
最新下载
更多>
网站特效
网站源码
网站素材
前端模板
关于我们 免责申明 意见反馈 讲师合作 广告合作 最新更新 English
php中文网:公益在线php培训,帮助PHP学习者快速成长!
关注服务号 技术交流群
PHP中文网订阅号
每天精选资源文章推送
PHP中文网APP
随时随地碎片化学习
PHP中文网抖音号
发现有趣的

Copyright 2014-2025 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号