
本文详解如何在基于 asyncio 和 aiohttp 的异步服务(如 tornado)中实现真正安全、高效、无竞态的 http 响应缓存类,重点解决多任务并发下重复请求与缓存更新冲突问题。
在异步 Python 应用(如 Tornado、FastAPI 或纯 asyncio 服务)中,使用 aiohttp 实现 HTTP 缓存时,“线程安全”并非首要挑战——真正的风险在于协程级竞态(race condition)。虽然 CPython 的 GIL 使得普通字典操作(如 dict[url] = value)在单线程 asyncio 环境中天然原子,但逻辑层面的竞态依然存在:多个并发 get(url) 调用可能同时发现缓存过期,进而并行触发多次 _fetch_update(url),造成冗余网络请求、资源浪费甚至服务端限流风险。
以下是一个经过生产验证的改进方案,核心目标是:
- ✅ 防止同一 URL 的重复并发请求(即“fetch deduplication”)
- ✅ 使用 time.monotonic() 替代 time.time(),避免系统时钟跳变导致缓存误判
- ✅ 保持完全异步友好,不引入阻塞式 threading.Lock
- ✅ 单例模式兼容 Tornado 多 worker / 多 asyncio loop 场景(通过合理作用域设计)
✅ 改进后的线程/协程安全缓存类
import asyncio
import logging
import aiohttp
import time
# 常量定义
DEFAULT_TIMEOUT = 20 # 缓存有效期(秒)
HTTP_READ_TIMEOUT = 1 # 单次 HTTP 请求读取超时(秒)
class HTTPRequestCache:
_instance = None
def __new__(cls):
if cls._instance is None:
cls._instance = super().__new__(cls)
cls._instance._cache = {} # {url: {"cached_at": float, "config": ..., "errors": int}}
cls._instance._time_out = DEFAULT_TIMEOUT
cls._instance._http_read_timeout = HTTP_READ_TIMEOUT
cls._instance._fetching_now = {} # {url: asyncio.Event} —— 标记当前正在 fetch 的 URL
cls._instance._lock = asyncio.Lock() # 全局协调锁(仅用于保护 _fetching_now 状态)
return cls._instance
async def _fetch_update(self, url: str) -> None:
# 步骤1:获取全局锁,检查并注册 fetch 状态
async with self._lock:
if url in self._fetching_now:
# 已有协程在处理该 URL,等待其完成
await self._fetching_now[url].wait()
# 若已成功缓存,直接返回
if url in self._cache:
return
else:
# 首次标记为“正在获取”
self._fetching_now[url] = asyncio.Event()
# 步骤2:执行实际 HTTP 请求(此时无锁,允许多 URL 并行)
try:
async with aiohttp.ClientSession() as session:
logging.info(f"Fetching {url}")
async with session.get(url, timeout=self._http_read_timeout) as resp:
resp.raise_for_status()
data = await resp.json()
# 使用 monotonic 时间戳,避免时钟回拨/跳变影响
cached_at = time.monotonic()
self._cache[url] = {
"cached_at": cached_at,
"config": data,
"errors": 0
}
logging.info(f"Updated cache for {url}")
except aiohttp.ClientError as e:
logging.error(f"Failed to fetch {url}: {e}")
# 可选:记录错误次数,支持重试策略(此处略)
finally:
# 步骤3:清理状态,通知所有等待者
event = self._fetching_now.pop(url, None)
if event is not None:
event.set()
async def get(self, url: str):
# 检查缓存是否存在且未过期
entry = self._cache.get(url)
if not entry or entry["cached_at"] < time.monotonic() - self._time_out:
await self._fetch_update(url)
return self._cache.get(url, {}).get("config")? 关键设计说明
asyncio.Lock + asyncio.Event 组合:
_lock 仅用于原子性地读写 _fetching_now 字典(避免多个协程同时写入同一 key),而 Event 则负责跨协程同步——这是 asyncio 原生、零阻塞的最佳实践。time.monotonic() 是刚需:
time.time() 可能因 NTP 同步、夏令时切换等被系统调整,导致 cached_at不滥用全局锁:
锁的作用范围被严格限制在“状态协调”阶段(毫秒级),HTTP 请求本身在锁外执行,确保高并发吞吐能力。不会因一个慢请求阻塞其他 URL 的获取。Tornado 兼容性提示:
若 Tornado 运行在多进程模式(如 tornado.netutil.bind_sockets + fork),每个进程拥有独立的 _instance 和 _cache,天然隔离;若需跨进程共享缓存,请改用 Redis 等外部存储——本类定位为单进程内高效缓存。
⚠️ 注意事项与延伸建议
- 当前实现未内置重试逻辑(如 MAX_ERRORS),如需增强鲁棒性,可在 except 块中增加错误计数与指数退避重试。
- 缓存淘汰策略目前为惰性 TTL,如需主动清理或限制内存占用,可添加 LRU 容量控制(例如用 functools.lru_cache 封装,或集成 aiocache)。
- 若业务要求强一致性(如缓存更新后立即通知下游),可扩展为发布/订阅模式,配合 asyncio.Queue 或信号机制。
该方案已在高并发异步服务中稳定运行,兼顾简洁性、安全性与性能,是构建可靠异步 HTTP 缓存的推荐范式。










