1. 基本概念 #
PyMilvus 是 Milvus 向量数据库的 Python SDK,它允许开发者通过 Python 接口与 Milvus 进行交互,实现高效的向量相似度搜索和管理。 PyMilvus 提供了以下核心功能:
- 集合(Collection)管理
- 向量插入和删除
- 向量相似度搜索
- 分区(Partition)管理
- 索引(Index)管理
2. 安装 PyMilvus #
建议使用 Python 3.7 及以上版本。
uv add pymilvus3. 基本使用 #
3.1 连接 Milvus #
连接前请确保 Milvus 服务端已启动,默认端口为 19530。
# 导入 pymilvus 连接模块
from pymilvus import connections
# 连接到 Milvus 服务器(本地或远程)
# 使用默认连接别名,连接到本地主机的19530端口,数据库名为rensheng
connections.connect("default", host="localhost", port="19530", db_name="rensheng")
# 检查连接状态
# 验证名为"default"的连接是否存在,返回True表示连接成功
print(connections.has_connection("default")) # True 表示连接成功
常见问题:
- 端口不通或服务未启动会连接失败。
- host/port 填写需与 Milvus 服务端一致。
3.2 创建集合(Collection) #
集合类似于数据库中的"表",需先定义字段(Field)和集合模式(Schema)。
# 导入 pymilvus 相关模块:集合模式、字段模式、数据类型、集合、连接
from pymilvus import CollectionSchema, FieldSchema, DataType, Collection, connections
# 连接到 Milvus 服务器,使用默认连接别名,连接到本地主机的19530端口,数据库名为rensheng
connections.connect("default", host="localhost", port="19530", db_name="rensheng")
# 定义集合的字段结构,包含主键ID字段和向量字段
fields = [
# 定义主键字段:64位整数类型,自动生成ID
FieldSchema(name="id", dtype=DataType.INT64, is_primary=True, auto_id=True),
# 定义向量字段:128维浮点向量
FieldSchema(name="embedding", dtype=DataType.FLOAT_VECTOR, dim=128),
]
# 创建集合模式对象,包含字段定义和描述信息
schema = CollectionSchema(fields, description="example")
# 检查集合是否存在,不存在才创建
collection_name = "example"
try:
# 尝试获取已存在的集合
collection = Collection(collection_name)
print(f"集合 {collection_name} 已存在")
except Exception:
# 集合不存在,创建新集合
collection = Collection(collection_name, schema)
print(f"集合 {collection_name} 创建成功")
注意事项:
- 向量字段需指定
dim维度。 - 主键字段建议用 INT64。
- 集合名需唯一。
3.3 创建索引 #
向量字段需创建索引以加速检索。常用类型有 IVF_FLAT、HNSW、IVF_PQ 等。
# 导入 pymilvus 集合和连接模块
from pymilvus import Collection, connections
# 连接到 Milvus 数据库,指定主机、端口和数据库名称
connections.connect("default", host="localhost", port="19530", db_name="rensheng")
# 获取名为 "example" 的集合对象
collection = Collection("example")
# 定义索引参数配置
index_params = {
"metric_type": "L2", # 距离度量方式:L2、IP、COSINE
"index_type": "IVF_FLAT", # 索引类型 Inverted File with Flat(倒排文件+暴力遍历)
"params": {"nlist": 128}, # nlist 越大召回率越高,速度越慢
}
# 为向量字段创建索引
collection.create_index("embedding", index_params)
# 打印索引创建成功信息
print("索引创建成功")
常见问题:
- 未建索引时检索效率低。
- nlist 参数需根据数据量调整。
3.4 加载集合到内存 #
检索前必须先将集合 load 到内存。
# 导入 pymilvus 集合和连接模块
from pymilvus import Collection, connections
# 连接到 Milvus 数据库,指定主机、端口和数据库名称
connections.connect("default", host="localhost", port="19530", db_name="rensheng")
# 获取名为 "example" 的集合对象
collection = Collection("example")
# 将集合加载到内存中以支持搜索操作
collection.load()
# 打印集合加载成功信息
print("集合已加载到内存")
注意事项:
- 未 load 时无法检索。
- 数据量大时 load 可能较慢。
3.5 插入数据 #
插入前需准备好与 schema 对应的数据。
# 导入 Milvus 数据库连接和集合操作所需的模块
from pymilvus import Collection, connections
# 导入随机数生成模块
import random
# 连接到 Milvus 数据库,指定主机、端口和数据库名称
connections.connect("default", host="localhost", port="19530", db_name="rensheng")
# 获取名为 "example" 的集合对象
collection = Collection("example")
# 生成随机向量数据
# 创建 1000 个向量,每个向量包含 128 个随机浮点数
vectors = [[random.random() for _ in range(128)] for _ in range(1000)]
# 只插入向量字段(主键 auto_id=True 可省略)
# 将向量数据组织成插入格式
data = [vectors]
# 插入数据
# 将数据插入到集合中,并获取插入结果
mr = collection.insert(data)
# 打印实际插入的向量数量
print("插入向量数量:", mr.insert_count)
注意事项:
- 数据格式为 List[List[float]],每个子列表为一个向量。
- 插入后建议
collection.flush()保证数据落盘。
3.5.1 insert #
在 Milvus 的 pymilvus 客户端中,collection.insert(data) 的数据格式有两种常见写法:
列表格式(List of Columns)
data = [vectors, categories]
collection.insert(data)data是一个列表,列表中的每一项对应一个字段(列),顺序要和 schema 中字段定义的顺序一致(不包括 auto_id 的主键字段)。- 每个字段的数据都是一个长度相同的列表(即“列式”存储)。
在这个的 schema 里,字段顺序是:
- id(auto_id=True,插入时不用提供)
- embedding
- category
所以只需要提供 embedding 和 category 两列的数据
字典格式(List of Dicts)
data = [
{"embedding": [0.1, 0.2, ...], "category": 2},
{"embedding": [0.3, 0.4, ...], "category": 3},
# ...
]
collection.insert(data)这种写法每一行为一个字典,字段名和 schema 对应,适合“行式”插入。
注意事项:
- 列表格式不需要指定字段名,但顺序必须和 schema 一致(auto_id 字段不用管)。
- 字典格式需要指定字段名,顺序无关。
3.6 向量搜索 #
支持多种距离度量(L2、IP、COSINE),可指定 topK、nprobe 等参数。
# 导入 pymilvus 集合和连接模块
from pymilvus import Collection, connections
# 导入随机数生成模块
import random
# 连接到 Milvus 数据库,指定主机、端口和数据库名称
connections.connect("default", host="localhost", port="19530", db_name="rensheng")
# 获取名为 "example" 的集合对象
collection = Collection("example")
# 定义搜索参数配置
search_params = {"metric_type": "L2", "params": {"nprobe": 10}}
# 生成一个查询向量
query_vector = [random.random() for _ in range(128)]
# 执行向量搜索操作
results = collection.search(
data=[query_vector], anns_field="embedding", param=search_params, limit=5
)
# 遍历并打印搜索结果
for hits in results:
for hit in hits:
print(f"id: {hit.id}, distance: {hit.distance}")参数说明:
anns_field:向量字段名param:索引参数(如 nprobe)limit:返回 topK 个结果
常见问题:
- 未 load 集合会报错。
- 查询向量维度需与 schema 一致。
3.7 删除集合 #
删除集合会清空所有数据,谨慎操作。
# 导入 pymilvus 集合和连接模块
from pymilvus import Collection, connections
# 连接到 Milvus 数据库,指定主机、端口和数据库名称
connections.connect("default", host="localhost", port="19530", db_name="rensheng")
# 获取名为 "example" 的集合对象
collection = Collection("example")
# 删除集合
collection.drop()
# 打印集合删除成功信息
print("集合已删除")
4. 高级特性 #
4.1 分区管理 #
分区可用于数据分组、分层管理。
# 导入 pymilvus 模块中的 Collection 类和 connections 连接模块
from pymilvus import Collection, connections
# 导入随机数生成模块,用于生成向量数据
import random
# 连接到 Milvus 数据库,指定别名、主机地址、端口号和数据库名称
connections.connect("default", host="localhost", port="19530", db_name="rensheng")
# 获取名为 "example" 的集合对象,后续操作都基于此集合
collection = Collection("example")
# 定义要创建的分区名称
partition_name = "partition_2"
# 判断集合中是否已经存在该分区,如果不存在则创建,否则提示已存在
if partition_name not in [p.name for p in collection.partitions]:
collection.create_partition(partition_name)
print(f"分区 '{partition_name}' 创建成功")
else:
print(f"分区 '{partition_name}' 已存在")
# 生成 10 个随机的 128 维向量,假设集合的 schema 中 embedding 字段为 128 维
vectors = [[random.random() for _ in range(128)] for _ in range(10)]
# 组织插入数据的格式,按 schema 字段顺序构造 data 列表
data = [vectors]
# 向指定分区插入数据,返回插入结果
insert_result = collection.insert(data, partition_name=partition_name)
# 打印插入数据的条数
print(f"已向分区 '{partition_name}' 插入 {insert_result.insert_count} 条数据")
# 将内存中的数据写入磁盘,保证持久化
collection.flush()
# 加载指定分区的数据到内存,提升后续搜索效率
collection.load(partition_names=[partition_name])
# 打印分区已加载到内存的提示
print(f"分区 '{partition_name}' 已加载到内存")
# 设置搜索参数:距离类型为 L2(欧式距离),nprobe 用于控制扫描的范围
search_params = {"metric_type": "L2", "params": {"nprobe": 10}}
# 生成一个随机的 128 维查询向量
query_vector = [random.random() for _ in range(128)]
# 在指定的分区中执行向量搜索,搜寻最相近的前5个向量
results = collection.search(
data=[query_vector],
anns_field="embedding",
param=search_params,
limit=5,
partition_names=[partition_name], # 指定搜索的范围仅限该分区
)
# 打印原始的搜索结果
print(results)
# 遍历每条搜索结果,逐条打印其 id 和 距离
for hits in results:
for hit in hits:
print(f"id: {hit.id}, distance: {hit.distance}")
# 打印当前集合中所有分区的名称
print("当前集合所有分区:", [p.name for p in collection.partitions])
# 先从内存释放
collection.release(partition_names=[partition_name])
# 如果需要删除分区(危险操作,一般不用),可以取消下面的注释执行删除
collection.drop_partition(partition_name)
print(f"分区 '{partition_name}' 已删除")
print("当前集合所有分区:", [p.name for p in collection.partitions])
注意事项:
- 分区名需唯一。
- 插入/检索时指定 partition_name/partition_names。
4.2 混合搜索(向量+标量过滤) #
支持带标量字段的混合检索,可用 expr 过滤。
# 导入 pymilvus 的集合结构描述、字段描述、数据类型、集合和连接相关模块
from pymilvus import CollectionSchema, FieldSchema, DataType, Collection, connections
# 导入 random 模块用于生成随机数据
import random
# 建立与 Milvus 数据库的连接,指定别名、主机、端口和数据库名称
connections.connect("default", host="localhost", port="19530", db_name="rensheng")
# 定义集合的字段列表,包括主键字段、向量字段和分类字段
fields = [
# 创建主键字段,类型为 64 位整型,并且设置为自动生成ID
FieldSchema(name="id", dtype=DataType.INT64, is_primary=True, auto_id=True),
# 创建向量字段,类型为 128 维的浮点向量
FieldSchema(name="embedding", dtype=DataType.FLOAT_VECTOR, dim=128),
# 创建分类字段,类型为 64 位整型
FieldSchema(name="category", dtype=DataType.INT64),
]
# 创建集合的 Schema(结构)对象
schema = CollectionSchema(fields)
# 基于 schema 创建名为 "mixed_collection" 的集合
collection = Collection("mixed_collection", schema)
# 生成 100 个 128 维的随机向量
vectors = [[random.random() for _ in range(128)] for _ in range(100)]
# 生成 100 个范围在 1~5 之间的随机分类标签
categories = [random.randint(1, 5) for _ in range(100)]
# 组织插入数据,包含向量数据和对应的分类标签
data = [vectors, categories]
# 向集合中插入数据
collection.insert(data)
# 构建索引参数,设置距离类型为 L2,索引类型为 IVF_FLAT,聚类数 nlist 为 64
index_params = {"metric_type": "L2", "index_type": "IVF_FLAT", "params": {"nlist": 64}}
# 为 'embedding' 字段创建索引
collection.create_index("embedding", index_params)
# 将集合数据加载到内存,准备进行检索
collection.load()
# 生成一个 128 维的随机向量用作查询
query_vector = [random.random() for _ in range(128)]
# 设置搜索参数,距离类型为 L2,nprobe 设置为 10
search_params = {"metric_type": "L2", "params": {"nprobe": 10}}
# 在集合内进行带有过滤条件的向量检索
results = collection.search(
# 检索的数据为 query_vector 组成的列表
data=[query_vector],
# 检索的向量字段名为 'embedding'
anns_field="embedding",
# 指定检索参数
param=search_params,
# 最多返回 5 条相似度最高的结果
limit=5,
# 只检索分类字段 category 为 2、3、4 的数据
expr="category in [2, 3, 4]",
)
# 遍历检索到的所有结果
for hits in results:
# 遍历每一条命中结果
for hit in hits:
# 打印命中结果的 ID 和距离
print(f"id: {hit.id}, distance: {hit.distance}")5. 最佳实践 #
- 批量插入:尽量批量插入数据而不是单条插入,以提高性能。
- 合理设置索引参数:根据数据量和查询需求调整索引参数(如 nlist、nprobe)。
- 预加载集合:在查询前加载集合到内存。
- 使用连接池:在高并发场景下使用连接池管理连接。
- 监控性能:定期监控查询性能和资源使用情况。
- 异常处理:所有操作建议加 try/except 捕获异常,便于排查问题。
6. IVF_FLAT #
# 导入默认字典 defaultdict,便于自动初始化桶结构
from collections import defaultdict
# 导入数学库,提供 sqrt、pow 等基础数学函数
import math
# 导入随机库,支持随机抽样与随机数生成
import random
# 计算两个向量之间的欧氏距离(L2 距离)
def l2_distance(a, b):
# zip 将两个向量按维度配对,平方差求和后开平方
return math.sqrt(sum((x - y) ** 2 for x, y in zip(a, b)))
# K-Means++ 初始化:挑选 k 个初始中心,并做一次分配+更新
def kmeans(vectors, k):
# 第一个中心直接从所有样本中随机挑选
centers = [random.choice(vectors)]
# 解释:下面这段 while 是 K-Means++ 的“轮盘赌”抽样逻辑
# 步骤:计算每个点到最近中心的距离平方 -> 得到概率权重 -> 依概率抽样新的中心
# 当中心数量不足 k 个时,持续抽样新的中心
while len(centers) < k:
# 第一步:每个数据点距离最近中心的平方,作为该点的权重
dists = [min(l2_distance(v, c) ** 2 for c in centers) for v in vectors]
# 第二步:对所有权重求和,得到轮盘赌的总长度
total = sum(dists)
# 第三步:在 [0, total) 区间上随机抽一个阈值 r,相当于轮盘指针
r = random.random() * total
# prefix 累加当前遍历到的权重,用于定位 r 落在哪个区间
prefix = 0.0
# 遍历所有点和对应权重,寻找累积首次超过 r 的那个点
for v, d in zip(vectors, dists):
# 不断累加权重,等价于在“条带”上向前走
prefix += d
# 一旦累计值覆盖了随机阈值 r,就把该点选为新中心
if prefix >= r:
centers.append(v)
break
# 初始化一个“簇字典”,key 是中心索引,value 是该中心下的样本列表
clusters = {i: [] for i in range(k)}
# 遍历所有向量,找到距离最近的中心,将该向量归入对应簇
for v in vectors:
idx = min(range(k), key=lambda i: l2_distance(v, centers[i]))
clusters[idx].append(v)
# 对每个非空簇计算均值,作为中心的最新位置
for i, points in clusters.items():
if points:
# zip(*points) 将点按维度拆开,sum(dim)/len(points) 是维度均值
centers[i] = [sum(dim) / len(points) for dim in zip(*points)]
# 返回完成一次“初始化+更新”的中心列表
return centers
# 构建 IVF-Flat 索引:输出中心列表和倒排桶
def build_ivf_flat(vectors, nlist):
# 先通过 K-Means++ 得到 nlist 个中心
centers = kmeans(vectors, nlist)
# defaultdict(list) 便于向桶里 append,无需检查 key 是否存在
buckets = defaultdict(list)
# 遍历所有向量,依据最近中心编号将其放入对应桶
for vec in vectors:
idx = min(range(nlist), key=lambda i: l2_distance(vec, centers[i]))
buckets[idx].append(vec)
# 返回构建好的中心列表与倒排结构
return centers, buckets
# 在 IVF-Flat 索引上执行查询,近似返回 topk 个最近向量
def search_ivf_flat(query, centers, buckets, nprobe, topk):
# 先按查询向量到各中心的距离排序,得到“探查顺序”
probe_order = sorted(range(len(centers)), key=lambda i: l2_distance(query, centers[i]))
# 用列表保存候选 (距离, 向量) 对
candidate_pairs = []
# 只深入距离最近的 nprobe 个中心对应的桶
for idx in probe_order[:nprobe]:
for vec in buckets[idx]:
candidate_pairs.append((l2_distance(query, vec), vec))
# 候选列表按距离升序排列,便于截取前 topk 个
candidate_pairs.sort(key=lambda x: x[0])
# 返回最接近查询的 topk 个候选
return candidate_pairs[:topk]
# 主程序:演示 IVF-Flat 的构建与查询流程
def main():
# 设定随机种子,保证示例结果可复现
random.seed(0)
# 构造 8 个二维向量作为“数据库”
database = [
[0.1, 0.2],
[0.15, 0.18],
[0.8, 0.85],
[0.82, 0.79],
[0.4, 0.4],
[0.42, 0.45],
[0.9, 0.1],
[0.88, 0.15],
]
# 查询向量:我们要找与它相似的数据库向量
query = [0.12, 0.22]
# nlist 表示建索引时划分的中心(簇)个数
nlist = 4
# nprobe 表示查询时会探查多少个最近的中心
nprobe = 2
# 构建 IVF-Flat 索引,得到中心与倒排桶
centers, buckets = build_ivf_flat(database, nlist=nlist)
# 执行检索,返回距离最近的 top3 个结果
results = search_ivf_flat(query, centers, buckets, nprobe=nprobe, topk=3)
# 打印本次检索的参数设置
print(f"nlist={nlist}, nprobe={nprobe}")
# 打印检索到的候选向量及其距离
print("最近向量:")
for dist, vec in results:
print(f"向量={vec}, 距离={dist:.4f}")
# Python 入口:若当前脚本以 main 方式执行,则运行示例
if __name__ == "__main__":
main()