线程安全的LRU缓存可通过四种方式实现:一、用threading.Lock封装OrderedDict;二、用锁装饰器代理functools.lru_cache;三、改用threading.RLock防自调用死锁;四、用单线程ThreadPoolExecutor异步封装。

如果您需要在多线程环境中使用 LRU 缓存,且要求读写操作互斥以避免状态不一致,则必须引入同步机制。以下是实现一个线程安全的带锁 LRU 缓存的多种方法:
一、使用 threading.Lock 封装 OrderedDict
该方法基于 Python 标准库中的 collections.OrderedDict 实现 LRU 逻辑,并通过显式加锁保证所有缓存操作的原子性。每次访问(get)、更新(put)或删除(popitem)均需获取锁,确保同一时刻仅有一个线程修改内部状态。
1、导入所需模块:from collections import OrderedDict 和 import threading。
2、定义类并初始化 self._cache = OrderedDict() 与 self._lock = threading.Lock()。
立即学习“Python免费学习笔记(深入)”;
3、在 get 方法中:调用 self._lock.acquire(),检查 key 是否存在,若存在则调用 move_to_end() 更新顺序,再释放锁。
4、在 put 方法中:同样先获取锁,判断是否已满,若满则调用 popitem(last=False) 移除最久未用项,然后设置新键值对并调用 move_to_end(),最后释放锁。
二、继承 functools.lru_cache 并添加锁代理
该方法不直接修改 functools.lru_cache 内部实现(因其为 C 扩展且不可直接加锁),而是将其作为底层缓存引擎,在函数调用入口和出口处统一加锁。适用于缓存目标为纯函数计算结果的场景,将线程安全逻辑与缓存逻辑分离。
1、定义一个装饰器函数,内部创建 threading.RLock()(可重入锁,支持同一线程多次 acquire)。
2、装饰器包裹原函数,在每次调用前执行 lock.acquire(),调用结束后执行 lock.release()。
3、对目标函数应用 @functools.lru_cache(maxsize=128),再叠加该锁装饰器。
4、确保被装饰函数无副作用,否则锁粒度可能导致性能瓶颈或死锁风险。
三、使用 threading.RLock 替代 Lock 防止自调用死锁
当缓存的 get 或 put 方法内部可能间接触发另一轮缓存操作(例如通过回调或嵌套调用),使用普通 threading.Lock 会导致同一线程重复 acquire 而阻塞。此时应改用可重入锁,允许同一线程多次获取而不阻塞,同时仍能阻止其他线程并发进入临界区。
1、将初始化语句改为 self._lock = threading.RLock()。
2、保持原有 get/put 方法结构不变,但所有 acquire() 和 release() 调用均作用于该 RLock 实例。
3、在 put 方法中若存在触发 get 的逻辑(如缓存穿透时回源并写入),无需额外规避,RLock 自动处理嵌套持有。
四、基于 concurrent.futures.ThreadPoolExecutor 的异步封装方案
该方法将缓存操作提交至单线程的线程池执行,利用线程池的串行化能力替代显式锁。适用于希望规避手动锁管理复杂性的场景,尤其适合 I/O 密集型缓存回源逻辑,但会引入轻微调度开销。
1、初始化时创建 self._executor = ThreadPoolExecutor(max_workers=1)。
2、将 get 和 put 方法重构为返回 concurrent.futures.Future 对象的异步接口。
3、在方法体内调用 self._executor.submit(self._sync_get, key) 或类似方式,将实际逻辑委托给私有同步方法。
4、调用方需使用 future.result() 获取结果,该调用会阻塞直至任务完成。










