
1. 问题概述:滑动窗口中位数
滑动窗口中位数问题要求在一个整数数组 nums 上,维护一个大小为 k 的滑动窗口。当窗口从左向右移动时,每次移动一个位置,都需要计算并返回当前窗口内的中位数。中位数是排序后位于中间的那个数;如果窗口大小 k 是偶数,则中位数是中间两个数的平均值。
例如,对于 nums = [1,3,-1,-3,5,3,6,7] 和 k = 3:
- 窗口 [1,3,-1],中位数是 1
- 窗口 [3,-1,-3],中位数是 -1
- ...依此类推
2. 传统双堆方法的性能瓶颈
解决滑动窗口中位数问题的一个常见且高效的思路是使用两个堆(一个最大堆 small 存储较小的一半元素,一个最小堆 large 存储较大的一半元素)。通过维护这两个堆,可以O(1)或O(log K)地获取中位数,并在O(log K)时间内添加新元素。
然而,当需要从窗口中“移除”一个旧元素时,问题就出现了。原始代码中的 popNum 方法采用了以下逻辑:
def popNum(self, num):
if num > (self.small[0] * -1): # 假设small[0]是最大堆的堆顶,其真实值是负数
self.large.remove(num)
heapq.heapify(self.large)
else:
self.small.remove(num * -1)
heapq.heapify(self.small)
self.balance()这里的瓶颈在于 list.remove(num) 和 heapq.heapify(heap)。
立即学习“Python免费学习笔记(深入)”;
- list.remove(num):在Python列表中查找并移除一个元素的时间复杂度是O(N),其中N是列表的当前大小(在这里是K)。
- heapq.heapify(heap):在移除元素后,堆的结构被破坏,需要调用 heapify 来重新构建堆,其时间复杂度也是O(N)(或O(K))。
因此,每次移除操作的复杂度高达O(K)。在一个包含 N 个元素的数组上进行 N-K+1 次窗口滑动,每次滑动都涉及一次移除和一次添加,导致总时间复杂度飙升至 O(NK)。当 N=100000 且 K=50000 时,`NK的量级将达到5 * 10^9`,这远远超出了通常的计算限制,从而导致“时间限制超出”(TLE)错误。
3. 优化的双堆方法:延迟删除策略
为了解决移除操作的效率问题,我们可以采用“延迟删除”(Lazy Deletion)策略。这种方法的核心思想是:当一个元素离开窗口时,我们不立即从堆中物理删除它,而是对其进行“标记”。当我们需要从堆中获取元素(如peek或pop)时,如果遇到已标记为“删除”的元素,就跳过它,直到找到一个有效元素。
实现延迟删除需要解决几个关键问题:
- 唯一标识元素: 窗口中可能存在重复的数值。为了区分它们,我们需要将每个元素与其在原始数组中的索引绑定,形成 (值, 索引) 对。例如,[1, 5, 1] 中有两个 1,通过 (1, 0) 和 (1, 2) 可以区分。
- 标记删除: 如何高效地标记一个元素为“已删除”?最简单的方法是维护一个窗口的起始索引 lowindex。任何 (值, 索引) 对中,如果 索引
- 自定义堆封装: Python的 heapq 模块提供了堆的基本操作,但不支持延迟删除。我们需要封装 heapq,创建自定义的堆类来处理 (值, 索引) 对和延迟删除逻辑。
4. 核心实现细节
我们将构建两个自定义的堆类:MinWindowHeap(最小堆)和 MaxWindowHeap(最大堆),以及一个 Solution 类来协调它们。
4.1 自定义堆结构:MinWindowHeap 和 MaxWindowHeap
这两个类将封装 heapq 的功能,并添加延迟删除的逻辑。
-
MinWindowHeap (最小堆):
- __init__(self, conv=lambda x: x): 初始化内部列表 self.heap。conv 是一个转换函数,用于处理元素入堆时的形式(例如,MaxWindowHeap 会用它来取反值)。self.lowindex 记录当前窗口的起始索引。
- peek(self): 这是延迟删除的关键。它会循环检查堆顶元素:如果堆顶元素的索引小于 self.lowindex,说明该元素已过期,将其弹出并继续检查下一个堆顶元素,直到找到一个有效的(未过期的)元素或者堆为空。
- push(self, item): 将 item((值, 索引) 对)通过 conv 函数转换后推入堆。
- pop(self): 先调用 peek() 确保获取到的是有效堆顶,然后从 self.heap 中实际弹出该元素。
-
MaxWindowHeap (最大堆):
- 继承自 MinWindowHeap。
- __init__(self): 调用 super().__init__(negate),其中 negate 函数用于将 (值, 索引) 对的 值 取反,从而将 heapq 的最小堆行为模拟成最大堆。
4.2 Solution 类中的核心逻辑
Solution 类将协调 small (MaxWindowHeap) 和 large (MinWindowHeap) 两个堆,实现滑动窗口中位数的功能。
-
rebalance(self, add):
- 维护一个 self.balance 计数器,表示 large 堆比 small 堆多出的元素数量(或负数表示 small 堆多出)。
- 当 abs(self.balance) 大于1时,说明两个堆不平衡,需要将一个堆的顶部元素移动到另一个堆,直到平衡。
- 此方法在每次插入或逻辑删除元素后被调用。
-
insert(self, item):
- item 是 (值, 索引) 对。
- 根据 item 的值与 large 堆的堆顶元素比较,决定将其插入 small 堆还是 large 堆。
- 插入后调用 rebalance 保持堆的平衡。
-
remove(self, item):
- item 是 (值, 索引) 对,表示要移除的旧元素。
- 关键优化: 它不实际从堆中移除元素,而是通过更新 self.large.lowindex 和 self.small.lowindex 为 item[1] + 1 来标记所有索引小于等于 item[1] 的元素为“过期”。
- 然后调用 rebalance 来调整平衡计数。
-
getMedian(self):
- 根据 self.balance 的值和两个堆的堆顶元素(通过 peek() 获取有效元素)来计算当前窗口的中位数。
- 如果 balance 为0,取两个堆顶的平均值;否则取元素较多那个堆的堆顶。
-
medianSlidingWindow(self, nums, k):
- 主函数,用于解决问题。
- 首先将 nums 转换为 (值, 索引) 对的列表 items。
- 初始化 small 和 large 堆,并用前 k 个元素填充第一个窗口。
- 计算第一个窗口的中位数并添加到结果列表。
- 然后通过循环滑动窗口:
- zip(items, items[k:]) 用于同时获取离开窗口的 olditem 和进入窗口的 item。
- 对 olditem 调用 remove(),对 item 调用 insert()。
- 每次滑动后计算当前窗口的中位数并添加到结果。
5. 完整解决方案代码
import heapq
# 辅助函数:将 (值, 索引) 对的值取反,用于模拟最大堆
def negate(item):
return -item[0], item[1]
class MinWindowHeap(object):
def __init__(self, conv=lambda x: x):
self.heap = []
self.conv = conv # 转换函数 (例如,用于MaxHeap取反值)
self.lowindex = 0 # 窗口下限索引,用于识别已删除项
def peek(self): # 返回 (值, 索引) 或 None (如果堆为空或仅包含已删除项)
while self.heap:
# 转换堆顶元素,例如 MaxWindowHeap 会将值取反
item = self.conv(self.heap[0])
if item[1] >= self.lowindex: # 如果索引在当前窗口内,则有效
return item
# 元素已过期(索引小于lowindex),从堆中弹出
heapq.heappop(self.heap)
return None # 堆中没有有效元素
def push(self, item):
# 将 (值, 索引) 对通过转换函数推入堆
heapq.heappush(self.heap, self.conv(item))
def pop(self):
item = self.peek() # 获取有效堆顶,同时清除所有过期的堆顶
if item:
heapq.heappop(self.heap) # 实际弹出有效堆顶
return item # 返回被弹出的有效元素
class MaxWindowHeap(MinWindowHeap):
def __init__(self):
# Python 3 中 super() 可以不带参数
super(MaxWindowHeap, self).__init__(negate) # 使用negate函数将最小堆模拟为最大堆
class Solution(object):
def rebalance(self, add):
"""
调整两个堆的平衡。
add > 0 表示 large 堆增加了元素,或 small 堆移除了元素。
add < 0 表示 small 堆增加了元素,或 large 堆移除了元素。
"""
self.balance += add
if abs(self.balance) < 2: # 堆已平衡 (大小差不超过1)
return
if self.balance > 1: # large 堆元素过多,需要移动一个到 small 堆
self.small.push(self.large.pop())
elif self.balance < -1: # small 堆元素过多,需要移动一个到 large 堆
self.large.push(self.small.pop())
self.balance = 0 # 重新平衡后,balance 归零
def insert(self, item):
"""
将新元素插入到合适的堆中并调整平衡。
item 是 (值, 索引) 对。
"""
# 尝试获取 large 堆的堆顶作为枢轴,如果 large 堆为空,则使用 small 堆的堆顶
pivot = self.large.peek()
# 判断新元素应该放入 large 堆还是 small 堆
# 如果 large 堆为空,或新元素大于等于 large 堆的堆顶,则放入 large 堆
islarge = (pivot is None) or (item[0] >= pivot[0])
heap = self.large if islarge else self.small
heap.push(item)
# 更新平衡计数并重新平衡
self.rebalance(1 if islarge else -1)
def remove(self, item):
"""
逻辑删除一个元素(通过更新 lowindex 标记过期)并调整平衡。
item 是 (值, 索引) 对,表示离开窗口的旧元素。
"""
# 获取 large 堆的堆顶作为枢轴
pivot = self.large.peek()
# 判断旧元素是属于 large 堆还是 small 堆
islarge = (pivot is not None) and (item[0] >= pivot[0])
# 关键步骤:更新两个堆的 lowindex,标记所有索引小于等于 item[1] 的元素为过期
# 这意味着窗口向右移动了,item[1] 及其之前的所有元素都可能已过期
self.large.lowindex = self.small.lowindex = item[1] + 1
# 更新平衡计数并重新平衡
self.rebalance(-1 if islarge else 1)
def getMedian(self):
"""
获取当前窗口的中位数。
"""
if self.balance == 0: # 两个堆大小相同,中位数是两个堆顶的平均值
return (self.large.peek()[0] + self.small.peek()[0]) * 0.5
# 否则,元素较多的那个堆的堆顶就是中位数
return self.large.peek()[0] if self.balance > 0 else self.small.peek()[0]
def medianSlidingWindow(self, nums, k):
"""
滑动窗口中位数的主函数。
:type nums: List[int]
:type k: int
:rtype: List[float]
"""
self.small = MaxWindowHeap() # 存储较小一半元素的堆 (最大堆)
self.large = MinWindowHeap() # 存储较大一半元素的堆 (最小堆)
self.balance = 0 # 记录 large 堆与 small 堆的元素数量差
# 将原始数组转换为 (值, 索引) 对,以便唯一标识元素
items = [(val, i) for i, val in enumerate(nums)]
# 填充第一个窗口
for item in items[:k]:
self.insert(item)
result = [self.getMedian()] # 记录第一个窗口的中位数
# 滑动窗口
# olditem 是离开窗口的元素,item 是进入窗口的元素
for olditem, item in zip(items, items[k:]):
self.remove(olditem) # 逻辑删除旧元素
self.insert(item) # 插入新元素
result.append(self.getMedian()) # 计算并记录当前窗口的中位数
return result
6. 注意事项与总结
- 时间复杂度优化: 通过延迟删除策略,remove 操作不再需要遍历列表或重建堆。更新 lowindex 是 O(1) 操作。peek 和 pop 操作在遇到过期元素时,每次弹出都是 O(log K),但每个元素只会因过期而被弹出一次。因此,平均来看,insert、remove 和 getMedian 的操作都保持在 O(log K)。整个算法的时间复杂度从 O(N*K) 优化到了 O(N log K),在大规模数据集上性能显著提升。
- 空间复杂度: 两个堆最多存储 K 个 (值, 索引) 对,所以空间复杂度为 O(K)。
- 处理重复值: 将值与索引绑定 (值, 索引) 是处理数组中重复值的关键,确保每个元素都能被唯一标识和跟踪。
- Python heapq 模块: heapq 默认是最小堆。为了实现最大堆,我们通过存储元素的负值来实现。
- 平衡维护: rebalance 方法确保 small 和 large 两个堆的大小差异不超过1,这是高效获取中位数的基础。
这种优化的双堆方法提供了一个健壮且高效的解决方案,适用于处理大规模数据下的滑动窗口中位数问题。它巧妙地利用了堆的特性和延迟删除机制,避免了传统方法中的性能瓶颈。










