
本文深入探讨了在python多进程或多线程环境中,如何实现一个写入者(writer)对多个读取者(reader)共享资源的并发访问控制,并赋予写入者优先权。通过设计一个自定义的`rwlock`(读写锁)类,利用`multiprocessing.joinablequeue`(或`queue.queue`)和共享变量,确保了数据一致性,允许并发读取,并在写入者需要独占访问时能及时中断读取操作。
在构建多代理系统或任何需要共享数据资源的并发应用时,一个常见的场景是存在一个或少数几个写入者进程/线程,以及多个读取者进程/线程。理想情况下,我们希望在写入者不操作时,多个读取者能够并行访问共享数据以提高效率;而当写入者需要修改数据时,它应获得独占访问权,并确保数据在修改过程中不会被读取,以维护数据的一致性。同时,写入者在需要写入时应具有优先权,能够及时中断正在进行的读取操作。
Python标准库中的multiprocessing.Lock或threading.Lock提供了互斥访问,但它们不允许并发读取。multiprocessing.Condition可以用于更复杂的线程间通信和同步,但直接实现“一写多读,写优先”的模式仍需精心设计,特别是要允许并发读取。
为了解决这一挑战,我们将构建一个自定义的读写锁(Read-Write Lock)机制,它能够:
核心思想是为每个读取者分配一个独立的JoinableQueue。写入者通过这些队列来通知读取者新数据已准备好,并等待所有读取者完成当前读取周期。读取者则通过其队列来阻塞,等待写入者的通知。此外,我们引入一个共享的“停止”标志,允许写入者在紧急情况下请求读取者立即中断当前读取。
立即学习“Python免费学习笔记(深入)”;
我们首先定义一个RWLock类,它将封装读写锁的逻辑。这个类需要跟踪读取者的数量、为每个读取者分配的队列、一个用于控制紧急停止的标志,以及一个用于初始化队列计数的锁。
from multiprocessing import Process, Lock, Value, JoinableQueue
from threading import local # 用于存储每个进程/线程私有的队列
import time
class RWLock:
def __init__(self, num_readers: int):
"""
创建一个支持单写入者和多读取者的读写锁。
num_readers 参数指定了读取者的数量。
"""
if num_readers < 1 or not isinstance(num_readers, int):
raise ValueError('num_readers 必须是一个正整数。')
self._local_storage = local() # 线程/进程局部存储,用于分配队列
self._num_readers = num_readers
self._queue_count = Value('i', 0) # 共享整数,用于为读取者分配队列索引
self._stop = Value('i', 0) # 共享整数,停止标志,写入者设置,读取者检查
self._lock = Lock() # 保护 _queue_count 的互斥锁
self._queues = [JoinableQueue(1) for _ in range(self._num_readers)] # 为每个读取者创建队列关键属性解释:
读取者调用此方法来请求共享读取权限。
def acquire_for_reading(self) -> None:
"""读取者请求对数据的共享读取访问。"""
# 如果尚未分配队列,则分配一个:
queue = getattr(self._local_storage, 'queue', None)
if queue is None:
with self._lock:
queue = self._queues[self._queue_count.value]
self._queue_count.value += 1
self._local_storage.queue = queue
queue.get() # 阻塞,直到写入者放入一个信号(表示有新数据)工作原理:
读取者完成数据读取后,调用此方法释放权限。
def release_for_reading(self):
"""读取者完成对数据的共享读取访问。"""
self._local_storage.queue.task_done() # 通知队列,已处理一个项工作原理:
读取者周期性调用此方法,检查写入者是否请求立即停止读取。
def is_stop_posted(self) -> bool:
"""读取者周期性调用此函数,查看写入者是否需要立即独占共享资源。"""
return True if self._stop.value else False工作原理:
写入者调用此方法来请求独占写入权限。
def acquire_for_writing(self, immediate=True):
"""
获取对数据的独占访问权限。
如果 immediate 参数为 True,则请求读取者尽快放弃对数据的访问。
"""
if immediate:
self._stop.value = 1 # 设置停止标志,请求读取者立即中断
for queue in self._queues:
queue.join() # 阻塞,直到所有读取者完成当前周期并调用 task_done()工作原理:
写入者完成数据写入后,调用此方法释放权限。
def release_for_writing(self) -> None:
"""放弃独占写入访问权限。"""
self._stop.value = 0 # 重置停止标志
for queue in self._queues:
queue.put(None) # 向每个读取者的队列中放入一个信号,唤醒它们工作原理:
现在,我们结合RWLock类和multiprocessing模块来构建一个实际的读写并发系统。
# 导入必要的模块
from multiprocessing import Process, Lock, Value, JoinableQueue
from threading import local
import time
# RWLock 类的定义如上所示,此处省略重复代码
class SharedValue:
"""一个简单的共享数据容器,使用 multiprocessing.Value"""
def __init__(self, initial_value=0):
self.value = Value('i', initial_value, lock=False) # lock=False表示手动管理锁
def reader(rw_lock, id, shared_data):
"""读取者进程的逻辑"""
while True:
rw_lock.acquire_for_reading() # 获取读取权限
# 模拟耗时的读取任务
# 在这里,我们应该周期性检查写入者是否要求停止
sleep_time = id / 10 # 不同的读取者模拟不同的读取时间
for _ in range(10):
time.sleep(sleep_time)
if rw_lock.is_stop_posted():
print(f'reader {id} 收到停止请求,中断读取。', flush=True)
break # 写入者请求停止,中断当前读取
print(f'reader {id} 完成处理数据: {shared_data.value}', flush=True)
rw_lock.release_for_reading() # 释放读取权限
def writer(rw_lock, shared_data):
"""写入者进程的逻辑"""
while True:
# 当 shared_data.value 等于 3 时,写入者将请求立即停止读取者
rw_lock.acquire_for_writing(immediate=(shared_data.value == 3))
shared_data.value.value += 1 # 修改共享数据
print(f'wrote {shared_data.value.value} at {time.time()}', flush=True)
rw_lock.release_for_writing() # 释放写入权限
def main_multiprocessing():
num_readers = 3
rw_lock = RWLock(num_readers)
shared_data = SharedValue(0) # 共享数据
# 创建并启动读取者进程
for id in range(1, num_readers + 1):
Process(target=reader, args=(rw_lock, id, shared_data), daemon=True).start()
# 创建并启动写入者进程
Process(target=writer, args=(rw_lock, shared_data), daemon=True).start()
input('按回车键终止程序:\n')
if __name__ == '__main__':
main_multiprocessing()运行示例输出解释: 当程序运行时,你会观察到读取者会并发地处理数据。写入者在每次写入后,会等待所有读取者完成当前数据的处理。当shared_data.value达到3时,写入者会设置immediate=True,这时读取者会更快地中断其模拟的读取任务,从而让写入者几乎立即获得写入权限。这演示了写入者优先和中断机制。
RWLock的设计同样适用于多线程环境。主要的区别在于:
from threading import Thread, Lock
from queue import Queue
from threading import local
import time
class RWLockMultiThreading:
def __init__(self, num_readers: int):
if num_readers < 1 or not isinstance(num_readers, int):
raise ValueError('num_readers 必须是一个正整数。')
self._local_storage = local()
self._num_readers = num_readers
self._queue_count = 0 # 普通整数,线程共享
self._stop = 0 # 普通整数,线程共享
self._lock = Lock() # threading.Lock
self._queues = [Queue(1) for _ in range(self._num_readers)] # queue.Queue
def acquire_for_reading(self) -> None:
queue = getattr(self._local_storage, 'queue', None)
if queue is None:
with self._lock:
queue = self._queues[self._queue_count]
self._queue_count += 1
self._local_storage.queue = queue
queue.get()
def release_for_reading(self):
self._local_storage.queue.task_done()
def acquire_for_writing(self, immediate=True):
if immediate:
self._stop = 1
for queue in self._queues:
queue.join()
def release_for_writing(self) -> None:
self._stop = 0
for queue in self._queues:
queue.put(None)
def is_stop_posted(self) -> bool:
return True if self._stop else False
class SharedValueThread:
"""一个简单的共享数据容器,适用于多线程"""
def __init__(self, initial_value=0):
self.value = initial_value # 普通整数,线程共享
def reader_thread(rw_lock, id, shared_data):
"""读取者线程的逻辑"""
while True:
rw_lock.acquire_for_reading()
sleep_time = id / 10
for _ in range(10):
time.sleep(sleep_time)
if rw_lock.is_stop_posted():
print(f'reader {id} (thread) 收到停止请求,中断读取。', flush=True)
break
print(f'reader {id} (thread) 完成处理数据: {shared_data.value}', flush=True)
rw_lock.release_for_reading()
def writer_thread(rw_lock, shared_data):
"""写入者线程的逻辑"""
while True:
rw_lock.acquire_for_writing(immediate=(shared_data.value == 3))
shared_data.value += 1
print(f'wrote {shared_data.value} at {time.time()} (thread)', flush=True)
rw_lock.release_for_writing()
def main_multithreading():
num_readers = 3
rw_lock = RWLockMultiThreading(num_readers)
shared_data = SharedValueThread(0)
for id in range(1, num_readers + 1):
Thread(target=reader_thread, args=(rw_lock, id, shared_data), daemon=True).start()
Thread(target=writer_thread, args=(rw_lock, shared_data), daemon=True).start()
input('按回车键终止程序:\n')
if __name__ == '__main__':
# 可以选择运行多进程或多线程示例
# main_multiprocessing()
main_multithreading()通过上述RWLock的实现,我们成功地在Python的并发编程中,为共享资源提供了一个高效且具有写入者优先权的一写多读访问机制,有效平衡了并发性与数据一致性的需求。
以上就是Python多进程/多线程高效实现一写多读(Writer优先)的并发控制的详细内容,更多请关注php中文网其它相关文章!
每个人都需要一台速度更快、更稳定的 PC。随着时间的推移,垃圾文件、旧注册表数据和不必要的后台进程会占用资源并降低性能。幸运的是,许多工具可以让 Windows 保持平稳运行。
Copyright 2014-2025 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号