导航菜单

  • 1.vector
  • 2.milvus
  • 3.pymilvus
  • 4.rag
  • 5.rag_measure
  • 7.search
  • ragflow
  • heapq
  • HNSW
  • cosine_similarity
  • math
  • typing
  • etcd
  • minio
  • collections
  • jieba
  • random
  • beautifulsoup4
  • chromadb
  • sentence_transformers
  • numpy
  • lxml
  • openpyxl
  • PyMuPDF
  • python-docx
  • requests
  • python-pptx
  • text_splitter
  • all-MiniLM-L6-v2
  • openai
  • llm
  • BPETokenizer
  • Flask
  • RAGAS
  • BagofWords
  • langchain
  • Pydantic
  • abc
  • faiss
  • MMR
  • scikit-learn
  • 1. 基本概念
  • 2. 安装 PyMilvus
  • 3. 基本使用
    • 3.1 连接 Milvus
    • 3.2 创建集合(Collection)
    • 3.3 创建索引
    • 3.4 加载集合到内存
    • 3.5 插入数据
      • 3.5.1 insert
    • 3.6 向量搜索
    • 3.7 删除集合
  • 4. 高级特性
    • 4.1 分区管理
    • 4.2 混合搜索(向量+标量过滤)
  • 5. 最佳实践
  • 6. IVF_FLAT
  • 7.参考

1. 基本概念 #

PyMilvus 是 Milvus 向量数据库的 Python SDK,它允许开发者通过 Python 接口与 Milvus 进行交互,实现高效的向量相似度搜索和管理。 PyMilvus 提供了以下核心功能:

  • 集合(Collection)管理
  • 向量插入和删除
  • 向量相似度搜索
  • 分区(Partition)管理
  • 索引(Index)管理

2. 安装 PyMilvus #

建议使用 Python 3.7 及以上版本。

uv add pymilvus

3. 基本使用 #

3.1 连接 Milvus #

连接前请确保 Milvus 服务端已启动,默认端口为 19530。

# 导入 pymilvus 连接模块
from pymilvus import connections

# 连接到 Milvus 服务器(本地或远程)
# 使用默认连接别名,连接到本地主机的19530端口,数据库名为rensheng
connections.connect("default", host="localhost", port="19530", db_name="rensheng")

# 检查连接状态
# 验证名为"default"的连接是否存在,返回True表示连接成功
print(connections.has_connection("default"))  # True 表示连接成功

常见问题:

  • 端口不通或服务未启动会连接失败。
  • host/port 填写需与 Milvus 服务端一致。

3.2 创建集合(Collection) #

集合类似于数据库中的"表",需先定义字段(Field)和集合模式(Schema)。

# 导入 pymilvus 相关模块:集合模式、字段模式、数据类型、集合、连接
from pymilvus import CollectionSchema, FieldSchema, DataType, Collection, connections

# 连接到 Milvus 服务器,使用默认连接别名,连接到本地主机的19530端口,数据库名为rensheng
connections.connect("default", host="localhost", port="19530", db_name="rensheng")

# 定义集合的字段结构,包含主键ID字段和向量字段
fields = [
    # 定义主键字段:64位整数类型,自动生成ID
    FieldSchema(name="id", dtype=DataType.INT64, is_primary=True, auto_id=True),
    # 定义向量字段:128维浮点向量
    FieldSchema(name="embedding", dtype=DataType.FLOAT_VECTOR, dim=128),
]

# 创建集合模式对象,包含字段定义和描述信息
schema = CollectionSchema(fields, description="example")

# 检查集合是否存在,不存在才创建
collection_name = "example"
try:
    # 尝试获取已存在的集合
    collection = Collection(collection_name)
    print(f"集合 {collection_name} 已存在")
except Exception:
    # 集合不存在,创建新集合
    collection = Collection(collection_name, schema)
    print(f"集合 {collection_name} 创建成功")

注意事项:

  • 向量字段需指定 dim 维度。
  • 主键字段建议用 INT64。
  • 集合名需唯一。

3.3 创建索引 #

向量字段需创建索引以加速检索。常用类型有 IVF_FLAT、HNSW、IVF_PQ 等。

nlist nprobe

# 导入 pymilvus 集合和连接模块
from pymilvus import Collection, connections

# 连接到 Milvus 数据库,指定主机、端口和数据库名称
connections.connect("default", host="localhost", port="19530", db_name="rensheng")
# 获取名为 "example" 的集合对象
collection = Collection("example")

# 定义索引参数配置
index_params = {
    "metric_type": "L2",  # 距离度量方式:L2、IP、COSINE
    "index_type": "IVF_FLAT",  # 索引类型 Inverted File with Flat(倒排文件+暴力遍历)
    "params": {"nlist": 128},  # nlist 越大召回率越高,速度越慢
}

# 为向量字段创建索引
collection.create_index("embedding", index_params)
# 打印索引创建成功信息
print("索引创建成功")

常见问题:

  • 未建索引时检索效率低。
  • nlist 参数需根据数据量调整。

3.4 加载集合到内存 #

检索前必须先将集合 load 到内存。

# 导入 pymilvus 集合和连接模块
from pymilvus import Collection, connections

# 连接到 Milvus 数据库,指定主机、端口和数据库名称
connections.connect("default", host="localhost", port="19530", db_name="rensheng")
# 获取名为 "example" 的集合对象
collection = Collection("example")

# 将集合加载到内存中以支持搜索操作
collection.load()
# 打印集合加载成功信息
print("集合已加载到内存")

注意事项:

  • 未 load 时无法检索。
  • 数据量大时 load 可能较慢。

3.5 插入数据 #

插入前需准备好与 schema 对应的数据。

# 导入 Milvus 数据库连接和集合操作所需的模块
from pymilvus import Collection, connections

# 导入随机数生成模块
import random

# 连接到 Milvus 数据库,指定主机、端口和数据库名称
connections.connect("default", host="localhost", port="19530", db_name="rensheng")
# 获取名为 "example" 的集合对象
collection = Collection("example")

# 生成随机向量数据
# 创建 1000 个向量,每个向量包含 128 个随机浮点数
vectors = [[random.random() for _ in range(128)] for _ in range(1000)]

# 只插入向量字段(主键 auto_id=True 可省略)
# 将向量数据组织成插入格式
data = [vectors]

# 插入数据
# 将数据插入到集合中,并获取插入结果
mr = collection.insert(data)
# 打印实际插入的向量数量
print("插入向量数量:", mr.insert_count)

注意事项:

  • 数据格式为 List[List[float]],每个子列表为一个向量。
  • 插入后建议 collection.flush() 保证数据落盘。

3.5.1 insert #

在 Milvus 的 pymilvus 客户端中,collection.insert(data) 的数据格式有两种常见写法:

列表格式(List of Columns)

data = [vectors, categories]
collection.insert(data)
  • data 是一个列表,列表中的每一项对应一个字段(列),顺序要和 schema 中字段定义的顺序一致(不包括 auto_id 的主键字段)。
  • 每个字段的数据都是一个长度相同的列表(即“列式”存储)。

在这个的 schema 里,字段顺序是:

  1. id(auto_id=True,插入时不用提供)
  2. embedding
  3. category

所以只需要提供 embedding 和 category 两列的数据

字典格式(List of Dicts)

data = [
    {"embedding": [0.1, 0.2, ...], "category": 2},
    {"embedding": [0.3, 0.4, ...], "category": 3},
    # ...
]
collection.insert(data)

这种写法每一行为一个字典,字段名和 schema 对应,适合“行式”插入。

注意事项:

  • 列表格式不需要指定字段名,但顺序必须和 schema 一致(auto_id 字段不用管)。
  • 字典格式需要指定字段名,顺序无关。

3.6 向量搜索 #

支持多种距离度量(L2、IP、COSINE),可指定 topK、nprobe 等参数。

# 导入 pymilvus 集合和连接模块
from pymilvus import Collection, connections

# 导入随机数生成模块
import random

# 连接到 Milvus 数据库,指定主机、端口和数据库名称
connections.connect("default", host="localhost", port="19530", db_name="rensheng")
# 获取名为 "example" 的集合对象
collection = Collection("example")

# 定义搜索参数配置
search_params = {"metric_type": "L2", "params": {"nprobe": 10}}

# 生成一个查询向量
query_vector = [random.random() for _ in range(128)]

# 执行向量搜索操作
results = collection.search(
    data=[query_vector], anns_field="embedding", param=search_params, limit=5
)

# 遍历并打印搜索结果
for hits in results:
    for hit in hits:
        print(f"id: {hit.id}, distance: {hit.distance}")

参数说明:

  • anns_field:向量字段名
  • param:索引参数(如 nprobe)
  • limit:返回 topK 个结果

常见问题:

  • 未 load 集合会报错。
  • 查询向量维度需与 schema 一致。

3.7 删除集合 #

删除集合会清空所有数据,谨慎操作。

# 导入 pymilvus 集合和连接模块
from pymilvus import Collection, connections

# 连接到 Milvus 数据库,指定主机、端口和数据库名称
connections.connect("default", host="localhost", port="19530", db_name="rensheng")
# 获取名为 "example" 的集合对象
collection = Collection("example")

# 删除集合
collection.drop()
# 打印集合删除成功信息
print("集合已删除")

4. 高级特性 #

4.1 分区管理 #

分区可用于数据分组、分层管理。

# 导入 pymilvus 模块中的 Collection 类和 connections 连接模块
from pymilvus import Collection, connections
# 导入随机数生成模块,用于生成向量数据
import random

# 连接到 Milvus 数据库,指定别名、主机地址、端口号和数据库名称
connections.connect("default", host="localhost", port="19530", db_name="rensheng")
# 获取名为 "example" 的集合对象,后续操作都基于此集合
collection = Collection("example")

# 定义要创建的分区名称
partition_name = "partition_2"
# 判断集合中是否已经存在该分区,如果不存在则创建,否则提示已存在
if partition_name not in [p.name for p in collection.partitions]:
    collection.create_partition(partition_name)
    print(f"分区 '{partition_name}' 创建成功")
else:
    print(f"分区 '{partition_name}' 已存在")

# 生成 10 个随机的 128 维向量,假设集合的 schema 中 embedding 字段为 128 维
vectors = [[random.random() for _ in range(128)] for _ in range(10)]
# 组织插入数据的格式,按 schema 字段顺序构造 data 列表
data = [vectors]

# 向指定分区插入数据,返回插入结果
insert_result = collection.insert(data, partition_name=partition_name)
# 打印插入数据的条数
print(f"已向分区 '{partition_name}' 插入 {insert_result.insert_count} 条数据")
# 将内存中的数据写入磁盘,保证持久化
collection.flush()
# 加载指定分区的数据到内存,提升后续搜索效率
collection.load(partition_names=[partition_name])
# 打印分区已加载到内存的提示
print(f"分区 '{partition_name}' 已加载到内存")
# 设置搜索参数:距离类型为 L2(欧式距离),nprobe 用于控制扫描的范围
search_params = {"metric_type": "L2", "params": {"nprobe": 10}}

# 生成一个随机的 128 维查询向量
query_vector = [random.random() for _ in range(128)]

# 在指定的分区中执行向量搜索,搜寻最相近的前5个向量
results = collection.search(
    data=[query_vector],
    anns_field="embedding",
    param=search_params,
    limit=5,
    partition_names=[partition_name],  # 指定搜索的范围仅限该分区
)
# 打印原始的搜索结果
print(results)
# 遍历每条搜索结果,逐条打印其 id 和 距离
for hits in results:
    for hit in hits:
        print(f"id: {hit.id}, distance: {hit.distance}")

# 打印当前集合中所有分区的名称
print("当前集合所有分区:", [p.name for p in collection.partitions])
# 先从内存释放
collection.release(partition_names=[partition_name])  
# 如果需要删除分区(危险操作,一般不用),可以取消下面的注释执行删除
collection.drop_partition(partition_name)
print(f"分区 '{partition_name}' 已删除")
print("当前集合所有分区:", [p.name for p in collection.partitions])

注意事项:

  • 分区名需唯一。
  • 插入/检索时指定 partition_name/partition_names。

4.2 混合搜索(向量+标量过滤) #

支持带标量字段的混合检索,可用 expr 过滤。

# 导入 pymilvus 的集合结构描述、字段描述、数据类型、集合和连接相关模块
from pymilvus import CollectionSchema, FieldSchema, DataType, Collection, connections

# 导入 random 模块用于生成随机数据
import random

# 建立与 Milvus 数据库的连接,指定别名、主机、端口和数据库名称
connections.connect("default", host="localhost", port="19530", db_name="rensheng")

# 定义集合的字段列表,包括主键字段、向量字段和分类字段
fields = [
    # 创建主键字段,类型为 64 位整型,并且设置为自动生成ID
    FieldSchema(name="id", dtype=DataType.INT64, is_primary=True, auto_id=True),
    # 创建向量字段,类型为 128 维的浮点向量
    FieldSchema(name="embedding", dtype=DataType.FLOAT_VECTOR, dim=128),
    # 创建分类字段,类型为 64 位整型
    FieldSchema(name="category", dtype=DataType.INT64),
]

# 创建集合的 Schema(结构)对象
schema = CollectionSchema(fields)

# 基于 schema 创建名为 "mixed_collection" 的集合
collection = Collection("mixed_collection", schema)

# 生成 100 个 128 维的随机向量
vectors = [[random.random() for _ in range(128)] for _ in range(100)]

# 生成 100 个范围在 1~5 之间的随机分类标签
categories = [random.randint(1, 5) for _ in range(100)]

# 组织插入数据,包含向量数据和对应的分类标签
data = [vectors, categories]

# 向集合中插入数据
collection.insert(data)

# 构建索引参数,设置距离类型为 L2,索引类型为 IVF_FLAT,聚类数 nlist 为 64
index_params = {"metric_type": "L2", "index_type": "IVF_FLAT", "params": {"nlist": 64}}

# 为 'embedding' 字段创建索引
collection.create_index("embedding", index_params)

# 将集合数据加载到内存,准备进行检索
collection.load()

# 生成一个 128 维的随机向量用作查询
query_vector = [random.random() for _ in range(128)]

# 设置搜索参数,距离类型为 L2,nprobe 设置为 10
search_params = {"metric_type": "L2", "params": {"nprobe": 10}}

# 在集合内进行带有过滤条件的向量检索
results = collection.search(
    # 检索的数据为 query_vector 组成的列表
    data=[query_vector],
    # 检索的向量字段名为 'embedding'
    anns_field="embedding",
    # 指定检索参数
    param=search_params,
    # 最多返回 5 条相似度最高的结果
    limit=5,
    # 只检索分类字段 category 为 2、3、4 的数据
    expr="category in [2, 3, 4]",
)

# 遍历检索到的所有结果
for hits in results:
    # 遍历每一条命中结果
    for hit in hits:
        # 打印命中结果的 ID 和距离
        print(f"id: {hit.id}, distance: {hit.distance}")

5. 最佳实践 #

  1. 批量插入:尽量批量插入数据而不是单条插入,以提高性能。
  2. 合理设置索引参数:根据数据量和查询需求调整索引参数(如 nlist、nprobe)。
  3. 预加载集合:在查询前加载集合到内存。
  4. 使用连接池:在高并发场景下使用连接池管理连接。
  5. 监控性能:定期监控查询性能和资源使用情况。
  6. 异常处理:所有操作建议加 try/except 捕获异常,便于排查问题。

6. IVF_FLAT #

  • 索引类型
  • 倒排索引
  • KMeans
  • 轮盘赌
  • nlistnprobe
  • point
# 导入默认字典 defaultdict,便于自动初始化桶结构
from collections import defaultdict

# 导入数学库,提供 sqrt、pow 等基础数学函数
import math

# 导入随机库,支持随机抽样与随机数生成
import random


# 计算两个向量之间的欧氏距离(L2 距离)
def l2_distance(a, b):
    # zip 将两个向量按维度配对,平方差求和后开平方
    return math.sqrt(sum((x - y) ** 2 for x, y in zip(a, b)))


# K-Means++ 初始化:挑选 k 个初始中心,并做一次分配+更新
def kmeans(vectors, k):
    # 第一个中心直接从所有样本中随机挑选
    centers = [random.choice(vectors)]

    # 解释:下面这段 while 是 K-Means++ 的“轮盘赌”抽样逻辑
    # 步骤:计算每个点到最近中心的距离平方 -> 得到概率权重 -> 依概率抽样新的中心

    # 当中心数量不足 k 个时,持续抽样新的中心
    while len(centers) < k:
        # 第一步:每个数据点距离最近中心的平方,作为该点的权重
        dists = [min(l2_distance(v, c) ** 2 for c in centers) for v in vectors]

        # 第二步:对所有权重求和,得到轮盘赌的总长度
        total = sum(dists)

        # 第三步:在 [0, total) 区间上随机抽一个阈值 r,相当于轮盘指针
        r = random.random() * total

        # prefix 累加当前遍历到的权重,用于定位 r 落在哪个区间
        prefix = 0.0

        # 遍历所有点和对应权重,寻找累积首次超过 r 的那个点
        for v, d in zip(vectors, dists):
            # 不断累加权重,等价于在“条带”上向前走
            prefix += d
            # 一旦累计值覆盖了随机阈值 r,就把该点选为新中心
            if prefix >= r:
                centers.append(v)
                break

    # 初始化一个“簇字典”,key 是中心索引,value 是该中心下的样本列表
    clusters = {i: [] for i in range(k)}

    # 遍历所有向量,找到距离最近的中心,将该向量归入对应簇
    for v in vectors:
        idx = min(range(k), key=lambda i: l2_distance(v, centers[i]))
        clusters[idx].append(v)

    # 对每个非空簇计算均值,作为中心的最新位置
    for i, points in clusters.items():
        if points:
            # zip(*points) 将点按维度拆开,sum(dim)/len(points) 是维度均值
            centers[i] = [sum(dim) / len(points) for dim in zip(*points)]

    # 返回完成一次“初始化+更新”的中心列表
    return centers


# 构建 IVF-Flat 索引:输出中心列表和倒排桶
def build_ivf_flat(vectors, nlist):
    # 先通过 K-Means++ 得到 nlist 个中心
    centers = kmeans(vectors, nlist)

    # defaultdict(list) 便于向桶里 append,无需检查 key 是否存在
    buckets = defaultdict(list)

    # 遍历所有向量,依据最近中心编号将其放入对应桶
    for vec in vectors:
        idx = min(range(nlist), key=lambda i: l2_distance(vec, centers[i]))
        buckets[idx].append(vec)

    # 返回构建好的中心列表与倒排结构
    return centers, buckets


# 在 IVF-Flat 索引上执行查询,近似返回 topk 个最近向量
def search_ivf_flat(query, centers, buckets, nprobe, topk):
    # 先按查询向量到各中心的距离排序,得到“探查顺序”
    probe_order = sorted(range(len(centers)), key=lambda i: l2_distance(query, centers[i]))

    # 用列表保存候选 (距离, 向量) 对
    candidate_pairs = []

    # 只深入距离最近的 nprobe 个中心对应的桶
    for idx in probe_order[:nprobe]:
        for vec in buckets[idx]:
            candidate_pairs.append((l2_distance(query, vec), vec))

    # 候选列表按距离升序排列,便于截取前 topk 个
    candidate_pairs.sort(key=lambda x: x[0])

    # 返回最接近查询的 topk 个候选
    return candidate_pairs[:topk]


# 主程序:演示 IVF-Flat 的构建与查询流程
def main():
    # 设定随机种子,保证示例结果可复现
    random.seed(0)

    # 构造 8 个二维向量作为“数据库”
    database = [
        [0.1, 0.2],
        [0.15, 0.18],
        [0.8, 0.85],
        [0.82, 0.79],
        [0.4, 0.4],
        [0.42, 0.45],
        [0.9, 0.1],
        [0.88, 0.15],
    ]

    # 查询向量:我们要找与它相似的数据库向量
    query = [0.12, 0.22]

    # nlist 表示建索引时划分的中心(簇)个数
    nlist = 4

    # nprobe 表示查询时会探查多少个最近的中心
    nprobe = 2

    # 构建 IVF-Flat 索引,得到中心与倒排桶
    centers, buckets = build_ivf_flat(database, nlist=nlist)

    # 执行检索,返回距离最近的 top3 个结果
    results = search_ivf_flat(query, centers, buckets, nprobe=nprobe, topk=3)

    # 打印本次检索的参数设置
    print(f"nlist={nlist}, nprobe={nprobe}")

    # 打印检索到的候选向量及其距离
    print("最近向量:")
    for dist, vec in results:
        print(f"向量={vec}, 距离={dist:.4f}")


# Python 入口:若当前脚本以 main 方式执行,则运行示例
if __name__ == "__main__":
    main()

7.参考 #

  • 向量嵌入
  • 向量数据库
  • 分词器
  • 启动匹配选项
  • Milvus分区
  • 分区键
  • 数据段
  • Nullable选项
  • 一致性
  • 动态Schema
  • 集合状态
  • 副本
  • radius
  • range filter
  • topK
  • 分组
  • 过滤表达式
  • 探索
  • 属性
  • 索引类型
  • 倒排索引
  • KMeans
  • 轮盘赌
  • nlistnprobe
  • point

访问验证

请输入访问令牌

Token不正确,请重新输入