导航菜单

  • 1.vector
  • 2.milvus
  • 3.pymilvus
  • 4.rag
  • 5.rag_measure
  • 7.search
  • ragflow
  • heapq
  • HNSW
  • cosine_similarity
  • math
  • typing
  • etcd
  • minio
  • collections
  • jieba
  • random
  • beautifulsoup4
  • chromadb
  • sentence_transformers
  • numpy
  • lxml
  • openpyxl
  • PyMuPDF
  • python-docx
  • requests
  • python-pptx
  • text_splitter
  • all-MiniLM-L6-v2
  • openai
  • llm
  • BPETokenizer
  • Flask
  • RAGAS
  • BagofWords
  • langchain
  • Pydantic
  • abc
  • faiss
  • MMR
  • scikit-learn
  • 1. Milvus简介
    • 1.1 Milvus是什么?
    • 1.2 为什么Milvus这么流行?
    • 1.3 Milvus能做什么?
    • 1.4 Milvus是如何工作的?
  • 2. Milvus的安装
    • 2.1 下载docker-compose.yml文件
      • 2.1.1 Windows
      • 2.1.2 MAC 或 Linux
    • 2.2 docker-compose.yml文件
    • 2.3 配置docker镜像
  • 3. Milvus的图形界面
    • 3.1 Attu能做什么?
    • 3.2 如何使用Attu?
  • 4. 使用Attu连接Milvus
  • 5. 创建数据库
    • 5.1 创建集合
    • 5.2 分词匹配
      • 5.2.1 分词器 (Standard)
      • 5.2.2 启用匹配
      • 5.2.3 分区键
      • 5.2.4.代码演示
    • 5.3 Nullable
    • 5.4 一致性 (Bounded)
      • 5.4.1 选项
        • 5.4.1.1 Strong(强一致性)
        • 5.4.1.2 Session(会话一致性)
        • 5.4.1.3 Bounded(有界一致性)
        • 5.4.1.4 Eventually(最终一致性)
      • 5.4.2 代码演示
      • 5.4.5 如何选择?
  • 6. schema
    • 6.1 界面
    • 6.2 副本
    • 6.3 状态
      • 6.3.1 为什么需要加载集合?
      • 6.3.2 Milvus集合生命周期管理
  • 7. 创建索引
    • 7.1 什么是索引?
    • 7.2 索引创建界面
    • 7.3 度量类型
    • 7.4 归一化
    • 7.5 加载集合
  • 8. 加载数据
  • 9. 向量搜索
    • 9.1 什么是向量搜索?
    • 9.2 界面左侧(搜索条件)
    • 9.3 界面右侧(搜索结果)
    • 9.4 搜索参数
      • 9.4.1 Level(搜索精度级别)
      • 9.4.2 Radius(搜索半径)
      • 9.4.3 Range Filter(范围过滤器)
      • 9.4.4 代码演示
      • 9.4.4 分区过滤
  • 10. 数据段
    • 10.1 什么是数据段?
    • 10.2 什么是查询节点ID
      • 10.2.1 查询节点
      • 10.2.2 含义
      • 10.2.3 类比
  • 11. 属性
    • 11.1 什么是集合属性?
    • 11.2 界面中的属性列表
  • 12. 操作milvus数据库
    • 12.1 连接数据库
    • 12.2 Schema管理
    • 12.4 索引管理
      • 12.5 集合管理
    • 12.6 数据管理
    • 12.7 搜索
    • 12.8 获取文本的向量嵌入
    • 12.9 保存数据到milvus
    • 12.10 图片向量
  • 13.APIKEY
    • 13.1 配置官方API Key
    • 13.2 配置本地部署的DeepSeek R1
    • 13.3 配置硅基流动的API Key
    • 13.4 配置派欧算力云的API Key
    • 13.5 配置腾讯云的API Key
    • 13.6 配置百度云的API Key
    • 13.7 配置阿里云的API Key
    • 13.8 配置字节跳动火山引擎的API Key
    • 13.9 配置科大讯飞的API Key
    • 13.10 配置openrouter的API Key
  • 14.参考
    • 15.云服务器安装

1. Milvus简介 #

  • milvus
  • milvus文档

1.1 Milvus是什么? #

Milvus是一种向量数据库,它是专门设计来存储和搜索大量向量数据的。

1.2 为什么Milvus这么流行? #

  1. 超级快:Milvus可以在几毫秒内从数十亿个向量中找到最相似的向量
  2. 开源免费:任何人都可以使用它,不需要付费
  3. 容易使用:即使不是专业程序员也能学会使用它

1.3 Milvus能做什么? #

  1. 图片搜索:上传一张照片,Milvus能找到相似的图片
  2. 语音识别:说一句话,Milvus能理解你的意思
  3. 推荐系统:根据你喜欢的电影,推荐你可能喜欢的其他电影
  4. 智能问答:问一个问题,找到最相关的答案

1.4 Milvus是如何工作的? #

  1. 收集数据:把图片、文字、声音等转换成向量(数字列表)
  2. 存储向量:把这些向量存储在数据库中
  3. 快速搜索:当你需要查找相似内容时,Milvus能快速找到最相似的向量

2. Milvus的安装 #

  • milvus安装文档

2.1 下载docker-compose.yml文件 #

2.1.1 Windows #

C:\>Invoke-WebRequest https://github.com/milvus-io/milvus/releases/download/v2.4.15/milvus-standalone-docker-compose.yml -OutFile docker-compose.yml

如果访问不了可以使用 
C:\>Invoke-WebRequest https://gitee.com/milvus-io/milvus/raw/v2.6.4/deployments/docker/standalone/docker-compose.yml -OutFile docker-compose.yml


docker compose up -d
docker compose ps
NAME                IMAGE                                      COMMAND                   SERVICE      CREATED          STATUS                             PORTS
milvus-etcd         quay.io/coreos/etcd:v3.5.5                 "etcd -advertise-cli…"   etcd         15 seconds ago   Up 10 seconds (health: starting)   2379-2380/tcp
milvus-minio        minio/minio:RELEASE.2023-03-20T20-16-18Z   "/usr/bin/docker-ent…"   minio        15 seconds ago   Up 9 seconds (health: starting)    0.0.0.0:9000-9001->9000-9001/tcp
milvus-standalone   milvusdb/milvus:v2.4.15                    "/tini -- milvus run…"   standalone   14 seconds ago   Up 8 seconds (health: starting)    0.0.0.0:9091->9091/tcp, 0.0.0.0:19530->19530/tcp

2.1.2 MAC 或 Linux #

wget https://gitee.com/milvus-io/milvus/raw/v2.6.4/deployments/docker/standalone/docker-compose.yml -O docker-compose.yml
docker compose up -d

2.2 docker-compose.yml文件 #

# 指定Docker Compose文件格式版本为3.5
version: '3.5'

# 定义服务组
services:
  # etcd服务配置,用于Milvus的元数据存储
  etcd:
    # 容器名称设置为milvus-etcd
    container_name: milvus-etcd
    # 使用coreos的etcd镜像,版本为v3.5.5
    image: quay.io/coreos/etcd:v3.5.5
    # 设置etcd的环境变量
    environment:
      # 设置自动压缩模式为revision
      - ETCD_AUTO_COMPACTION_MODE=revision
      # 设置自动压缩保留1000个版本
      - ETCD_AUTO_COMPACTION_RETENTION=1000
      # 设置后端存储配额为4GB
      - ETCD_QUOTA_BACKEND_BYTES=4294967296
      # 设置快照计数为50000
      - ETCD_SNAPSHOT_COUNT=50000
    # 挂载卷,将本地目录映射到容器内的/etcd目录
    volumes:
      - ${DOCKER_VOLUME_DIRECTORY:-.}/volumes/etcd:/etcd
    # 启动命令,配置etcd服务器
    command: etcd -advertise-client-urls=http://127.0.0.1:2379 -listen-client-urls http://0.0.0.0:2379 --data-dir /etcd
    # 健康检查配置
    healthcheck:
      # 使用etcdctl检查端点健康状态
      test: ["CMD", "etcdctl", "endpoint", "health"]
      # 每30秒检查一次
      interval: 30s
      # 超时时间为20秒
      timeout: 20s
      # 重试3次
      retries: 3

  # MinIO服务配置,用于Milvus的对象存储
  minio:
    # 容器名称设置为milvus-minio
    container_name: milvus-minio
    # 使用MinIO镜像,指定版本
    image: minio/minio:RELEASE.2023-03-20T20-16-18Z
    # 设置MinIO的环境变量
    environment:
      # 设置访问密钥
      MINIO_ACCESS_KEY: minioadmin
      # 设置密钥
      MINIO_SECRET_KEY: minioadmin
    # 端口映射,将容器的9001和9000端口映射到主机
    ports:
      - "9001:9001"
      - "9000:9000"
    # 挂载卷,将本地目录映射到容器内的/minio_data目录
    volumes:
      - ${DOCKER_VOLUME_DIRECTORY:-.}/volumes/minio:/minio_data
    # 启动命令,配置MinIO服务器和控制台地址
    command: minio server /minio_data --console-address ":9001"
    # 健康检查配置
    healthcheck:
      # 使用curl检查MinIO健康状态
      test: ["CMD", "curl", "-f", "http://localhost:9000/minio/health/live"]
      # 每30秒检查一次
      interval: 30s
      # 超时时间为20秒
      timeout: 20s
      # 重试3次
      retries: 3

  # Milvus独立服务配置
  standalone:
    # 容器名称设置为milvus-standalone
    container_name: milvus-standalone
    # 使用Milvus镜像,版本为v2.4.15
    image: milvusdb/milvus:v2.4.15
    # 启动命令,以独立模式运行Milvus
    command: ["milvus", "run", "standalone"]
    # 安全选项配置
    security_opt:
    # 禁用seccomp配置文件
    - seccomp:unconfined
    # 设置Milvus的环境变量
    environment:
      # 设置etcd端点地址
      ETCD_ENDPOINTS: etcd:2379
      # 设置MinIO地址
      MINIO_ADDRESS: minio:9000
    # 挂载卷,将本地目录映射到容器内的/var/lib/milvus目录
    volumes:
      - ${DOCKER_VOLUME_DIRECTORY:-.}/volumes/milvus:/var/lib/milvus
    # 健康检查配置
    healthcheck:
      # 使用curl检查Milvus健康状态
      test: ["CMD", "curl", "-f", "http://localhost:9091/healthz"]
      # 每30秒检查一次
      interval: 30s
      # 启动后等待90秒开始检查
      start_period: 90s
      # 超时时间为20秒
      timeout: 20s
      # 重试3次
      retries: 3
    # 端口映射,将容器的19530和9091端口映射到主机
    ports:
      - "19530:19530"
      - "9091:9091"
    # 依赖关系,确保etcd和minio服务先启动
    depends_on:
      - "etcd"
      - "minio"

# 网络配置
networks:
  # 默认网络配置
  default:
    # 网络名称设置为milvus
    name: milvus

2.3 配置docker镜像 #

{
  "builder": {
    "gc": {
      "defaultKeepStorage": "20GB",
      "enabled": true
    }
  },
  "experimental": false,
  "registry-mirrors": [
    "https://fwvjnv59.mirror.aliyuncs.com",
    "https://docker.m.daocloud.io",
    "https://huecker.io",
    "https://dockerhub.timeweb.cloud",
    "https://noohub.ru"
  ]
}

3. Milvus的图形界面 #

Attu是一个帮助我们使用Milvus向量数据库的图形界面工具

3.1 Attu能做什么? #

  1. 查看数据:看到存储在Milvus中的所有集合(可以理解为表格)和向量数据
  2. 创建集合:建立新的数据存储空间
  3. 导入数据:把数据添加到Milvus中
  4. 搜索相似内容:找到与某个向量最相似的其他向量
  5. 管理数据库:监控数据库的运行状态和性能

3.2 如何使用Attu? #

  1. 首先确保Milvus已经运行起来了
  2. 打开网页浏览器,访问Attu的地址(通常是http://localhost:8000)
  3. 连接到你的Milvus服务器
  4. 然后你就可以开始浏览和管理你的向量数据了!

4. 使用Attu连接Milvus #

这条命令用于启动Attu,它是Milvus向量数据库的图形用户界面工具

docker run -p 8000:3000 -e MILVUS_URL=192.168.2.106:19530 zilliz/attu:v2.5
http://192.168.2.106:8000/#/

一定要注意MILVUS_URL后面跟着的IP地址要改成自己的IP地址,这个IP地址如果是Windows可以使用ipconfig命令获取,如果是Linux和MAC可以使用ifconfig命令获取

1. docker run

  • 这是基本的Docker命令,用于创建并启动一个新的容器

2. -p 8000:3000

  • 这是端口映射参数
  • 将主机的8000端口映射到容器内的3000端口
  • 这意味着当你访问主机的8000端口时,请求会被转发到容器内的3000端口
  • Attu的Web界面在容器内运行在3000端口

3. -e MILVUS_URL=192.168.2.106:19530

  • -e 表示设置环境变量
  • MILVUS_URL=192.168.2.106:19530 指定了Milvus服务器的地址和端口
  • 192.168.2.106 是Milvus服务器的IP地址
  • 19530 是Milvus服务的默认端口

4. zilliz/attu:v2.5

  • 这是Docker镜像的名称和标签
  • zilliz/attu 是镜像名称,由Zilliz公司(Milvus的开发者)提供
  • v2.5 是版本标签,表示使用Attu的2.5版本

5. 访问地址

http://192.168.2.106:8000/#/

这是启动Attu后的访问地址:

  • http:// 是协议
  • 192.168.2.106 是主机IP地址
  • 8000 是我们映射的端口
  • /#/ 是Attu应用的路径

这个命令做了以下事情:

  1. 从Docker Hub下载并运行Attu工具(如果本地没有)
  2. 将Attu连接到指定IP地址的Milvus数据库
  3. 设置端口映射,让你可以通过浏览器访问Attu的界面
  4. 提供一个网址,通过这个网址你可以在浏览器中打开Attu管理界面

通过Attu界面,你可以直观地管理Milvus数据库中的向量数据,包括创建集合、导入数据、执行向量搜索等操作。

5. 创建数据库 #

5.1 创建集合 #

基本信息

  • 集合名称:user
  • 集合描述:用户

字段配置 界面显示了三种主要字段类型的配置:

ID字段(主键)

  • 字段名称:id
  • 数据类型:INT64
  • 描述:主键
  • 特性:自动ID(已启用)
  • 说明:ID

向量字段

  • 字段名称:name_vector
  • 数据类型:Float Vector
  • 维度:128
  • 描述:用户名向量
  • 功能:用于存储向量数据,支持向量相似度搜索

标量字段

  • 字段名称:name
  • 数据类型:VarChar
  • 最大长度:32
  • 分词器:Standard(标准分词器)
  • 可选配置:
    • 启用匹配(未选中)
    • 分区键(未选中)
    • Nullable(未选中)

集合属性设置

  • 一致性模式:Bounded(有界一致性)
  • 其他选项:
    • 启用动态Schema(未选中)
    • 创建后加载collection(未选中)
类型类别 类型名称 说明/用途
主键字段 INT64 64位整数,常用于自增ID、编号等唯一标识。
VARCHAR 可变长度字符串,适合用作唯一标识的文本(如用户名、邮箱等)。
向量字段 Binary Vector 二进制向量,常用于图像、音频等二进制特征的检索。
Float Vector 浮点型向量,最常见的AI特征向量类型(如文本、图片的嵌入向量)。
Float16 Vector 16位浮点向量,节省存储空间,适合大规模数据。
BFloat16 Vector 另一种16位浮点格式,兼容AI硬件加速。
Sparse Vector 稀疏向量,适合大部分元素为0的高维特征(如文本one-hot编码)。
BM25(Varchar) 文本字段,支持BM25等关键词检索算法。
标量字段 Int8 8位整数,节省空间,适合小范围数字。
Int16 16位整数。
Int32 32位整数。
Int64 64位整数。
Float 单精度浮点数。
Double 双精度浮点数,精度更高。
Boolean 布尔值,只有true/false。
VarChar 可变长度字符串,适合存储文本(如名字、描述等)。
JSON 存储结构化的JSON数据,适合灵活扩展字段。
Array 数组类型,可以存储一组同类型的值。
  • 主键字段:每个集合必须有一个主键,保证每条数据唯一。可以用数字(INT64)或字符串(VARCHAR)。
  • 向量字段:用于存储AI模型生成的特征向量,是Milvus支持“相似度检索”的核心。
    • 例如:图片特征用Float Vector,文本检索用BM25(Varchar)。
  • 标量字段:存储普通属性,如年龄(Int32)、价格(Float)、标签(VarChar)、扩展信息(JSON)等。

5.2 分词匹配 #

  • 分词器

    5.2.1 分词器 (Standard) #

    分词器是用来将文本分解成单个词语的工具。比如"我喜欢编程"可以被分解为"我"、"喜欢"、"编程"。

Standard分词器是最基本的分词方式,它按照一定规则把文本切分成词语,这样在搜索时就能更准确地找到包含特定词语的内容。

5.2.2 启用匹配 #

让集合支持基于这些词条的匹配与相似度检索,例如你输入“电脑”,系统会在已经分词的词条里查找“计算机”“笔记本”等相关词。 这就像是你在搜索引擎中输入关键词,即使不是完全一样的词,也能找到相关内容。

5.2.3 分区键 #

分区键用来将大量数据分散存储在不同的服务器上。

比如我们班里想把同学们进行分组,分组的依据是同学们的姓氏,那么姓氏就是分区键

5.2.4.代码演示 #

# 导入默认字典,用于分区聚合
from collections import defaultdict

# 导入结巴分词工具
import jieba

# 定义示例文档数据,每个文档包含“id”、“分区键”、“内容”
documents = [
    {"id": 1, "partition_key": "张", "content": "张三喜欢编程和电脑游戏"},
    {"id": 2, "partition_key": "李", "content": "李四热爱计算机科学"},
    {"id": 3, "partition_key": "王", "content": "王五喜欢阅读技术书籍"},
    {"id": 4, "partition_key": "张", "content": "张杰钟爱笔记本电脑"},
]

# 定义同义词映射,“键”为代表词,“值”为同义词集合(用于扩展匹配)
synonym_map = {
    "电脑": {"电脑", "计算机", "PC"},
    "编程": {"编程", "程序设计"},
}

# 对输入文本进行分词(搜索模式),返回词元列表
def simple_tokenize(text: str):
    """说明:使用 jieba 搜索模式,得到更细粒度的词元"""
    return [token for token in jieba.lcut_for_search(text) if token.strip()]

# 根据“分区键”对文档进行聚合,返回分区字典
def build_partitions(docs):
    """说明:按照分区键将文档聚合"""
    partitions = defaultdict(list)
    for doc in docs:
        partitions[doc["partition_key"]].append(doc)
    return partitions

# 在所有分区内查找包含指定关键词(或同义词)的文档
def match_query(partitions, keyword: str, use_synonym=True):
    """说明:在指定分区内执行匹配,支持同义词扩展"""
    # 构建本次要匹配的词集合,默认为仅包含keyword
    tokens_to_match = {keyword}
    # 如果启用同义词且keyword在同义词表中,则扩展为所有同义词
    if use_synonym and keyword in synonym_map:
        tokens_to_match = synonym_map[keyword]

    # 结果列表,用于收集匹配文档
    results = []
    # 遍历每个分区和分区下的文档
    for partition_key, docs in partitions.items():
        for doc in docs:
            # 对文档内容进行分词
            tokens = simple_tokenize(doc["content"])
            # 判断有无交集(有则说明匹配)
            if tokens_to_match.intersection(tokens):
                results.append((partition_key, doc["id"], doc["content"]))
    # 返回所有匹配结果
    return results

# 程序主入口
if __name__ == "__main__":
    # 按分区键聚合文档
    partitions = build_partitions(documents)

    # 示例1:只匹配“电脑”本词
    hits = match_query(partitions, keyword="电脑", use_synonym=False)
    print("仅匹配“电脑”的结果:")
    for row in hits:
        print(row)

    # 示例2:启用同义词匹配(“电脑”≈“计算机”)
    hits_with_synonym = match_query(partitions, keyword="电脑", use_synonym=True)
    print("\n启用同义词匹配后的结果:")
    for row in hits_with_synonym:
        print(row)

5.3 Nullable #

Nullable表示这个字段可以为空。就像填表格时,有些项目是必填的(不能为空),有些是选填的(可以为空)。如果勾选了Nullable,就意味着这个字段可以不填写任何值。

5.4 一致性 (Bounded) #

在数据库或分布式系统中,“一致性”表示多个副本或节点上的数据保持同步、相同的程度。 Milvus支持的“Bounded(一致性有界)”是一种典型的弱一致性模型,它介于强一致性和最终一致性之间。通俗讲:不是所有副本立刻同步,但保证在“某个可控的时限”内,所有节点的数据最终都能达到一致。

5.4.1 选项 #

在Milvus数据库创建集合时,有四种一致性选项

5.4.1.1 Strong(强一致性) #

比如在班级里,当老师发布一条通知后,所有同学必须立即收到并确认,老师才会继续下一步。这种方式最可靠,但速度较慢,因为要等待每个人都确认。

5.4.1.2 Session(会话一致性) #

这像是你和好友的私聊。你发的消息,你的好友一定能按顺序收到,但其他同学可能暂时看不到或顺序不同。

5.4.1.3 Bounded(有界一致性) #

这像是黑板上写的通知。老师写完通知后,同学们会在一定时间内(比如10分钟)都能看到,但不要求立即。

5.4.1.4 Eventually(最终一致性) #

这像是口口相传的小道八卦消息。信息最终会传到每个人那里,但可能需要较长时间,且不保证具体何时。

5.4.2 代码演示 #

import random
import time
from typing import List, Dict

STUDENTS = ["小张", "小李", "小王", "小刘"]


def strong_consistency(message: str) -> None:
    """说明:强一致性——必须等所有人确认收到后,老师才算广播成功。"""
    print("\n[Strong] 老师开始广播:", message)
    for name in STUDENTS:
        print(f"  → {name} 收到并确认:{message}")
        time.sleep(0.3)  # 模拟等待确认
    print("[Strong] 所有人都确认,老师继续下一步\n")


def session_consistency(message: str, session_owner: str) -> None:
    """说明:会话一致性——只保证当前会话(某个学生)看到自己的操作,其他同学稍后再同步。"""
    print(f"\n[Session] {session_owner} 在私聊中提交:{message}")
    print(f"  → {session_owner} 立即看到自己发的内容")
    print("  → 其他同学稍后通过公共同步获得,不保证时间顺序\n")


def bounded_consistency(message: str, bound_seconds: int = 5) -> None:
    """说明:有界一致性——在指定时间窗口内同步即可,不要求立刻。"""
    print(f"\n[Bounded] 老师写在黑板上:{message}(需要在 {bound_seconds} 秒内传达到)")
    reached: Dict[str, float] = {}
    for name in STUDENTS:
        delay = random.uniform(0, bound_seconds)
        time.sleep(delay / 10)  # 缩短真实等待,方便演示
        reached[name] = delay
        print(f"  → {name} 在 {delay:.2f} 秒后看到黑板信息")
    print("[Bounded] 所有人都在规定时间内看到了通知\n")


def eventual_consistency(message: str) -> None:
    """说明:最终一致性——通知顺序与到达时间都不确定,但最终所有人会知道。"""
    print("\n[Eventually] 消息通过口口相传传播:", message)
    remaining = STUDENTS.copy()
    random.shuffle(remaining)
    total_delay = 0.0
    while remaining:
        name = remaining.pop()
        delay = random.uniform(0.5, 2.0)
        total_delay += delay
        time.sleep(delay / 10)  # 缩短等待
        print(f"  → {name} 在约 {total_delay:.1f} 秒后才听到消息")
    print("[Eventually] 虽然有延迟,但最终每个人都掌握了消息\n")


if __name__ == "__main__":
    strong_consistency("今晚 8 点线上作业答疑")
    session_consistency("我提交了一次向量插入操作", session_owner="小李")
    bounded_consistency("10 分钟内完成实验截图上交", bound_seconds=10)
    eventual_consistency("下周可能有一次临时测验,注意关注群通知")
  • Strong:每次广播都等待所有人确认,最可靠但最慢;
  • Session:保证某个“会话拥有者”能立即看到自己提交的内容,其他人稍后同步;
  • Bounded:只需在规定时间窗口内达到即可;
  • Eventually:到达顺序和时间不可控,但最终所有人都会收到。

5.4.5 如何选择? #

  • 需要数据绝对准确时(如银行交易)→ 选Strong
  • 需要用户看到自己操作的结果时 → 选Session
  • 需要在一定时间内保证一致性 → 选Bounded
  • 对时间要求不高,但希望系统运行更快 → 选Eventually

6. schema #

6.1 界面 #

集合基本信息

  • 名称:user(用户)
  • 描述:用户
  • 创建时间:2023年3月15日 18:18:20
  • 状态:未加载(表示数据还没有加载到内存中)
  • 副本:1
  • 大约的Entity数量:0(表示集合中还没有数据)

集合特性

  • 自动ID:已启用(系统会自动生成ID)
  • 一致性:Bounded(有界一致性,这是数据一致性的一种模式)

字段信息 集合包含三个字段:

  1. id

    • 类型:Int64(64位整数)
    • 不可为空(Nullable: ×)
    • 是主键(ID)
    • 可以创建标量索引
  2. name_vector

    • 类型:FloatVector(128)(128维浮点向量)
    • 不可为空
    • 可以创建向量索引
    • 描述:用户名向量
  3. name

    • 类型:VarChar(32)(最多32个字符的可变长度字符串)
    • 不可为空
    • 使用分词器
    • 可以创建标量索引

6.2 副本 #

副本(Replica)就像是你重要笔记的复印件。想象一下,如果你有一本非常重要的笔记本:

  • 你可能会担心笔记本丢失
  • 你可能希望多个同学同时查看笔记内容
  • 你可能想防止笔记本被意外损坏

所以,你决定复印几份一模一样的副本。在Milvus中,副本就是数据的完整拷贝。

副本的作用

  1. 提高可靠性:如果一个数据副本损坏或丢失,系统可以使用其他副本继续工作
  2. 提高查询性能:多个副本可以同时处理不同的查询请求,就像多个同学可以同时看不同的笔记副本
  3. 负载均衡:查询请求可以分散到不同的副本上,避免单个服务器压力过大

6.3 状态 #

在Milvus中,必须先加载(load)集合到内存,才能创建和使用索引

6.3.1 为什么需要加载集合? #

Milvus采用了内存计算的架构:

  • 数据存储在磁盘上
  • 但搜索、索引等操作需要在内存中进行
  • 加载操作就是将集合数据从磁盘读入内存

6.3.2 Milvus集合生命周期管理 #

在Milvus中,集合有几种状态:

  1. 已创建但未加载:

    • 集合存在于磁盘上
    • 不能进行搜索或索引操作
    • 占用最少资源
  2. 已加载:

    • 集合数据在内存中
    • 可以进行所有操作(搜索、索引等)
    • 占用内存资源
  3. 已释放:

    • 集合数据从内存中释放
    • 回到"未加载"状态
    • 释放内存资源
  • 加载前请保证所有向量列都有索引

7. 创建索引 #

7.1 什么是索引? #

想象一下,索引就像是一本书的目录。如果没有目录,你想找某个内容就必须从头到尾翻阅整本书。 有了目录,你可以快速找到你想要的内容在哪一页。

在数据库中,索引也是这样工作的,它帮助计算机快速找到你要的数据。

7.2 索引创建界面 #

这个弹窗显示我们正在为"name_vector"字段(也就是用户名的向量表示)创建索引:

  1. 索引类型:AUTOINDEX

    • 这是一种自动选择最适合的索引类型的方式
    • Milvus会根据你的数据特点自动选择最合适的索引结构
  2. 索引名称:

    • 这里可以输入索引的名字
  3. 参数设置:

    • 度量类型:COSINE(余弦相似度)
    • 这决定了如何计算两个向量之间的"距离"或"相似度"

7.3 度量类型 #

  • 度量类型
  • 余弦相似度
# 导入math模块用于数学运算
import math
# 从typing模块导入List和Sequence用于类型注解
from typing import List, Sequence

# 定义函数,计算两个向量的欧几里得距离
def l2_distance(vec_a: Sequence[float], vec_b: Sequence[float]) -> float:
    # 说明:计算欧几里得距离
    # 判断两个向量的长度是否相等
    if len(vec_a) != len(vec_b):
        # 如果长度不等,抛出异常
        raise ValueError("向量维度必须一致")
    # 计算每个对应元素之差的平方和,再开平方根
    return math.sqrt(sum((a - b) ** 2 for a, b in zip(vec_a, vec_b)))

# 定义函数,计算两个向量的内积
def inner_product(vec_a: Sequence[float], vec_b: Sequence[float]) -> float:
    # 说明:计算向量内积
    # 判断两个向量的长度是否相等
    if len(vec_a) != len(vec_b):
        # 如果长度不等,抛出异常
        raise ValueError("向量维度必须一致")
    # 返回对应位置元素相乘后的和
    return sum(a * b for a, b in zip(vec_a, vec_b))

# 定义函数,计算两个向量的余弦相似度
def cosine_similarity(vec_a: Sequence[float], vec_b: Sequence[float]) -> float:
    # 说明:计算余弦相似度
    # 计算内积
    dot = inner_product(vec_a, vec_b)
    # 计算第一个向量的模长
    norm_a = math.sqrt(sum(a * a for a in vec_a))
    # 计算第二个向量的模长
    norm_b = math.sqrt(sum(b * b for b in vec_b))
    # 判断模长是否为0,避免除以0
    if norm_a == 0 or norm_b == 0:
        # 如果模长为0,抛出异常
        raise ValueError("向量模长不能为 0")
    # 返回内积除以模长乘积的结果
    return dot / (norm_a * norm_b)

# 定义函数,计算两个向量的汉明距离(适用于二进制或等长序列)
def hamming_distance(vec_a: Sequence[int], vec_b: Sequence[int]) -> int:
    # 说明:计算汉明距离(适用于二进制或等长序列)
    # 判断两个向量长度是否一致
    if len(vec_a) != len(vec_b):
        # 如果长度不等,抛出异常
        raise ValueError("向量长度必须一致")
    # 统计对应位置元素不相等的数量
    return sum(1 for a, b in zip(vec_a, vec_b) if a != b)

# 程序主入口
if __name__ == "__main__":
    # 定义两个float类型的向量
    vec1: List[float] = [1, 2, 3]
    vec2: List[float] = [4, 5, 6]
    # 定义两个int类型的向量,用于汉明距离
    bin1: List[int] = [1, 0, 1, 0, 1]
    bin2: List[int] = [1, 1, 0, 0, 0]

    # 打印L2距离,保留两位小数
    print("L2 距离:", format(l2_distance(vec1, vec2), ".2f"))
    # 打印向量内积,保留两位小数
    print("内积:", format(inner_product(vec1, vec2), ".2f"))
    # 打印余弦相似度,保留两位小数
    print("余弦相似度:", format(cosine_similarity(vec1, vec2), ".2f"))
    # 打印汉明距离
    print("汉明距离:", hamming_distance(bin1, bin2))

7.4 归一化 #

  • 归一化
# 导入math模块用于数学计算
import math
# 从typing模块导入List和Sequence类型用于类型标注
from typing import List, Sequence

# 定义归一化函数,将向量的长度缩放为1,方向保持不变
def normalize(vector: Sequence[float]) -> List[float]:
    # 计算向量模长(欧氏范数)
    norm = math.sqrt(sum(component * component for component in vector))
    # 如果模长为0,抛出异常,无法归一化零向量
    if norm == 0:
        raise ValueError("零向量无法归一化,因为长度为 0")
    # 返回归一化后的向量(每个分量除以向量模长)
    return [component / norm for component in vector]

# 主程序入口
if __name__ == "__main__":
    # 定义一个二维向量
    vec = [3.0, 4.0]
    # 调用normalize函数对向量归一化
    normalized_vec = normalize(vec)
    # 输出原始向量
    print(f"原向量:{vec}")
    # 输出归一化后的向量
    print(f"归一化结果:{normalized_vec}")
    # 计算归一化后向量的长度(再次验证是否为1)
    length = math.sqrt(sum(x * x for x in normalized_vec))
    # 输出归一化后向量的长度,保留两位小数
    print(f"归一化后长度:{length:.2f}")

7.5 加载集合 #

创建完索引后,需要加载集合才能使用索引进行搜索。

8. 加载数据 #

  • 导入文件:从文件中导入数据
  • 插入样本数据:添加示例数据
  • 清空数据:删除所有数据

9. 向量搜索 #

9.1 什么是向量搜索? #

向量搜索就像是在说:"给我找一些和这个例子很像的东西!" 在这个例子中,我们是在寻找与某个输入向量相似的用户名。

9.2 界面左侧(搜索条件) #

  1. 搜索向量:

    • 顶部显示了一个向量:[0.9990859031677246, 0.1116979345679283, ...]
    • 这是我们要查找相似内容的"样本"
  2. 搜索参数:

    • TopK:15(表示要返回最相似的15条结果)
    • 一致性:Bounded(搜索的一致性模式)
    • 输出字段:2个字段(选择显示哪些字段)
    • 分组:(未选择)
    • 搜索参数:
      • level:1(搜索精度级别)
      • radius:(未设置)
      • range filter:(未设置)
    • 分区过滤:(未设置)

9.3 界面右侧(搜索结果) #

搜索结果显示了与输入向量最相似的用户名,按相似度(score)从高到低排序:

  1. 第一行:

    • id: 456663312363910318
    • score: 0.9999999403953552(几乎完全匹配!)
    • name: over grace quick adventure fox
  2. 第二行:

    • id: 456663312363910402
    • score: 0.2162195146083818
    • name: fox fox over speed fox sky brown
  3. 第三行:

    • id: 456663312363910377
    • score: 0.1823001354932785
    • name: lazy grace jumps jumps brown

...以此类推,显示了更多结果。

9.4 搜索参数 #

9.4.1 Level(搜索精度级别) #

  • Level

Level参数是控制搜索精度和速度平衡的关键:

  • 什么是Level?

    • Level是一个整数值,通常从1到100
    • 现在设置为1
  • Level的作用:

    • 低Level值(如1-2):搜索速度快,但可能会漏掉一些相似结果
    • 高Level值(如50-100):搜索更精确,找到更多相似结果,但速度较慢
  • 生活中的例子: 想象你在找一本书。Level=1就像快速扫一眼书架,只看书脊;Level=100就像仔细检查每本书的内容。

9.4.2 Radius(搜索半径) #

  • Radius

Radius参数定义了在向量空间中搜索的范围:

  • 什么是Radius?

    • 它是一个浮点数,表示向量空间中的距离
  • Radius的作用:

    • 只返回与查询向量距离小于radius的结果
    • 即使设置了TopK=15,如果只有3个结果在radius范围内,也只会返回这3个结果
  • 生活中的例子: 想象你站在一个点,说"我只想知道5公里范围内的所有餐厅",这里的"5公里"就是radius。

9.4.3 Range Filter(范围过滤器) #

  • range_filter

Range Filter参数允许我们根据向量距离进行更精细的过滤:

  • 什么是Range Filter?

    • 它是一个表达式,定义了接受结果的距离范围
  • Range Filter的作用:

    • 可以设置最小和最大距离
    • 例如:"distance >= 0.5 && distance <= 0.9"
    • 这会只返回距离在0.5到0.9之间的结果
  • 生活中的例子: 想象你在找房子,说"我想要离学校不少于500米(避免噪音),但不超过2公里(方便接送)的房子"。

Range Search 新增了 2 个参数,分别是:

  • radius(半径) - 指相似性的外边界
  • range_filter(范围过滤器) - 指相似性的内边界

9.4.4 代码演示 #

# 导入math模块用于数学运算
import math
# 导入类型注解List, Tuple
from typing import List, Tuple

# 生成20个“数据库向量”,每个向量包含id和二维向量tuple
DATABASE = [
    {"id": i, "vector": (i * 0.1, i * 0.1 + 0.3)}
    for i in range(20)
]

# 定义欧几里得距离函数,计算两个二维向量之间的距离
def l2_distance(a: Tuple[float, float], b: Tuple[float, float]) -> float:
    # 对输入的两个向量a和b,依次计算同一维度差的平方,并求和后开方
    return math.sqrt(sum((x - y) ** 2 for x, y in zip(a, b)))

# 根据level参数粗略或深入地搜索向量,level越高候选越多
def search_with_level(query: Tuple[float, float], level: int) -> List[int]:
    """
    说明:
    - Level 越低,只粗略检查少量候选;
    - Level 越高,深入检查更多候选,结果更全。
    """
    # 按与query向量的距离升序排序所有数据库向量
    sorted_vectors = sorted(DATABASE, key=lambda item: l2_distance(query, item["vector"]))
    # 根据level确定需要检查的比例,level范围映射到0.1~1.0
    ratio = max(0.1, min(level / 100, 1.0))
    # 计算需要返回的候选数量,至少为1个
    top_k = max(1, int(len(sorted_vectors) * ratio))
    # 返回前top_k个向量的id
    return [item["id"] for item in sorted_vectors[:top_k]]

# 根据半径参数radius查找与query距离不超过radius的向量
def search_with_radius(query: Tuple[float, float], radius: float) -> List[int]:
    """
    说明:只返回与 query 距离 <= radius 的向量
    """
    # 保存符合条件的向量id
    result = []
    # 遍历数据库中的所有向量
    for item in DATABASE:
        # 判断与query的距离是否小于等于radius
        if l2_distance(query, item["vector"]) <= radius:
            # 满足条件则添加id到结果中
            result.append(item["id"])
    # 返回所有符合条件的id
    return result

# 根据指定的距离区间筛选与query之间距离在范围内的向量
def search_with_range_filter(
    query: Tuple[float, float],
    min_distance: float,
    max_distance: float
) -> List[int]:
    """
    说明:仅保留 min_distance <= distance <= max_distance 的向量
    """
    # 保存查询结果
    result = []
    # 遍历所有数据库向量
    for item in DATABASE:
        # 计算query与当前向量的距离
        d = l2_distance(query, item["vector"])
        # 如果距离在指定区间则将其id加入结果
        if min_distance <= d <= max_distance:
            result.append(item["id"])
    # 返回所有符合区间条件的id
    return result

# 主程序入口
if __name__ == "__main__":
    # 定义查询向量
    query_vector = (0.5, 0.8)

    # 输出level为10(粗略)的搜索结果
    print("Level=10(粗略):", search_with_level(query_vector, level=10))
    # 输出level为70(深入)的搜索结果
    print("Level=70(深入):", search_with_level(query_vector, level=70))

    # 输出radius为0.5的搜索结果
    print("Radius=0.5:", search_with_radius(query_vector, radius=0.5))
    # 输出radius为1.2的搜索结果
    print("Radius=1.2:", search_with_radius(query_vector, radius=1.2))

    # 输出距离在0.4到0.9之间的向量id
    print("Range Filter: 0.4 <= d <= 0.9 ->",
          search_with_range_filter(query_vector, min_distance=0.4, max_distance=0.9))

9.4.4 分区过滤 #

  • 分区

Milvus中的"分区过滤"功能是向量搜索中另一个非常重要的参数,它可以帮助我们更高效地组织和搜索数据。

10. 数据段 #

数据段

10.1 什么是数据段? #

数据段就像是数据库中存储数据的小容器。想象一下,如果数据库是一个图书馆,分区就是不同的书架区域,而数据段则是书架上的具体书籍盒子,里面装着实际的数据。

  • 数据段
  1. 功能按钮:

    • "压缩(Compact)":可以压缩数据段,让它占用更少空间
    • "落盘(Flush)":把内存中的数据写入到硬盘上保存
    • "刷新":更新显示最新的数据段信息
  2. 数据段信息:

    • ID:456663312364110319(这是数据段的唯一标识符)
    • Level:L1(这表示数据段的层级)
    • 分区ID:456663312362905661(这个数据段属于哪个分区)
    • 持久数据段状态:Flushed(表示数据已经保存到硬盘上)
    • 行数:100(这个数据段包含100行数据)
    • 查询节点IDs:1(表示哪些节点可以查询这个数据段)
    • 查询数据段状态:Sealed(表示这个数据段已经封闭,不能再添加新数据)

10.2 什么是查询节点ID #

查询节点ID是指在分布式数据库系统中,负责处理查询请求的服务器或计算单元的唯一标识符。

  • 查询节点ID

10.2.1 查询节点 #

想象一个大型图书馆,有很多工作人员(节点)分布在不同的楼层和区域:

  • 每个工作人员都有自己的工号(节点ID)
  • 每个工作人员负责管理特定书架上的图书(数据段)
  • 当你需要查找一本书时,你的请求会被分配给负责那个区域的工作人员

10.2.2 含义 #

"查询节点IDs: 1" 表示:

  • 这个数据段(ID为456663312364110319)目前被分配给了ID为1的查询节点
  • 只有这一个节点(节点1)负责处理对这个数据段的查询请求
  • 如果有人想查询这个数据段中的信息,请求会被路由到节点1处理

10.2.3 类比 #

这就像学校的分组学习:

  • 每个小组有一个编号(节点ID)
  • 每个小组负责研究特定的知识点(数据段)
  • 当老师提问某个知识点时,相应的小组就负责回答(处理查询)

11. 属性 #

11.1 什么是集合属性? #

集合属性就像是对数据库的"规则设定",它们控制着数据库如何处理数据、多快处理数据,以及使用多少资源。

11.2 界面中的属性列表 #

属性都以"collection."开头,表示这些是针对当前集合的设置:

  1. collection.ttl.seconds

    • 这是"生存时间"设置,决定数据在集合中保存多长时间
    • 如果设置了这个值,超过这个时间的数据会被自动删除
  2. collection.autocompaction.enabled

    • 控制是否启用自动压缩功能
    • 压缩可以减少存储空间并提高查询效率
  3. 数据操作速率限制:

    • collection.insertRate.max.mb 和 min.mb:插入数据的最大和最小速率
    • collection.upsertRate.max.mb 和 min.mb:更新插入的最大和最小速率
    • collection.deleteRate.max.mb 和 min.mb:删除数据的最大和最小速率
    • collection.bulkLoadRate.max.mb 和 min.mb:批量加载的最大和最小速率
  4. 查询速率限制:

    • collection.queryRate.max.qps 和 min.qps:普通查询的最大和最小每秒查询数
    • collection.searchRate.max.qps 和 min.qps:向量搜索的最大和最小每秒查询数
  5. 资源管理:

    • collection.diskProtection.diskQuota.mb:磁盘配额,限制集合可以使用的磁盘空间
    • collection.replica.number:副本数量,用于数据备份和负载均衡
    • collection.resource_groups:资源组设置,控制集合可以使用的计算资源

12. 操作milvus数据库 #

  • milvus安装
  • milvus使用
  • milvus管理数据库
  • nprobe
# Python 环境中安装 pymilvus
python3 -m pip install --upgrade pymilvus==2.6.3

12.1 连接数据库 #

# 导入pymilvus中的连接和工具包
from pymilvus import connections, utility

# 定义测试数据库连接的函数,包含主机host、端口port、数据库名db_name,设置默认值
def test_connection(
    host: str = "127.0.0.1", port: str = "19530", db_name: str = "default"
) -> bool:
    try:
        # 建立到Milvus的连接
        connections.connect("default", host=host, port=port, timeout=10, db_name=db_name)

        # 打印当前连接到的数据库名称
        print(f"当前连接到的数据库为: {db_name}")

        # 获取所有集合名并打印
        collections = utility.list_collections()
        print(f"数据库 '{db_name}' 下的集合有:", collections)

        # 连接成功提示
        print("连接成功:已成功连接到 Milvus 服务器")
        return True
    except Exception as exc:
        # 捕获连接失败异常并打印
        print(f"连接失败:无法连接到 Milvus 服务器的数据库 '{db_name}' ->", exc)
        return False
    finally:
        # 断开连接
        connections.disconnect("default")
        print("已关闭与 Milvus 服务器的连接")

# 主程序入口:调用连接测试
if __name__ == "__main__":
    if not test_connection():
        # 如果连接失败则程序退出
        raise SystemExit(1)

12.2 Schema管理 #

# 导入pymilvus相关类和函数
from pymilvus import (
    connections,
    db,
    FieldSchema,
    CollectionSchema,
    DataType,
    Collection,
    utility,
)

# 创建一个新的articles集合(表)
def create_articles_collection(
    db_name: str = "blog",
    collection_name: str = "articles",
) -> None:
    # 建立连接
    connections.connect("default", host="127.0.0.1", port="19530")
    try:
        # 切换数据库
        db.using_database(db_name)
        # 检查集合是否已存在
        if utility.has_collection(collection_name):
            print(f"集合 {collection_name} 已存在")
            return

        # 定义id字段,类型为长整型,主键且非自增
        id_field = FieldSchema(
            name="id",
            dtype=DataType.INT64,
            is_primary=True,
            auto_id=False,
            description="文章 ID",
        )
        # 定义title字段,类型为变长字符串
        title_field = FieldSchema(
            name="title",
            dtype=DataType.VARCHAR,
            max_length=512,
            description="文章标题",
        )
        # 定义title_vector字段,类型为3维浮点向量
        vector_field = FieldSchema(
            name="title_vector",
            dtype=DataType.FLOAT_VECTOR,
            dim=3,
            description="标题向量(示例为 3 维)",
        )

        # 创建集合schema
        schema = CollectionSchema(
            fields=[id_field, title_field, vector_field],
            description="文章信息集合",
        )

        # 创建集合
        Collection(name=collection_name, schema=schema, using="default")
        print(f"集合 {collection_name} 创建成功")
        # 打印当前所有集合名
        print("当前集合列表:", utility.list_collections())
    except Exception as exc:
        # 捕获异常并打印
        print("创建集合失败:", exc)
        raise
    finally:
        # 断开连接
        connections.disconnect("default")

# 主程序入口,创建articles集合
if __name__ == "__main__":
    create_articles_collection()

12.4 索引管理 #

# 导入pymilvus所需模块
from pymilvus import connections, db, utility, Collection

# 确保集合索引存在,如不存在则创建索引
def ensure_index(
    db_name: str = "blog",
    collection_name: str = "articles",
    field_name: str = "title_vector",
) -> None:
    # 建立连接
    connections.connect("default", host="127.0.0.1", port="19530")
    try:
        # 切换数据库
        db.using_database(db_name)
        # 检查集合是否存在
        if not utility.has_collection(collection_name):
            raise ValueError(f"集合 {collection_name} 不存在,无法创建索引")

        # 获取集合对象
        collection = Collection(collection_name)
        # 判断是否已有索引
        has_index = bool(collection.indexes)
        if has_index:
            print("索引已存在,跳过创建")
        else:
            # 定义索引参数(IVF_FLAT算法,L2距离,nlist=128)
            index_params = {
                "index_type": "IVF_FLAT",
                "metric_type": "L2",
                "params": {"nlist": 128},
            }
            # 创建索引
            collection.create_index(field_name=field_name, index_params=index_params)
            print("索引创建成功")

        # 加载集合到内存
        collection.load()
        print("集合已加载到内存,可供查询")
    except Exception as exc:
        # 打印异常
        print("索引管理失败:", exc)
        raise
    finally:
        # 断开连接
        connections.disconnect("default")

# 主程序入口,确保索引
if __name__ == "__main__":
    ensure_index()

12.5 集合管理 #

# 导入pymilvus相关模块
from pymilvus import connections, db, Collection, utility

# 查看集合信息,并根据需要在查看后删除集合
def inspect_collection(
    db_name: str = "blog",
    collection_name: str = "articles",
    drop_after_inspect: bool = False,
) -> None:
    # 建立连接
    connections.connect("default", host="127.0.0.1", port="19530")
    try:
        # 切换数据库
        db.using_database(db_name)
        # 检查集合是否存在
        if not utility.has_collection(collection_name):
            print(f"集合 {collection_name} 不存在,无法执行后续操作")
            return

        # 获取集合对象
        collection = Collection(collection_name)
        # 输出集合schema信息
        print("集合信息:\n", collection.schema)
        # 输出集合中的entity总数
        print("集合统计:", collection.num_entities)
        # 输出所有集合名称
        print("所有集合:", utility.list_collections())

        # 释放集合内存
        collection.release()
        print("集合已从内存释放")

        # 判断是否需要删除集合
        if drop_after_inspect:
            utility.drop_collection(collection_name)
            print("集合已删除")
    except Exception as exc:
        # 捕获异常并打印
        print("操作失败:", exc)
        raise
    finally:
        # 断开连接
        connections.disconnect("default")
        print("已关闭与 Milvus 服务器的连接")

# 主程序入口,查看并可选择删除集合
if __name__ == "__main__":
    inspect_collection()

12.6 数据管理 #

# 导入pymilvus相关内容
from pymilvus import connections, db, Collection, utility

# 数据增、改、删操作示例
def manage_data(
    db_name: str = "blog",
    collection_name: str = "articles",
) -> None:
    # 建立连接
    connections.connect("default", host="127.0.0.1", port="19530")
    try:
        # 切换数据库
        db.using_database(db_name)
        # 检查集合是否存在
        if not utility.has_collection(collection_name):
            print("集合不存在,无法执行数据操作")
            return

        # 获取集合对象
        collection = Collection(collection_name)

        # 插入数据,分三列:id, title, title_vector
        insert_result = collection.insert(
            [
                [1, 2],
                ["中国", "伟大"],
                [[0.1, 0.2, 0.3], [0.4, 0.5, 0.6]],
            ]
        )
        print("插入数据成功:", insert_result.insert_count)

        # 以upsert形式插入新数据、更新已存在数据
        upsert_result = collection.upsert(
            [
                [1, 2, 3],
                ["中国2", "伟大2", "祖国"],
                [[0.2, 0.4, 0.6], [0.8, 1.0, 1.2], [0.7, 0.8, 0.9]],
            ]
        )
        print("Upsert 数据成功:", upsert_result.insert_count)

        # 删除数据,按表达式条件
        delete_result = collection.delete(expr="id in [1,2]")
        print("删除数据成功:", delete_result.delete_count)
    except Exception as exc:
        # 打印异常
        print("操作失败:", exc)
        raise
    finally:
        # 断开连接
        connections.disconnect("default")
        print("已关闭与 Milvus 服务器的连接")

# 主程序入口,进行数据管理操作
if __name__ == "__main__":
    manage_data()

12.7 搜索 #

# 导入pymilvus相关模块
from pymilvus import connections, db, Collection, utility

# 简单的向量搜索函数
def simple_search(
    db_name: str = "blog",
    collection_name: str = "articles",
    field_name: str = "title_vector",
) -> None:
    # 建立连接
    connections.connect("default", host="127.0.0.1", port="19530")
    try:
        # 切换数据库
        db.using_database(db_name)
        # 检查集合是否存在
        if not utility.has_collection(collection_name):
            print("集合不存在,无法执行搜索")
            return

        # 获取集合对象
        collection = Collection(collection_name)
        # 加载集合到内存
        collection.load()
        # 执行向量检索,输入向量为[0.7, 0.8, 0.9]
        search_result = collection.search(
            data=[[0.7, 0.8, 0.9]],
            anns_field=field_name,
            param={"metric_type": "L2", "params": {"nprobe": 10}},
            limit=10,
            output_fields=["title"],
        )
        # 遍历结果,逐条打印id、距离和title
        for hits in search_result:
            for hit in hits:
                print(f"id={hit.id}, distance={hit.distance:.4f}, title={hit.entity.get('title')}")
    except Exception as exc:
        # 捕获异常并打印
        print("搜索失败:", exc)
        raise
    finally:
        # 断开连接
        connections.disconnect("default")
        print("已关闭与 Milvus 服务器的连接")

# 主程序入口,执行简单搜索
if __name__ == "__main__":
    simple_search()

12.8 获取文本的向量嵌入 #

  • 向量模型
  • 创建嵌入请求
# 导入所需模块
import json
import os
import requests

# 获取API Token(从环境变量或默认字符串)
API_TOKEN = os.getenv("SILICONFLOW_TOKEN", "YOUR_TOKEN_HERE")
# 向量API地址
EMBED_URL = "https://api.siliconflow.cn/v1/embeddings"
# 向量模型名称
MODEL_NAME = "BAAI/bge-large-zh-v1.5"

# 获取文本对应的向量嵌入
def get_text_embedding(text: str) -> list[float]:
    # 组装请求体
    payload = {
        "model": MODEL_NAME,
        "input": text,
        "encoding_format": "float",
    }
    # 组装Header(携带API Token)
    headers = {
        "Authorization": f"Bearer {API_TOKEN}",
        "Content-Type": "application/json",
    }
    # 发送POST请求获取嵌入
    resp = requests.post(EMBED_URL, json=payload, headers=headers, timeout=30)
    # 检查响应状态
    resp.raise_for_status()
    # 返回第一个文本的embedding向量
    return resp.json()["data"][0]["embedding"]

# 主程序入口:获取"张三"的向量并保存
if __name__ == "__main__":
    embedding = get_text_embedding("张三")
    # 保存到embedding.json
    with open("embedding.json", "w", encoding="utf-8") as f:
        json.dump(embedding, f)
    print("向量嵌入已保存至 embedding.json")

12.9 保存数据到milvus #

# 导入必要库
import os
import requests
from pymilvus import connections, db, Collection, utility

# 获取API Token(从环境变量)
API_TOKEN = os.getenv("SILICONFLOW_TOKEN", "sk-wtplxfsozuyrjsmlclpxivoogwwlkfyzrgcgsrfqfpymwnwb")
# 嵌入API地址
EMBED_URL = "https://api.siliconflow.cn/v1/embeddings"
# 嵌入模型名称
MODEL_NAME = "BAAI/bge-large-zh-v1.5"

# 获取单条文本的向量嵌入
def get_text_embedding(text: str) -> list[float]:
    payload = {"model": MODEL_NAME, "input": text, "encoding_format": "float"}
    headers = {"Authorization": f"Bearer {API_TOKEN}", "Content-Type": "application/json"}
    resp = requests.post(EMBED_URL, json=payload, headers=headers, timeout=30)
    resp.raise_for_status()
    return resp.json()["data"][0]["embedding"]

# 示例:保存名字和对应向量到Milvus,并进行检索
def save_and_search_names(
    db_name: str = "blog",
    collection_name: str = "user",
    field_name: str = "name_vector",
) -> None:
    # 建立连接
    connections.connect("default", host="127.0.0.1", port="19530")
    try:
        # 切换数据库
        db.using_database(db_name)
        # 检查集合是否存在
        if not utility.has_collection(collection_name):
            print("集合不存在,无法执行数据操作")
            return

        # 获取集合对象
        collection = Collection(collection_name)

        # 查找集合里指定的向量字段
        vector_field = next(
            (field for field in collection.schema.fields if field.name == field_name),
            None,
        )
        if vector_field is None:
            raise ValueError(f"集合中不存在字段 {field_name}")

        # 获取字段定义的维度(如果有)
        expected_dim = vector_field.params.get("dim") if hasattr(vector_field, "params") else None

        # 示例名称
        names = ["中国", "伟大"]
        # 获取每个名字的嵌入向量
        embeddings = [get_text_embedding(name) for name in names]

        # 检查维度是否一致
        if expected_dim and len(embeddings[0]) != expected_dim:
            raise ValueError(
                f"集合 {collection_name} 的向量维度为 {expected_dim},"
                f"但当前模型输出维度为 {len(embeddings[0])},请保持一致。"
            )

        # 插入向量和名称
        insert_result = collection.insert([embeddings, names])
        # 将数据写入持久化
        collection.flush()
        # 加载集合到内存
        collection.load()
        print("数据插入成功:", insert_result.insert_count)

        # 查询"华夏"的向量嵌入
        query_vector = get_text_embedding("华夏")
        # 执行向量检索
        search_result = collection.search(
            data=[query_vector],
            anns_field=field_name,
            param={"metric_type": "COSINE", "params": {"nprobe": 8}},
            limit=5,
            output_fields=["name"],
        )
        # 打印搜索结果
        for hits in search_result:
            for hit in hits:
                print(f"匹配结果 name={hit.entity.get('name')}, score={hit.distance:.4f}")
    except Exception as exc:
        print("操作失败:", exc)
        raise
    finally:
        # 断开连接
        connections.disconnect("default")
        print("已关闭与 Milvus 服务器的连接")

# 主程序入口:保存并检索名字
if __name__ == "__main__":
    save_and_search_names()

12.10 图片向量 #

  • 多模态模型
  • apikey
# 导入所需模块
import base64
import requests

# 定义图片文件路径
image_path = "deepseek.png"
# 读取图片内容并进行base64编码
with open(image_path, "rb") as f:
    base64_bytes = base64.b64encode(f.read())
    base64_image = base64_bytes.decode('utf-8')

# 定义图片格式
image_format = "png"
# 拼接base64图片数据并加上MIME类型
image_data = f"data:image/{image_format};base64,{base64_image}"

# 定义API相关信息
api_url = 'https://dashscope.aliyuncs.com/api/v1/services/embeddings/multimodal-embedding/multimodal-embedding'
api_key = 'sk-dfa3b183ad004246947735edbfba7f2d'

# 调用多模态模型获取图片向量嵌入
def call_multi_modal_embedding():
    # 设置HTTP请求头
    headers = {
        "Authorization": f"Bearer {api_key}",
        "Content-Type": "application/json"
    }
    # 构建请求体,包含模型名称和图片内容
    payload = {
        "model": "multimodal-embedding-v1",
        "input": {
            "contents": [
                {
                    "image": image_data
                }
            ]
        }
    }
    try:
        # 发送POST请求
        response = requests.post(api_url, headers=headers, json=payload, timeout=30)
        if response.status_code == 200:
            # 解析并打印输出(格式化为JSON字符串)
            output = response.json().get("output", {})
            import json
            print(json.dumps(output, ensure_ascii=False, indent=4))
        else:
            # 若请求失败打印状态码和错误内容
            print("请求失败,状态码:", response.status_code)
            print("响应内容:", response.text)
    except Exception as e:
        # 捕获接口调用异常
        print("调用模型接口失败:", e)

# 主程序入口,调用多模态嵌入
if __name__ == "__main__":
    call_multi_modal_embedding()

13.APIKEY #

13.1 配置官方API Key #

  • 官方API Key申请
  • 官方可用模型
配置项 值
DeepSeek:baseUrl https://api.deepseek.com
Deepseek:Apikey sk-c50d5356a45c4c7988b3df82fb1295e3
DeepseekModel deepseek-chat 或者 deepseek-reasoner

13.2 配置本地部署的DeepSeek R1 #

  • DeepSeek R1 本地部署教程
  • Ollama 官网
  • Ollama 模型列表
配置项 值
DeepSeek:baseUrl http://localhost:11434
DeepseekModel deepseek-r1:1.5b

13.3 配置硅基流动的API Key #

  • 硅基流动的API Key申请
  • 硅基流动可用模型
配置项 值
DeepSeek:baseUrl https://api.siliconflow.cn
Deepseek:Apikey sk-kgahvlalrbfjyftxrcoiiliopeblhxsgrxtrrwgiqwwxwxth
DeepseekModel deepseek-ai/DeepSeek-V3或者deepseek-ai/DeepSeek-R1

13.4 配置派欧算力云的API Key #

  • 派欧算力云的API Key申请
  • 派欧算力云可用模型
配置项 值
DeepSeek:baseUrl https://api.ppinfra.com/v3/openai
Deepseek:Apikey sk_ASPYijn-VdsIbkEXtjCFNn0dEkvs6Sko6ZEA4XbWc
DeepseekModel deepseek/deepseek-v3/community或者deepseek/deepseek-r1/community

13.5 配置腾讯云的API Key #

  • 接口文档
  • 腾讯云的API Key申请
  • 创建兼容OpenAI的APIKEY
  • 腾讯云可用模型
配置项 值
DeepSeek:baseUrl https://api.lkeap.cloud.tencent.com/v1
Deepseek:Apikey sk-EsdQJe0M4IsohNh4k0zji0bq9fsDDMQ77dmtn2hOMocTJCfj
DeepseekModel deepseek-v3或者deepseek-r1

13.6 配置百度云的API Key #

  • 接口文档
  • 创建兼容OpenAI的APIKEY
配置项 值
DeepSeek:baseUrl https://qianfan.baidubce.com/v2
Deepseek:Apikey bce-v3/ALTAK-VrJzzPHo9HRCH6Lq88QK8/6e9b1f33909b5a43dac2be242351006d72b
DeepseekModel deepseek-v3或者deepseek-r1

13.7 配置阿里云的API Key #

  • 接口文档
  • 创建兼容OpenAI的APIKEY
  • 阿里云可用模型
配置项 值
DeepSeek:baseUrl https://dashscope.aliyuncs.com/compatible-mode/v1
Deepseek:Apikey sk-cc2054c29cf54fec92503bf7044cf383
DeepseekModel deepseek-v3或者deepseek-r1

13.8 配置字节跳动火山引擎的API Key #

  • 接口文档
  • 火山引擎文档
  • 创建兼容OpenAI的APIKEY
  • 火山引擎可用模型
配置项 值
DeepSeek:baseUrl https://ark.cn-beijing.volces.com/api/v3
Deepseek:Apikey d52e49a1-36ea-55bb-bc6e-65ce789a72f6
DeepseekModel deepseek-v3-241226或者deepseek-r1

13.9 配置科大讯飞的API Key #

  • 科大讯飞接口文档
  • 模型服务列表
配置项 值
DeepSeek:baseUrl https://maas-api.cn-huabei-1.xf-yun.com/v1
Deepseek:Apikey sk-U9gPYdkeXmPoJWM33A168717c07544fEc983fF83b884e3785
DeepseekModel xdeepseekv3

13.10 配置openrouter的API Key #

  • openrouter接口文档
  • 创建APIKEY
  • openrouter模型列表
配置项 值
DeepSeek:baseUrl https://openrouter.ai/api/v1
Deepseek:Apikey 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
DeepseekModel deepseek/deepseek-chat:free

14.参考 #

  • milvus
  • milvus文档
  • 度量类型
  • 余弦相似度
  • 归一化
  • milvus安装
  • milvus使用
  • milvus管理数据库
  • Level
  • Radius
  • range_filter
  • 分区
  • 查询节点ID
  • 数据段
  • 向量模型
  • 创建嵌入请求
  • 多模态模型
  • 多模态模型apikey
  • milvus安装文档

15.云服务器安装 #

wget https://gitee.com/milvus-io/milvus/raw/v2.6.4/deployments/docker/standalone/docker-compose.yml -O docker-compose.yml
docker compose up -d
docker run -p 8000:3000 -e MILVUS_URL=192.168.2.106:19530 zilliz/attu:v2.5

访问验证

请输入访问令牌

Token不正确,请重新输入