限流装饰器不能直接套 asyncio.Semaphore,因为其 acquire() 是协程需 await,而同步装饰器无法等待;正确做法是用异步装饰器封装 async with semaphore: 逻辑,确保复用同一信号量实例并自动释放。

限流装饰器为什么不能直接套 asyncio.Semaphore?
因为 asyncio.Semaphore 的 acquire() 是协程函数,必须用 await 调用;而普通装饰器在定义时是同步执行的,无法 await 一个协程对象。直接写 @semaphore.acquire() 会报 RuntimeWarning: coroutine 'Semaphore.acquire' was never awaited,甚至导致死锁。
正确做法:用异步装饰器 + async with 包裹
核心是把信号量控制逻辑封装进一个真正的异步装饰器里,并确保每次调用都走 async with semaphore: 流程。示例如下:
import asyncio from functools import wrapsdef rate_limit(limit: int): semaphore = asyncio.Semaphore(limit) def decorator(func): @wraps(func) async def wrapper(*args, *kwargs): async with semaphore: return await func(args, **kwargs) return wrapper return decorator
@rate_limit(3) async def fetch_data(url: str): print(f"GET {url}") await asyncio.sleep(1) # 模拟请求 return f"done: {url}"
这个模式的关键点:
-
semaphore在装饰器工厂函数中创建一次,复用同一个实例(不是每次调用都新建) -
async with确保自动获取/释放,即使func抛异常也不会漏掉release - 装饰器返回的是
async def wrapper,能被await正确调度
常见踩坑场景与修复
实际用的时候容易掉进这几个坑:
-
多个装饰器顺序错乱:比如同时用
@retry和@rate_limit,要把@rate_limit放在最外层,否则重试会绕过限流 -
信号量作用域错误:在 FastAPI 路由里误把
semaphore = asyncio.Semaphore(3)写在@app.get函数内部 —— 每次请求都新建一个,完全失效 -
跨协程共享失败:在不同模块或类方法里各自初始化
asyncio.Semaphore(3),等于建了多个独立池子,总并发数变成 3×N -
忘记 await 装饰后函数:调用
fetch_data("https://...")却没加await,结果拿到一个coroutine对象而非结果
进阶:按用户/路径维度做差异化限流
如果需要对不同 API 路径、不同用户 ID 或不同目标域名分别限流,就不能只用一个全局 semaphore。推荐用字典缓存 + 键隔离:
from collections import defaultdict import asyncio_semaphores = defaultdict(lambda: asyncio.Semaphore(3))
def per_domain_rate_limit(domain: str): semaphore = _semaphores[domain] def decorator(func): @wraps(func) async def wrapper(*args, *kwargs): async with semaphore: return await func(args, **kwargs) return wrapper return decorator
@per_domain_rate_limit("httpbin.org") async def fetch_httpbin(): ...
注意:_semaphores 字典本身不需要加锁 —— asyncio.Semaphore 是线程/协程安全的,但字典读写在高并发下可能有竞态,生产环境建议用 asyncio.Lock 包一层或改用 weakref.WeakValueDictionary 防内存泄漏。
真正难的不是写对语法,而是想清楚「谁和谁共用一个信号量」—— 同一资源池里的所有协程,必须共享同一个 semaphore 实例,且生命周期要覆盖整个应用运行期。










