
给定一个整数数组 nums 和一个整数 k,存在一个大小为 k 的滑动窗口,它从数组的最左侧移动到最右侧。每次滑动窗口向右移动一个位置。任务是返回每个窗口中的中位数组成的数组。
示例:nums = [1,3,-1,-3,5,3,6,7], k = 3 窗口移动及中位数: [1,3,-1] -> 中位数 1[3,-1,-3] -> 中位数 -1[-1,-3,5] -> 中位数 -1[-3,5,3] -> 中位数 3[5,3,6] -> 中位数 5[3,6,7] -> 中位数 6 返回 [1.0, -1.0, -1.0, 3.0, 5.0, 6.0]
双堆法是解决中位数问题的常用策略。它维护两个堆:一个最大堆(small)存储较小的一半元素,一个最小堆(large)存储较大的一半元素。通过平衡这两个堆的大小,可以快速获取中位数。
传统实现思路:
问题中提供的初始代码示例展示了这种传统实现:
import heapq
class Solution(object):
def __init__(self):
self.small = [] # 最大堆 (存储负值模拟)
self.large = [] # 最小堆
def balance(self):
# ... 堆平衡逻辑 ...
def addNum(self, num):
# ... 添加元素逻辑 ...
def findMedian(self):
# ... 查找中位数逻辑 ...
def popNum(self, num):
# 性能瓶颈所在
if num > (self.small[0] * -1):
self.large.remove(num) # O(K) 查找并移除
heapq.heapify(self.large) # O(K) 重新堆化
else:
self.small.remove(num * -1) # O(K) 查找并移除
heapq.heapify(self.small) # O(K) 重新堆化
self.balance()
def medianSlidingWindow(self, nums, k):
# ... 主逻辑 ...性能瓶颈: 在上述代码中,popNum 方法是导致时间超限(TLE)的主要原因。Python 的 list.remove(value) 操作需要遍历列表来查找指定值,其时间复杂度为 O(K)(其中 K 是堆的大小)。移除元素后,为了恢复堆的性质,需要调用 heapq.heapify(),这同样是 O(K) 的操作。
对于一个包含 N 个元素、窗口大小为 k 的数组,滑动窗口总共会进行 N-k+1 次操作。每次操作包含添加、移除和查找中位数。如果移除操作是 O(K),则总时间复杂度将达到 O(NK)。当 N=100000, K=50000 时,`NK的量级为5 * 10^9`,这远远超出了可接受的时间限制。
立即学习“Python免费学习笔记(深入)”;
为了将移除操作的时间复杂度降低到 O(logK),我们需要避免直接在堆中搜索和物理移除元素。有两种常见的优化方法:
针对滑动窗口问题,惰性删除策略尤为适用,因为它天然地可以通过窗口的移动来“标记”元素是否过期。为了处理数组中可能存在的重复值,我们需要将每个元素与其在原始数组中的索引绑定,形成 (值, 索引) 对。
惰性删除的实现细节:
以下是基于惰性删除策略的优化实现。为了更好地组织代码和复用逻辑,我们创建了自定义的堆类。
import heapq
# 辅助函数:用于MaxHeap将值取反
def negate(item):
"""将 (value, index) 元组中的值取反,用于模拟最大堆。"""
return -item[0], item[1]
class WindowHeap(object):
"""
基础窗口堆类,支持惰性删除。
conv: 一个转换函数,用于处理元素(例如,MaxHeap需要将值取反)。
"""
def __init__(self, conv=lambda x: x):
self.heap = []
self.conv = conv # 元素转换函数 (例如,MaxHeap的取反操作)
self.lowindex = 0 # 窗口下限索引,用于识别已删除元素
def peek(self):
"""
查看堆顶元素,跳过所有已删除的元素。
返回 (value, index) 元组或 None(如果堆为空)。
"""
while self.heap:
item = self.conv(self.heap[0]) # 获取原始值和索引
if item[1] >= self.lowindex: # 如果索引在当前窗口内,则有效
return item
# 否则,该元素已过期,从堆中弹出并忽略
heapq.heappop(self.heap)
return None # 堆中没有有效元素
def push(self, item):
"""向堆中添加一个 (value, index) 元组。"""
heapq.heappush(self.heap, self.conv(item))
def pop(self):
"""
弹出堆顶元素,跳过所有已删除的元素。
返回弹出的 (value, index) 元组。
"""
item = self.peek() # 确保获取的是有效元素
if item:
heapq.heappop(self.heap)
return item
class MinWindowHeap(WindowHeap):
"""最小窗口堆,直接存储 (value, index) 元组。"""
def __init__(self):
super(MinWindowHeap, self).__init__(lambda x: x) # 无需转换
class MaxWindowHeap(WindowHeap):
"""最大窗口堆,通过 negate 函数将值取反存储。"""
def __init__(self):
super(MaxWindowHeap, self).__init__(negate) # 使用 negate 函数
class Solution(object):
def rebalance(self, add_val):
"""
平衡两个堆的大小,并更新平衡因子。
add_val: 1 表示 large 堆增加,-1 表示 small 堆增加。
"""
self.balance += add_val
if abs(self.balance) < 2: # 堆大小差值在 -1 到 1 之间,无需平衡
return
# 如果 small 堆过大,将 small 堆顶元素移到 large 堆
if self.balance > 1: # 意味着 small 堆比 large 堆多一个元素
self.small.push(self.large.pop()) # 注意:这里是 large.pop() 然后 push 到 small
# 实际上是:如果 small 比 large 多 2 个,需要从 small 移一个到 large
# 或者 large 比 small 多 2 个,需要从 large 移一个到 small
# 这里代码的 self.balance 含义与常规理解可能不同
# 假设 self.balance > 0 意味着 large 堆元素多, self.balance < 0 意味着 small 堆元素多
# 原始代码逻辑是:
# if self.balance > 1: # 意味 large 堆比 small 堆多 2 个或以上
# self.small.push(self.large.pop())
# elif self.balance < -1: # 意味 small 堆比 large 堆多 2 个或以上
# self.large.push(self.small.pop())
# 修正后的平衡逻辑应为:
# 如果 small 堆比 large 堆多两个或以上元素
if self.small.peek() and self.large.peek() and len(self.small.heap) > len(self.large.heap) + 1:
self.large.push(self.small.pop())
# 如果 large 堆比 small 堆多两个或以上元素
elif self.large.peek() and self.small.peek() and len(self.large.heap) > len(self.small.heap) + 1:
self.small.push(self.large.pop())
# 重新计算平衡因子
self.balance = len(self.large.heap) - len(self.small.heap) # 假设 balance 是 large - small
# 简化平衡逻辑(根据原答案,balance 变量的更新是关键)
# 原答案的 rebalance 逻辑是基于 self.balance 的变化来判断的
# self.balance 初始为0,每次 insert/remove 改变其值
# 如果 self.balance > 1,表示 large 堆比 small 堆“多”了一个元素,需要从 large 移到 small
# 如果 self.balance < -1,表示 small 堆比 large 堆“多”了一个元素,需要从 small 移到 large
# 这里的 self.balance 实际上记录的是 large 堆和 small 堆的“有效”元素数量差
if self.balance > 1: # large 堆有效元素比 small 堆多 2 个或以上
self.small.push(self.large.pop())
self.balance -= 2 # large 减少1,small 增加1,差值减少2
elif self.balance < -1: # small 堆有效元素比 large 堆多 2 个或以上
self.large.push(self.small.pop())
self.balance += 2 # small 减少1,large 增加1,差值增加2
def insert(self, item):
"""向双堆结构中插入一个 (value, index) 元组。"""
# 确定插入到哪个堆
# 如果 large 堆为空,或当前 item 小于 large 堆顶,则插入 small 堆
if not self.large.peek() or item[0] < self.large.peek()[0]:
self.small.push(item)
self.balance -= 1 # small 堆增加,平衡因子减1
else:
self.large.push(item)
self.balance += 1 # large 堆增加,平衡因子加1
self.rebalance(0) # 插入后进行平衡,add_val 设为 0,因为 self.balance 已经更新
def remove(self, item):
"""
从双堆结构中移除一个 (value, index) 元组(惰性删除)。
通过更新 lowindex 标记元素为已删除,并重新平衡。
"""
# 判断 item 属于哪个堆
if item[0] >= self.large.peek()[0]: # 元素在 large 堆中
self.balance -= 1 # large 堆有效元素减少,平衡因子减1
else: # 元素在 small 堆中
self.balance += 1 # small 堆有效元素减少,平衡因子加1
# 更新 lowindex,标记该元素及其之前的所有元素为已删除
# 注意:这里更新的是两个堆的 lowindex,确保它们只关注当前窗口内的元素
self.large.lowindex = self.small.lowindex = item[1] + 1
self.rebalance(0) # 移除后进行平衡
def getMedian(self):
"""获取当前窗口的中位数。"""
# 如果平衡因子为 0,表示两个堆有效元素数量相等
if self.balance == 0:
# 中位数是两个堆顶的平均值
return (self.large.peek()[0] + self.small.peek()[0]) * 0.5
# 如果 large 堆有效元素多,中位数是 large 堆顶
elif self.balance > 0:
return float(self.large.peek()[0])
# 如果 small 堆有效元素多,中位数是 small 堆顶
else:
return float(self.small.peek()[0])
def medianSlidingWindow(self, nums, k):
"""
计算滑动窗口中位数的主函数。
:type nums: List[int]
:type k: int
:rtype: List[float]
"""
self.small = MaxWindowHeap() # 存储较小一半的元素
self.large = MinWindowHeap() # 存储较大一半的元素
self.balance = 0 # 平衡因子:large 堆有效元素数量 - small 堆有效元素数量
# 将原始数组转换为 (value, index) 对列表
items = [(val, i) for i, val in enumerate(nums)]
# 初始化第一个窗口
for item in items[:k]:
self.insert(item)
result = [self.getMedian()]
# 滑动窗口并计算后续中位数
# zip(items, items[k:]) 巧妙地生成 (旧元素, 新元素) 对
for olditem, newitem in zip(items, items[k:]):
self.remove(olditem) # 移除旧元素(惰性删除)
self.insert(newitem) # 插入新元素
result.append(self.getMedian())
return result
代码解释:
因此,优化后的算法总时间复杂度为 O(N log K)。对于 N=100000, K=50000 的测试用例,100000 * log(50000) 大约是 100000 * 16,即 1.6 * 10^6 级别,这在可接受的时间范围内。
通过采用这种惰性删除和索引标记的优化策略,我们可以高效地解决滑动窗口中位数问题,避免在大规模数据集上出现时间超限错误。
以上就是Python滑动窗口中位数优化:双堆法解决TLE问题的详细内容,更多请关注php中文网其它相关文章!
每个人都需要一台速度更快、更稳定的 PC。随着时间的推移,垃圾文件、旧注册表数据和不必要的后台进程会占用资源并降低性能。幸运的是,许多工具可以让 Windows 保持平稳运行。
Copyright 2014-2025 https://www.php.cn/ All Rights Reserved | php.cn | 湘ICP备2023035733号