
本文详解如何基于 `aiohttp` 和 `asyncio` 构建线程安全、协程安全的单例 http 缓存类,重点解决并发请求同一 url 时的重复拉取问题,并优化时间精度与资源竞争控制。
在使用 aiohttp 构建异步 HTTP 缓存服务(如配合 Tornado 或 FastAPI)时,一个常见误区是认为“只要用了 async/await 就天然线程安全”。实际上,Python 的 GIL 仅保障纯 Python 字节码层面的线程互斥,但无法阻止 asyncio 任务在单线程内并发修改共享状态(如 dict)所引发的逻辑竞态——尤其是当多个协程同时检测到缓存过期并触发 _fetch_update(url) 时,可能造成多次重复请求,浪费资源且增加服务压力。
✅ 核心问题:不是“数据损坏”,而是“逻辑冗余”
原代码中 self._cache[url] = {...} 虽为原子操作(CPython 中 dict 赋值是线程安全的),但其前置判断 url not in self._cache or ... 与后续写入之间存在时间窗口。若两个协程几乎同时执行该判断,均得出“需更新”结论,则会并发执行两次 session.get(),导致:
- 同一 URL 被重复请求;
- 后完成的响应覆盖先完成的(无一致性保证);
- 错误日志混乱,难以调试。
这不是内存损坏,却是典型的 “check-then-act” 竞态(TOCTOU),必须通过同步机制消除。
✅ 正确解法:按 URL 细粒度协同锁(Per-URL Coordinated Fetching)
我们不采用全局 asyncio.Lock()(会串行化所有 URL 请求,严重损害并发性能),而是为每个待请求的 URL 动态维护一个 asyncio.Event,实现按 URL 粒度的协同等待:
import asyncio
import logging
import aiohttp
import time
DEFAULT_TIMEOUT = 20
HTTP_READ_TIMEOUT = 1
class HTTPRequestCache:
_instance = None
def __new__(cls):
if cls._instance is None:
cls._instance = super().__new__(cls)
cls._instance._cache = {}
cls._instance._time_out = DEFAULT_TIMEOUT
cls._instance._http_read_timeout = HTTP_READ_TIMEOUT
cls._instance._fetching_now = {} # {url: asyncio.Event()}
cls._instance._lock = asyncio.Lock() # 仅用于保护 _fetching_now 字典本身
return cls._instance
async def _fetch_update(self, url):
# Step 1: 获取对 _fetching_now 的独占访问权,检查/注册当前 URL 的 fetch 状态
async with self._lock:
if url in self._fetching_now:
# 另一协程已在处理该 URL → 等待其完成
event = self._fetching_now[url]
await event.wait()
# 检查是否已成功缓存(避免重复等待后仍去请求)
if url in self._cache and self._cache[url]["cached_at"] >= time.monotonic() - self._time_out:
return
else:
# 首次标记该 URL 正在被获取
self._fetching_now[url] = asyncio.Event()
# Step 2: 执行实际 HTTP 请求(此时无锁,允许多 URL 并发)
try:
async with aiohttp.ClientSession() as session:
logging.info(f"Fetching {url}")
async with session.get(url, timeout=self._http_read_timeout) as resp:
resp.raise_for_status()
data = await resp.json()
cached_at = time.monotonic() # ✅ 使用 monotonic 时间,避免系统时钟跳变影响
self._cache[url] = {
"cached_at": cached_at,
"config": data,
"errors": 0
}
logging.info(f"Updated cache for {url}")
except aiohttp.ClientError as e:
logging.error(f"Failed to fetch {url}: {e}")
finally:
# Step 3: 清理状态,通知所有等待者
async with self._lock:
if url in self._fetching_now:
self._fetching_now[url].set()
del self._fetching_now[url]
async def get(self, url):
# 使用 monotonic 时间进行过期判断,更可靠
now = time.monotonic()
if (url not in self._cache
or self._cache[url]["cached_at"] < now - self._time_out):
await self._fetch_update(url)
return self._cache.get(url, {}).get("config")? 关键设计说明
- _lock 仅保护 _fetching_now 字典读写:因 dict 操作本身非原子(如 in + [] 组合),需锁保护其结构一致性。
- asyncio.Event 实现“等待即订阅”:协程发现 URL 正在被获取时,直接 await event.wait(),无需轮询;完成时 event.set() 唤醒全部等待者。
- time.monotonic() 替代 time.time():确保超时计算不受系统时间回拨(如 NTP 校准、DST 切换)干扰,符合高可靠性场景要求。
- 无全局阻塞:不同 URL 的请求完全并发,仅相同 URL 的请求被智能协调,兼顾性能与正确性。
⚠️ 注意事项与扩展建议
- Tornado 兼容性:Tornado 6+ 原生支持 async/await,可直接将 HTTPRequestCache 实例挂载为应用级单例(如 app.settings['cache']),无需额外线程适配。
- 错误重试策略:当前示例未集成指数退避或错误计数(如 MAX_ERRORS)。如需增强鲁棒性,可在 except 块中增加 self._cache[url]["errors"] += 1,并在 get() 中根据错误次数决定是否跳过缓存或降级返回。
- 内存清理:长期运行需添加 LRU 或 TTL 驱逐逻辑(如定期扫描 cached_at 过期项),防止内存泄漏。
- 多进程场景:若部署于多进程(如 Gunicorn + --workers),单例失效,需改用 Redis 等外部缓存。
此方案在保持异步高性能的同时,彻底消除了缓存更新的逻辑竞态,是构建生产级异步 HTTP 客户端缓存的推荐实践。










