Python多进程/多线程高效实现一写多读(Writer优先)的并发控制

霞舞
发布: 2025-10-27 12:08:31
原创
169人浏览过

Python多进程/多线程高效实现一写多读(Writer优先)的并发控制

本文深入探讨了在python多进程或多线程环境中,如何实现一个写入者(writer)对多个读取者(reader)共享资源的并发访问控制,并赋予写入者优先权。通过设计一个自定义的`rwlock`(读写锁)类,利用`multiprocessing.joinablequeue`(或`queue.queue`)和共享变量,确保了数据一致性,允许并发读取,并在写入者需要独占访问时能及时中断读取操作。

引言:多进程/多线程环境下的读写并发挑战

在构建多代理系统或任何需要共享数据资源的并发应用时,一个常见的场景是存在一个或少数几个写入者进程/线程,以及多个读取者进程/线程。理想情况下,我们希望在写入者不操作时,多个读取者能够并行访问共享数据以提高效率;而当写入者需要修改数据时,它应获得独占访问权,并确保数据在修改过程中不会被读取,以维护数据的一致性。同时,写入者在需要写入时应具有优先权,能够及时中断正在进行的读取操作。

Python标准库中的multiprocessing.Lock或threading.Lock提供了互斥访问,但它们不允许并发读取。multiprocessing.Condition可以用于更复杂的线程间通信和同步,但直接实现“一写多读,写优先”的模式仍需精心设计,特别是要允许并发读取。

为了解决这一挑战,我们将构建一个自定义的读写锁(Read-Write Lock)机制,它能够:

  1. 允许任意数量的读取者同时读取数据。
  2. 确保写入者在写入时拥有独占访问权,且不会有任何读取者同时进行读取。
  3. 赋予写入者优先权,当它需要写入时,能够“请求”读取者尽快释放资源。

自定义读写锁(RWLock)的设计与实现

核心思想是为每个读取者分配一个独立的JoinableQueue。写入者通过这些队列来通知读取者新数据已准备好,并等待所有读取者完成当前读取周期。读取者则通过其队列来阻塞,等待写入者的通知。此外,我们引入一个共享的“停止”标志,允许写入者在紧急情况下请求读取者立即中断当前读取。

立即学习Python免费学习笔记(深入)”;

RWLock 类结构

我们首先定义一个RWLock类,它将封装读写锁的逻辑。这个类需要跟踪读取者的数量、为每个读取者分配的队列、一个用于控制紧急停止的标志,以及一个用于初始化队列计数的锁。

from multiprocessing import Process, Lock, Value, JoinableQueue
from threading import local # 用于存储每个进程/线程私有的队列
import time

class RWLock:
    def __init__(self, num_readers: int):
        """
        创建一个支持单写入者和多读取者的读写锁。
        num_readers 参数指定了读取者的数量。
        """
        if num_readers < 1 or not isinstance(num_readers, int):
            raise ValueError('num_readers 必须是一个正整数。')
        self._local_storage = local() # 线程/进程局部存储,用于分配队列
        self._num_readers = num_readers
        self._queue_count = Value('i', 0) # 共享整数,用于为读取者分配队列索引
        self._stop = Value('i', 0) # 共享整数,停止标志,写入者设置,读取者检查
        self._lock = Lock() # 保护 _queue_count 的互斥锁
        self._queues = [JoinableQueue(1) for _ in range(self._num_readers)] # 为每个读取者创建队列
登录后复制

关键属性解释:

  • _local_storage: threading.local对象,用于在每个进程(或线程)中存储其私有的队列引用,避免跨进程/线程直接共享JoinableQueue对象本身。
  • _num_readers: 预期的读取者数量。
  • _queue_count: multiprocessing.Value,一个共享的整数值,用于为每个新的读取者分配一个唯一的队列。
  • _stop: multiprocessing.Value,一个共享的整数标志,当写入者需要立即独占访问时,会将其设置为1。读取者会周期性检查此标志。
  • _lock: multiprocessing.Lock,用于保护_queue_count在多个进程同时初始化时不会出现竞争。
  • _queues: 一个JoinableQueue列表,每个读取者对应一个。JoinableQueue(1)表示队列容量为1,确保写入者每次只能放置一个信号。

读取者操作方法

acquire_for_reading()

读取者调用此方法来请求共享读取权限。

    def acquire_for_reading(self) -> None:
        """读取者请求对数据的共享读取访问。"""
        # 如果尚未分配队列,则分配一个:
        queue = getattr(self._local_storage, 'queue', None)
        if queue is None:
            with self._lock:
                queue = self._queues[self._queue_count.value]
                self._queue_count.value += 1
            self._local_storage.queue = queue
        queue.get()  # 阻塞,直到写入者放入一个信号(表示有新数据)
登录后复制

工作原理:

喵记多
喵记多

喵记多 - 自带助理的 AI 笔记

喵记多27
查看详情 喵记多
  1. 每个读取者首次调用时,会通过_local_storage获取一个专属的JoinableQueue。_queue_count确保每个读取者拿到不同的队列。
  2. queue.get()是一个阻塞操作。读取者会在此处等待,直到写入者调用release_for_writing()并向其队列中放入一个项(通常是None),表示有新的数据可供读取。

release_for_reading()

读取者完成数据读取后,调用此方法释放权限。

    def release_for_reading(self):
        """读取者完成对数据的共享读取访问。"""
        self._local_storage.queue.task_done() # 通知队列,已处理一个项
登录后复制

工作原理:

  1. task_done()通知JoinableQueue,之前通过get()获取的项已被处理。这对于写入者通过join()等待所有读取者完成操作至关重要。

is_stop_posted()

读取者周期性调用此方法,检查写入者是否请求立即停止读取。

    def is_stop_posted(self) -> bool:
        """读取者周期性调用此函数,查看写入者是否需要立即独占共享资源。"""
        return True if self._stop.value else False
登录后复制

工作原理:

  1. 读取者在执行耗时读取任务时,应定期检查_stop标志。如果为True,则应尽快中断当前读取并释放锁。这是一个合作机制,需要读取者主动配合。

写入者操作方法

acquire_for_writing(immediate=True)

写入者调用此方法来请求独占写入权限。

    def acquire_for_writing(self, immediate=True):
        """
        获取对数据的独占访问权限。
        如果 immediate 参数为 True,则请求读取者尽快放弃对数据的访问。
        """
        if immediate:
            self._stop.value = 1 # 设置停止标志,请求读取者立即中断

        for queue in self._queues:
            queue.join() # 阻塞,直到所有读取者完成当前周期并调用 task_done()
登录后复制

工作原理:

  1. 如果immediate为True,写入者会设置_stop标志为1,通知所有读取者尽快停止。
  2. 写入者遍历所有读取者的JoinableQueue,并调用queue.join()。join()方法会阻塞,直到队列中的所有项都被get()并随后task_done()。由于写入者在release_for_writing时会向队列中放入项,所以join()会等待这些项被处理。在初始状态或写入者刚完成写入后,队列为空,join()会立即返回。当写入者需要再次写入时,它会等待所有读取者处理完上一轮的数据。

release_for_writing()

写入者完成数据写入后,调用此方法释放权限。

    def release_for_writing(self) -> None:
        """放弃独占写入访问权限。"""
        self._stop.value = 0  # 重置停止标志
        for queue in self._queues:
            queue.put(None) # 向每个读取者的队列中放入一个信号,唤醒它们
登录后复制

工作原理:

  1. 写入者首先重置_stop标志,表示不再需要紧急停止。
  2. 然后,它向每个读取者的JoinableQueue中放入一个None(或其他任意项)。这将解除之前在acquire_for_reading()中阻塞的读取者。

多进程示例应用

现在,我们结合RWLock类和multiprocessing模块来构建一个实际的读写并发系统。

# 导入必要的模块
from multiprocessing import Process, Lock, Value, JoinableQueue
from threading import local
import time

# RWLock 类的定义如上所示,此处省略重复代码

class SharedValue:
    """一个简单的共享数据容器,使用 multiprocessing.Value"""
    def __init__(self, initial_value=0):
        self.value = Value('i', initial_value, lock=False) # lock=False表示手动管理锁

def reader(rw_lock, id, shared_data):
    """读取者进程的逻辑"""
    while True:
        rw_lock.acquire_for_reading() # 获取读取权限

        # 模拟耗时的读取任务
        # 在这里,我们应该周期性检查写入者是否要求停止
        sleep_time = id / 10 # 不同的读取者模拟不同的读取时间
        for _ in range(10):
            time.sleep(sleep_time)
            if rw_lock.is_stop_posted():
                print(f'reader {id} 收到停止请求,中断读取。', flush=True)
                break # 写入者请求停止,中断当前读取

        print(f'reader {id} 完成处理数据: {shared_data.value}', flush=True)
        rw_lock.release_for_reading() # 释放读取权限

def writer(rw_lock, shared_data):
    """写入者进程的逻辑"""
    while True:
        # 当 shared_data.value 等于 3 时,写入者将请求立即停止读取者
        rw_lock.acquire_for_writing(immediate=(shared_data.value == 3))
        shared_data.value.value += 1 # 修改共享数据
        print(f'wrote {shared_data.value.value} at {time.time()}', flush=True)
        rw_lock.release_for_writing() # 释放写入权限

def main_multiprocessing():
    num_readers = 3
    rw_lock = RWLock(num_readers)
    shared_data = SharedValue(0) # 共享数据

    # 创建并启动读取者进程
    for id in range(1, num_readers + 1):
        Process(target=reader, args=(rw_lock, id, shared_data), daemon=True).start()

    # 创建并启动写入者进程
    Process(target=writer, args=(rw_lock, shared_data), daemon=True).start()

    input('按回车键终止程序:\n')

if __name__ == '__main__':
    main_multiprocessing()
登录后复制

运行示例输出解释: 当程序运行时,你会观察到读取者会并发地处理数据。写入者在每次写入后,会等待所有读取者完成当前数据的处理。当shared_data.value达到3时,写入者会设置immediate=True,这时读取者会更快地中断其模拟的读取任务,从而让写入者几乎立即获得写入权限。这演示了写入者优先和中断机制。

多线程环境下的适配

RWLock的设计同样适用于多线程环境。主要的区别在于:

  1. 将multiprocessing.Process替换为threading.Thread。
  2. 将multiprocessing.Lock替换为threading.Lock。
  3. 将multiprocessing.Value替换为普通的Python整数(因为线程共享同一进程的内存空间)。
  4. 将multiprocessing.JoinableQueue替换为queue.Queue。
from threading import Thread, Lock
from queue import Queue
from threading import local
import time

class RWLockMultiThreading:
    def __init__(self, num_readers: int):
        if num_readers < 1 or not isinstance(num_readers, int):
            raise ValueError('num_readers 必须是一个正整数。')
        self._local_storage = local()
        self._num_readers = num_readers
        self._queue_count = 0 # 普通整数,线程共享
        self._stop = 0 # 普通整数,线程共享
        self._lock = Lock() # threading.Lock
        self._queues = [Queue(1) for _ in range(self._num_readers)] # queue.Queue

    def acquire_for_reading(self) -> None:
        queue = getattr(self._local_storage, 'queue', None)
        if queue is None:
            with self._lock:
                queue = self._queues[self._queue_count]
                self._queue_count += 1
            self._local_storage.queue = queue
        queue.get()

    def release_for_reading(self):
        self._local_storage.queue.task_done()

    def acquire_for_writing(self, immediate=True):
        if immediate:
            self._stop = 1
        for queue in self._queues:
            queue.join()

    def release_for_writing(self) -> None:
        self._stop = 0
        for queue in self._queues:
            queue.put(None)

    def is_stop_posted(self) -> bool:
        return True if self._stop else False

class SharedValueThread:
    """一个简单的共享数据容器,适用于多线程"""
    def __init__(self, initial_value=0):
        self.value = initial_value # 普通整数,线程共享

def reader_thread(rw_lock, id, shared_data):
    """读取者线程的逻辑"""
    while True:
        rw_lock.acquire_for_reading()
        sleep_time = id / 10
        for _ in range(10):
            time.sleep(sleep_time)
            if rw_lock.is_stop_posted():
                print(f'reader {id} (thread) 收到停止请求,中断读取。', flush=True)
                break
        print(f'reader {id} (thread) 完成处理数据: {shared_data.value}', flush=True)
        rw_lock.release_for_reading()

def writer_thread(rw_lock, shared_data):
    """写入者线程的逻辑"""
    while True:
        rw_lock.acquire_for_writing(immediate=(shared_data.value == 3))
        shared_data.value += 1
        print(f'wrote {shared_data.value} at {time.time()} (thread)', flush=True)
        rw_lock.release_for_writing()

def main_multithreading():
    num_readers = 3
    rw_lock = RWLockMultiThreading(num_readers)
    shared_data = SharedValueThread(0)

    for id in range(1, num_readers + 1):
        Thread(target=reader_thread, args=(rw_lock, id, shared_data), daemon=True).start()
    Thread(target=writer_thread, args=(rw_lock, shared_data), daemon=True).start()

    input('按回车键终止程序:\n')

if __name__ == '__main__':
    # 可以选择运行多进程或多线程示例
    # main_multiprocessing()
    main_multithreading()
登录后复制

注意事项与总结

  1. 合作式中断: is_stop_posted()机制要求读取者是“合作式”的。如果读取者不定期检查此标志,或者在检查后不立即中断,那么写入者的“立即”优先权将无法有效实现。在实际应用中,耗时较长的读取操作应设计成可中断的。
  2. 队列数量与资源消耗: RWLock为每个读取者创建一个独立的JoinableQueue。当读取者数量非常庞大时,这可能会带来一定的资源开销。对于超大规模的读取者场景,可能需要考虑其他更优化的读写锁实现,例如基于信号量或条件变量的复杂状态机。
  3. 数据一致性: 本方案确保了写入者在写入时独占资源,从而保证了数据的一致性。读取者在获取读取权限后,读取的是写入者最近一次写入的完整数据。
  4. 适用场景: 这种自定义RWLock特别适用于一个写入者频繁更新数据,而多个读取者需要并发读取,且写入者对实时性有较高要求(即写入者需要时能快速获得控制权)的场景。
  5. 进程 vs. 线程: 选择multiprocessing还是threading取决于具体任务。multiprocessing适用于CPU密集型任务,利用多核优势;threading适用于I/O密集型任务,或者需要共享大量内存数据的场景。本教程提供了两种环境下的适配方案。

通过上述RWLock的实现,我们成功地在Python的并发编程中,为共享资源提供了一个高效且具有写入者优先权的一写多读访问机制,有效平衡了并发性与数据一致性的需求。

以上就是Python多进程/多线程高效实现一写多读(Writer优先)的并发控制的详细内容,更多请关注php中文网其它相关文章!

最佳 Windows 性能的顶级免费优化软件
最佳 Windows 性能的顶级免费优化软件

每个人都需要一台速度更快、更稳定的 PC。随着时间的推移,垃圾文件、旧注册表数据和不必要的后台进程会占用资源并降低性能。幸运的是,许多工具可以让 Windows 保持平稳运行。

下载
来源:php中文网
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系admin@php.cn
最新问题
开源免费商场系统广告
热门教程
更多>
最新下载
更多>
网站特效
网站源码
网站素材
前端模板
关于我们 免责申明 意见反馈 讲师合作 广告合作 最新更新 English
php中文网:公益在线php培训,帮助PHP学习者快速成长!
关注服务号 技术交流群
PHP中文网订阅号
每天精选资源文章推送
PHP中文网APP
随时随地碎片化学习
PHP中文网抖音号
发现有趣的

Copyright 2014-2025 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号