导航菜单

  • 1.vector
  • 2.milvus
  • 3.pymilvus
  • 4.rag
  • 5.rag_measure
  • 7.search
  • ragflow
  • heapq
  • HNSW
  • cosine_similarity
  • math
  • typing
  • etcd
  • minio
  • collections
  • jieba
  • random
  • beautifulsoup4
  • chromadb
  • sentence_transformers
  • numpy
  • lxml
  • openpyxl
  • PyMuPDF
  • python-docx
  • requests
  • python-pptx
  • text_splitter
  • all-MiniLM-L6-v2
  • openai
  • llm
  • BPETokenizer
  • Flask
  • RAGAS
  • BagofWords
  • langchain
  • Pydantic
  • abc
  • faiss
  • MMR
  • scikit-learn
  • 1. 什么是 HNSW?
    • 1.1 它要解决什么问题?
    • 1.2 为什么传统方法不够好?
    • 1.3 HNSW 的优势
  • 2. 前置知识
    • 2.1 什么是向量?
    • 2.2 什么是向量相似度?
    • 2.3 什么是近似最近邻搜索?
  • 3.安装和环境准备
    • 3.1 安装 hnswlib
    • 3.2 验证安装
  • 4.核心思想:像查地图一样查数据
    • 4.1 生活中的类比
    • 4.2 HNSW 的分层结构
  • 5.HNSW基础使用
  • 6. HNSW 的工作原理
    • 6.1 小世界网络
    • 6.2 分层结构
    • 6.3 构建过程
  • 7. 实现
  • 8.分层
  • 9. 优缺点
    • 9.1 优点
    • 9.2 缺点
    • 9.3 适用场景

1. 什么是 HNSW? #

HNSW 的全称是 Hierarchical Navigable Small World,翻译过来就是 "可导航小世界分层图"。它是一种用于快速查找相似向量的算法,广泛应用于推荐系统、搜索引擎、图像检索等领域。

1.1 它要解决什么问题? #

想象一下,你有一个包含100万张图片的数据库,每张图片都可以用一个512维的向量来表示其特征。现在给你一张新图片,如何快速从这100万张图片中找到最相似的几张?

这就是高维向量相似度搜索问题。HNSW 就是为了解决这个问题而生的。

1.2 为什么传统方法不够好? #

  • 线性扫描太慢:如果要精确查找,需要计算新向量与所有100万个向量的距离,这可能需要几秒钟甚至更长时间。

1.3 HNSW 的优势 #

HNSW 采用近似最近邻搜索的策略,用微小的精度损失换取巨大的速度提升:

  • 速度快:可以在毫秒级时间内从百万级数据中找到相似结果
  • 精度高:虽然不保证找到绝对最近邻,但通常能找到非常接近的结果
  • 易用性好:有成熟的 Python 库(如 hnswlib)可以直接使用

2. 前置知识 #

在学习 HNSW 之前,我们需要了解一些基础概念。

2.1 什么是向量? #

向量就是一组有序的数字,比如 [1.2, 3.4, 5.6] 就是一个3维向量。在 AI 领域,文本、图片、音频等都可以转换成向量来表示。

# 导入 numpy 库用于处理向量
import numpy as np

# 创建一个3维向量,表示某个对象的特征
vector = np.array([1.2, 3.4, 5.6])
# 打印向量的维度
print(f"向量维度: {vector.shape}")
# 打印向量内容
print(f"向量内容: {vector}")

2.2 什么是向量相似度? #

两个向量越相似,它们的距离就越小。常用的距离计算方法有:

欧氏距离(L2距离):计算两个向量对应位置差的平方和的平方根。就像在坐标系中计算两点之间的直线距离。

# 导入 numpy 库用于数学计算
import numpy as np

# 定义函数,计算两个向量的欧氏距离
def euclidean_distance(vec1, vec2):
    # 计算两个向量对应位置差的平方
    diff = vec1 - vec2
    # 计算平方和
    squared_diff = np.sum(diff ** 2)
    # 返回平方根,即欧氏距离
    return np.sqrt(squared_diff)

# 定义两个示例向量
vector_a = np.array([1.0, 2.0, 3.0])
vector_b = np.array([4.0, 5.0, 6.0])

# 计算并打印两个向量之间的欧氏距离
distance = euclidean_distance(vector_a, vector_b)
print(f"向量 A: {vector_a}")
print(f"向量 B: {vector_b}")
print(f"欧氏距离: {distance:.2f}")

余弦相似度:计算两个向量夹角的余弦值,范围在 -1 到 1 之间。值越接近 1,表示两个向量方向越相似。 余弦相似度

# 导入 numpy 库用于数学计算
import numpy as np

# 定义函数,计算两个向量的余弦相似度
def cosine_similarity(vec1, vec2):
    # 计算两个向量的点积(内积)
    dot_product = np.dot(vec1, vec2)
    # 计算第一个向量的模长(长度)
    norm1 = np.linalg.norm(vec1)
    # 计算第二个向量的模长(长度)
    norm2 = np.linalg.norm(vec2)
    # 如果模长为0,返回0(避免除以0)
    if norm1 == 0 or norm2 == 0:
        return 0
    # 返回点积除以两个模长的乘积,即余弦相似度
    return dot_product / (norm1 * norm2)

# 定义两个示例向量
vector_a = np.array([1.0, 2.0, 3.0])
vector_b = np.array([2.0, 4.0, 6.0])  # 与 vector_a 方向相同,只是长度不同

# 计算并打印两个向量的余弦相似度
similarity = cosine_similarity(vector_a, vector_b)
print(f"向量 A: {vector_a}")
print(f"向量 B: {vector_b}")
print(f"余弦相似度: {similarity:.2f}")

2.3 什么是近似最近邻搜索? #

精确最近邻搜索:找到距离查询向量最近的那个向量(需要遍历所有向量,速度慢)。

近似最近邻搜索(ANN):找到距离查询向量"比较近"的向量(不需要遍历所有向量,速度快,但可能不是绝对最近的那个)。

HNSW 就是一种近似最近邻搜索算法。

3.安装和环境准备 #

在使用 HNSW 之前,我们需要安装必要的库。

3.1 安装 hnswlib #

hnswlib 是一个高效的 HNSW 实现库,支持 Python。

Windows 系统:

# 使用 pip 安装 hnswlib 库
pip install hnswlib

macOS 系统:

# 使用 pip3 安装 hnswlib 库
pip3 install hnswlib

如果安装失败,可以尝试使用国内镜像源:

Windows 系统:

# 使用清华镜像源安装 hnswlib
pip install -i https://pypi.tuna.tsinghua.edu.cn/simple hnswlib

macOS 系统:

# 使用清华镜像源安装 hnswlib
pip3 install -i https://pypi.tuna.tsinghua.edu.cn/simple hnswlib

3.2 验证安装 #

安装完成后,我们可以写一个简单的测试程序来验证安装是否成功:

# 导入 hnswlib 库
import hnswlib
# 验证安装是否成功
print("hnswlib 安装成功!")

4.核心思想:像查地图一样查数据 #

HNSW 的核心思想非常直观,就像我们在地图上找地点一样。

4.1 生活中的类比 #

想象一下你要在一个复杂的世界地图上找一个特定的地点,你会怎么做?

  1. 先看全局地图:你首先会找到世界地图或国家地图,确定大方向(比如在哪个国家)。
  2. 再看区域地图:然后你找到省份或城市的地图,范围越来越小。
  3. 最后看详细地图:最后你查看街道地图,找到具体的目标地点。

HNSW 正是模拟了这个过程!

查地图

4.2 HNSW 的分层结构 #

HNSW 构建了一个多层图结构,就像不同比例尺的地图:

  • 最高层(顶层):只包含极少数的数据点,就像世界地图只显示主要城市。这一层用于快速定位大致方向。
  • 中间层:包含更多数据点,就像省份地图显示更多城市。用于逐步缩小搜索范围。
  • 第0层(底层):包含所有数据点,就像详细的街道地图。用于精确查找目标。

多层图

关键优势:通过高层快速跳转,避免了在底层进行全局的、漫长的搜索,大大提高了查找速度。

5.HNSW基础使用 #

这个例子展示了如何创建一个 HNSW 索引,插入一些向量数据,然后进行搜索。

# 导入 hnswlib 库,用于创建 HNSW 索引
import hnswlib
# 导入 numpy 库,用于生成和处理向量数据
import numpy as np

# 设置随机种子,确保每次运行结果一致
np.random.seed(42)

# 定义向量维度为 128(可以根据实际需求修改)
dim = 128
# 定义要插入的向量数量
num_elements = 1000

# 创建一个 HNSW 索引对象
# 'l2' 表示使用欧氏距离(L2距离)作为相似度度量
p = hnswlib.Index(space='l2', dim=dim)

# 初始化索引
# max_elements 表示索引最多可以存储的向量数量
# ef_construction 是构建时的参数,影响索引质量(稍后详细说明)
# M 是每个节点的最大连接数,影响索引的稠密程度(稍后详细说明)
p.init_index(max_elements=num_elements, ef_construction=200, M=16)

# 生成随机向量数据作为示例
# 生成 1000 个 128 维的随机向量,每个元素的值在 0 到 1 之间
data = np.random.random((num_elements, dim)).astype(np.float32)

# 将向量数据添加到索引中
# 这里我们使用向量的索引作为标签(0, 1, 2, ..., 999)
p.add_items(data, np.arange(num_elements))

# 设置搜索参数
# ef_search 是搜索时的候选集大小,影响搜索精度和速度
p.set_ef(50)

# 定义查询向量(也是随机生成的,实际应用中这是你要查找的目标)
query = np.random.random((dim,)).astype(np.float32)

# 执行搜索,查找最相似的 5 个向量
# k 表示要返回的最相似向量的数量
labels, distances = p.knn_query(query, k=5)

# 打印搜索结果
print(f"查询向量维度: {query.shape}")
print(f"\n找到最相似的 5 个向量:")
# 遍历搜索结果
for i, (label, distance) in enumerate(zip(labels[0], distances[0])):
    # 打印每个结果的标签(向量在索引中的位置)和距离
    print(f"  第 {i+1} 名: 标签={label}, 距离={distance:.4f}")

6. HNSW 的工作原理 #

虽然 HNSW 的内部实现很复杂,但我们可以用简单的概念来理解它。

6.1 小世界网络 #

HNSW 基于"小世界网络"的概念。想象一个社交网络:

  • 短程连接:你的朋友(直接连接,保证你能找到附近的人)
  • 长程连接:一些"超级连接者"(可以快速跳到遥远的区域)

在 HNSW 中:

  • 每个数据点(向量)都有一些"邻居"连接
  • 这些连接包括附近的点(短程)和一些较远的点(长程)
  • 搜索时,从一个点开始,沿着连接不断"走"向更接近目标的位置

6.2 分层结构 #

HNSW 的关键创新是分层:

  • 顶层:数据点很少,但连接很"长",可以快速跨越很大距离
  • 底层:包含所有数据点,连接很"短",用于精确查找

搜索过程:

  1. 从顶层开始,快速定位大致方向
  2. 逐层下降,每层都进行局部搜索
  3. 到底层时,进行最后的精确搜索

这就像从世界地图 → 国家地图 → 城市地图 → 街道地图的查找过程。

6.3 构建过程 #

当插入一个新向量时:

  1. 随机分配层数:新向量会被随机分配到某一层(高层概率小,底层概率大)
  2. 找到入口点:从顶层开始,逐层找到距离新向量最近的节点
  3. 建立连接:在新向量所在的层及以下各层,找到一些邻居并建立连接

7. 实现 #

演示

# 导入heapq库,用于优先队列(堆实现)
import heapq
# 导入numpy库,用于数值计算
import numpy as np

# 定义HNSW类
class HNSW:
    # 文档字符串,描述类用途:核心增量建图 + 贪心/最佳优先搜索
    """核心增量建图 + 贪心/最佳优先搜索。"""

    # 初始化方法,指定空间类型和向量维度
    def __init__(self, space: str, dim: int):
        # 检查空间类型,只支持L2距离度量空间,如果不是则抛出异常
        if space != 'l2':
            raise ValueError('示例仅支持 L2 距离')
        # 保存向量维度
        self.dim = dim
        # 最大元素数量,默认0
        self.max_elements = 0
        # 节点的最大邻居数
        self.M = 0
        # 搜索宽度,默认10
        self.explorationFactor = 10  # 默认搜索宽度
        # 用字典存储所有标签到向量的映射
        self.vectors = {}  # label -> 向量
        # 用字典存储标签到邻居列表的图结构
        self.graph = {}  # label -> 邻居列表
        # 图的入口点,默认为None
        self.entry_point = None

    # 初始化索引,设置最大元素数量和最大邻居数
    def init_index(self, max_elements: int, M: int):
        # 设置最大节点数
        self.max_elements = max_elements
        # 设置每个节点的最大邻居数
        self.M = M

    # 添加新向量和标签到图结构中
    def add_items(self, data, labels):
        # 遍历所有输入的向量和对应标签
        for vector, label in zip(data, labels):
            # 如果已达到容量上限,抛出异常
            if len(self.vectors) >= self.max_elements:
                raise ValueError('已达到索引容量上限')
            # 初始化该节点的邻居列表(如果尚不存在)
            self.graph.setdefault(label, [])
            # 如果entry_point为空,设置为当前节点,并存储向量
            if self.entry_point is None:
                self.entry_point = label
                # 保存当前向量
                self.vectors[label] = vector
                # 跳过剩余流程
                continue
            # 选择当前节点距离最近的M个邻居(还未加入vectors)
            neighbors = self._select_neighbors(vector)
            # 保存该label下的向量数据
            self.vectors[label] = vector
            # 给当前节点和其邻居建立双向连接,且去重
            for nb in neighbors:
                # 跳过自己
                if nb == label:
                    continue
                # 如果nb还不在当前节点的邻居表,添加
                if nb not in self.graph[label]:
                    self.graph[label].append(nb)
                # 如果当前节点还不在nb的邻居表,添加
                if label not in self.graph.get(nb, []):
                    self.graph.setdefault(nb, []).append(label)

    # 设置搜索宽度ef参数
    def set_explorationFactor(self, explorationFactor: int):
        # 确保搜索宽度至少为1
        self.explorationFactor = max(1, explorationFactor)

    # 查询与查询向量最相似的k个向量
    def knn_query(self, query, k: int):
        # 如果不存在入口节点,抛出异常
        if self.entry_point is None:
            raise ValueError('索引为空')
        # 记录已访问过的节点
        visited = set()
        # 定义待探索队列(小顶堆),初始为空
        frontier = []
        # 计算查询向量与入口点的距离
        entry_dist = self._l2(query, self.vectors[self.entry_point])
        # 将入口点和对应距离推入待探索队列
        heapq.heappush(frontier, (entry_dist, self.entry_point))
        # 初始化最大堆用于保留最优候选
        best = []  # 最大堆,存放当前最优候选

        # 只要待探索队列非空,持续搜索
        while frontier:
            # 弹出距离最小的点
            dist, node = heapq.heappop(frontier)
            # 如果已经访问过该节点则跳过
            if node in visited:
                continue
            # 标记当前节点为已访问
            visited.add(node)
            # 若best候选数量不满explorationFactor或当前距离更优,则加入best堆
            if len(best) < self.explorationFactor or dist < -best[0][0]:
                heapq.heappush(best, (-dist, node))
                # 如果堆超出了explorationFactor大小,则弹出堆顶(最差的那个)
                if len(best) > self.explorationFactor:
                    heapq.heappop(best)
            # 遍历该节点的所有邻居,若未访问则计算距离并推入队列
            for nb in self.graph.get(node, []):
                if nb not in visited:
                    nd = self._l2(query, self.vectors[nb])
                    heapq.heappush(frontier, (nd, nb))

        # 按距离由小到大排序最优候选,只取前k个
        best_pairs = sorted(best, key=lambda x: -x[0])[:k]
        # 构造返回的标签数组(labels)
        labels = np.array([[label for _, label in best_pairs]])
        # 构造返回的距离数组(distances)
        distances = np.array([[ -score for score, _ in best_pairs ]])
        # 返回最相似的labels和对应距离
        return labels, distances

    # 内部方法,选择距离vector最近的M个邻居(基于L2距离)
    def _select_neighbors(self, vector):
        # 计算vector与已存在每个节点的距离,组装为(距离, label)对
        dists = [
            (self._l2(vector, other), label)
            for label, other in self.vectors.items()
        ]
        # 按距离升序排序
        dists.sort(key=lambda x: x[0])
        # 返回最近的M个label
        return [label for _, label in dists[: self.M]]

    # 静态方法,计算两个向量的L2距离
    @staticmethod
    def _l2(a, b):
        # 用numpy计算欧氏距离并返回float类型
        return float(np.linalg.norm(a - b))

# 设置向量维度为3
dim = 3
# 设置总向量个数为10
num_elements = 10
# 创建HNSW实例,空间类型为l2,维度为3
index = HNSW(space='l2', dim=dim)
# 初始化索引,最多10个元素,每个节点最多3个邻居
index.init_index(max_elements=num_elements, M=3)
# 构造10条均匀分布于空间的三维向量,类型为int32
data = np.array(
    [
        [0, 0, 0],
        [0, 10, 10],
        [10, 0, 10],
        [10, 10, 0],
        [5, 0, 5],
        [0, 5, 5],
        [5, 5, 0],
        [10, 5, 5],
        [5, 10, 5],
        [5, 5, 10],
    ],
    dtype=np.int32,
)
# 生成标签,范围1~10
labels = np.arange(1, num_elements + 1)
# 批量将向量数据插入到索引
index.add_items(data, labels)
# 设置搜索宽度explorationFactor为10
index.set_explorationFactor(5)
# 构造一个三维查询向量,位于空间中心,更适合均匀分布 [5,5,5]
query = np.array([5, 5, 5], dtype=np.int32)
# 查询与该向量最接近的2个向量
labels, distances = index.knn_query(query, k=2)
# 打印查询向量
print(f'查询向量: {query}')
# 打印查询结果,逐个输出最相似的2个向量
print('\n找到最相似的 2 个向量:')
for rank, (label, distance) in enumerate(zip(labels[0], distances[0]), 1):
    print(f'  第 {rank} 名: 标签={label}, 向量值={data[label-1]}, 距离={distance:.4f}')

8.分层 #

分层

# 导入heapq库,用于优先队列(堆实现)
import heapq
# 导入numpy库,用于数值计算
import numpy as np
+# 导入random库,用于生成随机层级
+import random
+# 导入math库,用于数学相关操作
+import math

+"""
+HNSW分层实现说明:
+
+1. 多层图结构:
+  - graphs[layer] = {label -> 邻居列表},每层一个独立的图
+  - 底层(layer 0)包含所有节点
+  - 上层节点逐渐减少,形成"金字塔"结构
+
+2. 节点层级分配:
+  - 使用指数衰减分布随机决定节点出现在哪些层
+  - 每个节点至少出现在layer 0
+  - 层级越高,节点越少,搜索路径越短
+
+3. 分层搜索:
+  - 从最高层开始搜索,快速定位大致区域
+  - 逐层向下细化搜索
+  - 在底层(layer 0)获取最终结果
+
+4. 分层插入:
+  - 从顶层到底层逐层插入
+  - 每层独立选择邻居并建立连接
+  - 上层连接用于快速导航,下层连接保证精度
+"""
+
+# 定义HNSW类(分层实现)
class HNSW:
+   # 文档字符串,描述类用途
+   """分层导航小世界图(Hierarchical Navigable Small World)"""

+   # 初始化函数,设置空间类型和维度
    def __init__(self, space: str, dim: int):
+       # 检查空间类型,只支持l2距离
        if space != 'l2':
            raise ValueError('示例仅支持 L2 距离')
        # 保存向量维度
        self.dim = dim
+       # 最大元素数量,默认为0
        self.max_elements = 0
+       # 节点最大邻居数
        self.M = 0
+       # 搜索宽度参数,默认10
+       self.explorationFactor = 10
+       # 所有标签到向量的映射
        self.vectors = {}  # label -> 向量
+       # 多层图结构:layer -> {label->邻居列表}
+       self.graphs = {}   # layer -> {label -> 邻居}
+       # 每个节点出现在哪些层的记录
+       self.node_layers = {}  # label -> [层]
+       # 当前最高层编号
+       self.max_layer = 0
+       # 各层的入口点
+       self.entry_points = {}  # layer -> entry_point

+   # 索引初始化,设置最大容量和最大邻居数
    def init_index(self, max_elements: int, M: int):
        # 设置最大节点数
        self.max_elements = max_elements
+       # 设置每个节点最大邻居数
        self.M = M
+       # 初始化第0层图结构
+       self.graphs[0] = {}
+       # 当前最高层为0
+       self.max_layer = 0

+   # 批量添加数据与标签
    def add_items(self, data, labels):
+       # 遍历所有数据与标签
        for vector, label in zip(data, labels):
+           # 检查是否超出最大容量
            if len(self.vectors) >= self.max_elements:
                raise ValueError('已达到索引容量上限')
+
+           # 保存向量数据
            self.vectors[label] = vector
+
+           # 如果是第一个节点,直接作为入口点
+           if len(self.vectors) == 1:
+               # 随机计算节点出现的最高层数
+               node_level = self._random_level()
+               self.node_layers[label] = list(range(node_level + 1))
+               # 每一层都初始化邻居列表
+               for layer in self.node_layers[label]:
+                   if layer not in self.graphs:
+                       self.graphs[layer] = {}
+                   self.graphs[layer][label] = []
+               # 每一层设置入口点
+               for layer in self.node_layers[label]:
+                   self.entry_points[layer] = label
+               # 更新最高层数
+               self.max_layer = max(self.max_layer, node_level)
+               continue
+
+           # 非首节点,随机决定出现层级
+           node_level = self._random_level()
+           self.node_layers[label] = list(range(node_level + 1))
+
+           # 如果新节点的层级更高,更新最高层数
+           if node_level > self.max_layer:
+               self.max_layer = node_level
+
+           # 从顶层到底层逐层插入
+           current_node = None
+           # 逐层构建连接(高层到低层)
+           for layer in range(node_level, -1, -1):
+               # 如果本层图结构不存在,则新建
+               if layer not in self.graphs:
+                   self.graphs[layer] = {}
+               # 初始化自己的邻居列表
+               if label not in self.graphs[layer]:
+                   self.graphs[layer][label] = []
+               # 顶层:入口点
+               if layer == node_level:
+                   entry = self.entry_points.get(layer, None)
+                   if entry is None:
+                       # 本层无入口点,则该节点成为入口
+                       self.entry_points[layer] = label
+                       current_node = label
+                   else:
+                       # 本层有入口点,从入口点开始层内搜索
+                       candidates = self._search_layer(entry, vector, layer, ef=1)
+                       if candidates:
+                           # 最近节点作为起始点
+                           current_node = candidates[0][0]
+                       else:
+                           current_node = entry
+               else:
+                   # 低层:用上一层结果做起点
+                   pass
+
+               # 在本层搜索候选邻居
+               candidates = self._search_layer(current_node, vector, layer, ef=self.explorationFactor)
+               # 选择本层最近的M个邻居
+               neighbors = self._select_neighbors_in_layer(vector, candidates, layer, M=self.M)
+
+               # 建立与这些邻居的双向连接
+               for nb in neighbors:
+                   # 跳过自己
+                   if nb == label:
+                       continue
+                   # 当前节点连向邻居
+                   if nb not in self.graphs[layer][label]:
+                       self.graphs[layer][label].append(nb)
+                   # 邻居连回当前节点
+                   if label not in self.graphs[layer].get(nb, []):
+                       if nb not in self.graphs[layer]:
+                           self.graphs[layer][nb] = []
+                       self.graphs[layer][nb].append(label)
+               # 更新下层入口点
+               if candidates:
+                   current_node = candidates[0]
+
+   # 设置搜索宽度参数
    def set_explorationFactor(self, explorationFactor: int):
+       # 搜索宽度不能小于1
        self.explorationFactor = max(1, explorationFactor)

+   # 分层搜索K近邻
    def knn_query(self, query, k: int):
+       # 如果没有入口点报错
+       if not self.entry_points:
            raise ValueError('索引为空')
+
+       # 从最高层向下分层查找
+       current_node = None
+       for layer in range(self.max_layer, -1, -1):
+           # 跳过没有入口点的层
+           if layer not in self.entry_points:
+               continue
+           # 最高层从入口点出发
+           if layer == self.max_layer:
+               entry = self.entry_points[layer]
+               # 在本层查找最近点
+               candidates = self._search_layer(entry, query, layer, ef=self.explorationFactor)
+               if candidates:
+                   current_node = candidates[0][0]
+               else:
+                   current_node = entry
+           else:
+               # 低层:用上一层结果做起点
+               candidates = self._search_layer(current_node, query, layer, ef=self.explorationFactor)
+               if candidates:
+                   current_node = candidates[0][0]
+
+       # 最底层做最终K近邻搜索
+       if current_node is None:
+           current_node = self.entry_points.get(0, None)
+           if current_node is None:
+               raise ValueError('索引为空')
+
+       # 以更大的ef查找更多候选
+       candidates = self._search_layer(current_node, query, 0, ef=max(self.explorationFactor, k))
+       # 按距离排序,仅保留前K个
+       candidates.sort(key=lambda x: x[1])
+       top_k = candidates[:k]
+       # 返回标签与距离的ndarray
+       labels = np.array([[label for label, _ in top_k]])
+       distances = np.array([[dist for _, dist in top_k]])
+       return labels, distances
+
+   # 决定节点最高层级(指数衰减,越高概率越低)
+   def _random_level(self):
+       """
+       使用指数衰减分布决定节点层级。层越高概率越小。
+       """
+       level = 0
+       # 每次0.5概率进入更高一层,最大到16层
+       while random.random() < 0.5 and level < 16:
+           level += 1
+       return level
+
+   # 在指定层内以贪心(近邻扩展)找ef个最近点
+   def _search_layer(self, entry_node, query, layer, ef):
+       """
+       指定入口节点,从该层用贪心搜索ef个近邻。返回按距离从近到远排序。
+       """
+       # 如果层或入口不存在直接返回空
+       if layer not in self.graphs or entry_node not in self.graphs[layer]:
+           return []
+
+       # 记录已访问节点
        visited = set()
+       # 待扩展优先队列(距离,节点)
        frontier = []
+       # 先计算入口点距离
+       entry_dist = self._l2(query, self.vectors[entry_node])
+       # 入队
+       heapq.heappush(frontier, (entry_dist, entry_node))
+       # 维护最大堆,保存最近的ef个点(距离取负以最大堆方式)
+       best = []
        while frontier:
            dist, node = heapq.heappop(frontier)
+           # 如果访问过则跳过
            if node in visited:
                continue
+           # 标记访问
            visited.add(node)
+           # 维护best堆
+           if len(best) < ef or dist < -best[0][0]:
                heapq.heappush(best, (-dist, node))
+               if len(best) > ef:
                    heapq.heappop(best)
+           # 将当前节点的所有邻居加入frontier扩展
+           for nb in self.graphs[layer].get(node, []):
                if nb not in visited:
                    nd = self._l2(query, self.vectors[nb])
                    heapq.heappush(frontier, (nd, nb))
+       # 堆弹出后变为(距离由近到远)列表
+       best_pairs = sorted(best, key=lambda x: -x[0])
+       # 返回[(label, 距离)]
+       return [(label, -score) for score, label in best_pairs]

+   # 在某层候选节点列表中选出最近M个邻居
+   def _select_neighbors_in_layer(self, vector, candidates, layer, M):
+       """
+       在候选集中选取与vector最近的M个作邻居。返回label列表。
+       """
+       # 没有候选则直接返回空
+       if not candidates:
+           return []
+       # 计算与vector的L2距离并排序
+       dists = [(self._l2(vector, self.vectors[label]), label) for label, _ in candidates]
        dists.sort(key=lambda x: x[0])
+       # 返回距离最小的M个label
+       return [label for _, label in dists[:M]]

+   # 静态方法,计算L2距离
    @staticmethod
    def _l2(a, b):
+       # 使用numpy的linalg.norm计算欧式距离
        return float(np.linalg.norm(a - b))

# 设置向量维度为3
dim = 3
+# 设置元素数量为10
num_elements = 10
+# 创建HNSW实例(l2空间,3维向量)
index = HNSW(space='l2', dim=dim)
+# 初始化索引,最多10个元素,每节点最多3个邻居
index.init_index(max_elements=num_elements, M=3)
+# 构造10个三维向量,每行为一个点(类型int32)
data = np.array(
    [
        [0, 0, 0],
        [0, 10, 10],
        [10, 0, 10],
        [10, 10, 0],
        [5, 0, 5],
        [0, 5, 5],
        [5, 5, 0],
        [10, 5, 5],
        [5, 10, 5],
        [5, 5, 10],
    ],
    dtype=np.int32,
)
+# 生成标签(1~10)
labels = np.arange(1, num_elements + 1)
+# 将所有向量和标签批量插入
index.add_items(data, labels)
+# 设置搜索宽度参数为5
index.set_explorationFactor(5)
+# 构造检索向量(空间中央,[5,5,5])
query = np.array([5, 5, 5], dtype=np.int32)
+# 查询最相似的2个点
labels, distances = index.knn_query(query, k=2)
# 打印查询向量
print(f'查询向量: {query}')
# 打印查询结果,逐个输出最相似的2个向量
print('\n找到最相似的 2 个向量:')
for rank, (label, distance) in enumerate(zip(labels[0], distances[0]), 1):
    print(f'  第 {rank} 名: 标签={label}, 向量值={data[label-1]}, 距离={distance:.4f}')

9. 优缺点 #

9.1 优点 #

  1. 极高的性能:在多个公开数据集基准测试中,HNSW 在速度和精度方面都名列前茅。可以在毫秒级时间内从百万级数据中找到相似结果。

  2. 易于使用:有成熟的 Python 库(如 hnswlib),API 简单直观,几行代码就能使用。

  3. 支持动态插入:无需重建整个索引即可插入新数据,非常适合流式数据场景。

  4. 通用性强:适用于各种距离度量(欧氏距离、余弦相似度、内积等),可以处理各种类型的向量数据。

  5. 参数直观:主要参数(M、ef_construction、ef_search)的含义清晰,容易理解和调优。

9.2 缺点 #

  1. 内存占用较高:需要存储整个图结构,对于十亿级别以上的数据,内存可能成为瓶颈。但通常百万级到千万级数据是可以接受的。

  2. 参数需要调优:性能表现依赖于参数调优,不同数据集的最优参数可能不同,需要根据实际情况测试和调整。

  3. 构建时间较长:相比于一些简单的索引(如线性索引),构建 HNSW 索引需要更多时间。但对于大多数应用场景,这个时间是可以接受的。

  4. 不保证绝对精确:HNSW 是近似算法,不保证找到绝对最近邻,但通常能找到非常接近的结果。

9.3 适用场景 #

适合使用 HNSW 的场景:

  • 需要快速相似度搜索的应用(推荐系统、搜索引擎、图像检索等)
  • 数据规模在百万级到千万级
  • 可以接受微小的精度损失以换取速度提升
  • 需要支持动态添加数据

不适合使用 HNSW 的场景:

  • 数据规模超过十亿级(内存可能成为瓶颈)
  • 必须找到绝对最近邻(需要精确算法)
  • 对构建时间要求极高(需要更简单的索引)

访问验证

请输入访问令牌

Token不正确,请重新输入