导航菜单

  • 1.vector
  • 2.milvus
  • 3.pymilvus
  • 4.rag
  • 5.rag_measure
  • 7.search
  • ragflow
  • heapq
  • HNSW
  • cosine_similarity
  • math
  • typing
  • etcd
  • minio
  • collections
  • jieba
  • random
  • beautifulsoup4
  • chromadb
  • sentence_transformers
  • numpy
  • lxml
  • openpyxl
  • PyMuPDF
  • python-docx
  • requests
  • python-pptx
  • text_splitter
  • all-MiniLM-L6-v2
  • openai
  • llm
  • BPETokenizer
  • Flask
  • RAGAS
  • BagofWords
  • langchain
  • Pydantic
  • abc
  • faiss
  • MMR
  • scikit-learn
  • 1. 什么是RAGAS?
    • 1.1 生活中的类比
    • 1.2 RAGAS的核心特点
    • 1.3 为什么需要RAGAS?
  • 2. RAGAS的核心评估指标
    • 2.1 检索器评估指标
    • 2.2 生成器评估指标
    • 2.3 指标之间的关系
  • 3. 用法示例
  • 4. 实际应用
    • 4.1.准备测试数据
    • 4.2. 准备RAG
    • 4.3. 生成答案
    • 4.4. 评测数据
  • 5. generate
    • 5.1 方法定义
      • 5.1.1 方法定义
      • 5.1.2 方法定义
      • 5.1.3 输入参数处理
      • 5.1.4 包装同步函数
      • 5.1.5 异步执行同步方法
      • 5.1.6 返回结果封装
    • 5.2 设计意图分析
      • 5.2.1. 异步兼容性
      • 5.2.2. ragas 框架兼容
      • 5.2.3. 输入灵活性
    • 5.3 使用
      • 5.3.1 基本使用
      • 5.3.2 在异步框架中使用
    • 5.4 方法特点总结
  • 6.总结
    • 6.1 核心要点
    • 6.2 适用场景

1. 什么是RAGAS? #

RAGAS(Retrieval-Augmented Generation Assessment)是一个专门用于评估RAG(检索增强生成)系统质量的Python框架。它使用大语言模型(LLM)作为"评判员",自动化地评估RAG系统的各个关键维度,无需大量人工标注。

1.1 生活中的类比 #

想象一下,你开发了一个智能客服系统,用户提问后,系统会:

  1. 从知识库中检索相关信息
  2. 根据检索到的信息生成答案

现在你想知道这个系统好不好,你会怎么评估?

传统方法:

  • 人工检查每个答案是否正确(耗时、成本高)
  • 需要准备大量"标准答案"(标注工作量大)
  • 不同评估者可能给出不同结果(主观性强)

RAGAS方法:

  • 使用AI自动评估(快速、成本低)
  • 不需要标准答案也能评估(无监督评估)
  • 评估结果一致性好(客观性强)

1.2 RAGAS的核心特点 #

RAGAS具有以下核心特点:

  1. 无参考评估:大部分指标不需要人工标注的"标准答案",可以直接评估
  2. 多维度评估:从检索、生成等多个角度全面评估RAG系统
  3. 自动化:使用LLM自动生成评估和分数,无需人工干预
  4. 易用性:简单的API,几行代码就能完成评估

1.3 为什么需要RAGAS? #

在RAG系统开发中,评估是至关重要的环节:

  • 发现问题:快速识别系统在哪些方面表现不佳
  • 优化方向:明确应该改进检索器还是生成器
  • 性能对比:比较不同配置或模型的效果
  • 持续监控:跟踪系统在生产环境中的表现

RAGAS让这些评估工作变得简单、快速、自动化。

2. RAGAS的核心评估指标 #

RAGAS将RAG评估分为两个主要层次:检索器评估和生成器评估。

2.1 检索器评估指标 #

检索器负责从知识库中查找相关信息,评估指标包括:

Context Precision(上下文精确率):

  • 作用:评估检索到的文档中,有多少是真正相关的,以及相关文档是否排在前面
  • 类比:就像搜索引擎,不仅要有相关结果,还要把最相关的结果排在前面

Context Recall(上下文召回率):

  • 作用:评估检索到的文档是否包含了回答问题所需的所有关键信息
  • 类比:就像考试复习,不仅要复习到重点,还要确保没有遗漏重要内容

2.2 生成器评估指标 #

生成器负责基于检索到的文档生成答案,评估指标包括:

Faithfulness(忠实性):

  • 作用:评估生成的答案是否忠实于检索到的文档,有没有编造内容
  • 类比:就像写论文,引用的内容必须真实,不能自己编造

Answer Relevance(答案相关性):

  • 作用:评估生成的答案是否直接回答了问题,有没有跑题
  • 类比:就像考试答题,要直接回答题目,不能答非所问

2.3 指标之间的关系 #

这四个指标从不同角度评估RAG系统:

检索器评估 → Context Precision + Context Recall
     ↓
生成器评估 → Faithfulness + Answer Relevance
     ↓
整体系统质量

理想情况:所有指标都应该接近1.0(满分)

3. 用法示例 #

# 导入os模块,用于环境变量设置
import os
# 导入HuggingFace datasets库的Dataset类
from datasets import Dataset
# 导入ragas库里的评估接口
from ragas import evaluate
# 导入ragas内置的评估指标
from ragas.metrics import (
    faithfulness,         # 忠实度指标
    answer_relevancy,     # 答案相关性指标
    context_precision,    # 上下文精准率指标
    context_recall,       # 上下文召回率指标
)
# 导入langchain_openai库,用于ChatOpenAI和OpenAIEmbeddings
from langchain_openai import ChatOpenAI, OpenAIEmbeddings

# 构建用于评估的数据集字典
dataset_dict = {
    # 输入问题
    "question": [
        "爱因斯坦的主要贡献是什么?",
    ],
    # RAG系统输出的答案
    "answer": [
        "爱因斯坦提出了狭义相对论和广义相对论,并发现了光电效应。",
    ],
    # 检索到的上下文(多条文本组成的列表)
    "contexts": [
        [
            "阿尔伯特·爱因斯坦在1905年提出狭义相对论,1915年提出广义相对论。",
            "他还因发现光电效应定律而获得1921年诺贝尔物理学奖。",
            "相对论彻底改变了人们对时空的理解。"
        ]
    ],
    # 人工制定的标准答案(用于 context recall 等)
    "ground_truth": [
        "爱因斯坦的主要贡献包括狭义相对论、广义相对论和光电效应理论。"
    ]
}

# 将数据字典转换为 HuggingFace Dataset 对象
dataset = Dataset.from_dict(dataset_dict)

# 打印数据集准备完成信息及数据条数
print(f"数据集准备完成,包含 {len(dataset)} 条记录\n")

# 打印初始化模型信息
print("初始化LLM和Embedding模型...")

# 初始化大语言模型(LLM)
llm = ChatOpenAI(model="gpt-4o")
# 初始化Embedding模型
embeddings = OpenAIEmbeddings()

# 打印开始评估信息
print("开始评估...")

# 调用ragas的evaluate接口进行评测
result = evaluate(
    dataset=dataset,
    llm=llm,
    embeddings=embeddings,
    metrics=[
        context_precision,    # 上下文精准率
        context_recall,       # 上下文召回率
        faithfulness,         # 忠实度
        answer_relevancy,     # 答案相关性
    ],
)

# 将评估结果转换为pandas DataFrame格式
df = result.to_pandas()
# 打印各评估指标的均值
print("各项指标平均分:")
print(f"Context Precision: {df['context_precision'].mean():.4f}")
print(f"Context Recall: {df['context_recall'].mean():.4f}")
print(f"Faithfulness: {df['faithfulness'].mean():.4f}")
print(f"Answer Relevancy: {df['answer_relevancy'].mean():.4f}")

4. 实际应用 #

4.1.准备测试数据 #

# 导入sentence-transformers库,用于生成文本向量
from sentence_transformers import SentenceTransformer
# 导入ChromaDB客户端
import chromadb
import sys
# 定义本地Embedding(文本向量)封装类,用于生成文本向量
class LocalEmbedding:
    # 初始化方法,加载指定的句向量模型,默认为all-MiniLM-L6-v2
    def __init__(self, model_name="all-MiniLM-L6-v2"):
        # 打印模型加载消息
        print(f"[Embedding] 加载模型: {model_name}")
        # 加载HuggingFace SentenceTransformer模型,并强制在cpu上运行
        self.model = SentenceTransformer(model_name, device="cpu")
        # 加载完成
        print(f"[Embedding] 模型加载完成")

    # 生成一个文本的向量
    def embed_query(self, text):
        embedding = self.model.encode(text, normalize_embeddings=True)
        return embedding.tolist()

    # 批量生成多个文本的向量
    def embed_documents(self, texts):
        embeddings = self.model.encode(texts, normalize_embeddings=True)
        return embeddings.tolist()

    # 获取embedding对象本身(兼容接口设计)
    def get(self):
        return self
# 定义插入测试数据到Chroma数据库的函数
def insert_data():
    # 打印初始化日志
    print("[DB] 初始化Chroma数据库 product_db")
    try:
        # 创建ChromaDB客户端与集合
        chroma_client = chromadb.PersistentClient(path="./chroma_db")
        collection = chroma_client.get_or_create_collection(name="product_db")
        # 加载嵌入模型
        embedding_model = LocalEmbedding()

        # 定义用于插入的测试数据,每条含文本内容和元数据
        test_docs = [
            {
                "content": "如需申请手机保修服务,请携带购机发票和保修卡前往品牌授权售后服务中心,工程师检测后符合保修政策即可免费维修。",
                "meta": {"category": "手机保修"},
            },
            {
                "content": "笔记本电脑电池出现鼓包属于安全隐患,请立即停止使用并联系品牌售后服务中心进行更换,切勿自行拆卸或继续充电。",
                "meta": {"category": "电脑电池"},
            },
            {
                "content": "电子产品自购买之日起享受7天无理由退货,15天内可换货,一年内享受免费保修服务,具体以品牌政策为准。",
                "meta": {"category": "售后政策"},
            },
            {
                "content": "如遇产品无法开机、屏幕碎裂等问题,请及时联系官方售后或客服,部分问题可能不在保修范围内。",
                "meta": {"category": "常见问题"},
            },
        ]

        # 准备所有文本和相关元数据
        texts = [doc["content"] for doc in test_docs]
        metadatas = [doc["meta"] for doc in test_docs]
        ids = [f"test_doc_{i}" for i in range(len(test_docs))]
        # 生成文本向量
        embeddings = embedding_model.embed_documents(texts)
        # 遍历输出每条数据插入日志
        for i, doc in enumerate(test_docs):
            print(f"[DB] 插入测试数据 {i+1}: {doc['meta']['category']}")

        # 批量插入数据到ChromaDB
        collection.add(
            embeddings=embeddings,
            documents=texts,
            metadatas=metadatas,
            ids=ids,
        )
        # 插入成功打印信息
        print("[DB] 测试数据已插入 product_db。")
        return True
    except Exception as e:
        # 插入失败时打印错误信息
        print(f"[DB] 插入测试数据错误: {str(e)}")
        return False
# 定义用于检查数据库是否为空的方法
def check_database(db_name="product_db"):
    # 打印检查日志
    print(f"[DB] 检查数据库: {db_name}")
    try:
        # 连接ChromaDB客户端并获取集合
        chroma_client = chromadb.PersistentClient(path="./chroma_db")
        collection = chroma_client.get_or_create_collection(name=db_name)
        # 获取集合文档数量
        count = collection.count()
        # 打印数量
        print(f"[DB] 数据库中有 {count} 条文档")
        # 有文档返回True,无则False
        return count > 0
    except Exception as e:
        # 检查有误,打印错误信息并返回False
        print(f"[DB] 检查数据库错误: {str(e)}")
        return False

# 检查数据库,如果为空则插入测试数据
if not check_database():
    print("[MAIN] 数据库为空,开始插入测试数据...")
    if not insert_data():
        print("[MAIN] 插入测试数据失败,程序退出")
        sys.exit(1)
    print("[MAIN] 测试数据插入完成")

4.2. 准备RAG #

# 导入sentence-transformers库,用于生成文本向量
from sentence_transformers import SentenceTransformer
# 导入ChromaDB客户端
import chromadb
+# 导入异常处理相关模块、异步处理模块以及类型标注等
import sys
# 定义本地Embedding(文本向量)封装类,用于生成文本向量
class LocalEmbedding:
    # 初始化方法,加载指定的句向量模型,默认为all-MiniLM-L6-v2
    def __init__(self, model_name="all-MiniLM-L6-v2"):
        # 打印模型加载消息
        print(f"[Embedding] 加载模型: {model_name}")
        # 加载HuggingFace SentenceTransformer模型,并强制在cpu上运行
        self.model = SentenceTransformer(model_name, device="cpu")
        # 加载完成
        print(f"[Embedding] 模型加载完成")

    # 生成一个文本的向量
    def embed_query(self, text):
        embedding = self.model.encode(text, normalize_embeddings=True)
        return embedding.tolist()

    # 批量生成多个文本的向量
    def embed_documents(self, texts):
        embeddings = self.model.encode(texts, normalize_embeddings=True)
        return embeddings.tolist()

    # 获取embedding对象本身(兼容接口设计)
    def get(self):
        return self
# 定义插入测试数据到Chroma数据库的函数
def insert_data():
    # 打印初始化日志
    print("[DB] 初始化Chroma数据库 product_db")
    try:
        # 创建ChromaDB客户端与集合
        chroma_client = chromadb.PersistentClient(path="./chroma_db")
        collection = chroma_client.get_or_create_collection(name="product_db")
        # 加载嵌入模型
        embedding_model = LocalEmbedding()

        # 定义用于插入的测试数据,每条含文本内容和元数据
        test_docs = [
            {
                "content": "如需申请手机保修服务,请携带购机发票和保修卡前往品牌授权售后服务中心,工程师检测后符合保修政策即可免费维修。",
                "meta": {"category": "手机保修"},
            },
            {
                "content": "笔记本电脑电池出现鼓包属于安全隐患,请立即停止使用并联系品牌售后服务中心进行更换,切勿自行拆卸或继续充电。",
                "meta": {"category": "电脑电池"},
            },
            {
                "content": "电子产品自购买之日起享受7天无理由退货,15天内可换货,一年内享受免费保修服务,具体以品牌政策为准。",
                "meta": {"category": "售后政策"},
            },
            {
                "content": "如遇产品无法开机、屏幕碎裂等问题,请及时联系官方售后或客服,部分问题可能不在保修范围内。",
                "meta": {"category": "常见问题"},
            },
        ]

        # 准备所有文本和相关元数据
        texts = [doc["content"] for doc in test_docs]
        metadatas = [doc["meta"] for doc in test_docs]
        ids = [f"test_doc_{i}" for i in range(len(test_docs))]
        # 生成文本向量
        embeddings = embedding_model.embed_documents(texts)
        # 遍历输出每条数据插入日志
        for i, doc in enumerate(test_docs):
            print(f"[DB] 插入测试数据 {i+1}: {doc['meta']['category']}")

        # 批量插入数据到ChromaDB
        collection.add(
            embeddings=embeddings,
            documents=texts,
            metadatas=metadatas,
            ids=ids,
        )
        # 插入成功打印信息
        print("[DB] 测试数据已插入 product_db。")
        return True
    except Exception as e:
        # 插入失败时打印错误信息
        print(f"[DB] 插入测试数据错误: {str(e)}")
        return False
# 定义用于检查数据库是否为空的方法
def check_database(db_name="product_db"):
    # 打印检查日志
    print(f"[DB] 检查数据库: {db_name}")
    try:
        # 连接ChromaDB客户端并获取集合
        chroma_client = chromadb.PersistentClient(path="./chroma_db")
        collection = chroma_client.get_or_create_collection(name=db_name)
        # 获取集合文档数量
        count = collection.count()
        # 打印数量
        print(f"[DB] 数据库中有 {count} 条文档")
        # 有文档返回True,无则False
        return count > 0
    except Exception as e:
        # 检查有误,打印错误信息并返回False
        print(f"[DB] 检查数据库错误: {str(e)}")
        return False

+# 定义RAG(检索增强生成)主流程封装类
+class Rag:
+   pass
+
# 检查数据库,如果为空则插入测试数据
if not check_database():
    print("[MAIN] 数据库为空,开始插入测试数据...")
    if not insert_data():
        print("[MAIN] 插入测试数据失败,程序退出")
        sys.exit(1)
    print("[MAIN] 测试数据插入完成")
+
+# 准备要测试的问题列表
+questions = [
+   "如何申请手机保修服务?",
+   "笔记本电脑电池鼓包怎么办?",
+]
+# 每个问题的标准答案
+ground_truths = [
+   "您可携带购机发票和保修卡前往品牌授权售后服务中心,工程师检测后符合保修政策即可免费维修。",
+   "如发现笔记本电脑电池鼓包,请立即停止使用并联系品牌售后服务中心进行更换,切勿自行拆卸或继续充电。",
+]    
+rag = Rag()
+# 初始化答案和上下文结果存储列表
+answers, contexts = [], []

4.3. 生成答案 #

# 导入sentence-transformers库,用于生成文本向量
from sentence_transformers import SentenceTransformer
# 导入ChromaDB客户端
import chromadb
# 导入异常处理相关模块、异步处理模块以及类型标注等
import sys
+# 使用OpenAI官方API进行大模型调用
+from openai import OpenAI
+# 导入异步处理模块
+import asyncio
+# 导入functools库中的partial函数,用于函数偏应用
+from functools import partial
+
+# 定义本地Embedding封装类(用于生成文本向量)
class LocalEmbedding:
+   # 构造函数,初始化并加载指定的句向量模型(默认为all-MiniLM-L6-v2)
    def __init__(self, model_name="all-MiniLM-L6-v2"):
+       # 打印模型加载提示信息
        print(f"[Embedding] 加载模型: {model_name}")
+       # 加载SentenceTransformer模型,强制使用CPU
        self.model = SentenceTransformer(model_name, device="cpu")
+       # 打印模型加载完成提示信息
        print(f"[Embedding] 模型加载完成")

+   # 单条文本转向量
    def embed_query(self, text):
+       # 对单条文本进行编码,生成归一化后的向量
        embedding = self.model.encode(text, normalize_embeddings=True)
+       # 将numpy向量转为普通list返回
        return embedding.tolist()

+   # 批量文本转向量
    def embed_documents(self, texts):
+       # 对多条文本进行编码,生成归一化后的向量
        embeddings = self.model.encode(texts, normalize_embeddings=True)
+       # 将numpy向量组转为普通list返回
        return embeddings.tolist()

+   # 获取本embedding对象自身
    def get(self):
+       # 返回自身
        return self
# 定义插入测试数据到Chroma数据库的函数
def insert_data():
    # 打印初始化日志
    print("[DB] 初始化Chroma数据库 product_db")
    try:
        # 创建ChromaDB客户端与集合
        chroma_client = chromadb.PersistentClient(path="./chroma_db")
        collection = chroma_client.get_or_create_collection(name="product_db")
        # 加载嵌入模型
        embedding_model = LocalEmbedding()

        # 定义用于插入的测试数据,每条含文本内容和元数据
        test_docs = [
            {
                "content": "如需申请手机保修服务,请携带购机发票和保修卡前往品牌授权售后服务中心,工程师检测后符合保修政策即可免费维修。",
                "meta": {"category": "手机保修"},
            },
            {
                "content": "笔记本电脑电池出现鼓包属于安全隐患,请立即停止使用并联系品牌售后服务中心进行更换,切勿自行拆卸或继续充电。",
                "meta": {"category": "电脑电池"},
            },
            {
                "content": "电子产品自购买之日起享受7天无理由退货,15天内可换货,一年内享受免费保修服务,具体以品牌政策为准。",
                "meta": {"category": "售后政策"},
            },
            {
                "content": "如遇产品无法开机、屏幕碎裂等问题,请及时联系官方售后或客服,部分问题可能不在保修范围内。",
                "meta": {"category": "常见问题"},
            },
        ]

        # 准备所有文本和相关元数据
        texts = [doc["content"] for doc in test_docs]
        metadatas = [doc["meta"] for doc in test_docs]
        ids = [f"test_doc_{i}" for i in range(len(test_docs))]
        # 生成文本向量
        embeddings = embedding_model.embed_documents(texts)
        # 遍历输出每条数据插入日志
        for i, doc in enumerate(test_docs):
            print(f"[DB] 插入测试数据 {i+1}: {doc['meta']['category']}")

        # 批量插入数据到ChromaDB
        collection.add(
            embeddings=embeddings,
            documents=texts,
            metadatas=metadatas,
            ids=ids,
        )
        # 插入成功打印信息
        print("[DB] 测试数据已插入 product_db。")
        return True
    except Exception as e:
        # 插入失败时打印错误信息
        print(f"[DB] 插入测试数据错误: {str(e)}")
        return False
# 定义用于检查数据库是否为空的方法
def check_database(db_name="product_db"):
    # 打印检查日志
    print(f"[DB] 检查数据库: {db_name}")
    try:
        # 连接ChromaDB客户端并获取集合
        chroma_client = chromadb.PersistentClient(path="./chroma_db")
        collection = chroma_client.get_or_create_collection(name=db_name)
        # 获取集合文档数量
        count = collection.count()
        # 打印数量
        print(f"[DB] 数据库中有 {count} 条文档")
        # 有文档返回True,无则False
        return count > 0
    except Exception as e:
        # 检查有误,打印错误信息并返回False
        print(f"[DB] 检查数据库错误: {str(e)}")
        return False
+# 定义本地LLM(大语言模型)推理类,封装与OpenAI云服务的交互逻辑
+class LLM:
+   # 初始化方法,设置所调用的OpenAI模型名,默认为gpt-4o
+   def __init__(
+       self,
+       model="gpt-4o",
+   ):
+       # 创建OpenAI API的客户端实例
+       self.client = OpenAI()
+       # 保存模型名称
+       self.model = model
+
+   # 定义同步的推理方法,向模型发送prompt并获取文本结果
+   def invoke(self, prompt, stop=None, **kwargs):
+       # 打印推理请求的前50个字符
+       print(f"[LLM] 推理请求: {prompt[:50]}...")
+       try:
+           # 使用OpenAI的ChatCompletion API发起推理请求
+           response = self.client.chat.completions.create(
+               model=self.model, # 模型名称
+               messages=[{"role": "user", "content": prompt}], # 消息列表,包含角色和内容
+               temperature=kwargs.get("temperature", 0.1), # 温度,控制生成文本的随机性
+               top_p=kwargs.get("top_p", 0.9), # 核采样,控制生成文本的多样性
+               max_tokens=kwargs.get("max_tokens", 4096), # 最大令牌数,控制生成文本的长度
+               stop=stop, # 停止条件
+               stream=kwargs.get("stream", False), # 是否流式输出
+           )
+           # 获取生成的文本内容
+           generated_text = response.choices[0].message.content
+           # 打印生成结果的前50个字符
+           print(f"[LLM] 推理结果: {generated_text[:50]}...")
+           # 返回生成文本
+           return generated_text
+       except Exception as e:
+           # 出现异常时打印错误信息并抛出异常
+           print(f"[LLM] 错误: {str(e)}")
+           raise
+
+   # 异步生成方法,用于ragas异步接口的兼容
+   async def generate(self, prompt, stop=None, **kwargs):
+       # 检查prompt参数类型,如果为列表则取第一个元素
+       if isinstance(prompt, list):
+           prompt_str = prompt[0] if prompt else ""
+       else:
+           prompt_str = str(prompt)
+       # 用partial将同步invoke函数包装,以便run_in_executor能调用
+       invoke_func = partial(self.invoke, prompt_str, stop=stop, **kwargs)
+       # 获取当前事件循环
+       loop = asyncio.get_event_loop()
+       # 在线程池中异步运行同步方法
+       result = await loop.run_in_executor(None, invoke_func)

+       # ragas需要的返回对象,具有generations属性
+       class Generation:
+           # 初始化,text为生成文本
+           def __init__(self, text):
+               self.text = str(text)
+
+       class GenerationResult:
+           # 初始化,生成格式为[[Generation对象]]
+           def __init__(self, text):
+               self.generations = [[Generation(text)]]
+
+       # 返回封装好的生成结果对象
+       return GenerationResult(result)
# 定义RAG(检索增强生成)主流程封装类
class Rag:
+   # 初始化方法,设置数据库名、嵌入模型、LLM、prompt模板及chroma客户端
+   def __init__(
+       self,
+       db_name="product_db", # 数据库名
+       embedding_model=None, # 嵌入模型    
+       llm=None, # LLM模型
+       prompt_template=None, # 提示词模板
+       chroma_client=None, # ChromaDB客户端
+   ): # 初始化ChromaDB客户端
+       # 打印初始化日志
+       print(f"[RAG] 初始化,数据库名: {db_name}")
+       # 如果未传入嵌入模型则用默认
+       self.embedding_model = embedding_model or LocalEmbedding()
+       # 如果未传入LLM则用默认
+       self.llm = llm or LLM()
+       # 如果未传入ChromaDB客户端则创建本地持久化客户端
+       self.chroma_client = chroma_client or chromadb.PersistentClient(
+           path="./chroma_db"
+       )
+       # 获取或创建ChromaDB的collection
+       self.collection = self.chroma_client.get_or_create_collection(
+           name=db_name,
+       )
+       # 设置提示词模板,默认为售后场景专用模板
+       self.prompt_template = prompt_template or (
+           "你是电子产品售后服务助手,熟悉手机、电脑等产品的保修、维修、退换货等政策。"
+           "请根据提供的上下文信息context,专业、简明地回答用户的售后相关问题。"
+           "如果上下文没有相关信息,请回答[请联系品牌官方售后或客服]。\n"
+           '问题:{question}\n上下文:"{context}"\n答复:'
+       )
+   # 构建prompt的方法,将输入问题和上下文填入模板
+   def build_prompt(self, question, context):
+       prompt = self.prompt_template.replace("{question}", question).replace(
+           "{context}", context
+       )
+       return prompt    
+   # 检索与问题相关的上下文,并返回上下文字符串及上下文列表
+   def retrieve_context(self, query, top_k=3):
+       # 打印检索日志
+       print(f"[RAG] 检索上下文: {query}")
+       try:
+           # 将查询转换为向量
+           query_embedding = self.embedding_model.embed_query(query)
+           # 使用ChromaDB的query方法查找最相似的top_k个文档
+           results = self.collection.query(
+               query_embeddings=[query_embedding],
+               n_results=top_k
+           )
+           # 提取文档内容
+           context_list = []
+           if results['documents'] and len(results['documents']) > 0:
+               context_list = results['documents'][0]
+           # 打印检索到的上下文数量
+           print(f"[RAG] 检索到{len(context_list)}条上下文")
+           # 返回拼接的上下文字符串和原始上下文列表
+           return "\n".join(context_list), context_list
+       except Exception as e:
+           # 若检索出错,返回空
+           print(f"[RAG] 检索错误: {str(e)}")
+           return "", []
+    # 生成答案的主方法
+   def answer(self, question, top_k=3):
+       # 检索相关上下文
+       context_str, context_list = self.retrieve_context(question, top_k)
+       # 未检索到上下文时返回提示
+       if not context_list:
+           return "抱歉,未找到相关信息,请联系品牌官方售后或客服。", []
+       # 构建Prompt
+       prompt = self.build_prompt(question, context_str)
+       # 打印prompt的前80字符
+       print(f"[RAG] 构建的prompt: {prompt[:80]}...")
+       try:
+           # 使用LLM生成答案
+           response = self.llm.invoke(prompt, stream=False)
+           # 打印部分生成答案
+           print(f"[RAG] 生成答案: {response[:50]}...")
+           # 返回答案和上下文
+           return response, context_list
+       except Exception as e:
+           # 若生成出错,返回默认答案
+           print(f"[RAG] 生成答案错误: {str(e)}")
+           return "抱歉,生成答案时出现错误,请联系品牌官方售后或客服。", context_list

# 检查数据库,如果为空则插入测试数据
if not check_database():
    print("[MAIN] 数据库为空,开始插入测试数据...")
    if not insert_data():
        print("[MAIN] 插入测试数据失败,程序退出")
        sys.exit(1)
    print("[MAIN] 测试数据插入完成")

# 准备要测试的问题列表
questions = [
    "如何申请手机保修服务?",
    "笔记本电脑电池鼓包怎么办?",
]
# 每个问题的标准答案
ground_truths = [
    "您可携带购机发票和保修卡前往品牌授权售后服务中心,工程师检测后符合保修政策即可免费维修。",
    "如发现笔记本电脑电池鼓包,请立即停止使用并联系品牌售后服务中心进行更换,切勿自行拆卸或继续充电。",
]    
rag = Rag()
# 初始化答案和上下文结果存储列表
answers, contexts = [], []
+
+# 对每个问题分别进行推理
+print("\n[MAIN] 开始处理问题并生成答案...")
+for i, q in enumerate(questions, 1):
+   print(f"\n[MAIN] 处理问题 {i}/{len(questions)}: {q}")
+   try:
+       # 调用Rag对象生成答案,检索最相关的3条上下文
+       answer, context_list = rag.answer(q, top_k=3)
+       # 存答案和上下文
+       answers.append(answer)
+       contexts.append(context_list)
+       print(f"[MAIN] 答案生成完成,上下文数量: {len(context_list)}")
+   except Exception as e:
+       # 遇到错误时,存预设信息
+       print(f"[MAIN] 处理问题失败: {str(e)}")
+       answers.append("处理失败")
+       contexts.append([])

4.4. 评测数据 #

# 导入sentence-transformers库,用于生成文本向量
from sentence_transformers import SentenceTransformer
# 导入ChromaDB客户端
import chromadb
# 导入异常处理相关模块、异步处理模块以及类型标注等
import sys
# 使用OpenAI官方API进行大模型调用
from openai import OpenAI
# 导入异步处理模块
import asyncio
# 导入functools库中的partial函数,用于函数偏应用
from functools import partial
+# 导入HuggingFace的Dataset类,用于数据集结构化
+from datasets import Dataset
+# 导入ragas评测主方法和运行配置
+from ragas import evaluate, RunConfig
+# 导入ragas评测指标
+from ragas.metrics import (
+   faithfulness,          # 忠实度:衡量生成答案与上下文的事实一致性
+   answer_relevancy,      # 答案相关性:衡量答案对问题的直接相关程度
+   context_recall,        # 上下文召回率:衡量检索到的上下文包含多少相关信息
+   context_precision,     # 上下文精确率:衡量检索到的相关文档是否排在前面
+)
# 定义本地Embedding封装类(用于生成文本向量)
class LocalEmbedding:
    # 构造函数,初始化并加载指定的句向量模型(默认为all-MiniLM-L6-v2)
    def __init__(self, model_name="all-MiniLM-L6-v2"):
        # 打印模型加载提示信息
        print(f"[Embedding] 加载模型: {model_name}")
        # 加载SentenceTransformer模型,强制使用CPU
        self.model = SentenceTransformer(model_name, device="cpu")
        # 打印模型加载完成提示信息
        print(f"[Embedding] 模型加载完成")

    # 单条文本转向量
    def embed_query(self, text):
        # 对单条文本进行编码,生成归一化后的向量
        embedding = self.model.encode(text, normalize_embeddings=True)
        # 将numpy向量转为普通list返回
        return embedding.tolist()

    # 批量文本转向量
    def embed_documents(self, texts):
        # 对多条文本进行编码,生成归一化后的向量
        embeddings = self.model.encode(texts, normalize_embeddings=True)
        # 将numpy向量组转为普通list返回
        return embeddings.tolist()

    # 获取本embedding对象自身
    def get(self):
        # 返回自身
        return self
# 定义插入测试数据到Chroma数据库的函数
def insert_data():
    # 打印初始化日志
    print("[DB] 初始化Chroma数据库 product_db")
    try:
        # 创建ChromaDB客户端与集合
        chroma_client = chromadb.PersistentClient(path="./chroma_db")
        collection = chroma_client.get_or_create_collection(name="product_db")
        # 加载嵌入模型
        embedding_model = LocalEmbedding()

        # 定义用于插入的测试数据,每条含文本内容和元数据
        test_docs = [
            {
                "content": "如需申请手机保修服务,请携带购机发票和保修卡前往品牌授权售后服务中心,工程师检测后符合保修政策即可免费维修。",
                "meta": {"category": "手机保修"},
            },
            {
                "content": "笔记本电脑电池出现鼓包属于安全隐患,请立即停止使用并联系品牌售后服务中心进行更换,切勿自行拆卸或继续充电。",
                "meta": {"category": "电脑电池"},
            },
            {
                "content": "电子产品自购买之日起享受7天无理由退货,15天内可换货,一年内享受免费保修服务,具体以品牌政策为准。",
                "meta": {"category": "售后政策"},
            },
            {
                "content": "如遇产品无法开机、屏幕碎裂等问题,请及时联系官方售后或客服,部分问题可能不在保修范围内。",
                "meta": {"category": "常见问题"},
            },
        ]

        # 准备所有文本和相关元数据
        texts = [doc["content"] for doc in test_docs]
        metadatas = [doc["meta"] for doc in test_docs]
        ids = [f"test_doc_{i}" for i in range(len(test_docs))]
        # 生成文本向量
        embeddings = embedding_model.embed_documents(texts)
        # 遍历输出每条数据插入日志
        for i, doc in enumerate(test_docs):
            print(f"[DB] 插入测试数据 {i+1}: {doc['meta']['category']}")

        # 批量插入数据到ChromaDB
        collection.add(
            embeddings=embeddings,
            documents=texts,
            metadatas=metadatas,
            ids=ids,
        )
        # 插入成功打印信息
        print("[DB] 测试数据已插入 product_db。")
        return True
    except Exception as e:
        # 插入失败时打印错误信息
        print(f"[DB] 插入测试数据错误: {str(e)}")
        return False
# 定义用于检查数据库是否为空的方法
def check_database(db_name="product_db"):
    # 打印检查日志
    print(f"[DB] 检查数据库: {db_name}")
    try:
        # 连接ChromaDB客户端并获取集合
        chroma_client = chromadb.PersistentClient(path="./chroma_db")
        collection = chroma_client.get_or_create_collection(name=db_name)
        # 获取集合文档数量
        count = collection.count()
        # 打印数量
        print(f"[DB] 数据库中有 {count} 条文档")
        # 有文档返回True,无则False
        return count > 0
    except Exception as e:
        # 检查有误,打印错误信息并返回False
        print(f"[DB] 检查数据库错误: {str(e)}")
        return False
# 定义本地LLM(大语言模型)推理类,封装与OpenAI云服务的交互逻辑
class LLM:
    # 初始化方法,设置所调用的OpenAI模型名,默认为gpt-4o
    def __init__(
        self,
        model="gpt-4o",
    ):
        # 创建OpenAI API的客户端实例
        self.client = OpenAI()
        # 保存模型名称
        self.model = model

    # 定义同步的推理方法,向模型发送prompt并获取文本结果
    def invoke(self, prompt, stop=None, **kwargs):
        # 打印推理请求的前50个字符
        print(f"[LLM] 推理请求: {prompt[:50]}...")
        try:
            # 使用OpenAI的ChatCompletion API发起推理请求
            response = self.client.chat.completions.create(
                model=self.model, # 模型名称
                messages=[{"role": "user", "content": prompt}], # 消息列表,包含角色和内容
                temperature=kwargs.get("temperature", 0.1), # 温度,控制生成文本的随机性
                top_p=kwargs.get("top_p", 0.9), # 核采样,控制生成文本的多样性
                max_tokens=kwargs.get("max_tokens", 4096), # 最大令牌数,控制生成文本的长度
                stop=stop, # 停止条件
                stream=kwargs.get("stream", False), # 是否流式输出
            )
            # 获取生成的文本内容
            generated_text = response.choices[0].message.content
            # 打印生成结果的前50个字符
            print(f"[LLM] 推理结果: {generated_text[:50]}...")
            # 返回生成文本
            return generated_text
        except Exception as e:
            # 出现异常时打印错误信息并抛出异常
            print(f"[LLM] 错误: {str(e)}")
            raise

    # 异步生成方法,用于ragas异步接口的兼容
    async def generate(self, prompt, stop=None, **kwargs):
        # 检查prompt参数类型,如果为列表则取第一个元素
        if isinstance(prompt, list):
            prompt_str = prompt[0] if prompt else ""
        else:
            prompt_str = str(prompt)
        # 用partial将同步invoke函数包装,以便run_in_executor能调用
        invoke_func = partial(self.invoke, prompt_str, stop=stop, **kwargs)
        # 获取当前事件循环
        loop = asyncio.get_event_loop()
        # 在线程池中异步运行同步方法
        result = await loop.run_in_executor(None, invoke_func)

        # ragas需要的返回对象,具有generations属性
        class Generation:
            # 初始化,text为生成文本
            def __init__(self, text):
                self.text = str(text)

        class GenerationResult:
            # 初始化,生成格式为[[Generation对象]]
            def __init__(self, text):
                self.generations = [[Generation(text)]]

        # 返回封装好的生成结果对象
        return GenerationResult(result)
# 定义RAG(检索增强生成)主流程封装类
class Rag:
    # 初始化方法,设置数据库名、嵌入模型、LLM、prompt模板及chroma客户端
    def __init__(
        self,
        db_name="product_db", # 数据库名
        embedding_model=None, # 嵌入模型    
        llm=None, # LLM模型
        prompt_template=None, # 提示词模板
        chroma_client=None, # ChromaDB客户端
    ): # 初始化ChromaDB客户端
        # 打印初始化日志
        print(f"[RAG] 初始化,数据库名: {db_name}")
        # 如果未传入嵌入模型则用默认
        self.embedding_model = embedding_model or LocalEmbedding()
        # 如果未传入LLM则用默认
        self.llm = llm or LLM()
        # 如果未传入ChromaDB客户端则创建本地持久化客户端
        self.chroma_client = chroma_client or chromadb.PersistentClient(
            path="./chroma_db"
        )
        # 获取或创建ChromaDB的collection
        self.collection = self.chroma_client.get_or_create_collection(
            name=db_name,
        )
        # 设置提示词模板,默认为售后场景专用模板
        self.prompt_template = prompt_template or (
            "你是电子产品售后服务助手,熟悉手机、电脑等产品的保修、维修、退换货等政策。"
            "请根据提供的上下文信息context,专业、简明地回答用户的售后相关问题。"
            "如果上下文没有相关信息,请回答[请联系品牌官方售后或客服]。\n"
            '问题:{question}\n上下文:"{context}"\n答复:'
        )
    # 构建prompt的方法,将输入问题和上下文填入模板
    def build_prompt(self, question, context):
        prompt = self.prompt_template.replace("{question}", question).replace(
            "{context}", context
        )
        return prompt    
    # 检索与问题相关的上下文,并返回上下文字符串及上下文列表
    def retrieve_context(self, query, top_k=3):
        # 打印检索日志
        print(f"[RAG] 检索上下文: {query}")
        try:
            # 将查询转换为向量
            query_embedding = self.embedding_model.embed_query(query)
            # 使用ChromaDB的query方法查找最相似的top_k个文档
            results = self.collection.query(
                query_embeddings=[query_embedding],
                n_results=top_k
            )
            # 提取文档内容
            context_list = []
            if results['documents'] and len(results['documents']) > 0:
                context_list = results['documents'][0]
            # 打印检索到的上下文数量
            print(f"[RAG] 检索到{len(context_list)}条上下文")
            # 返回拼接的上下文字符串和原始上下文列表
            return "\n".join(context_list), context_list
        except Exception as e:
            # 若检索出错,返回空
            print(f"[RAG] 检索错误: {str(e)}")
            return "", []
     # 生成答案的主方法
    def answer(self, question, top_k=3):
        # 检索相关上下文
        context_str, context_list = self.retrieve_context(question, top_k)
        # 未检索到上下文时返回提示
        if not context_list:
            return "抱歉,未找到相关信息,请联系品牌官方售后或客服。", []
        # 构建Prompt
        prompt = self.build_prompt(question, context_str)
        # 打印prompt的前80字符
        print(f"[RAG] 构建的prompt: {prompt[:80]}...")
        try:
            # 使用LLM生成答案
            response = self.llm.invoke(prompt, stream=False)
            # 打印部分生成答案
            print(f"[RAG] 生成答案: {response[:50]}...")
            # 返回答案和上下文
            return response, context_list
        except Exception as e:
            # 若生成出错,返回默认答案
            print(f"[RAG] 生成答案错误: {str(e)}")
            return "抱歉,生成答案时出现错误,请联系品牌官方售后或客服。", context_list

# 检查数据库,如果为空则插入测试数据
if not check_database():
    print("[MAIN] 数据库为空,开始插入测试数据...")
    if not insert_data():
        print("[MAIN] 插入测试数据失败,程序退出")
        sys.exit(1)
    print("[MAIN] 测试数据插入完成")

# 准备要测试的问题列表
questions = [
    "如何申请手机保修服务?",
    "笔记本电脑电池鼓包怎么办?",
]
# 每个问题的标准答案
ground_truths = [
    "您可携带购机发票和保修卡前往品牌授权售后服务中心,工程师检测后符合保修政策即可免费维修。",
    "如发现笔记本电脑电池鼓包,请立即停止使用并联系品牌售后服务中心进行更换,切勿自行拆卸或继续充电。",
]    
rag = Rag()
# 初始化答案和上下文结果存储列表
answers, contexts = [], []

# 对每个问题分别进行推理
print("\n[MAIN] 开始处理问题并生成答案...")
for i, q in enumerate(questions, 1):
    print(f"\n[MAIN] 处理问题 {i}/{len(questions)}: {q}")
    try:
        # 调用Rag对象生成答案,检索最相关的3条上下文
        answer, context_list = rag.answer(q, top_k=3)
        # 存答案和上下文
        answers.append(answer)
        contexts.append(context_list)
        print(f"[MAIN] 答案生成完成,上下文数量: {len(context_list)}")
    except Exception as e:
        # 遇到错误时,存预设信息
        print(f"[MAIN] 处理问题失败: {str(e)}")
        answers.append("处理失败")
        contexts.append([])
+
+# 打印提示信息,准备构建评测数据集
+print("\n[MAIN] 构建评测数据集...")
+# 组织评测用的数据,包括问题、答案、上下文和标准答案
+data = {
+   "question": questions, # 问题列表
+   "answer": answers, # 答案列表
+   "contexts": contexts, # 上下文列表
+   "ground_truth": ground_truths, # 标准答案列表
+}
+# 利用HuggingFace的Dataset.from_dict构建数据集对象
+dataset = Dataset.from_dict(data)
+# 打印数据集中包含的数据条数
+print(f"[MAIN] 数据集构建完成,包含 {len(dataset)} 条记录")
+
+# 打印提示信息,开始RAGAS自动评测
+print("[MAIN] 开始RAGAS评测...")
+try:
+   # 创建RAGAS评测所需的LLM对象(需要已实现LLM类)
+   eval_llm = LLM()
+   # 创建本地embedding对象
+   eval_embedding = LocalEmbedding()
+   # 配置ragas评估的运行参数,如超时时间和日志设置
+   config = RunConfig(timeout=1200, log_tenacity=True)
+   # 调用ragas的evaluate方法进行自动评测
+   result = evaluate(
+       dataset=dataset,                  # 输入数据集
+       llm=eval_llm,                     # 评测用大模型
+       embeddings=eval_embedding,        # 评测用embedding模型
+       metrics=[                         # 选择四项评测指标
+           context_precision, # 上下文精确率
+           context_recall, # 上下文召回率
+           faithfulness, # 忠实度
+           answer_relevancy, # 答案相关性
+       ],
+       raise_exceptions=True,            # 出现异常时直接抛出
+       run_config=config,                # 评测运行参数配置
+   )
+   # 将评测结果转为pandas DataFrame格式,便于展示
+   df = result.to_pandas()
+   # 打印整体RAGAS评测结果
+   print("[MAIN] RAGAS评测结果:")
+   # 打印每一条测试用例的详细评测信息
+   print(df)
+   # 打印各项指标的平均分
+   print("\n[MAIN] 各项指标平均分:")
+   for metric in ["context_precision", "context_recall", "faithfulness", "answer_relevancy"]:
+       # 仅在DataFrame中存在该列时计算平均值
+       if metric in df.columns:
+           avg_score = df[metric].mean() # 计算平均值
+           print(f"  {metric}: {avg_score:.4f}") # 打印平均值
+   # 评测流程结束
+   print("\n[MAIN] 评估完成!")
+except Exception as e:
+   # 如果评测过程中出现异常,则输出错误信息
+   print(f"\n[MAIN] RAGAS评测失败: {str(e)}")
+   # 程序异常退出
+   sys.exit(1)

5. generate #

5.1 方法定义 #

5.1.1 方法定义 #

generate 方法是一个异步方法,主要目的是:

  1. 将同步的 invoke 方法包装成异步接口
  2. 兼容 ragas 框架的异步调用规范
  3. 处理不同格式的输入参数

5.1.2 方法定义 #

async def generate(self, prompt, stop=None, **kwargs):
  • async:声明这是一个异步方法
  • prompt:输入提示,可以是字符串或列表
  • stop:停止条件,用于控制生成何时停止
  • **kwargs:其他可选参数,如温度、最大token数等

5.1.3 输入参数处理 #

if isinstance(prompt, list):
    prompt_str = prompt[0] if prompt else ""
else:
    prompt_str = str(prompt)

作用:处理不同格式的输入

  • 如果 prompt 是列表,取第一个元素
  • 如果是其他类型,转换为字符串
  • 兼容性考虑:某些框架可能传递列表格式的输入

5.1.4 包装同步函数 #

invoke_func = partial(self.invoke, prompt_str, stop=stop, **kwargs)

partial 函数的作用:

  • 创建一个新的函数,预先填充了参数
  • 等价于:lambda: self.invoke(prompt_str, stop=stop, **kwargs)
  • 这样可以在线程池中直接调用,无需再传递参数

5.1.5 异步执行同步方法 #

loop = asyncio.get_event_loop()
result = await loop.run_in_executor(None, invoke_func)

关键技巧:在异步环境中运行同步代码

  • asyncio.get_event_loop():获取当前事件循环
  • run_in_executor(None, invoke_func):在线程池中运行同步函数
  • await:等待异步操作完成
  • 优势:避免阻塞事件循环,提高并发性能

5.1.6 返回结果封装 #

class Generation:
    def __init__(self, text):
        self.text = str(text)

class GenerationResult:
    def __init__(self, text):
        self.generations = [[Generation(text)]]

return GenerationResult(result)

封装结构:

GenerationResult
└── generations (列表)
    └── [0] (列表)
        └── Generation 对象
            └── text (生成的文本)

5.2 设计意图分析 #

5.2.1. 异步兼容性 #

# 同步调用
result = llm.invoke("你好")

# 异步调用  
result = await llm.generate("你好")

5.2.2. ragas 框架兼容 #

ragas 需要特定的返回格式:

# ragas 期望的格式
result.generations[0][0].text

5.2.3. 输入灵活性 #

支持多种输入格式:

# 字符串输入
await llm.generate("单提示词")

# 列表输入(兼容性)
await llm.generate(["提示词列表"])

5.3 使用 #

5.3.1 基本使用 #

# 导入异常处理相关模块、异步处理模块以及类型标注等
import sys
# 使用OpenAI官方API进行大模型调用
from openai import OpenAI
# 导入异步处理模块
import asyncio
# 导入functools库中的partial函数,用于函数偏应用
from functools import partial
# 定义本地LLM(大语言模型)推理类,封装与OpenAI云服务的交互逻辑
class LLM:
    # 初始化方法,设置所调用的OpenAI模型名,默认为gpt-4o
    def __init__(
        self,
        model="gpt-4o",
    ):
        # 创建OpenAI API的客户端实例
        self.client = OpenAI()
        # 保存模型名称
        self.model = model

    # 定义同步的推理方法,向模型发送prompt并获取文本结果
    def invoke(self, prompt, stop=None, **kwargs):
        # 打印推理请求的前50个字符
        print(f"[LLM] 推理请求: {prompt[:50]}...")
        try:
            # 使用OpenAI的ChatCompletion API发起推理请求
            response = self.client.chat.completions.create(
                model=self.model, # 模型名称
                messages=[{"role": "user", "content": prompt}], # 消息列表,包含角色和内容
                temperature=kwargs.get("temperature", 0.1), # 温度,控制生成文本的随机性
                top_p=kwargs.get("top_p", 0.9), # 核采样,控制生成文本的多样性
                max_tokens=kwargs.get("max_tokens", 4096), # 最大令牌数,控制生成文本的长度
                stop=stop, # 停止条件
                stream=kwargs.get("stream", False), # 是否流式输出
            )
            # 获取生成的文本内容
            generated_text = response.choices[0].message.content
            # 打印生成结果的前50个字符
            print(f"[LLM] 推理结果: {generated_text[:50]}...")
            # 返回生成文本
            return generated_text
        except Exception as e:
            # 出现异常时打印错误信息并抛出异常
            print(f"[LLM] 错误: {str(e)}")
            raise

    # 异步生成方法,用于ragas异步接口的兼容
    async def generate(self, prompt, stop=None, **kwargs):
        # 检查prompt参数类型,如果为列表则取第一个元素
        if isinstance(prompt, list):
            prompt_str = prompt[0] if prompt else ""
        else:
            prompt_str = str(prompt)
        # 用partial将同步invoke函数包装,以便run_in_executor能调用
        invoke_func = partial(self.invoke, prompt_str, stop=stop, **kwargs)
        # 获取当前事件循环
        loop = asyncio.get_event_loop()
        # 在线程池中异步运行同步方法
        result = await loop.run_in_executor(None, invoke_func)

        # ragas需要的返回对象,具有generations属性
        class Generation:
            # 初始化,text为生成文本
            def __init__(self, text):
                self.text = str(text)

        class GenerationResult:
            # 初始化,生成格式为[[Generation对象]]
            def __init__(self, text):
                self.generations = [[Generation(text)]]

        # 返回封装好的生成结果对象
        return GenerationResult(result)
llm = LLM()

# 同步调用
sync_result = llm.invoke("解释一下人工智能")

# 异步调用
async def main():
    async_result = await llm.generate("解释一下机器学习")
    print(async_result.generations[0][0].text)

5.3.2 在异步框架中使用 #

# 导入sys模块,主要用于异常处理和与Python解释器交互
import sys
# 导入OpenAI官方SDK,用于连接云端大语言模型API
from openai import OpenAI
# 导入asyncio异步模块,用于支持异步编程
import asyncio
# 导入functools中的partial函数,用于对函数做参数预填充
from functools import partial

# 定义本地LLM(大语言模型)推理类,封装OpenAI云服务推理能力
class LLM:
    # 初始化方法,设置模型名称,默认使用gpt-4o并初始化OpenAI客户端
    def __init__(
        self,
        model="gpt-4o",
    ):
        # 创建OpenAI API客户端实例
        self.client = OpenAI()
        # 保存模型名称
        self.model = model

    # 定义同步推理方法,向大语言模型发送prompt并获取结果
    def invoke(self, prompt, stop=None, **kwargs):
        # 打印推理请求的前50个字符
        print(f"[LLM] 推理请求: {prompt[:50]}...")
        try:
            # 通过ChatCompletion接口发送对话请求
            response = self.client.chat.completions.create(
                model=self.model,                    # 指定所用大模型
                messages=[{"role": "user", "content": prompt}],  # 构造对话序列
                temperature=kwargs.get("temperature", 0.1),      # 控制生成的创意程度
                top_p=kwargs.get("top_p", 0.9),                 # 核采样,影响多样性
                max_tokens=kwargs.get("max_tokens", 4096),       # 最大输出token数
                stop=stop,                                       # 指定停止tokens
                stream=kwargs.get("stream", False),               # 是否流式返回
            )
            # 获取生成的answer文本内容
            generated_text = response.choices[0].message.content
            # 打印推理结果前50字符
            print(f"[LLM] 推理结果: {generated_text[:50]}...")
            # 返回完整生成结果
            return generated_text
        except Exception as e:
            # 推理出错时打印错误信息,并继续抛出异常
            print(f"[LLM] 错误: {str(e)}")
            raise

    # 定义异步生成方法,兼容ragas的async接口
    async def generate(self, prompt, stop=None, **kwargs):
        # 如果prompt为列表,则取第一个元素,否则转为str类型
        if isinstance(prompt, list):
            prompt_str = prompt[0] if prompt else ""
        else:
            prompt_str = str(prompt)
        # 用partial封装invoke方法,作为线程池的可调用目标
        invoke_func = partial(self.invoke, prompt_str, stop=stop, **kwargs)
        # 获取当前事件循环
        loop = asyncio.get_event_loop()
        # 在线程池中运行同步方法
        result = await loop.run_in_executor(None, invoke_func)

        # 内部定义Generation结果类,模拟ragas所需格式
        class Generation:
            # 构造方法,保存生成结果文本
            def __init__(self, text):
                self.text = str(text)

        # 封装一层GenerationResult
        class GenerationResult:
            # 构造方法,generations为格式要求的二维列表
            def __init__(self, text):
                self.generations = [[Generation(text)]]
            # 定义字符串表示,便于调试打印
            def __repr__(self):
                if self.generations and self.generations[0]:
                    return f"GenerationResult(text='{self.generations[0][0].text[:50]}...')"
                return "GenerationResult(empty)"

        # 返回GenerationResult对象
        return GenerationResult(result)

# 定义主异步流程
async def main():
    # 批量生成封装,用于并发推理多个prompt
    async def batch_generate(prompts):
        # 实例化LLM类
        llm = LLM()
        # 为每个prompt创建任务
        tasks = [llm.generate(prompt) for prompt in prompts]
        # 并发执行所有任务并收集结果
        results = await asyncio.gather(*tasks)
        # 返回全部结果
        return results

    # 设置待推理的prompt批量列表
    prompts = ["解释一下人工智能", "解释一下机器学习"]
    # 并发执行推理
    results = await batch_generate(prompts)
    # 提取每个推理结果中的文本内容
    texts = [result.generations[0][0].text for result in results]
    # 打印推理结果
    print("\n生成结果:")
    for i, (prompt, text) in enumerate(zip(prompts, texts), 1):
        print(f"\n[{i}] 问题: {prompt}")
        print(f"回答: {text}")

# 入口检测,确保仅作为主程序时才执行
if __name__ == "__main__":
    asyncio.run(main())

5.4 方法特点总结 #

  1. 桥接模式:连接同步的 invoke 方法和异步的调用需求
  2. 适配器模式:适配 ragas 框架的接口要求
  3. 异步优化:使用线程池避免阻塞主事件循环
  4. 类型安全:处理不同类型的输入参数
  5. 错误处理:继承 invoke 方法的异常处理机制

6.总结 #

6.1 核心要点 #

  1. RAGAS是什么:一个自动化评估RAG系统的Python框架
  2. 核心指标:Faithfulness、Answer Relevance、Context Precision、Context Recall
  3. 使用流程:准备数据 → 执行评估 → 查看结果
  4. 优势:自动化、多维度、无需大量人工标注

6.2 适用场景 #

适合使用RAGAS的场景:

  • 开发RAG系统时需要评估系统质量
  • 对比不同配置或模型的效果
  • 持续监控生产环境中的系统表现
  • 快速发现系统问题并优化

不适合使用RAGAS的场景:

  • 需要非常精确的人工评估(RAGAS是近似评估)
  • 对API成本非常敏感的场景
  • 需要评估特定领域专业知识的情况

访问验证

请输入访问令牌

Token不正确,请重新输入