
本教程深入探讨了在python多进程或多线程环境中,如何高效地实现一个写优先、多读并发的读写锁机制。通过自定义`rwlock`类,利用`joinablequeue`和共享变量,确保读操作可以并发进行,而写操作在获得独占访问权时能优先中断读操作,从而解决共享资源访问的复杂同步问题,并兼顾数据一致性与系统响应性。
在并发编程中,处理共享资源的访问是核心挑战之一。尤其是在“一写多读”的场景下,我们通常希望读者能够并发地访问数据以提高效率,而写者在更新数据时必须获得独占访问权,以保证数据的一致性。更进一步,如果写者需要紧急写入,它应该能够优先获得控制权,甚至中断正在进行的读操作。
Python的multiprocessing模块提供了Lock、Semaphore、Condition等多种同步原语。虽然它们能够解决基本的互斥和通信问题,但在实现“一写多读,写优先且可中断读”的复杂场景时,仅使用这些基本原语可能不够灵活或效率不高。例如,Condition对象通常用于线程或进程间的事件通知,但它并不能直接提供读写并发和写者中断读者的机制。一个简单的Condition可能导致所有读者在等待写者通知时阻塞,无法实现并发读。
为了满足这些特定需求,我们需要一种更高级的同步机制——读写锁(Read-Write Lock)。
自定义的RWLock旨在解决以下问题:
立即学习“Python免费学习笔记(深入)”;
本方案的核心思想是为每个读者进程(或线程)分配一个独立的multiprocessing.JoinableQueue。写者通过这些队列来协调读者的行为。
以下是基于multiprocessing模块实现的RWLock类:
from multiprocessing import Process, Lock, Value, JoinableQueue
from threading import local
import time
class RWLock:
def __init__(self, num_readers: int):
"""
创建一个支持单写多读的读写锁。
num_readers: 读者进程的数量。
"""
if num_readers < 1 or not isinstance(num_readers, int):
raise ValueError('num_readers 必须是正整数。')
self._local_storage = local() # 用于存储每个进程/线程的私有数据,如其专属队列
self._num_readers = num_readers
self._queue_count = Value('i', 0) # 计数器,用于为读者分配队列
self._stop = Value('i', 0) # 停止标志,写者设置,读者检查
self._lock = Lock() # 用于保护_queue_count的互斥锁
self._queues = [JoinableQueue(1) for _ in range(self._num_readers)] # 每个读者一个队列
def acquire_for_reading(self) -> None:
"""读者请求共享读访问权限。"""
# 如果当前进程/线程尚未分配队列,则分配一个
queue = getattr(self._local_storage, 'queue', None)
if queue is None:
with self._lock:
queue = self._queues[self._queue_count.value]
self._queue_count.value += 1
self._local_storage.queue = queue
queue.get() # 阻塞,等待写者写入新数据并发出信号
def release_for_reading(self):
"""读者完成共享读访问。"""
self._local_storage.queue.task_done() # 通知写者当前批次数据已处理完毕
def acquire_for_writing(self, immediate=True):
"""
获取独占写访问权限。
如果 immediate 为 True,则请求读者尽快释放访问权限。
"""
if immediate:
self._stop.value = 1 # 设置停止标志,通知读者尽快停止
for queue in self._queues:
queue.join() # 阻塞,直到所有读者完成当前数据处理
def release_for_writing(self) -> None:
"""释放独占写访问权限。"""
self._stop.value = 0 # 重置停止标志
for queue in self._queues:
queue.put(None) # 向每个读者队列放入一个项,唤醒等待的读者
def is_stop_posted(self) -> bool:
"""
读者定期调用此函数,检查写者是否请求立即独占控制。
"""
return True if self._stop.value else False
代码解析:
以下示例展示了如何在多进程环境中使用RWLock来协调一个写者和多个读者:
# 共享数据类,用于在进程间传递数据
class SharedData:
def __init__(self, initial_value=0):
self.value = Value('i', initial_value, lock=False) # 使用multiprocessing.Value作为共享数据
def reader(rw_lock, id, shared_data):
"""读者进程函数"""
while True:
rw_lock.acquire_for_reading() # 获取读锁
# 模拟长时间的读取任务
# 在读取过程中,定期检查写者是否请求中断
sleep_time = id / 10 # 不同读者有不同的模拟读取时间
for _ in range(10):
time.sleep(sleep_time)
if rw_lock.is_stop_posted():
print(f'reader {id} 收到停止请求,提前中断。', flush=True)
break # 收到停止请求,立即中断
# 实际读取共享数据
current_value = shared_data.value.value
print(f'reader {id} 完成处理数据: {current_value}', flush=True)
rw_lock.release_for_reading() # 释放读锁
time.sleep(0.1) # 模拟处理完后的小间歇
def writer(rw_lock, shared_data):
"""写者进程函数"""
while True:
# 当共享数据值为3时,写者请求立即写入
# 否则,等待所有读者完成当前批次读取
rw_lock.acquire_for_writing(immediate=(shared_data.value.value == 3))
shared_data.value.value += 1 # 写入新数据
print(f'wrote {shared_data.value.value} at {time.time()}', flush=True)
rw_lock.release_for_writing() # 释放写锁
time.sleep(1) # 模拟写完后的小间歇
def main():
num_readers = 3
rw_lock = RWLock(num_readers)
shared_data = SharedData() # 共享数据实例
# 启动读者进程
for id in range(1, num_readers + 1):
Process(target=reader, args=(rw_lock, id, shared_data), daemon=True).start()
# 启动写者进程
Process(target=writer, args=(rw_lock, shared_data), daemon=True).start()
input('按 Enter 键终止程序:\n')
if __name__ == '__main__':
main()示例输出分析:
Hit enter to terminate: wrote 1 at 1704820185.6386113 reader 1 done processing 1 reader 2 done processing 1 reader 3 done processing 1 wrote 2 at 1704820188.7424514 reader 1 done processing 2 reader 2 done processing 2 reader 3 done processing 2 wrote 3 at 1704820191.8461268 reader 1 done processing 3 reader 2 done processing 3 reader 3 done processing 3 wrote 4 at 1704820192.1564832 <-- 注意这里,写操作几乎立即发生 reader 1 done processing 4 reader 2 done processing 4 reader 3 done processing 4 wrote 5 at 1704820195.2668517 ...
从输出中可以看出,当shared_data.value达到3时,写者调用acquire_for_writing(immediate=True)。此时,读者进程会因为is_stop_posted()返回True而中断其模拟的长时间读取任务,从而使写者能够更快地获取写锁并写入4。这证明了写者优先和中断机制的有效性。在通常情况下,写者会等待所有读者完成当前数据处理(由最慢的读者决定),但在immediate=True的场景下,写者可以大大缩短等待时间。
上述RWLock类是为多进程设计的,使用了multiprocessing模块中的同步原语(Process, Lock, Value, JoinableQueue)。如果您的应用是基于多线程而不是多进程,可以进行一些优化,将这些原语替换为threading模块和标准queue模块的对应实现,因为线程间的通信开销通常小于进程间通信。
主要变化包括:
以下是适用于多线程的RWLockMultiThreading类:
from threading import Thread, Lock
from queue import Queue
from threading import local
import time
class RWLockMultiThreading:
def __init__(self, num_readers: int):
"""
创建一个支持单写多读的读写锁,适用于多线程环境。
num_readers: 读者线程的数量。
"""
if num_readers < 1 or not isinstance(num_readers, int):
raise ValueError('num_readers 必须是正整数。')
self._local_storage = local()
self._num_readers = num_readers
self._queue_count = 0 # 普通整数变量,线程共享
self._stop = 0 # 普通整数变量,线程共享
self._lock = Lock() # threading.Lock
self._queues = [Queue(1) for _ in range(self._num_readers)] # queue.Queue
def acquire_for_reading(self) -> None:
"""读者线程请求共享读访问权限。"""
queue = getattr(self._local_storage, 'queue', None)
if queue is None:
with self._lock:
queue = self._queues[self._queue_count]
self._queue_count += 1
self._local_storage.queue = queue
queue.get() # 阻塞,等待写者写入新数据并发出信号
def release_for_reading(self):
"""读者线程完成共享读访问。"""
self._local_storage.queue.task_done() # 通知写者当前批次数据已处理完毕
def acquire_for_writing(self, immediate=True):
"""
获取独占写访问权限。
如果 immediate 为 True,则请求读者线程尽快释放访问权限。
"""
if immediate:
self._stop = 1 # 设置停止标志
for queue in self._queues:
queue.join() # 阻塞,直到所有读者完成当前数据处理
def release_for_writing(self) -> None:
"""释放独占写访问权限。"""
self._stop = 0 # 重置停止标志
for queue in self._queues:
queue.put(None) # 向每个读者队列放入一个项,唤醒等待的读者
def is_stop_posted(self) -> bool:
"""
读者线程定期调用此函数,检查写者是否请求立即独占控制。
"""
return True if self._stop else False
# 多线程示例应用
class SharedValue: # 简单的共享值类,线程共享
def __init__(self):
self.value = 0
def reader_thread(rw_lock, id, shared_data):
"""读者线程函数"""
while True:
rw_lock.acquire_for_reading()
sleep_time = id / 10
for _ in range(10):
time.sleep(sleep_time)
if rw_lock.is_stop_posted():
print(f'reader {id} (thread) 收到停止请求,提前中断。', flush=True)以上就是Python多进程/多线程读写锁实现:高效管理一写多读并发访问的详细内容,更多请关注php中文网其它相关文章!
每个人都需要一台速度更快、更稳定的 PC。随着时间的推移,垃圾文件、旧注册表数据和不必要的后台进程会占用资源并降低性能。幸运的是,许多工具可以让 Windows 保持平稳运行。
Copyright 2014-2025 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号