1. 什么是 HNSW? #
HNSW 的全称是 Hierarchical Navigable Small World,翻译过来就是 "可导航小世界分层图"。它是一种用于快速查找相似向量的算法,广泛应用于推荐系统、搜索引擎、图像检索等领域。
1.1 它要解决什么问题? #
想象一下,你有一个包含100万张图片的数据库,每张图片都可以用一个512维的向量来表示其特征。现在给你一张新图片,如何快速从这100万张图片中找到最相似的几张?
这就是高维向量相似度搜索问题。HNSW 就是为了解决这个问题而生的。
1.2 为什么传统方法不够好? #
- 线性扫描太慢:如果要精确查找,需要计算新向量与所有100万个向量的距离,这可能需要几秒钟甚至更长时间。
1.3 HNSW 的优势 #
HNSW 采用近似最近邻搜索的策略,用微小的精度损失换取巨大的速度提升:
- 速度快:可以在毫秒级时间内从百万级数据中找到相似结果
- 精度高:虽然不保证找到绝对最近邻,但通常能找到非常接近的结果
- 易用性好:有成熟的 Python 库(如
hnswlib)可以直接使用
2. 前置知识 #
在学习 HNSW 之前,我们需要了解一些基础概念。
2.1 什么是向量? #
向量就是一组有序的数字,比如 [1.2, 3.4, 5.6] 就是一个3维向量。在 AI 领域,文本、图片、音频等都可以转换成向量来表示。
# 导入 numpy 库用于处理向量
import numpy as np
# 创建一个3维向量,表示某个对象的特征
vector = np.array([1.2, 3.4, 5.6])
# 打印向量的维度
print(f"向量维度: {vector.shape}")
# 打印向量内容
print(f"向量内容: {vector}")2.2 什么是向量相似度? #
两个向量越相似,它们的距离就越小。常用的距离计算方法有:
欧氏距离(L2距离):计算两个向量对应位置差的平方和的平方根。就像在坐标系中计算两点之间的直线距离。
# 导入 numpy 库用于数学计算
import numpy as np
# 定义函数,计算两个向量的欧氏距离
def euclidean_distance(vec1, vec2):
# 计算两个向量对应位置差的平方
diff = vec1 - vec2
# 计算平方和
squared_diff = np.sum(diff ** 2)
# 返回平方根,即欧氏距离
return np.sqrt(squared_diff)
# 定义两个示例向量
vector_a = np.array([1.0, 2.0, 3.0])
vector_b = np.array([4.0, 5.0, 6.0])
# 计算并打印两个向量之间的欧氏距离
distance = euclidean_distance(vector_a, vector_b)
print(f"向量 A: {vector_a}")
print(f"向量 B: {vector_b}")
print(f"欧氏距离: {distance:.2f}")余弦相似度:计算两个向量夹角的余弦值,范围在 -1 到 1 之间。值越接近 1,表示两个向量方向越相似。 余弦相似度
# 导入 numpy 库用于数学计算
import numpy as np
# 定义函数,计算两个向量的余弦相似度
def cosine_similarity(vec1, vec2):
# 计算两个向量的点积(内积)
dot_product = np.dot(vec1, vec2)
# 计算第一个向量的模长(长度)
norm1 = np.linalg.norm(vec1)
# 计算第二个向量的模长(长度)
norm2 = np.linalg.norm(vec2)
# 如果模长为0,返回0(避免除以0)
if norm1 == 0 or norm2 == 0:
return 0
# 返回点积除以两个模长的乘积,即余弦相似度
return dot_product / (norm1 * norm2)
# 定义两个示例向量
vector_a = np.array([1.0, 2.0, 3.0])
vector_b = np.array([2.0, 4.0, 6.0]) # 与 vector_a 方向相同,只是长度不同
# 计算并打印两个向量的余弦相似度
similarity = cosine_similarity(vector_a, vector_b)
print(f"向量 A: {vector_a}")
print(f"向量 B: {vector_b}")
print(f"余弦相似度: {similarity:.2f}")2.3 什么是近似最近邻搜索? #
精确最近邻搜索:找到距离查询向量最近的那个向量(需要遍历所有向量,速度慢)。
近似最近邻搜索(ANN):找到距离查询向量"比较近"的向量(不需要遍历所有向量,速度快,但可能不是绝对最近的那个)。
HNSW 就是一种近似最近邻搜索算法。
3.安装和环境准备 #
在使用 HNSW 之前,我们需要安装必要的库。
3.1 安装 hnswlib #
hnswlib 是一个高效的 HNSW 实现库,支持 Python。
Windows 系统:
# 使用 pip 安装 hnswlib 库
pip install hnswlibmacOS 系统:
# 使用 pip3 安装 hnswlib 库
pip3 install hnswlib如果安装失败,可以尝试使用国内镜像源:
Windows 系统:
# 使用清华镜像源安装 hnswlib
pip install -i https://pypi.tuna.tsinghua.edu.cn/simple hnswlibmacOS 系统:
# 使用清华镜像源安装 hnswlib
pip3 install -i https://pypi.tuna.tsinghua.edu.cn/simple hnswlib3.2 验证安装 #
安装完成后,我们可以写一个简单的测试程序来验证安装是否成功:
# 导入 hnswlib 库
import hnswlib
# 验证安装是否成功
print("hnswlib 安装成功!")4.核心思想:像查地图一样查数据 #
HNSW 的核心思想非常直观,就像我们在地图上找地点一样。
4.1 生活中的类比 #
想象一下你要在一个复杂的世界地图上找一个特定的地点,你会怎么做?
- 先看全局地图:你首先会找到世界地图或国家地图,确定大方向(比如在哪个国家)。
- 再看区域地图:然后你找到省份或城市的地图,范围越来越小。
- 最后看详细地图:最后你查看街道地图,找到具体的目标地点。
HNSW 正是模拟了这个过程!
4.2 HNSW 的分层结构 #
HNSW 构建了一个多层图结构,就像不同比例尺的地图:
- 最高层(顶层):只包含极少数的数据点,就像世界地图只显示主要城市。这一层用于快速定位大致方向。
- 中间层:包含更多数据点,就像省份地图显示更多城市。用于逐步缩小搜索范围。
- 第0层(底层):包含所有数据点,就像详细的街道地图。用于精确查找目标。
关键优势:通过高层快速跳转,避免了在底层进行全局的、漫长的搜索,大大提高了查找速度。
5.HNSW基础使用 #
这个例子展示了如何创建一个 HNSW 索引,插入一些向量数据,然后进行搜索。
# 导入 hnswlib 库,用于创建 HNSW 索引
import hnswlib
# 导入 numpy 库,用于生成和处理向量数据
import numpy as np
# 设置随机种子,确保每次运行结果一致
np.random.seed(42)
# 定义向量维度为 128(可以根据实际需求修改)
dim = 128
# 定义要插入的向量数量
num_elements = 1000
# 创建一个 HNSW 索引对象
# 'l2' 表示使用欧氏距离(L2距离)作为相似度度量
p = hnswlib.Index(space='l2', dim=dim)
# 初始化索引
# max_elements 表示索引最多可以存储的向量数量
# ef_construction 是构建时的参数,影响索引质量(稍后详细说明)
# M 是每个节点的最大连接数,影响索引的稠密程度(稍后详细说明)
p.init_index(max_elements=num_elements, ef_construction=200, M=16)
# 生成随机向量数据作为示例
# 生成 1000 个 128 维的随机向量,每个元素的值在 0 到 1 之间
data = np.random.random((num_elements, dim)).astype(np.float32)
# 将向量数据添加到索引中
# 这里我们使用向量的索引作为标签(0, 1, 2, ..., 999)
p.add_items(data, np.arange(num_elements))
# 设置搜索参数
# ef_search 是搜索时的候选集大小,影响搜索精度和速度
p.set_ef(50)
# 定义查询向量(也是随机生成的,实际应用中这是你要查找的目标)
query = np.random.random((dim,)).astype(np.float32)
# 执行搜索,查找最相似的 5 个向量
# k 表示要返回的最相似向量的数量
labels, distances = p.knn_query(query, k=5)
# 打印搜索结果
print(f"查询向量维度: {query.shape}")
print(f"\n找到最相似的 5 个向量:")
# 遍历搜索结果
for i, (label, distance) in enumerate(zip(labels[0], distances[0])):
# 打印每个结果的标签(向量在索引中的位置)和距离
print(f" 第 {i+1} 名: 标签={label}, 距离={distance:.4f}")6. HNSW 的工作原理 #
虽然 HNSW 的内部实现很复杂,但我们可以用简单的概念来理解它。
6.1 小世界网络 #
HNSW 基于"小世界网络"的概念。想象一个社交网络:
- 短程连接:你的朋友(直接连接,保证你能找到附近的人)
- 长程连接:一些"超级连接者"(可以快速跳到遥远的区域)
在 HNSW 中:
- 每个数据点(向量)都有一些"邻居"连接
- 这些连接包括附近的点(短程)和一些较远的点(长程)
- 搜索时,从一个点开始,沿着连接不断"走"向更接近目标的位置
6.2 分层结构 #
HNSW 的关键创新是分层:
- 顶层:数据点很少,但连接很"长",可以快速跨越很大距离
- 底层:包含所有数据点,连接很"短",用于精确查找
搜索过程:
- 从顶层开始,快速定位大致方向
- 逐层下降,每层都进行局部搜索
- 到底层时,进行最后的精确搜索
这就像从世界地图 → 国家地图 → 城市地图 → 街道地图的查找过程。
6.3 构建过程 #
当插入一个新向量时:
- 随机分配层数:新向量会被随机分配到某一层(高层概率小,底层概率大)
- 找到入口点:从顶层开始,逐层找到距离新向量最近的节点
- 建立连接:在新向量所在的层及以下各层,找到一些邻居并建立连接
7. 实现 #
# 导入heapq库,用于优先队列(堆实现)
import heapq
# 导入numpy库,用于数值计算
import numpy as np
# 定义HNSW类
class HNSW:
# 文档字符串,描述类用途:核心增量建图 + 贪心/最佳优先搜索
"""核心增量建图 + 贪心/最佳优先搜索。"""
# 初始化方法,指定空间类型和向量维度
def __init__(self, space: str, dim: int):
# 检查空间类型,只支持L2距离度量空间,如果不是则抛出异常
if space != 'l2':
raise ValueError('示例仅支持 L2 距离')
# 保存向量维度
self.dim = dim
# 最大元素数量,默认0
self.max_elements = 0
# 节点的最大邻居数
self.M = 0
# 搜索宽度,默认10
self.explorationFactor = 10 # 默认搜索宽度
# 用字典存储所有标签到向量的映射
self.vectors = {} # label -> 向量
# 用字典存储标签到邻居列表的图结构
self.graph = {} # label -> 邻居列表
# 图的入口点,默认为None
self.entry_point = None
# 初始化索引,设置最大元素数量和最大邻居数
def init_index(self, max_elements: int, M: int):
# 设置最大节点数
self.max_elements = max_elements
# 设置每个节点的最大邻居数
self.M = M
# 添加新向量和标签到图结构中
def add_items(self, data, labels):
# 遍历所有输入的向量和对应标签
for vector, label in zip(data, labels):
# 如果已达到容量上限,抛出异常
if len(self.vectors) >= self.max_elements:
raise ValueError('已达到索引容量上限')
# 初始化该节点的邻居列表(如果尚不存在)
self.graph.setdefault(label, [])
# 如果entry_point为空,设置为当前节点,并存储向量
if self.entry_point is None:
self.entry_point = label
# 保存当前向量
self.vectors[label] = vector
# 跳过剩余流程
continue
# 选择当前节点距离最近的M个邻居(还未加入vectors)
neighbors = self._select_neighbors(vector)
# 保存该label下的向量数据
self.vectors[label] = vector
# 给当前节点和其邻居建立双向连接,且去重
for nb in neighbors:
# 跳过自己
if nb == label:
continue
# 如果nb还不在当前节点的邻居表,添加
if nb not in self.graph[label]:
self.graph[label].append(nb)
# 如果当前节点还不在nb的邻居表,添加
if label not in self.graph.get(nb, []):
self.graph.setdefault(nb, []).append(label)
# 设置搜索宽度ef参数
def set_explorationFactor(self, explorationFactor: int):
# 确保搜索宽度至少为1
self.explorationFactor = max(1, explorationFactor)
# 查询与查询向量最相似的k个向量
def knn_query(self, query, k: int):
# 如果不存在入口节点,抛出异常
if self.entry_point is None:
raise ValueError('索引为空')
# 记录已访问过的节点
visited = set()
# 定义待探索队列(小顶堆),初始为空
frontier = []
# 计算查询向量与入口点的距离
entry_dist = self._l2(query, self.vectors[self.entry_point])
# 将入口点和对应距离推入待探索队列
heapq.heappush(frontier, (entry_dist, self.entry_point))
# 初始化最大堆用于保留最优候选
best = [] # 最大堆,存放当前最优候选
# 只要待探索队列非空,持续搜索
while frontier:
# 弹出距离最小的点
dist, node = heapq.heappop(frontier)
# 如果已经访问过该节点则跳过
if node in visited:
continue
# 标记当前节点为已访问
visited.add(node)
# 若best候选数量不满explorationFactor或当前距离更优,则加入best堆
if len(best) < self.explorationFactor or dist < -best[0][0]:
heapq.heappush(best, (-dist, node))
# 如果堆超出了explorationFactor大小,则弹出堆顶(最差的那个)
if len(best) > self.explorationFactor:
heapq.heappop(best)
# 遍历该节点的所有邻居,若未访问则计算距离并推入队列
for nb in self.graph.get(node, []):
if nb not in visited:
nd = self._l2(query, self.vectors[nb])
heapq.heappush(frontier, (nd, nb))
# 按距离由小到大排序最优候选,只取前k个
best_pairs = sorted(best, key=lambda x: -x[0])[:k]
# 构造返回的标签数组(labels)
labels = np.array([[label for _, label in best_pairs]])
# 构造返回的距离数组(distances)
distances = np.array([[ -score for score, _ in best_pairs ]])
# 返回最相似的labels和对应距离
return labels, distances
# 内部方法,选择距离vector最近的M个邻居(基于L2距离)
def _select_neighbors(self, vector):
# 计算vector与已存在每个节点的距离,组装为(距离, label)对
dists = [
(self._l2(vector, other), label)
for label, other in self.vectors.items()
]
# 按距离升序排序
dists.sort(key=lambda x: x[0])
# 返回最近的M个label
return [label for _, label in dists[: self.M]]
# 静态方法,计算两个向量的L2距离
@staticmethod
def _l2(a, b):
# 用numpy计算欧氏距离并返回float类型
return float(np.linalg.norm(a - b))
# 设置向量维度为3
dim = 3
# 设置总向量个数为10
num_elements = 10
# 创建HNSW实例,空间类型为l2,维度为3
index = HNSW(space='l2', dim=dim)
# 初始化索引,最多10个元素,每个节点最多3个邻居
index.init_index(max_elements=num_elements, M=3)
# 构造10条均匀分布于空间的三维向量,类型为int32
data = np.array(
[
[0, 0, 0],
[0, 10, 10],
[10, 0, 10],
[10, 10, 0],
[5, 0, 5],
[0, 5, 5],
[5, 5, 0],
[10, 5, 5],
[5, 10, 5],
[5, 5, 10],
],
dtype=np.int32,
)
# 生成标签,范围1~10
labels = np.arange(1, num_elements + 1)
# 批量将向量数据插入到索引
index.add_items(data, labels)
# 设置搜索宽度explorationFactor为10
index.set_explorationFactor(5)
# 构造一个三维查询向量,位于空间中心,更适合均匀分布 [5,5,5]
query = np.array([5, 5, 5], dtype=np.int32)
# 查询与该向量最接近的2个向量
labels, distances = index.knn_query(query, k=2)
# 打印查询向量
print(f'查询向量: {query}')
# 打印查询结果,逐个输出最相似的2个向量
print('\n找到最相似的 2 个向量:')
for rank, (label, distance) in enumerate(zip(labels[0], distances[0]), 1):
print(f' 第 {rank} 名: 标签={label}, 向量值={data[label-1]}, 距离={distance:.4f}')8.分层 #
# 导入heapq库,用于优先队列(堆实现)
import heapq
# 导入numpy库,用于数值计算
import numpy as np
+# 导入random库,用于生成随机层级
+import random
+# 导入math库,用于数学相关操作
+import math
+"""
+HNSW分层实现说明:
+
+1. 多层图结构:
+ - graphs[layer] = {label -> 邻居列表},每层一个独立的图
+ - 底层(layer 0)包含所有节点
+ - 上层节点逐渐减少,形成"金字塔"结构
+
+2. 节点层级分配:
+ - 使用指数衰减分布随机决定节点出现在哪些层
+ - 每个节点至少出现在layer 0
+ - 层级越高,节点越少,搜索路径越短
+
+3. 分层搜索:
+ - 从最高层开始搜索,快速定位大致区域
+ - 逐层向下细化搜索
+ - 在底层(layer 0)获取最终结果
+
+4. 分层插入:
+ - 从顶层到底层逐层插入
+ - 每层独立选择邻居并建立连接
+ - 上层连接用于快速导航,下层连接保证精度
+"""
+
+# 定义HNSW类(分层实现)
class HNSW:
+ # 文档字符串,描述类用途
+ """分层导航小世界图(Hierarchical Navigable Small World)"""
+ # 初始化函数,设置空间类型和维度
def __init__(self, space: str, dim: int):
+ # 检查空间类型,只支持l2距离
if space != 'l2':
raise ValueError('示例仅支持 L2 距离')
# 保存向量维度
self.dim = dim
+ # 最大元素数量,默认为0
self.max_elements = 0
+ # 节点最大邻居数
self.M = 0
+ # 搜索宽度参数,默认10
+ self.explorationFactor = 10
+ # 所有标签到向量的映射
self.vectors = {} # label -> 向量
+ # 多层图结构:layer -> {label->邻居列表}
+ self.graphs = {} # layer -> {label -> 邻居}
+ # 每个节点出现在哪些层的记录
+ self.node_layers = {} # label -> [层]
+ # 当前最高层编号
+ self.max_layer = 0
+ # 各层的入口点
+ self.entry_points = {} # layer -> entry_point
+ # 索引初始化,设置最大容量和最大邻居数
def init_index(self, max_elements: int, M: int):
# 设置最大节点数
self.max_elements = max_elements
+ # 设置每个节点最大邻居数
self.M = M
+ # 初始化第0层图结构
+ self.graphs[0] = {}
+ # 当前最高层为0
+ self.max_layer = 0
+ # 批量添加数据与标签
def add_items(self, data, labels):
+ # 遍历所有数据与标签
for vector, label in zip(data, labels):
+ # 检查是否超出最大容量
if len(self.vectors) >= self.max_elements:
raise ValueError('已达到索引容量上限')
+
+ # 保存向量数据
self.vectors[label] = vector
+
+ # 如果是第一个节点,直接作为入口点
+ if len(self.vectors) == 1:
+ # 随机计算节点出现的最高层数
+ node_level = self._random_level()
+ self.node_layers[label] = list(range(node_level + 1))
+ # 每一层都初始化邻居列表
+ for layer in self.node_layers[label]:
+ if layer not in self.graphs:
+ self.graphs[layer] = {}
+ self.graphs[layer][label] = []
+ # 每一层设置入口点
+ for layer in self.node_layers[label]:
+ self.entry_points[layer] = label
+ # 更新最高层数
+ self.max_layer = max(self.max_layer, node_level)
+ continue
+
+ # 非首节点,随机决定出现层级
+ node_level = self._random_level()
+ self.node_layers[label] = list(range(node_level + 1))
+
+ # 如果新节点的层级更高,更新最高层数
+ if node_level > self.max_layer:
+ self.max_layer = node_level
+
+ # 从顶层到底层逐层插入
+ current_node = None
+ # 逐层构建连接(高层到低层)
+ for layer in range(node_level, -1, -1):
+ # 如果本层图结构不存在,则新建
+ if layer not in self.graphs:
+ self.graphs[layer] = {}
+ # 初始化自己的邻居列表
+ if label not in self.graphs[layer]:
+ self.graphs[layer][label] = []
+ # 顶层:入口点
+ if layer == node_level:
+ entry = self.entry_points.get(layer, None)
+ if entry is None:
+ # 本层无入口点,则该节点成为入口
+ self.entry_points[layer] = label
+ current_node = label
+ else:
+ # 本层有入口点,从入口点开始层内搜索
+ candidates = self._search_layer(entry, vector, layer, ef=1)
+ if candidates:
+ # 最近节点作为起始点
+ current_node = candidates[0][0]
+ else:
+ current_node = entry
+ else:
+ # 低层:用上一层结果做起点
+ pass
+
+ # 在本层搜索候选邻居
+ candidates = self._search_layer(current_node, vector, layer, ef=self.explorationFactor)
+ # 选择本层最近的M个邻居
+ neighbors = self._select_neighbors_in_layer(vector, candidates, layer, M=self.M)
+
+ # 建立与这些邻居的双向连接
+ for nb in neighbors:
+ # 跳过自己
+ if nb == label:
+ continue
+ # 当前节点连向邻居
+ if nb not in self.graphs[layer][label]:
+ self.graphs[layer][label].append(nb)
+ # 邻居连回当前节点
+ if label not in self.graphs[layer].get(nb, []):
+ if nb not in self.graphs[layer]:
+ self.graphs[layer][nb] = []
+ self.graphs[layer][nb].append(label)
+ # 更新下层入口点
+ if candidates:
+ current_node = candidates[0]
+
+ # 设置搜索宽度参数
def set_explorationFactor(self, explorationFactor: int):
+ # 搜索宽度不能小于1
self.explorationFactor = max(1, explorationFactor)
+ # 分层搜索K近邻
def knn_query(self, query, k: int):
+ # 如果没有入口点报错
+ if not self.entry_points:
raise ValueError('索引为空')
+
+ # 从最高层向下分层查找
+ current_node = None
+ for layer in range(self.max_layer, -1, -1):
+ # 跳过没有入口点的层
+ if layer not in self.entry_points:
+ continue
+ # 最高层从入口点出发
+ if layer == self.max_layer:
+ entry = self.entry_points[layer]
+ # 在本层查找最近点
+ candidates = self._search_layer(entry, query, layer, ef=self.explorationFactor)
+ if candidates:
+ current_node = candidates[0][0]
+ else:
+ current_node = entry
+ else:
+ # 低层:用上一层结果做起点
+ candidates = self._search_layer(current_node, query, layer, ef=self.explorationFactor)
+ if candidates:
+ current_node = candidates[0][0]
+
+ # 最底层做最终K近邻搜索
+ if current_node is None:
+ current_node = self.entry_points.get(0, None)
+ if current_node is None:
+ raise ValueError('索引为空')
+
+ # 以更大的ef查找更多候选
+ candidates = self._search_layer(current_node, query, 0, ef=max(self.explorationFactor, k))
+ # 按距离排序,仅保留前K个
+ candidates.sort(key=lambda x: x[1])
+ top_k = candidates[:k]
+ # 返回标签与距离的ndarray
+ labels = np.array([[label for label, _ in top_k]])
+ distances = np.array([[dist for _, dist in top_k]])
+ return labels, distances
+
+ # 决定节点最高层级(指数衰减,越高概率越低)
+ def _random_level(self):
+ """
+ 使用指数衰减分布决定节点层级。层越高概率越小。
+ """
+ level = 0
+ # 每次0.5概率进入更高一层,最大到16层
+ while random.random() < 0.5 and level < 16:
+ level += 1
+ return level
+
+ # 在指定层内以贪心(近邻扩展)找ef个最近点
+ def _search_layer(self, entry_node, query, layer, ef):
+ """
+ 指定入口节点,从该层用贪心搜索ef个近邻。返回按距离从近到远排序。
+ """
+ # 如果层或入口不存在直接返回空
+ if layer not in self.graphs or entry_node not in self.graphs[layer]:
+ return []
+
+ # 记录已访问节点
visited = set()
+ # 待扩展优先队列(距离,节点)
frontier = []
+ # 先计算入口点距离
+ entry_dist = self._l2(query, self.vectors[entry_node])
+ # 入队
+ heapq.heappush(frontier, (entry_dist, entry_node))
+ # 维护最大堆,保存最近的ef个点(距离取负以最大堆方式)
+ best = []
while frontier:
dist, node = heapq.heappop(frontier)
+ # 如果访问过则跳过
if node in visited:
continue
+ # 标记访问
visited.add(node)
+ # 维护best堆
+ if len(best) < ef or dist < -best[0][0]:
heapq.heappush(best, (-dist, node))
+ if len(best) > ef:
heapq.heappop(best)
+ # 将当前节点的所有邻居加入frontier扩展
+ for nb in self.graphs[layer].get(node, []):
if nb not in visited:
nd = self._l2(query, self.vectors[nb])
heapq.heappush(frontier, (nd, nb))
+ # 堆弹出后变为(距离由近到远)列表
+ best_pairs = sorted(best, key=lambda x: -x[0])
+ # 返回[(label, 距离)]
+ return [(label, -score) for score, label in best_pairs]
+ # 在某层候选节点列表中选出最近M个邻居
+ def _select_neighbors_in_layer(self, vector, candidates, layer, M):
+ """
+ 在候选集中选取与vector最近的M个作邻居。返回label列表。
+ """
+ # 没有候选则直接返回空
+ if not candidates:
+ return []
+ # 计算与vector的L2距离并排序
+ dists = [(self._l2(vector, self.vectors[label]), label) for label, _ in candidates]
dists.sort(key=lambda x: x[0])
+ # 返回距离最小的M个label
+ return [label for _, label in dists[:M]]
+ # 静态方法,计算L2距离
@staticmethod
def _l2(a, b):
+ # 使用numpy的linalg.norm计算欧式距离
return float(np.linalg.norm(a - b))
# 设置向量维度为3
dim = 3
+# 设置元素数量为10
num_elements = 10
+# 创建HNSW实例(l2空间,3维向量)
index = HNSW(space='l2', dim=dim)
+# 初始化索引,最多10个元素,每节点最多3个邻居
index.init_index(max_elements=num_elements, M=3)
+# 构造10个三维向量,每行为一个点(类型int32)
data = np.array(
[
[0, 0, 0],
[0, 10, 10],
[10, 0, 10],
[10, 10, 0],
[5, 0, 5],
[0, 5, 5],
[5, 5, 0],
[10, 5, 5],
[5, 10, 5],
[5, 5, 10],
],
dtype=np.int32,
)
+# 生成标签(1~10)
labels = np.arange(1, num_elements + 1)
+# 将所有向量和标签批量插入
index.add_items(data, labels)
+# 设置搜索宽度参数为5
index.set_explorationFactor(5)
+# 构造检索向量(空间中央,[5,5,5])
query = np.array([5, 5, 5], dtype=np.int32)
+# 查询最相似的2个点
labels, distances = index.knn_query(query, k=2)
# 打印查询向量
print(f'查询向量: {query}')
# 打印查询结果,逐个输出最相似的2个向量
print('\n找到最相似的 2 个向量:')
for rank, (label, distance) in enumerate(zip(labels[0], distances[0]), 1):
print(f' 第 {rank} 名: 标签={label}, 向量值={data[label-1]}, 距离={distance:.4f}')9. 优缺点 #
9.1 优点 #
极高的性能:在多个公开数据集基准测试中,HNSW 在速度和精度方面都名列前茅。可以在毫秒级时间内从百万级数据中找到相似结果。
易于使用:有成熟的 Python 库(如
hnswlib),API 简单直观,几行代码就能使用。支持动态插入:无需重建整个索引即可插入新数据,非常适合流式数据场景。
通用性强:适用于各种距离度量(欧氏距离、余弦相似度、内积等),可以处理各种类型的向量数据。
参数直观:主要参数(M、ef_construction、ef_search)的含义清晰,容易理解和调优。
9.2 缺点 #
内存占用较高:需要存储整个图结构,对于十亿级别以上的数据,内存可能成为瓶颈。但通常百万级到千万级数据是可以接受的。
参数需要调优:性能表现依赖于参数调优,不同数据集的最优参数可能不同,需要根据实际情况测试和调整。
构建时间较长:相比于一些简单的索引(如线性索引),构建 HNSW 索引需要更多时间。但对于大多数应用场景,这个时间是可以接受的。
不保证绝对精确:HNSW 是近似算法,不保证找到绝对最近邻,但通常能找到非常接近的结果。
9.3 适用场景 #
适合使用 HNSW 的场景:
- 需要快速相似度搜索的应用(推荐系统、搜索引擎、图像检索等)
- 数据规模在百万级到千万级
- 可以接受微小的精度损失以换取速度提升
- 需要支持动态添加数据
不适合使用 HNSW 的场景:
- 数据规模超过十亿级(内存可能成为瓶颈)
- 必须找到绝对最近邻(需要精确算法)
- 对构建时间要求极高(需要更简单的索引)