
本文深入探讨了python中同步(阻塞)函数与异步(非阻塞)函数如何实现并发执行。通过分析`asyncio`和`threading`的工作原理,我们阐明了直接在`async`函数中调用阻塞同步代码的局限性,并重点介绍了使用`asyncio.loop.run_in_executor`将同步任务调度到单独线程池中执行的策略,从而确保`asyncio`事件循环的非阻塞性,实现高效的混合并发编程。
在Python的并发编程中,asyncio和threading是两种核心机制,分别适用于不同的场景。asyncio通过事件循环实现协作式多任务,非常适合I/O密集型且非阻塞的操作。而threading则利用操作系统线程实现并发,能够处理阻塞的I/O操作或CPU密集型任务(受限于GIL)。当我们需要在同一个应用程序中同时运行同步(阻塞)函数和异步(非阻塞)函数并希望它们并行执行时,直接将同步阻塞调用放在async函数中并不能实现真正的并发,反而会阻塞整个事件循环。
考虑以下初始代码示例:
import asyncio
import threading
import time
def do_sync(number: int) -> int:
    # 模拟一个耗时的同步阻塞操作
    print(f"同步函数 do_sync({number}) 在线程 {threading.current_thread().name} 中开始执行...")
    time.sleep(2) # 模拟2秒的阻塞
    result = number + 10
    print(f"同步函数 do_sync({number}) 在线程 {threading.current_thread().name} 中完成,结果: {result}")
    return result
async def do_async(number: int) -> int:
    # 模拟一个耗时的异步非阻塞操作
    print(f"异步函数 do_async({number}) 在线程 {threading.current_thread().name} 中开始执行...")
    await asyncio.sleep(1) # 模拟1秒的非阻塞等待
    result = number + 10
    print(f"异步函数 do_async({number}) 在线程 {threading.current_thread().name} 中完成,结果: {result}")
    return result
async def main():
    start_time = time.time()
    # 这里的do_sync(1)是一个普通的函数调用,它会阻塞main函数的执行
    # 直到do_sync(1)完全执行完毕,await do_async(2)才会被调度
    sync_result = do_sync(1)
    async_result = await do_async(2)
    final_max = max(sync_result, async_result)
    print(f"最终结果 max({sync_result}, {async_result}): {final_max}")
    print(f"总耗时: {time.time() - start_time:.2f} 秒")
if __name__ == "__main__":
    asyncio.run(main())运行上述代码,你会发现总耗时接近 2秒 + 1秒 = 3秒。这是因为do_sync(1)是一个普通的函数调用,它会在主线程上立即执行并阻塞,直到其完成,await do_async(2)才有机会被asyncio事件循环调度。这并非真正的并行或并发执行。
要实现同步函数与异步函数的并行执行,我们需要将阻塞的同步函数从asyncio事件循环的主线程中剥离出来,放到一个单独的执行器(Executor)中运行。asyncio提供了loop.run_in_executor()方法来优雅地解决这个问题。
立即学习“Python免费学习笔记(深入)”;
run_in_executor()方法允许我们将一个可调用对象(通常是同步函数)提交给一个concurrent.futures.ThreadPoolExecutor(默认)或concurrent.futures.ProcessPoolExecutor来执行。这样,阻塞的同步代码将在一个单独的线程(或进程)中运行,而asyncio事件循环则可以继续处理其他异步任务,从而实现并发。
import asyncio
import threading
import time
import concurrent.futures # 导入concurrent.futures模块
def do_sync_blocking(number: int) -> int:
    print(f"同步函数 do_sync_blocking({number}) 在线程 {threading.current_thread().name} 中开始执行...")
    time.sleep(2) # 模拟2秒的阻塞I/O或CPU工作
    result = number + 10
    print(f"同步函数 do_sync_blocking({number}) 在线程 {threading.current_thread().name} 中完成,结果: {result}")
    return result
async def do_async_non_blocking(number: int) -> int:
    print(f"异步函数 do_async_non_blocking({number}) 在线程 {threading.current_thread().name} 中开始执行...")
    await asyncio.sleep(1) # 模拟1秒的非阻塞I/O等待
    result = number + 10
    print(f"异步函数 do_async_non_blocking({number}) 在线程 {threading.current_thread().name} 中完成,结果: {result}")
    return result
async def main_parallel():
    start_time = time.time()
    loop = asyncio.get_running_loop()
    # 将同步阻塞函数提交到默认的ThreadPoolExecutor中执行
    # 这会立即返回一个Future对象,而不会阻塞当前事件循环
    sync_task_future = loop.run_in_executor(None, do_sync_blocking, 1)
    # 异步函数可以直接调度到事件循环中
    async_task_coro = do_async_non_blocking(2)
    # 使用asyncio.gather并发等待这两个任务的结果
    # gather会等待所有传入的awaitable对象完成
    results = await asyncio.gather(sync_task_future, async_task_coro)
    sync_result, async_result = results
    final_max = max(sync_result, async_result)
    print(f"最终结果 max({sync_result}, {async_result}): {final_max}")
    print(f"总耗时: {time.time() - start_time:.2f} 秒")
if __name__ == "__main__":
    asyncio.run(main_parallel())运行这段代码,你会观察到总耗时大约是 2秒。这是因为do_sync_blocking在后台线程中执行,而do_async_non_blocking在事件循环中执行,两者几乎同时开始,并且程序等待两者中最长的那个任务完成(即2秒的同步任务)。这证明了我们成功地实现了同步与异步任务的并发执行。
当我们将同步函数放到单独的线程中执行时,需要特别注意线程安全问题。如果同步函数访问或修改共享资源(例如全局变量、数据库连接池等),则必须使用适当的同步机制,如threading.Lock、threading.Semaphore或queue.Queue来避免竞态条件。
例如,如果do_sync_blocking内部需要保护某个共享资源,其结构应如下:
import threading
shared_data = []
data_lock = threading.Lock()
def do_sync_with_lock(number: int) -> int:
    with data_lock: # 使用with语句确保锁的正确获取和释放
        shared_data.append(f"Processing {number}")
        # ... 其他操作 ...
    return number + 10除了run_in_executor,asyncio还提供了loop.call_soon_threadsafe()方法。这个方法的主要用途是允许在非事件循环线程中安全地调度一个回调函数到事件循环中执行。它返回一个asyncio.Handle对象,而不是一个Future。
例如,如果你有一个工作线程处理完数据后,需要通知事件循环更新UI或执行某个异步操作,你可以使用call_soon_threadsafe:
# 在一个非asyncio线程中 # loop.call_soon_threadsafe(my_callback, arg1, arg2)
然而,对于将阻塞的同步函数从事件循环中剥离以实现并发,run_in_executor是更直接和推荐的方法,因为它直接处理了任务的执行和结果的返回。call_soon_threadsafe更多是用于跨线程的事件通知和回调调度。
通过理解和正确应用asyncio与threading的结合策略,开发者可以构建出既能高效处理大量并发I/O,又能平稳集成现有阻塞代码的强大Python应用程序。
以上就是Python中同步与异步函数的并发执行:结合asyncio与threading的详细内容,更多请关注php中文网其它相关文章!
 
                        
                        每个人都需要一台速度更快、更稳定的 PC。随着时间的推移,垃圾文件、旧注册表数据和不必要的后台进程会占用资源并降低性能。幸运的是,许多工具可以让 Windows 保持平稳运行。
 
                Copyright 2014-2025 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号