Python多进程/多线程读写锁实现:高效管理一写多读并发访问

DDD
发布: 2025-10-28 14:30:07
原创
232人浏览过

Python多进程/多线程读写锁实现:高效管理一写多读并发访问

本教程深入探讨了在python多进程或多线程环境中,如何高效地实现一个写优先、多读并发的读写锁机制。通过自定义`rwlock`类,利用`joinablequeue`和共享变量,确保读操作可以并发进行,而写操作在获得独占访问权时能优先中断读操作,从而解决共享资源访问的复杂同步问题,并兼顾数据一致性与系统响应性。

并发编程中,处理共享资源的访问是核心挑战之一。尤其是在“一写多读”的场景下,我们通常希望读者能够并发地访问数据以提高效率,而写者在更新数据时必须获得独占访问权,以保证数据的一致性。更进一步,如果写者需要紧急写入,它应该能够优先获得控制权,甚至中断正在进行的读操作。

传统同步机制的局限

Python的multiprocessing模块提供了Lock、Semaphore、Condition等多种同步原语。虽然它们能够解决基本的互斥和通信问题,但在实现“一写多读,写优先且可中断读”的复杂场景时,仅使用这些基本原语可能不够灵活或效率不高。例如,Condition对象通常用于线程或进程间的事件通知,但它并不能直接提供读写并发和写者中断读者的机制。一个简单的Condition可能导致所有读者在等待写者通知时阻塞,无法实现并发读。

为了满足这些特定需求,我们需要一种更高级的同步机制——读写锁(Read-Write Lock)。

读写锁(RWLock)设计原理

自定义的RWLock旨在解决以下问题:

立即学习Python免费学习笔记(深入)”;

  1. 并发读:允许多个读者同时访问共享数据。
  2. 写者独占:写者在写入时必须独占资源,防止读者读取到不一致的数据。
  3. 写者优先:写者可以请求立即获得控制权,中断正在进行的读操作。
  4. 数据一致性:读者总是读取到写者完成写入后的最新、一致的数据版本。

本方案的核心思想是为每个读者进程(或线程)分配一个独立的multiprocessing.JoinableQueue。写者通过这些队列来协调读者的行为。

  • 读者获取访问权:每个读者在尝试读取数据时,会从它自己的队列中执行一个阻塞的get()操作。这意味着读者会一直等待,直到写者将一个项放入其队列中,表明有新数据可读。
  • 读者释放访问权:读者完成数据处理后,会调用其队列的task_done()方法,通知写者它已完成当前批次数据的读取。
  • 写者获取独占权:当写者需要写入时,它会遍历所有读者的队列并调用join()方法。join()会阻塞,直到所有读者都对队列中的所有项(由写者之前放入)调用了task_done()。这确保了写者在写入之前,所有读者都已处理完上一批数据。
  • 写者释放独占权:写者完成写入后,会向每个读者的队列中放入一个项(例如None),从而唤醒所有等待的读者,告知它们有新数据可读。
  • 写者优先中断:为了实现写者优先中断,引入了一个共享的_stop标志(multiprocessing.Value)。当写者需要立即获得控制权时,它可以设置此标志。读者进程在执行读取任务期间,应定期检查此_stop标志。如果发现标志被设置,读者应立即停止当前读取任务,并释放其访问权(调用task_done()),从而允许写者尽快获得独占权。

RWLock类实现详解 (多进程版)

以下是基于multiprocessing模块实现的RWLock类:

from multiprocessing import Process, Lock, Value, JoinableQueue
from threading import local
import time

class RWLock:
    def __init__(self, num_readers: int):
        """
        创建一个支持单写多读的读写锁。
        num_readers: 读者进程的数量。
        """
        if num_readers < 1 or not isinstance(num_readers, int):
            raise ValueError('num_readers 必须是正整数。')
        self._local_storage = local()  # 用于存储每个进程/线程的私有数据,如其专属队列
        self._num_readers = num_readers
        self._queue_count = Value('i', 0)  # 计数器,用于为读者分配队列
        self._stop = Value('i', 0)         # 停止标志,写者设置,读者检查
        self._lock = Lock()                # 用于保护_queue_count的互斥锁
        self._queues = [JoinableQueue(1) for _ in range(self._num_readers)] # 每个读者一个队列

    def acquire_for_reading(self) -> None:
        """读者请求共享读访问权限。"""
        # 如果当前进程/线程尚未分配队列,则分配一个
        queue = getattr(self._local_storage, 'queue', None)
        if queue is None:
            with self._lock:
                queue = self._queues[self._queue_count.value]
                self._queue_count.value += 1
            self._local_storage.queue = queue
        queue.get()  # 阻塞,等待写者写入新数据并发出信号

    def release_for_reading(self):
        """读者完成共享读访问。"""
        self._local_storage.queue.task_done() # 通知写者当前批次数据已处理完毕

    def acquire_for_writing(self, immediate=True):
        """
        获取独占写访问权限。
        如果 immediate 为 True,则请求读者尽快释放访问权限。
        """
        if immediate:
            self._stop.value = 1  # 设置停止标志,通知读者尽快停止

        for queue in self._queues:
            queue.join()  # 阻塞,直到所有读者完成当前数据处理

    def release_for_writing(self) -> None:
        """释放独占写访问权限。"""
        self._stop.value = 0  # 重置停止标志
        for queue in self._queues:
            queue.put(None)  # 向每个读者队列放入一个项,唤醒等待的读者

    def is_stop_posted(self) -> bool:
        """
        读者定期调用此函数,检查写者是否请求立即独占控制。
        """
        return True if self._stop.value else False
登录后复制

代码解析:

喵记多
喵记多

喵记多 - 自带助理的 AI 笔记

喵记多27
查看详情 喵记多
  • __init__: 初始化时创建指定数量的JoinableQueue,每个队列对应一个读者。_queue_count用于确保每个读者进程只分配一个唯一的队列。_stop是一个multiprocessing.Value,用于进程间共享的停止标志。
  • acquire_for_reading: 读者首次调用时,会从_queues中获取一个队列并存储在_local_storage中,确保每个读者进程有自己的专属队列。然后调用queue.get()阻塞,等待写者释放新数据。
  • release_for_reading: 读者完成读取后,调用queue.task_done()。这是JoinableQueue的关键特性,它会减少队列中未完成任务的计数,当计数为零时,join()操作将解除阻塞。
  • acquire_for_writing: 写者调用此方法来获取写锁。如果immediate为True,它会设置_stop标志,通知读者尽快停止。然后,它对所有读者的队列调用join(),这将阻塞直到所有读者都完成了它们当前批次的数据处理。
  • release_for_writing: 写者完成写入后,重置_stop标志,然后向每个读者的队列中放入一个None,这会唤醒所有在acquire_for_reading中等待的读者。
  • is_stop_posted: 读者进程在处理数据时应周期性地调用此方法,检查_stop标志。如果返回True,表示写者请求立即停止,读者应尽快完成当前操作并释放锁。

多进程示例应用

以下示例展示了如何在多进程环境中使用RWLock来协调一个写者和多个读者:

# 共享数据类,用于在进程间传递数据
class SharedData:
    def __init__(self, initial_value=0):
        self.value = Value('i', initial_value, lock=False) # 使用multiprocessing.Value作为共享数据

def reader(rw_lock, id, shared_data):
    """读者进程函数"""
    while True:
        rw_lock.acquire_for_reading() # 获取读锁
        # 模拟长时间的读取任务
        # 在读取过程中,定期检查写者是否请求中断
        sleep_time = id / 10 # 不同读者有不同的模拟读取时间
        for _ in range(10):
            time.sleep(sleep_time)
            if rw_lock.is_stop_posted():
                print(f'reader {id} 收到停止请求,提前中断。', flush=True)
                break # 收到停止请求,立即中断

        # 实际读取共享数据
        current_value = shared_data.value.value
        print(f'reader {id} 完成处理数据: {current_value}', flush=True)
        rw_lock.release_for_reading() # 释放读锁
        time.sleep(0.1) # 模拟处理完后的小间歇

def writer(rw_lock, shared_data):
    """写者进程函数"""
    while True:
        # 当共享数据值为3时,写者请求立即写入
        # 否则,等待所有读者完成当前批次读取
        rw_lock.acquire_for_writing(immediate=(shared_data.value.value == 3))
        shared_data.value.value += 1 # 写入新数据
        print(f'wrote {shared_data.value.value} at {time.time()}', flush=True)
        rw_lock.release_for_writing() # 释放写锁
        time.sleep(1) # 模拟写完后的小间歇

def main():
    num_readers = 3
    rw_lock = RWLock(num_readers)
    shared_data = SharedData() # 共享数据实例

    # 启动读者进程
    for id in range(1, num_readers + 1):
        Process(target=reader, args=(rw_lock, id, shared_data), daemon=True).start()

    # 启动写者进程
    Process(target=writer, args=(rw_lock, shared_data), daemon=True).start()

    input('按 Enter 键终止程序:\n')

if __name__ == '__main__':
    main()
登录后复制

示例输出分析:

Hit enter to terminate:
wrote 1 at 1704820185.6386113
reader 1 done processing 1
reader 2 done processing 1
reader 3 done processing 1
wrote 2 at 1704820188.7424514
reader 1 done processing 2
reader 2 done processing 2
reader 3 done processing 2
wrote 3 at 1704820191.8461268
reader 1 done processing 3
reader 2 done processing 3
reader 3 done processing 3
wrote 4 at 1704820192.1564832  <-- 注意这里,写操作几乎立即发生
reader 1 done processing 4
reader 2 done processing 4
reader 3 done processing 4
wrote 5 at 1704820195.2668517
...
登录后复制

从输出中可以看出,当shared_data.value达到3时,写者调用acquire_for_writing(immediate=True)。此时,读者进程会因为is_stop_posted()返回True而中断其模拟的长时间读取任务,从而使写者能够更快地获取写锁并写入4。这证明了写者优先和中断机制的有效性。在通常情况下,写者会等待所有读者完成当前数据处理(由最慢的读者决定),但在immediate=True的场景下,写者可以大大缩短等待时间。

多线程环境下的优化

上述RWLock类是为多进程设计的,使用了multiprocessing模块中的同步原语(Process, Lock, Value, JoinableQueue)。如果您的应用是基于多线程而不是多进程,可以进行一些优化,将这些原语替换为threading模块和标准queue模块的对应实现,因为线程间的通信开销通常小于进程间通信。

主要变化包括:

  • multiprocessing.Process 替换为 threading.Thread。
  • multiprocessing.Lock 替换为 threading.Lock。
  • multiprocessing.Value 替换为普通的Python int变量(因为线程共享内存)。
  • multiprocessing.JoinableQueue 替换为 queue.Queue。

以下是适用于多线程的RWLockMultiThreading类:

from threading import Thread, Lock
from queue import Queue
from threading import local
import time

class RWLockMultiThreading:
    def __init__(self, num_readers: int):
        """
        创建一个支持单写多读的读写锁,适用于多线程环境。
        num_readers: 读者线程的数量。
        """
        if num_readers < 1 or not isinstance(num_readers, int):
            raise ValueError('num_readers 必须是正整数。')
        self._local_storage = local()
        self._num_readers = num_readers
        self._queue_count = 0  # 普通整数变量,线程共享
        self._stop = 0         # 普通整数变量,线程共享
        self._lock = Lock()    # threading.Lock
        self._queues = [Queue(1) for _ in range(self._num_readers)] # queue.Queue

    def acquire_for_reading(self) -> None:
        """读者线程请求共享读访问权限。"""
        queue = getattr(self._local_storage, 'queue', None)
        if queue is None:
            with self._lock:
                queue = self._queues[self._queue_count]
                self._queue_count += 1
            self._local_storage.queue = queue
        queue.get()  # 阻塞,等待写者写入新数据并发出信号

    def release_for_reading(self):
        """读者线程完成共享读访问。"""
        self._local_storage.queue.task_done() # 通知写者当前批次数据已处理完毕

    def acquire_for_writing(self, immediate=True):
        """
        获取独占写访问权限。
        如果 immediate 为 True,则请求读者线程尽快释放访问权限。
        """
        if immediate:
            self._stop = 1  # 设置停止标志

        for queue in self._queues:
            queue.join()  # 阻塞,直到所有读者完成当前数据处理

    def release_for_writing(self) -> None:
        """释放独占写访问权限。"""
        self._stop = 0  # 重置停止标志
        for queue in self._queues:
            queue.put(None)  # 向每个读者队列放入一个项,唤醒等待的读者

    def is_stop_posted(self) -> bool:
        """
        读者线程定期调用此函数,检查写者是否请求立即独占控制。
        """
        return True if self._stop else False

# 多线程示例应用
class SharedValue: # 简单的共享值类,线程共享
    def __init__(self):
        self.value = 0

def reader_thread(rw_lock, id, shared_data):
    """读者线程函数"""
    while True:
        rw_lock.acquire_for_reading()
        sleep_time = id / 10
        for _ in range(10):
            time.sleep(sleep_time)
            if rw_lock.is_stop_posted():
                print(f'reader {id} (thread) 收到停止请求,提前中断。', flush=True)
登录后复制

以上就是Python多进程/多线程读写锁实现:高效管理一写多读并发访问的详细内容,更多请关注php中文网其它相关文章!

最佳 Windows 性能的顶级免费优化软件
最佳 Windows 性能的顶级免费优化软件

每个人都需要一台速度更快、更稳定的 PC。随着时间的推移,垃圾文件、旧注册表数据和不必要的后台进程会占用资源并降低性能。幸运的是,许多工具可以让 Windows 保持平稳运行。

下载
来源:php中文网
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系admin@php.cn
最新问题
开源免费商场系统广告
热门教程
更多>
最新下载
更多>
网站特效
网站源码
网站素材
前端模板
关于我们 免责申明 意见反馈 讲师合作 广告合作 最新更新 English
php中文网:公益在线php培训,帮助PHP学习者快速成长!
关注服务号 技术交流群
PHP中文网订阅号
每天精选资源文章推送
PHP中文网APP
随时随地碎片化学习
PHP中文网抖音号
发现有趣的

Copyright 2014-2025 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号