asyncio通过协程实现单线程并发,适用于I/O密集型任务。使用async/await定义和调用协程,通过事件循环调度执行。可用asyncio.run()启动主协程,create_task()并发运行多个协程,gather()等待所有协程完成。异常处理需在await时捕获,未处理异常会存储于Task中。避免阻塞事件循环:使用异步I/O、将CPU密集型任务放入线程或进程池、用wait_for()设置超时、定期调用sleep(0)让出控制权。相比线程和进程,asyncio轻量高效,适合I/O密集场景;CPU密集任务应选进程或多线程;混合任务可结合使用三种方式。

Asyncio允许你编写并发代码,而无需线程或进程的复杂性。它本质上是一个单线程、事件循环的并发框架,通过协程(coroutines)实现非阻塞操作,从而提升I/O密集型任务的效率。
使用asyncio进行异步编程,简单来说,就是用
async
await
解决方案:
定义协程: 使用
async def
立即进入“豆包AI人工智官网入口”;
立即学习“豆包AI人工智能在线问答入口”;
import asyncio
async def my_coroutine(delay):
print(f"协程开始,等待 {delay} 秒")
await asyncio.sleep(delay)
print("协程结束")
return f"等待了 {delay} 秒的结果"创建事件循环: asyncio的核心是事件循环,它负责调度和执行协程。
loop = asyncio.get_event_loop()
运行协程: 可以使用
loop.run_until_complete()
asyncio.run()
asyncio.run()
async def main():
result = await my_coroutine(2)
print(result)
asyncio.run(main())使用await
await
awaitable
Future
Task
await
async
async def another_coroutine():
await asyncio.sleep(1)
return "另一个协程的结果"
async def main():
result = await another_coroutine()
print(result)
asyncio.run(main())创建Tasks:
asyncio.create_task()
Task
async def main():
task1 = asyncio.create_task(my_coroutine(1))
task2 = asyncio.create_task(my_coroutine(2))
await task1
await task2
print("所有任务完成")
asyncio.run(main())并发执行多个协程: 使用
asyncio.gather()
async def main():
results = await asyncio.gather(
my_coroutine(1),
my_coroutine(2),
another_coroutine()
)
print(results)
asyncio.run(main())在asyncio中处理异常与在常规Python代码中类似,但需要注意协程的特性。可以使用
try...except
import asyncio
async def might_fail():
await asyncio.sleep(0.5)
raise ValueError("Something went wrong!")
async def main():
try:
await might_fail()
except ValueError as e:
print(f"Caught an error: {e}")
asyncio.run(main())如果一个任务(Task)内部抛出了未捕获的异常,该异常会被存储在Task对象中。当使用
await
await
import asyncio
async def task_that_fails():
await asyncio.sleep(0.5)
raise ValueError("Task failed!")
async def main():
task = asyncio.create_task(task_that_fails())
await asyncio.sleep(1) # 让任务有时间完成
try:
await task # 重新抛出异常
except ValueError as e:
print(f"Caught error from task: {e}")
asyncio.run(main())
可以使用
task.result()
import asyncio
async def task_that_fails():
await asyncio.sleep(0.5)
raise ValueError("Task failed!")
async def main():
task = asyncio.create_task(task_that_fails())
await asyncio.sleep(1)
try:
task.result() # 重新抛出异常
except ValueError as e:
print(f"Caught error from task: {e}")
asyncio.run(main())避免阻塞asyncio事件循环是保证程序响应性的关键。事件循环是单线程的,如果一个协程执行了耗时的同步操作,它会阻塞整个事件循环,导致其他协程无法运行。
使用异步I/O操作: 这是最基本也是最重要的原则。使用asyncio提供的异步I/O操作,例如
asyncio.sleep()
asyncio.open_connection()
aiohttp
避免CPU密集型任务: 如果需要执行CPU密集型任务(例如图像处理、加密解密等),应该将这些任务放到单独的进程或线程中执行,避免阻塞事件循环。可以使用
asyncio.to_thread()
concurrent.futures
import asyncio
import concurrent.futures
import time
def cpu_bound_task(n):
time.sleep(n) # 模拟耗时操作
return f"CPU密集型任务完成,耗时 {n} 秒"
async def main():
loop = asyncio.get_running_loop()
with concurrent.futures.ProcessPoolExecutor() as pool:
result = await loop.run_in_executor(pool, cpu_bound_task, 2) # 放在进程池中运行
print(result)
asyncio.run(main())限制协程的运行时间: 使用
asyncio.wait_for()
asyncio.wait_for()
asyncio.TimeoutError
import asyncio
async def long_running_task():
await asyncio.sleep(5)
return "任务完成"
async def main():
try:
result = await asyncio.wait_for(long_running_task(), timeout=2)
print(result)
except asyncio.TimeoutError:
print("任务超时")
asyncio.run(main())使用异步库: 尽量使用异步库来替代同步库。例如,使用
aiohttp
requests
asyncpg
aiosqlite
psycopg2
sqlite3
定期释放控制权: 如果一个协程需要执行大量的计算或I/O操作,可以定期使用
await asyncio.sleep(0)
import asyncio
async def large_task():
for i in range(100000):
# 模拟大量计算
_ = i * i
if i % 1000 == 0:
await asyncio.sleep(0) # 释放控制权
return "任务完成"
async def main():
result = await large_task()
print(result)
asyncio.run(main())选择合适的并发策略取决于任务的性质和程序的需求。Asyncio、线程和进程各有优缺点,适用于不同的场景。
Asyncio:
线程:
进程:
总结:
以上就是如何使用asyncio进行异步编程?的详细内容,更多请关注php中文网其它相关文章!
编程怎么学习?编程怎么入门?编程在哪学?编程怎么学才快?不用担心,这里为大家提供了编程速学教程(入门课程),有需要的小伙伴保存下载就能学习啦!
Copyright 2014-2025 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号