Python滑动窗口中位数优化:双堆法解决TLE问题

花韻仙語
发布: 2025-10-02 11:54:02
原创
942人浏览过

Python滑动窗口中位数优化:双堆法解决TLE问题

本文深入探讨了使用双堆法解决滑动窗口中位数问题时遇到的时间超限(TLE)错误。通过分析传统双堆实现中移除操作的低效性,我们提出了一种基于“惰性删除”策略的优化方案。该方案利用元素索引和窗口边界来隐式标记过期元素,从而将移除操作的时间复杂度从O(K)降低到O(logK),显著提升了算法在大规模数据集上的性能。

问题描述:滑动窗口中位数

给定一个整数数组 nums 和一个整数 k,存在一个大小为 k 的滑动窗口,它从数组的最左侧移动到最右侧。每次滑动窗口向右移动一个位置。任务是返回每个窗口中的中位数组成的数组。

示例:nums = [1,3,-1,-3,5,3,6,7], k = 3 窗口移动及中位数: [1,3,-1] -> 中位数 1[3,-1,-3] -> 中位数 -1[-1,-3,5] -> 中位数 -1[-3,5,3] -> 中位数 3[5,3,6] -> 中位数 5[3,6,7] -> 中位数 6 返回 [1.0, -1.0, -1.0, 3.0, 5.0, 6.0]

传统双堆法及性能瓶颈分析

双堆法是解决中位数问题的常用策略。它维护两个堆:一个最大堆(small)存储较小的一半元素,一个最小堆(large)存储较大的一半元素。通过平衡这两个堆的大小,可以快速获取中位数。

传统实现思路:

  1. 添加元素 (addNum):将新元素添加到合适的堆,并进行堆平衡,确保 len(small) 和 len(large) 的差值不超过1。
  2. 查找中位数 (findMedian):如果两个堆大小相等,中位数是两个堆顶元素的平均值;否则,中位数是元素较多的那个堆的堆顶元素。
  3. 移除元素 (popNum):当窗口滑动时,需要移除窗口最左侧的元素。

问题中提供的初始代码示例展示了这种传统实现:

import heapq

class Solution(object):
    def __init__(self):
        self.small = [] # 最大堆 (存储负值模拟)
        self.large = [] # 最小堆

    def balance(self):
        # ... 堆平衡逻辑 ...

    def addNum(self, num):
        # ... 添加元素逻辑 ...

    def findMedian(self):
        # ... 查找中位数逻辑 ...

    def popNum(self, num):
        # 性能瓶颈所在
        if num > (self.small[0] * -1):
            self.large.remove(num) # O(K) 查找并移除
            heapq.heapify(self.large) # O(K) 重新堆化
        else:
            self.small.remove(num * -1) # O(K) 查找并移除
            heapq.heapify(self.small) # O(K) 重新堆化
        self.balance()

    def medianSlidingWindow(self, nums, k):
        # ... 主逻辑 ...
登录后复制

性能瓶颈: 在上述代码中,popNum 方法是导致时间超限(TLE)的主要原因。Python 的 list.remove(value) 操作需要遍历列表来查找指定值,其时间复杂度为 O(K)(其中 K 是堆的大小)。移除元素后,为了恢复堆的性质,需要调用 heapq.heapify(),这同样是 O(K) 的操作。

对于一个包含 N 个元素、窗口大小为 k 的数组,滑动窗口总共会进行 N-k+1 次操作。每次操作包含添加、移除和查找中位数。如果移除操作是 O(K),则总时间复杂度将达到 O(NK)。当 N=100000, K=50000 时,`NK的量级为5 * 10^9`,这远远超出了可接受的时间限制。

立即学习Python免费学习笔记(深入)”;

优化策略:惰性删除与索引标记

为了将移除操作的时间复杂度降低到 O(logK),我们需要避免直接在堆中搜索和物理移除元素。有两种常见的优化方法:

  1. 带索引映射的堆: 维护一个哈希表(字典),将每个值映射到其在堆列表中的索引。通过索引可以直接定位元素,然后用堆末尾元素替换,再进行 O(logK) 的上浮或下沉操作来恢复堆性质。这种方法需要更复杂的自定义堆实现。
  2. 惰性删除(Lazy Deletion): 不立即物理移除元素,而是对其进行标记。当访问堆顶元素时,如果发现它是被标记为“已删除”的元素,则将其弹出并忽略,直到找到一个未被删除的有效元素。

针对滑动窗口问题,惰性删除策略尤为适用,因为它天然地可以通过窗口的移动来“标记”元素是否过期。为了处理数组中可能存在的重复值,我们需要将每个元素与其在原始数组中的索引绑定,形成 (值, 索引) 对。

千面视频动捕
千面视频动捕

千面视频动捕是一个AI视频动捕解决方案,专注于将视频中的人体关节二维信息转化为三维模型动作。

千面视频动捕 27
查看详情 千面视频动捕

惰性删除的实现细节:

  • 存储 (值, 索引) 对: 堆中存储的不再是单纯的数值,而是 (value, index) 元组。这样即使值相同,也能通过索引唯一标识一个元素。
  • lowindex 标记: 在每个堆中维护一个 lowindex 变量。当窗口向右滑动时,最左侧的元素 (old_val, old_idx) 离开窗口。此时,将 lowindex 更新为 old_idx + 1。任何元素的索引 item[1] 小于 lowindex,都意味着它已经不在当前窗口内,应被视为“已删除”。
  • peek 和 pop 操作: 在 peek(查看堆顶)或 pop(弹出堆顶)时,不断检查堆顶元素 item 的 item[1] 是否小于 lowindex。如果是,则说明该元素已过期,将其从堆中弹出并继续检查下一个堆顶元素,直到找到一个有效的、未被删除的元素。

优化后的双堆实现

以下是基于惰性删除策略的优化实现。为了更好地组织代码和复用逻辑,我们创建了自定义的堆类。

import heapq

# 辅助函数:用于MaxHeap将值取反
def negate(item):
    """将 (value, index) 元组中的值取反,用于模拟最大堆。"""
    return -item[0], item[1]

class WindowHeap(object):
    """
    基础窗口堆类,支持惰性删除。
    conv: 一个转换函数,用于处理元素(例如,MaxHeap需要将值取反)。
    """
    def __init__(self, conv=lambda x: x):
        self.heap = []
        self.conv = conv  # 元素转换函数 (例如,MaxHeap的取反操作)
        self.lowindex = 0 # 窗口下限索引,用于识别已删除元素

    def peek(self):
        """
        查看堆顶元素,跳过所有已删除的元素。
        返回 (value, index) 元组或 None(如果堆为空)。
        """
        while self.heap:
            item = self.conv(self.heap[0]) # 获取原始值和索引
            if item[1] >= self.lowindex: # 如果索引在当前窗口内,则有效
                return item
            # 否则,该元素已过期,从堆中弹出并忽略
            heapq.heappop(self.heap)
        return None # 堆中没有有效元素

    def push(self, item):
        """向堆中添加一个 (value, index) 元组。"""
        heapq.heappush(self.heap, self.conv(item))

    def pop(self):
        """
        弹出堆顶元素,跳过所有已删除的元素。
        返回弹出的 (value, index) 元组。
        """
        item = self.peek() # 确保获取的是有效元素
        if item:
            heapq.heappop(self.heap)
        return item

class MinWindowHeap(WindowHeap):
    """最小窗口堆,直接存储 (value, index) 元组。"""
    def __init__(self):
        super(MinWindowHeap, self).__init__(lambda x: x) # 无需转换

class MaxWindowHeap(WindowHeap):
    """最大窗口堆,通过 negate 函数将值取反存储。"""
    def __init__(self):
        super(MaxWindowHeap, self).__init__(negate) # 使用 negate 函数

class Solution(object):
    def rebalance(self, add_val):
        """
        平衡两个堆的大小,并更新平衡因子。
        add_val: 1 表示 large 堆增加,-1 表示 small 堆增加。
        """
        self.balance += add_val
        if abs(self.balance) < 2: # 堆大小差值在 -1 到 1 之间,无需平衡
            return

        # 如果 small 堆过大,将 small 堆顶元素移到 large 堆
        if self.balance > 1: # 意味着 small 堆比 large 堆多一个元素
            self.small.push(self.large.pop()) # 注意:这里是 large.pop() 然后 push 到 small
            # 实际上是:如果 small 比 large 多 2 个,需要从 small 移一个到 large
            # 或者 large 比 small 多 2 个,需要从 large 移一个到 small
            # 这里代码的 self.balance 含义与常规理解可能不同
            # 假设 self.balance > 0 意味着 large 堆元素多, self.balance < 0 意味着 small 堆元素多
            # 原始代码逻辑是:
            # if self.balance > 1: # 意味 large 堆比 small 堆多 2 个或以上
            #     self.small.push(self.large.pop())
            # elif self.balance < -1: # 意味 small 堆比 large 堆多 2 个或以上
            #     self.large.push(self.small.pop())

            # 修正后的平衡逻辑应为:
            # 如果 small 堆比 large 堆多两个或以上元素
            if self.small.peek() and self.large.peek() and len(self.small.heap) > len(self.large.heap) + 1:
                self.large.push(self.small.pop())
            # 如果 large 堆比 small 堆多两个或以上元素
            elif self.large.peek() and self.small.peek() and len(self.large.heap) > len(self.small.heap) + 1:
                self.small.push(self.large.pop())
            # 重新计算平衡因子
            self.balance = len(self.large.heap) - len(self.small.heap) # 假设 balance 是 large - small

            # 简化平衡逻辑(根据原答案,balance 变量的更新是关键)
            # 原答案的 rebalance 逻辑是基于 self.balance 的变化来判断的
            # self.balance 初始为0,每次 insert/remove 改变其值
            # 如果 self.balance > 1,表示 large 堆比 small 堆“多”了一个元素,需要从 large 移到 small
            # 如果 self.balance < -1,表示 small 堆比 large 堆“多”了一个元素,需要从 small 移到 large
            # 这里的 self.balance 实际上记录的是 large 堆和 small 堆的“有效”元素数量差
            if self.balance > 1: # large 堆有效元素比 small 堆多 2 个或以上
                self.small.push(self.large.pop())
                self.balance -= 2 # large 减少1,small 增加1,差值减少2
            elif self.balance < -1: # small 堆有效元素比 large 堆多 2 个或以上
                self.large.push(self.small.pop())
                self.balance += 2 # small 减少1,large 增加1,差值增加2

    def insert(self, item):
        """向双堆结构中插入一个 (value, index) 元组。"""
        # 确定插入到哪个堆
        # 如果 large 堆为空,或当前 item 小于 large 堆顶,则插入 small 堆
        if not self.large.peek() or item[0] < self.large.peek()[0]:
            self.small.push(item)
            self.balance -= 1 # small 堆增加,平衡因子减1
        else:
            self.large.push(item)
            self.balance += 1 # large 堆增加,平衡因子加1
        self.rebalance(0) # 插入后进行平衡,add_val 设为 0,因为 self.balance 已经更新

    def remove(self, item):
        """
        从双堆结构中移除一个 (value, index) 元组(惰性删除)。
        通过更新 lowindex 标记元素为已删除,并重新平衡。
        """
        # 判断 item 属于哪个堆
        if item[0] >= self.large.peek()[0]: # 元素在 large 堆中
            self.balance -= 1 # large 堆有效元素减少,平衡因子减1
        else: # 元素在 small 堆中
            self.balance += 1 # small 堆有效元素减少,平衡因子加1

        # 更新 lowindex,标记该元素及其之前的所有元素为已删除
        # 注意:这里更新的是两个堆的 lowindex,确保它们只关注当前窗口内的元素
        self.large.lowindex = self.small.lowindex = item[1] + 1
        self.rebalance(0) # 移除后进行平衡

    def getMedian(self):
        """获取当前窗口的中位数。"""
        # 如果平衡因子为 0,表示两个堆有效元素数量相等
        if self.balance == 0:
            # 中位数是两个堆顶的平均值
            return (self.large.peek()[0] + self.small.peek()[0]) * 0.5
        # 如果 large 堆有效元素多,中位数是 large 堆顶
        elif self.balance > 0:
            return float(self.large.peek()[0])
        # 如果 small 堆有效元素多,中位数是 small 堆顶
        else:
            return float(self.small.peek()[0])

    def medianSlidingWindow(self, nums, k):
        """
        计算滑动窗口中位数的主函数。
        :type nums: List[int]
        :type k: int
        :rtype: List[float]
        """
        self.small = MaxWindowHeap() # 存储较小一半的元素
        self.large = MinWindowHeap() # 存储较大一半的元素
        self.balance = 0 # 平衡因子:large 堆有效元素数量 - small 堆有效元素数量

        # 将原始数组转换为 (value, index) 对列表
        items = [(val, i) for i, val in enumerate(nums)]

        # 初始化第一个窗口
        for item in items[:k]:
            self.insert(item)

        result = [self.getMedian()]

        # 滑动窗口并计算后续中位数
        # zip(items, items[k:]) 巧妙地生成 (旧元素, 新元素) 对
        for olditem, newitem in zip(items, items[k:]):
            self.remove(olditem) # 移除旧元素(惰性删除)
            self.insert(newitem) # 插入新元素
            result.append(self.getMedian())

        return result
登录后复制

代码解释:

  1. negate 函数: 辅助函数,用于将 (value, index) 元组中的 value 取反,以模拟最大堆的行为(Python 的 heapq 默认是最小堆)。
  2. WindowHeap 类:
    • __init__:初始化堆列表、转换函数 conv 和 lowindex。
    • peek:在返回堆顶元素之前,会循环弹出所有 item[1] < self.lowindex 的过期元素,直到找到一个有效元素或堆为空。
    • push:将转换后的元素推入堆。
    • pop:先调用 peek 确保找到有效元素,然后弹出并返回。
  3. MinWindowHeap 和 MaxWindowHeap: 分别继承 WindowHeap,并传入不同的 conv 函数。
  4. Solution 类:
    • rebalance(add_val):负责调整两个堆的大小,确保它们的有效元素数量差值不超过1。self.balance 记录了 large 堆和 small 堆的有效元素数量差。
    • insert(item):根据 item 的值将其插入 small 堆或 large 堆,并更新 self.balance,然后调用 rebalance。
    • remove(item):这是核心优化所在。它不直接从堆中移除 item。而是根据 item 属于哪个堆来更新 self.balance,然后将 self.large.lowindex 和 self.small.lowindex 都更新为 item[1] + 1。这意味着所有索引小于 item[1] + 1 的元素都已过期。最后调用 rebalance。
    • getMedian():根据 self.balance 和两个堆的 peek() 结果计算中位数。
    • medianSlidingWindow(nums, k):主逻辑。首先将 nums 转换为 (value, index) 对列表。然后初始化第一个窗口,并循环滑动窗口,每次调用 remove 和 insert 来更新双堆,并获取中位数。

性能分析

  • 添加元素 (insert): O(logK),因为 heapq.heappush 是 O(logK),并且 rebalance 操作也是 O(logK)。
  • 移除元素 (remove): O(logK),因为 lowindex 的更新是 O(1),rebalance 操作是 O(logK)。peek 和 pop 操作虽然可能循环多次弹出过期元素,但在整个滑动窗口过程中,每个元素最多被弹出一次,因此分摊到每次操作上仍然是 O(logK)。
  • 获取中位数 (getMedian): O(1),因为 peek 操作是 O(logK)(但在分摊意义上),获取堆顶元素是 O(1)。

因此,优化后的算法总时间复杂度为 O(N log K)。对于 N=100000, K=50000 的测试用例,100000 * log(50000) 大约是 100000 * 16,即 1.6 * 10^6 级别,这在可接受的时间范围内。

总结与注意事项

  • 惰性删除是关键: 通过将物理移除替换为逻辑标记,并延迟处理过期元素,显著提升了滑动窗口中位数问题的性能。
  • lowindex 的作用: 它作为窗口的左边界,巧妙地利用元素索引来判断其是否过期。
  • balance 变量: 准确维护两个堆的有效元素数量差是正确计算中位数和进行堆平衡的关键。
  • 处理重复值: 使用 (value, index) 元组是处理输入数组中重复值的必要手段,确保每个元素都能被唯一标识和追踪。
  • Python heapq 模块: 默认实现最小堆。模拟最大堆需要存储元素的负值。
  • Python 2/3 super() 兼容性: 原始答案提到了 super 调用在 Python 2 中需要参数,Python 3 中可以省略。在现代 Python 开发中,通常使用 Python 3 语法,即 super().__init__()。

通过采用这种惰性删除和索引标记的优化策略,我们可以高效地解决滑动窗口中位数问题,避免在大规模数据集上出现时间超限错误。

以上就是Python滑动窗口中位数优化:双堆法解决TLE问题的详细内容,更多请关注php中文网其它相关文章!

最佳 Windows 性能的顶级免费优化软件
最佳 Windows 性能的顶级免费优化软件

每个人都需要一台速度更快、更稳定的 PC。随着时间的推移,垃圾文件、旧注册表数据和不必要的后台进程会占用资源并降低性能。幸运的是,许多工具可以让 Windows 保持平稳运行。

下载
来源:php中文网
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系admin@php.cn
最新问题
开源免费商场系统广告
热门教程
更多>
最新下载
更多>
网站特效
网站源码
网站素材
前端模板
关于我们 免责申明 举报中心 意见反馈 讲师合作 广告合作 最新更新 English
php中文网:公益在线php培训,帮助PHP学习者快速成长!
关注服务号 技术交流群
PHP中文网订阅号
每天精选资源文章推送
PHP中文网APP
随时随地碎片化学习

Copyright 2014-2025 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号