1. 什么是堆? #
1.1 生活中的例子 #
想象一下,你正在医院急诊室工作。病人按照病情的紧急程度排队:
- 最紧急的病人(比如心脏病发作)应该最先被治疗
- 普通感冒的病人可以稍后处理
这就是优先队列的概念:不是按照先来后到的顺序,而是按照优先级来处理。
堆(Heap)就是一种数据结构,可以高效地实现优先队列。
1.2 什么是堆? #
堆是一种特殊的二叉树结构,满足以下性质:
- 最小堆:父节点的值总是小于或等于其子节点的值。最小元素总是在根节点(索引0处)。
- 最大堆:父节点的值总是大于或等于其子节点的值。最大元素总是在根节点。
重要:Python 的 heapq 模块实现的是最小堆。
1.3 为什么需要堆? #
假设你有一个包含100万个数字的列表,需要经常:
- 找到最小的数字
- 删除最小的数字
- 添加新的数字
如果使用普通列表:
- 找到最小数字:需要遍历整个列表,时间复杂度 O(n)
- 删除最小数字:需要先找到它,再删除,时间复杂度 O(n)
- 添加数字:直接添加到末尾,时间复杂度 O(1)
如果使用堆:
- 找到最小数字:直接访问根节点,时间复杂度 O(1)
- 删除最小数字:时间复杂度 O(log n)
- 添加数字:时间复杂度 O(log n)
当数据量很大时,堆的优势就非常明显了!
1.4 堆的特点 #
- 最小元素总是在根节点:可以直接通过
heap[0]访问最小元素 - 高效的插入和删除:插入和删除最小元素的时间复杂度为 O(log n)
- 内存效率高:堆是原地操作,不需要额外的空间
- 不是完全有序的:堆只保证父节点小于子节点,但不保证兄弟节点之间有序
2. 前置知识 #
在使用 heapq 之前,我们需要了解一些基础概念。
2.1 Python 列表基础 #
堆在 Python 中是通过列表(list)来实现的,所以需要熟悉列表的基本操作。
# 创建一个空列表
my_list = []
# 向列表末尾添加元素
my_list.append(1)
my_list.append(2)
my_list.append(3)
# 打印列表内容
print(f"列表内容: {my_list}")
# 访问列表的第一个元素(索引从0开始)
print(f"第一个元素: {my_list[0]}")
# 删除列表的最后一个元素并返回
last_item = my_list.pop()
print(f"删除的元素: {last_item}")
print(f"删除后的列表: {my_list}")2.2 元组和比较 #
heapq 经常使用元组来存储带优先级的数据,元组会按照第一个元素进行比较。
# 创建元组
tuple1 = (3, "任务A")
tuple2 = (1, "任务B")
tuple3 = (2, "任务C")
# 元组比较:先比较第一个元素,如果相同再比较第二个元素
print(f"tuple1 < tuple2: {tuple1 < tuple2}") # False,因为 3 > 1
print(f"tuple2 < tuple3: {tuple2 < tuple3}") # True,因为 1 < 2
# 元组列表排序
tuples = [tuple1, tuple2, tuple3]
sorted_tuples = sorted(tuples)
print(f"排序后的元组: {sorted_tuples}")3. heapq核心函数 #
3.1 添加元素 #
heapq.heappush(heap, item) -
作用:将元素推入堆中,自动调整堆结构,保持堆属性。
使用场景:当你需要向堆中添加新元素时使用。
# 导入 heapq 模块
import heapq
# 创建一个空列表作为堆
heap = []
# 向堆中添加元素
# 注意:heapq 会自动调整堆结构,保持最小堆属性
heapq.heappush(heap, 3)
heapq.heappush(heap, 1)
heapq.heappush(heap, 4)
heapq.heappush(heap, 2)
# 打印堆的内容
# 注意:堆不是完全有序的,但最小元素总是在索引0处
print(f"堆的内容: {heap}")
print(f"最小元素(堆顶): {heap[0]}")3.2 弹出最小元素 #
heapq.heappop(heap)
作用:弹出并返回堆中的最小元素,同时保持堆属性。
使用场景:当你需要获取并删除最小元素时使用。
# 导入 heapq 模块
import heapq
# 创建一个已经包含元素的堆
# 注意:这个列表必须已经是堆结构,否则需要使用 heapify
heap = [1, 2, 4, 3]
# 弹出最小元素
# heappop 会返回最小元素,并从堆中删除它
min_item = heapq.heappop(heap)
# 打印弹出的元素
print(f"弹出的最小元素: {min_item}")
# 打印弹出后的堆
# 注意:堆结构会自动调整,新的最小元素会移到索引0处
print(f"弹出后的堆: {heap}")
print(f"新的最小元素: {heap[0]}")3.3 将列表转换为堆 #
heapq.heapify(x)
作用:将普通列表原地转换为堆结构,时间复杂度 O(n)。
使用场景:当你已经有一个包含数据的列表,想把它转换成堆时使用。
# 导入 heapq 模块
import heapq
# 创建一个普通列表(不是堆结构)
data = [3, 1, 4, 1, 5, 9, 2, 6]
# 打印原始列表
print(f"原始列表: {data}")
# 将列表转换为堆(原地操作,不创建新列表)
heapq.heapify(data)
# 打印转换后的堆
# 注意:堆不是完全有序的,但满足堆属性
print(f"转换后的堆: {data}")
print(f"最小元素: {data[0]}")
# 验证堆属性:可以弹出几个元素看看
print("\n依次弹出最小元素:")
for i in range(3):
min_val = heapq.heappop(data)
print(f" 第 {i+1} 次弹出: {min_val}, 剩余堆: {data}")3.4 添加并弹出 #
heapq.heappushpop(heap, item)
作用:先推入新元素,然后弹出最小元素。比分别调用 heappush 和 heappop 更高效。
使用场景:当你需要添加新元素,同时获取当前最小元素时使用。
# 导入 heapq 模块
import heapq
# 创建一个堆
heap = [2, 3, 4]
# 使用 heappushpop:先添加元素1,然后弹出最小元素
# 因为1是最小的,所以会弹出1
result = heapq.heappushpop(heap, 1)
print(f"添加1后弹出的元素: {result}")
print(f"操作后的堆: {heap}")
# 再次使用 heappushpop:添加元素5,然后弹出最小元素
# 因为5不是最小的,所以会弹出堆中原来的最小元素2
result = heapq.heappushpop(heap, 5)
print(f"\n添加5后弹出的元素: {result}")
print(f"操作后的堆: {heap}")3.5 弹出并添加 #
heapq.heapreplace(heap, item)
作用:先弹出最小元素,然后推入新元素。注意:这个操作要求堆不为空。
使用场景:当你需要替换最小元素时使用。
# 导入 heapq 模块
import heapq
# 创建一个堆
heap = [2, 3, 4]
# 使用 heapreplace:先弹出最小元素2,然后添加元素1
# 注意:这个操作会先弹出,再添加
result = heapq.heapreplace(heap, 1)
print(f"弹出的最小元素: {result}")
print(f"操作后的堆: {heap}")
# 再次使用 heapreplace:弹出最小元素1,添加元素5
result = heapq.heapreplace(heap, 5)
print(f"\n弹出的最小元素: {result}")
print(f"操作后的堆: {heap}")注意:heappushpop 和 heapreplace 的区别:
heappushpop(heap, item):先添加 item,再弹出最小元素heapreplace(heap, item):先弹出最小元素,再添加 item
3.6 找最大的 n 个元素 #
heapq.nlargest(n, iterable[, key])
作用:返回迭代器中前 n 个最大的元素,按从大到小排序。
使用场景:当你需要找到最大的几个元素,但不需要维护整个堆时使用。
# 导入 heapq 模块
import heapq
# 创建一个数据列表
data = [3, 1, 4, 1, 5, 9, 2, 6]
# 找到最大的3个元素
# nlargest 返回一个列表,包含最大的n个元素,按从大到小排序
largest_3 = heapq.nlargest(3, data)
print(f"原始数据: {data}")
print(f"最大的3个元素: {largest_3}")
# 也可以用于其他可迭代对象,比如元组列表
students = [
("张三", 85),
("李四", 92),
("王五", 78),
("赵六", 95),
("钱七", 88)
]
# 找到分数最高的2个学生
# key 参数指定按照元组的第二个元素(分数)进行比较
top_2 = heapq.nlargest(2, students, key=lambda x: x[1])
print(f"\n所有学生: {students}")
print(f"分数最高的2个学生: {top_2}")3.7 找最小的 n 个元素 #
heapq.nsmallest(n, iterable[, key])
作用:返回迭代器中前 n 个最小的元素,按从小到大排序。
使用场景:当你需要找到最小的几个元素,但不需要维护整个堆时使用。
# 导入 heapq 模块
import heapq
# 创建一个数据列表
data = [3, 1, 4, 1, 5, 9, 2, 6]
# 找到最小的3个元素
# nsmallest 返回一个列表,包含最小的n个元素,按从小到大排序
smallest_3 = heapq.nsmallest(3, data)
print(f"原始数据: {data}")
print(f"最小的3个元素: {smallest_3}")
# 也可以用于其他可迭代对象
products = [
("苹果", 5.5),
("香蕉", 3.2),
("橙子", 4.8),
("葡萄", 12.5),
("西瓜", 8.0)
]
# 找到价格最低的2个产品
# key 参数指定按照元组的第二个元素(价格)进行比较
cheapest_2 = heapq.nsmallest(2, products, key=lambda x: x[1])
print(f"\n所有产品: {products}")
print(f"价格最低的2个产品: {cheapest_2}")4.实现 #
4.1 push #
# 定义最小堆类
class MinHeap:
# 初始化函数,创建一个空列表作为堆
def __init__(self):
self.heap = []
# 向堆中插入一个新值
def push(self, value):
self.heap.append(value) # 将新值添加到堆的末尾
self._sift_up(len(self.heap) - 1) # 上浮新添加的元素
# 上浮操作,保持堆的性质
def _sift_up(self, index):
while index > 0: # 只要不是堆顶元素
parent = (index - 1) // 2 # 计算父节点下标
if self.heap[index] < self.heap[parent]: # 如果当前元素小于父节点
self.heap[index], self.heap[parent] = self.heap[parent], self.heap[index] # 交换当前元素和父节点
index = parent # 更新当前下标为父节点下标,继续比较
else:
break # 如果没有违反堆的性质,结束循环
# 返回堆顶元素(最小值),不移除
def peek(self):
if self.heap: # 如果堆不为空
return self.heap[0] # 返回堆顶元素
return None # 如果堆为空,返回None
# 通过索引访问堆内元素
def __getitem__(self, index):
return self.heap[index] # 返回指定下标的元素
# 返回堆的字符串表示
def __repr__(self):
return str(self.heap) # 将堆以字符串形式输出
# 创建一个最小堆对象
heap = MinHeap()
# 向堆中插入元素3
heap.push(3)
# 向堆中插入元素1
heap.push(1)
# 向堆中插入元素4
heap.push(4)
# 向堆中插入元素2
heap.push(2)
# 打印当前堆的内容
print(f"堆的内容: {heap}")
# 打印堆顶的最小元素
print(f"最小元素(堆顶): {heap[0]}")4.2 pop #
# 定义最小堆类
class MinHeap:
# 初始化函数,创建一个空列表作为堆
def __init__(self):
self.heap = []
# 向堆中插入一个新值
def push(self, value):
self.heap.append(value) # 将新值添加到堆的末尾
self._sift_up(len(self.heap) - 1) # 上浮新添加的元素
# 上浮操作,保持堆的性质
def _sift_up(self, index):
while index > 0: # 只要不是堆顶元素
parent = (index - 1) // 2 # 计算父节点下标
if self.heap[index] < self.heap[parent]: # 如果当前元素小于父节点
self.heap[index], self.heap[parent] = self.heap[parent], self.heap[index] # 交换当前元素和父节点
index = parent # 更新当前下标为父节点下标,继续比较
else:
break # 如果没有违反堆的性质,结束循环
# 返回堆顶元素(最小值),不移除
def peek(self):
if self.heap: # 如果堆不为空
return self.heap[0] # 返回堆顶元素
return None # 如果堆为空,返回None
+ # 弹出堆顶元素(最小值)
+ def pop(self):
+ if not self.heap:
+ return None
+ # 将堆顶与最后一个元素交换,并弹出最后一个元素
+ min_value = self.heap[0]
+ last_value = self.heap.pop()
+ if self.heap:
+ self.heap[0] = last_value
+ self._sift_down(0) # 向下调整
+ return min_value
+
+ # 下沉操作,保持堆的性质
+ def _sift_down(self, index):
+ size = len(self.heap)
+ while True:
+ left = 2 * index + 1
+ right = 2 * index + 2
+ smallest = index
+ if left < size and self.heap[left] < self.heap[smallest]:
+ smallest = left
+ if right < size and self.heap[right] < self.heap[smallest]:
+ smallest = right
+ if smallest == index:
+ break
+ self.heap[index], self.heap[smallest] = self.heap[smallest], self.heap[index]
+ index = smallest
+
# 通过索引访问堆内元素
def __getitem__(self, index):
return self.heap[index] # 返回指定下标的元素
# 返回堆的字符串表示
def __repr__(self):
return str(self.heap) # 将堆以字符串形式输出
# 创建一个最小堆对象
heap = MinHeap()
# 向堆中插入元素3
heap.push(3)
# 向堆中插入元素1
heap.push(1)
# 向堆中插入元素4
heap.push(4)
# 向堆中插入元素2
heap.push(2)
# 打印当前堆的内容
print(f"堆的内容: {heap}")
# 打印堆顶的最小元素
print(f"最小元素(堆顶): {heap[0]}")
+# 演示 heappop,弹出最小元素
+min_value = heap.pop()
+print(f"弹出最小元素: {min_value}")
+print(f"弹出后堆的内容: {heap}")5. 实际应用示例 #
5.1 优先任务调度 #
在实际应用中,我们经常需要按照优先级处理任务。堆非常适合这种场景。
# 导入 heapq 模块
import heapq
# 定义一个优先队列类
class PriorityQueue:
def __init__(self):
# 使用列表作为堆的底层存储
self._queue = []
# 用于处理优先级相同的情况,确保先添加的任务先处理
self._index = 0
def push(self, item, priority):
# 将任务添加到堆中
# 使用元组 (priority, index, item) 作为堆元素
# priority: 优先级(数字越小优先级越高)
# index: 索引,用于处理相同优先级的情况
# item: 实际的任务内容
heapq.heappush(self._queue, (priority, self._index, item))
# 每次添加后索引加1
self._index += 1
def pop(self):
# 弹出优先级最高的任务(优先级数字最小)
# 返回元组的最后一个元素,即实际的任务内容
return heapq.heappop(self._queue)[-1]
def is_empty(self):
# 检查队列是否为空
return len(self._queue) == 0
def size(self):
# 返回队列中任务的数量
return len(self._queue)
# 主程序:演示优先任务调度
if __name__ == "__main__":
# 创建一个优先队列
pq = PriorityQueue()
# 添加任务,数字越小优先级越高
pq.push('处理紧急邮件', 1) # 优先级最高
pq.push('写报告', 3) # 优先级较低
pq.push('参加会议', 2) # 优先级中等
pq.push('整理文档', 4) # 优先级最低
print("按优先级处理任务:")
# 依次弹出并处理任务
while not pq.is_empty():
task = pq.pop()
print(f" 正在处理: {task}")5.2 合并多个有序序列 #
假设你有多个已经排序的列表,需要将它们合并成一个有序列表。堆可以高效地完成这个任务。
# 导入 heapq 模块
import heapq
# 定义函数,合并多个有序数组
def merge_sorted_arrays(arrays):
# 创建一个堆,用于存储每个数组的当前元素
heap = []
# 遍历每个数组,将每个数组的第一个元素推入堆中
# 堆中存储 (元素值, 数组索引, 元素在数组中的索引)
for i, arr in enumerate(arrays):
# 如果数组不为空
if arr:
# 推入 (元素值, 数组索引, 元素索引)
heapq.heappush(heap, (arr[0], i, 0))
# 存储合并后的结果
result = []
# 当堆不为空时,继续处理
while heap:
# 弹出堆中的最小元素
val, arr_idx, elem_idx = heapq.heappop(heap)
# 将元素添加到结果中
result.append(val)
# 如果当前数组还有下一个元素
if elem_idx + 1 < len(arrays[arr_idx]):
# 获取下一个元素的值
next_val = arrays[arr_idx][elem_idx + 1]
# 将下一个元素推入堆中
heapq.heappush(heap, (next_val, arr_idx, elem_idx + 1))
# 返回合并后的有序列表
return result
# 主程序:演示合并有序数组
if __name__ == "__main__":
# 定义三个已经排序的数组
arrays = [[1, 3, 5], [2, 4, 6], [0, 7, 8]]
print("要合并的数组:")
for i, arr in enumerate(arrays):
print(f" 数组 {i+1}: {arr}")
# 合并数组
merged = merge_sorted_arrays(arrays)
print(f"\n合并后的结果: {merged}")5.3 Top-K 问题 #
Top-K 问题是指从大量数据中找出最大(或最小)的 K 个元素。堆非常适合解决这类问题。
# 导入 heapq 模块
import heapq
import random
# 生成大量随机数据(模拟实际场景)
# 假设有10000个学生的成绩
num_students = 10000
scores = [random.randint(0, 100) for _ in range(num_students)]
# 方法1:使用 nlargest(简单但可能不是最高效)
print("方法1:使用 nlargest")
top_10_scores = heapq.nlargest(10, scores)
print(f"前10名成绩: {top_10_scores}")
# 方法2:使用堆维护 Top-K(更高效,适合数据流场景)
print("\n方法2:使用堆维护 Top-K")
def find_top_k(data, k):
# 创建一个最小堆,大小为 k
# 只保留最大的 k 个元素
heap = []
# 遍历所有数据
for score in data:
# 如果堆的大小小于 k,直接添加
if len(heap) < k:
heapq.heappush(heap, score)
# 如果堆已满,且当前元素大于堆顶(最小元素)
elif score > heap[0]:
# 替换堆顶元素
heapq.heapreplace(heap, score)
# 将堆转换为列表并排序(从大到小)
return sorted(heap, reverse=True)
top_10_scores_v2 = find_top_k(scores, 10)
print(f"前10名成绩: {top_10_scores_v2}")
# 验证两种方法的结果是否一致
print(f"\n两种方法结果一致: {sorted(top_10_scores) == sorted(top_10_scores_v2)}")5.4 实现最大堆(技巧) #
heapq 只支持最小堆,但我们可以通过一个小技巧来实现最大堆:存储负值。
# 导入 heapq 模块
import heapq
# 创建一个"最大堆"(实际上是最小堆存储负值)
# 技巧:存储负值,这样最小堆就变成了"最大堆"
max_heap = []
# 添加元素时存储负值
heapq.heappush(max_heap, -10)
heapq.heappush(max_heap, -5)
heapq.heappush(max_heap, -20)
heapq.heappush(max_heap, -15)
print("最大堆(存储负值)的内容:", max_heap)
# 获取最大元素:弹出并取负值
max_item = -heapq.heappop(max_heap)
print(f"最大元素: {max_item}")
# 继续弹出
max_item = -heapq.heappop(max_heap)
print(f"次大元素: {max_item}")
# 更实用的例子:找到最大的 K 个元素
print("\n找到最大的3个元素:")
data = [10, 5, 20, 15, 8, 25, 12]
max_heap = []
# 将所有元素取负值后加入堆
for item in data:
heapq.heappush(max_heap, -item)
# 弹出最大的3个元素
largest_3 = []
for _ in range(3):
largest_3.append(-heapq.heappop(max_heap))
print(f"原始数据: {data}")
print(f"最大的3个元素: {largest_3}")5.5 事件调度器 #
模拟一个简单的事件调度器,按照时间顺序处理事件。
# 导入 heapq 模块
import heapq
import time
# 定义一个事件调度器类
class EventScheduler:
def __init__(self):
# 使用堆存储事件,每个事件是 (时间戳, 事件ID, 事件描述)
self.events = []
self.current_time = 0
def add_event(self, delay, event_id, description):
# 添加事件,delay 是延迟时间(秒)
event_time = self.current_time + delay
heapq.heappush(self.events, (event_time, event_id, description))
print(f"添加事件: {description} (将在 {delay} 秒后执行)")
def process_events(self, until_time):
# 处理所有在 until_time 之前的事件
print(f"\n开始处理事件(当前时间: {self.current_time}, 处理到: {until_time}):")
while self.events and self.events[0][0] <= until_time:
# 弹出最早的事件
event_time, event_id, description = heapq.heappop(self.events)
self.current_time = event_time
print(f" [时间 {self.current_time}] 执行事件 {event_id}: {description}")
# 主程序:演示事件调度
if __name__ == "__main__":
# 创建事件调度器
scheduler = EventScheduler()
# 添加一些事件
scheduler.add_event(5, 1, "发送邮件")
scheduler.add_event(2, 2, "备份数据")
scheduler.add_event(8, 3, "生成报告")
scheduler.add_event(1, 4, "检查系统")
# 处理事件(处理到时间10)
scheduler.process_events(10)6. 常见问题和注意事项 #
6.1 堆属性维护 #
重要:直接修改堆中的元素会破坏堆属性!如果需要修改,应该先删除再添加,或者重新构建堆。
# 导入 heapq 模块
import heapq
# 创建一个堆
heap = [1, 2, 3, 4, 5]
heapq.heapify(heap)
print(f"原始堆: {heap}")
# ❌ 错误做法:直接修改堆中的元素
# heap[0] = 10 # 这会破坏堆属性!
# ✅ 正确做法1:删除后重新添加
min_item = heapq.heappop(heap)
heapq.heappush(heap, 10)
print(f"修改后的堆: {heap}")
# ✅ 正确做法2:使用 heapreplace
heap = [1, 2, 3, 4, 5]
heapq.heapify(heap)
heapq.heapreplace(heap, 10)
print(f"使用 heapreplace 后的堆: {heap}")6.2 相同优先级的处理 #
当多个元素具有相同的优先级时,heapq 会按照插入顺序处理(先插入的先处理)。如果需要自定义排序规则,可以使用元组。
# 导入 heapq 模块
import heapq
# 创建堆,存储 (优先级, 插入顺序, 任务)
heap = []
# 添加任务,有些任务优先级相同
heapq.heappush(heap, (1, 0, "任务A"))
heapq.heappush(heap, (1, 1, "任务B")) # 与任务A优先级相同
heapq.heappush(heap, (2, 2, "任务C"))
heapq.heappush(heap, (1, 3, "任务D")) # 与任务A、B优先级相同
print("按优先级处理任务(相同优先级按插入顺序):")
while heap:
priority, order, task = heapq.heappop(heap)
print(f" 优先级 {priority}: {task} (插入顺序: {order})")6.3 空堆处理 #
对空堆执行 heappop 或 heapreplace 会抛出 IndexError,需要先检查堆是否为空。
# 导入 heapq 模块
import heapq
# 创建一个空堆
heap = []
# ❌ 错误:对空堆执行 heappop 会报错
# min_item = heapq.heappop(heap) # IndexError: index out of range
# ✅ 正确:先检查堆是否为空
if heap:
min_item = heapq.heappop(heap)
print(f"最小元素: {min_item}")
else:
print("堆为空,无法弹出元素")
# 对于 heapreplace 也是同样的道理
if heap:
result = heapq.heapreplace(heap, 10)
else:
print("堆为空,无法使用 heapreplace,应该使用 heappush")
heapq.heappush(heap, 10)6.4 性能考虑 #
nlargest/nsmallestvs 排序:如果只需要找 Top-K 个元素,使用nlargest/nsmallest比完整排序更高效- 堆 vs 列表:如果需要频繁查找最小/最大元素,堆比列表更高效
- 内存效率:堆是原地操作,不需要额外的空间
# 导入 heapq 模块和 time 模块用于性能测试
import heapq
import time
import random
# 生成大量数据
data = [random.randint(1, 1000000) for _ in range(100000)]
# 测试1:使用 nlargest 找最大的10个元素
start_time = time.time()
top_10_v1 = heapq.nlargest(10, data)
time_v1 = time.time() - start_time
print(f"nlargest 方法耗时: {time_v1:.4f} 秒")
# 测试2:使用完整排序找最大的10个元素
start_time = time.time()
top_10_v2 = sorted(data, reverse=True)[:10]
time_v2 = time.time() - start_time
print(f"完整排序方法耗时: {time_v2:.4f} 秒")
print(f"\n性能提升: {time_v2/time_v1:.2f} 倍")
print(f"结果一致: {sorted(top_10_v1) == sorted(top_10_v2)}")7. 时间复杂度总结 #
了解各个操作的时间复杂度有助于选择合适的算法。
| 操作 | 时间复杂度 | 说明 |
|---|---|---|
heappush |
O(log n) | 插入元素到堆中 |
heappop |
O(log n) | 弹出最小元素 |
heapify |
O(n) | 将列表转换为堆 |
| 查看最小元素 | O(1) | 直接访问 heap[0] |
nlargest(k, n) |
O(n log k) | 找最大的 k 个元素,k 通常远小于 n |
nsmallest(k, n) |
O(n log k) | 找最小的 k 个元素 |
说明:
- n 是堆中元素的数量
- k 是要找的元素数量(在
nlargest/nsmallest中) - 对于 Top-K 问题,当 k 远小于 n 时,
nlargest/nsmallest比完整排序更高效
8. 总结 #
heapq 是 Python 中处理优先队列和 Top-K 问题的强大工具。通过本教程,你应该已经掌握了:
- 堆的基本概念:最小堆的特点和优势
- 核心函数:
heappush、heappop、heapify等7个主要函数 - 实际应用:优先任务调度、合并有序序列、Top-K 问题等
- 实用技巧:如何用最小堆实现最大堆
- 注意事项:堆属性维护、空堆处理等
使用建议:
- 需要频繁查找最小/最大元素时,使用堆
- Top-K 问题优先考虑
nlargest/nsmallest - 注意维护堆属性,不要直接修改堆中的元素
- 根据实际需求选择合适的距离度量方式