Deeptoai RAG系列教程

RAG重排序与查询集成:提升检索精度的关键技术

深入探讨RAG系统中的重排序技术和查询集成方法,包括交叉编码器重排序、倒数排序融合和多查询检索

RAG重排序与查询集成:提升检索精度的关键技术

在前面的章节中,我们学习了如何优化文档索引和检索策略。但是,初次检索的结果往往不够精确。本章将深入探讨如何通过重排序和查询集成技术进一步提升检索质量。

为什么需要重排序?

向量检索的局限性

# 问题:单一向量相似度不够精确

场景:
用户查询: "Python中如何处理异常?"

向量检索结果(按相似度排序):
1. 文档A: "Python异常处理机制...try-except..." (相似度: 0.85)
2. 文档B: "Python中的错误类型...Exception类..." (相似度: 0.83)
3. 文档C: "Python编程基础...变量、函数..." (相似度: 0.82)

# 问题分析:
→ 文档A最相关,排序正确 ✅
→ 但相似度差异很小(0.85 vs 0.83 vs 0.82
→ 向量相似度不能完全反映真实相关性
→ 文档C不太相关但相似度也不低

# 解决方案:重排序 🎯
→ 使用更强大的模型重新评估文档相关性
→ 考虑查询和文档的精确匹配度
→ 调整排序,确保最相关的文档排在前面

本章技术概览

技术核心功能优势复杂度
交叉编码器重排序精确评估查询-文档相关性准确度最高⭐⭐⭐
倒数排序融合(RRF)融合多个排序结果鲁棒性强⭐⭐
多查询检索生成多个查询并融合结果召回率高⭐⭐
查询扩展扩充查询语义覆盖面广⭐⭐
混合检索结合向量+关键词检索全面性好⭐⭐⭐

Part 1: 交叉编码器重排序 - Cross-Encoder Reranking

核心概念

双编码器(Bi-Encoder) vs 交叉编码器(Cross-Encoder):

# 双编码器(用于初始检索)
查询 → 编码器 → 查询向量 ──┐
                            ├→ 余弦相似度
文档 → 编码器 → 文档向量 ──┘

优点:快速,可预先计算文档向量
缺点:查询和文档独立编码,无法捕捉细粒度交互

# 交叉编码器(用于重排序)
查询 + 文档 → 编码器 → 相关性分数

优点:查询和文档联合编码,更精确
缺点:慢,无法预先计算

工作流程

# RAG + 重排序流程

1. 初始检索(双编码器)
   → 从大量文档中快速检索top-k个候选
   → 例如:从10000个文档中检索top-50

2. 重排序(交叉编码器)
   → 对top-k个候选重新评分
   → 更精确地排序

3. 返回最终结果
   → 返回重排序后的top-n个文档
   → 例如:返回最相关的top-5

实现:使用Cohere Rerank API

import cohere
from langchain_openai import OpenAIEmbeddings
from langchain.vectorstores import Chroma
from langchain.schema import Document

class CohereReranker:
    """使用Cohere API进行重排序"""
    
    def __init__(self, cohere_api_key: str, embeddings):
        self.cohere_client = cohere.Client(cohere_api_key)
        self.embeddings = embeddings
        
        # 向量存储
        self.vectorstore = Chroma(
            collection_name="rerank_demo",
            embedding_function=embeddings
        )
    
    def add_documents(self, documents: List[Document]):
        """添加文档到向量存储"""
        self.vectorstore.add_documents(documents)
        print(f"✅ 已添加 {len(documents)} 个文档")
    
    def retrieve_and_rerank(
        self,
        query: str,
        initial_k: int = 20,
        final_k: int = 5
    ):
        """检索并重排序"""
        # 1. 初始向量检索
        print(f"🔍 初始检索 top-{initial_k} 文档...")
        initial_docs = self.vectorstore.similarity_search(query, k=initial_k)
        
        print("\n初始检索结果(按向量相似度):")
        for i, doc in enumerate(initial_docs[:3]):
            print(f"{i+1}. {doc.page_content[:100]}...")
        
        # 2. 使用Cohere重排序
        print(f"\n🎯 使用Cohere重排序...")
        
        # 准备文档内容
        doc_texts = [doc.page_content for doc in initial_docs]
        
        # 调用Cohere Rerank API
        rerank_results = self.cohere_client.rerank(
            query=query,
            documents=doc_texts,
            top_n=final_k,
            model="rerank-english-v2.0"  # 或 rerank-multilingual-v2.0
        )
        
        # 3. 构建重排序后的文档列表
        reranked_docs = []
        for result in rerank_results.results:
            original_doc = initial_docs[result.index]
            reranked_docs.append({
                'document': original_doc,
                'relevance_score': result.relevance_score
            })
        
        print("\n重排序结果(按相关性得分):")
        for i, item in enumerate(reranked_docs):
            print(f"{i+1}. [得分: {item['relevance_score']:.4f}] {item['document'].page_content[:100]}...")
        
        return reranked_docs

# 使用示例
embeddings = OpenAIEmbeddings()
reranker = CohereReranker(
    cohere_api_key="your-cohere-api-key",
    embeddings=embeddings
)

# 准备测试文档
documents = [
    Document(
        page_content="""
        Python异常处理完整指南
        
        使用try-except捕获异常:
        try:
            risky_operation()
        except Exception as e:
            print(f"发生错误: {e}")
        
        可以捕获特定异常类型,也可以使用finally子句。
        """,
        metadata={"source": "python_exceptions.md"}
    ),
    Document(
        page_content="""
        Python错误和异常类型
        
        Python有多种内置异常类型:
        - ValueError: 值错误
        - TypeError: 类型错误
        - KeyError: 键错误
        - IndexError: 索引错误
        
        所有异常都继承自Exception类。
        """,
        metadata={"source": "python_error_types.md"}
    ),
    Document(
        page_content="""
        Python编程基础教程
        
        本教程涵盖:
        - 变量和数据类型
        - 控制流语句
        - 函数定义
        - 模块导入
        """,
        metadata={"source": "python_basics.md"}
    ),
    Document(
        page_content="""
        如何在Python中优雅地处理错误
        
        最佳实践:
        1. 只捕获你能处理的异常
        2. 使用具体的异常类型而不是Exception
        3. 提供有用的错误信息
        4. 适当时使用finally清理资源
        """,
        metadata={"source": "python_error_best_practices.md"}
    ),
]

# 添加文档
reranker.add_documents(documents)

# 检索并重排序
query = "Python中如何处理异常?"
results = reranker.retrieve_and_rerank(query, initial_k=10, final_k=3)

实现:使用本地交叉编码器模型

from sentence_transformers import CrossEncoder
import numpy as np

class LocalCrossEncoderReranker:
    """使用本地交叉编码器模型重排序"""
    
    def __init__(self, embeddings, model_name: str = "cross-encoder/ms-marco-MiniLM-L-6-v2"):
        self.embeddings = embeddings
        
        # 加载交叉编码器模型
        print(f"📥 加载交叉编码器模型: {model_name}")
        self.cross_encoder = CrossEncoder(model_name)
        
        # 向量存储
        self.vectorstore = Chroma(
            collection_name="local_rerank",
            embedding_function=embeddings
        )
    
    def add_documents(self, documents: List[Document]):
        """添加文档"""
        self.vectorstore.add_documents(documents)
    
    def retrieve_and_rerank(
        self,
        query: str,
        initial_k: int = 20,
        final_k: int = 5
    ):
        """检索并重排序"""
        # 1. 初始检索
        initial_docs = self.vectorstore.similarity_search(query, k=initial_k)
        
        # 2. 准备查询-文档对
        query_doc_pairs = [
            [query, doc.page_content] for doc in initial_docs
        ]
        
        # 3. 使用交叉编码器计算相关性分数
        print(f"🎯 使用交叉编码器重新评分...")
        scores = self.cross_encoder.predict(query_doc_pairs)
        
        # 4. 根据分数排序
        scored_docs = [
            {'document': doc, 'score': score}
            for doc, score in zip(initial_docs, scores)
        ]
        
        # 按分数降序排序
        scored_docs.sort(key=lambda x: x['score'], reverse=True)
        
        # 5. 返回top-k
        reranked_docs = scored_docs[:final_k]
        
        print("\n重排序结果:")
        for i, item in enumerate(reranked_docs):
            print(f"{i+1}. [得分: {item['score']:.4f}] {item['document'].page_content[:100]}...")
        
        return reranked_docs

# 使用示例
local_reranker = LocalCrossEncoderReranker(embeddings)
local_reranker.add_documents(documents)

results = local_reranker.retrieve_and_rerank(query, initial_k=10, final_k=3)

常用交叉编码器模型

# 推荐的交叉编码器模型

RERANKER_MODELS = {
    "small_fast": {
        "name": "cross-encoder/ms-marco-TinyBERT-L-2-v2",
        "params": "~4M",
        "speed": "very fast",
        "quality": "good"
    },
    "balanced": {
        "name": "cross-encoder/ms-marco-MiniLM-L-6-v2",
        "params": "~22M",
        "speed": "fast",
        "quality": "very good"
    },
    "high_quality": {
        "name": "cross-encoder/ms-marco-MiniLM-L-12-v2",
        "params": "~33M",
        "speed": "medium",
        "quality": "excellent"
    },
    "multilingual": {
        "name": "cross-encoder/mmarco-mMiniLMv2-L12-H384-v1",
        "params": "~118M",
        "speed": "medium",
        "quality": "excellent (多语言)"
    }
}

def choose_reranker(priority: str = "balanced"):
    """选择合适的重排序模型"""
    model_info = RERANKER_MODELS.get(priority, RERANKER_MODELS["balanced"])
    
    print(f"📊 选择模型: {model_info['name']}")
    print(f"   参数量: {model_info['params']}")
    print(f"   速度: {model_info['speed']}")
    print(f"   质量: {model_info['quality']}")
    
    return CrossEncoder(model_info['name'])

Part 2: 倒数排序融合 - Reciprocal Rank Fusion (RRF)

核心概念

倒数排序融合(RRF)是一种融合多个排序列表的算法,它不需要知道具体的分数,只需要排名。

RRF算法原理

# RRF公式

对于文档d,其RRF分数为:
RRF(d) = Σ (1 / (k + rank_i(d)))

其中:
- rank_i(d): 文档d在第i个排序列表中的排名
- k: 常数,通常取60
- Σ: 对所有排序列表求和

# 示例
假设有2个排序列表,k=60:

列表1: [DocA, DocB, DocC]  (DocA排名1, DocB排名2, DocC排名3)
列表2: [DocC, DocA, DocD]  (DocC排名1, DocA排名2, DocD排名3)

RRF分数:
DocA: 1/(60+1) + 1/(60+2) = 0.0164 + 0.0161 = 0.0325
DocB: 1/(60+2) + 0        = 0.0161
DocC: 1/(60+3) + 1/(60+1) = 0.0159 + 0.0164 = 0.0323
DocD: 0        + 1/(60+3) = 0.0159

最终排序: DocA > DocC > DocB > DocD

实现:RRF融合器

from typing import List, Dict
from collections import defaultdict

class ReciprocalRankFusion:
    """倒数排序融合"""
    
    def __init__(self, k: int = 60):
        """
        Args:
            k: RRF常数,通常取60
        """
        self.k = k
    
    def fuse(self, ranked_lists: List[List[Document]]) -> List[Dict]:
        """
        融合多个排序列表
        
        Args:
            ranked_lists: 多个文档排序列表
            
        Returns:
            融合后的文档列表(包含RRF分数)
        """
        # 存储每个文档的RRF分数
        doc_scores = defaultdict(float)
        doc_map = {}  # 文档ID到文档对象的映射
        
        # 遍历每个排序列表
        for ranked_list in ranked_lists:
            for rank, doc in enumerate(ranked_list, start=1):
                # 使用page_content作为文档唯一标识
                doc_id = id(doc)
                
                # 计算RRF分数
                rrf_score = 1.0 / (self.k + rank)
                doc_scores[doc_id] += rrf_score
                
                # 保存文档对象
                if doc_id not in doc_map:
                    doc_map[doc_id] = doc
        
        # 按RRF分数排序
        sorted_docs = sorted(
            doc_scores.items(),
            key=lambda x: x[1],
            reverse=True
        )
        
        # 构建结果
        fused_results = [
            {
                'document': doc_map[doc_id],
                'rrf_score': score
            }
            for doc_id, score in sorted_docs
        ]
        
        return fused_results

# 使用示例
rrf = ReciprocalRankFusion(k=60)

# 模拟多个检索器的结果
# 检索器1: 向量相似度检索
retriever1_results = [
    Document(page_content="文档A: Python异常处理..."),
    Document(page_content="文档B: 错误类型..."),
    Document(page_content="文档C: 编程基础...")
]

# 检索器2: BM25关键词检索
retriever2_results = [
    Document(page_content="文档C: 编程基础..."),
    Document(page_content="文档A: Python异常处理..."),
    Document(page_content="文档D: 最佳实践...")
]

# 融合结果
fused = rrf.fuse([retriever1_results, retriever2_results])

print("RRF融合结果:")
for i, item in enumerate(fused):
    print(f"{i+1}. [RRF分数: {item['rrf_score']:.4f}] {item['document'].page_content[:50]}...")

实现:完整的RRF检索器

from langchain.retrievers import BM25Retriever, EnsembleRetriever

class RRFEnsembleRetriever:
    """使用RRF的集成检索器"""
    
    def __init__(self, embeddings, documents: List[Document]):
        self.embeddings = embeddings
        
        # 1. 向量检索器
        self.vectorstore = Chroma(
            collection_name="rrf_ensemble",
            embedding_function=embeddings
        )
        self.vectorstore.add_documents(documents)
        self.vector_retriever = self.vectorstore.as_retriever(search_kwargs={"k": 10})
        
        # 2. BM25关键词检索器
        self.bm25_retriever = BM25Retriever.from_documents(documents)
        self.bm25_retriever.k = 10
        
        # 3. RRF融合器
        self.rrf = ReciprocalRankFusion(k=60)
    
    def retrieve(self, query: str, k: int = 5) -> List[Dict]:
        """使用RRF融合多个检索器的结果"""
        print(f"🔍 查询: {query}\n")
        
        # 1. 向量检索
        print("📊 向量检索...")
        vector_results = self.vector_retriever.get_relevant_documents(query)
        print(f"  → 检索到 {len(vector_results)} 个文档")
        
        # 2. BM25检索
        print("🔤 BM25关键词检索...")
        bm25_results = self.bm25_retriever.get_relevant_documents(query)
        print(f"  → 检索到 {len(bm25_results)} 个文档")
        
        # 3. RRF融合
        print("\n🔀 RRF融合...")
        fused_results = self.rrf.fuse([vector_results, bm25_results])
        
        # 返回top-k
        return fused_results[:k]

# 使用示例
rrf_retriever = RRFEnsembleRetriever(embeddings, documents)

results = rrf_retriever.retrieve("Python异常处理", k=3)

print("\n最终结果:")
for i, item in enumerate(results):
    print(f"\n{i+1}. [RRF分数: {item['rrf_score']:.4f}]")
    print(f"   {item['document'].page_content[:150]}...")

Part 3: 多查询检索 - Multi-Query Retrieval

核心概念

多查询检索的思路是:

  1. 从单个用户查询生成多个相似但不同的查询
  2. 对每个查询分别检索
  3. 融合所有检索结果

为什么需要多查询?

# 问题:单一查询可能不够全面

用户查询: "Python如何读取文件?"

# 可能的相关文档:
- "Python文件读取open()函数"  ✅ 匹配
- "读写文件的最佳实践"        ❌ 可能不匹配(没有"Python"
- "使用pathlib处理文件路径"  ❌ 可能不匹配(没有"读取"

# 解决方案:生成多个查询变体 ✅
原始查询: "Python如何读取文件?"

生成的查询变体:
1. "在Python中打开和读取文件"
2. "Python file I/O操作"
3. "使用open()函数读取文件内容"
4. "Python文件处理方法"

→ 不同变体可能匹配不同的相关文档
→ 融合结果,提高召回率

实现:使用LLM生成查询变体

from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser

class MultiQueryRetriever:
    """多查询检索器"""
    
    def __init__(self, vectorstore, llm):
        self.vectorstore = vectorstore
        self.llm = llm
        self.rrf = ReciprocalRankFusion(k=60)
        
        # 生成查询变体的提示
        self.query_generation_prompt = ChatPromptTemplate.from_messages([
            ("system", """你是一个帮助生成多个搜索查询的AI助手。
            用户会给你一个问题,你需要生成3个不同的搜索查询变体,这些变体表达相同的意图但用词不同。
            每个查询一行,不要编号。"""),
            ("human", "{question}")
        ])
    
    def generate_queries(self, question: str) -> List[str]:
        """生成查询变体"""
        chain = self.query_generation_prompt | self.llm | StrOutputParser()
        
        response = chain.invoke({"question": question})
        
        # 解析生成的查询(每行一个)
        queries = [q.strip() for q in response.split('\n') if q.strip()]
        
        # 加入原始查询
        all_queries = [question] + queries
        
        print(f"📝 生成了 {len(all_queries)} 个查询变体:")
        for i, q in enumerate(all_queries):
            print(f"   {i+1}. {q}")
        
        return all_queries
    
    def retrieve(self, question: str, k: int = 5) -> List[Dict]:
        """多查询检索"""
        # 1. 生成查询变体
        queries = self.generate_queries(question)
        
        # 2. 对每个查询进行检索
        print(f"\n🔍 对 {len(queries)} 个查询分别检索...")
        all_results = []
        for query in queries:
            results = self.vectorstore.similarity_search(query, k=10)
            all_results.append(results)
            print(f"   查询: '{query[:50]}...' → {len(results)} 个文档")
        
        # 3. 使用RRF融合结果
        print("\n🔀 融合所有检索结果...")
        fused = self.rrf.fuse(all_results)
        
        return fused[:k]

# 使用示例
vectorstore = Chroma(
    collection_name="multi_query",
    embedding_function=embeddings
)
vectorstore.add_documents(documents)

llm = ChatOpenAI(model="gpt-3.5-turbo", temperature=0.7)
multi_query_retriever = MultiQueryRetriever(vectorstore, llm)

# 检索
question = "Python如何读取文件?"
results = multi_query_retriever.retrieve(question, k=3)

print("\n最终检索结果:")
for i, item in enumerate(results):
    print(f"\n{i+1}. [RRF分数: {item['rrf_score']:.4f}]")
    print(f"   {item['document'].page_content[:150]}...")

实现:使用LangChain的MultiQueryRetriever

from langchain.retrievers.multi_query import MultiQueryRetriever as LangChainMultiQueryRetriever
from langchain_core.prompts import PromptTemplate

# 使用LangChain内置的多查询检索器
base_retriever = vectorstore.as_retriever(search_kwargs={"k": 10})

multi_query_retriever = LangChainMultiQueryRetriever.from_llm(
    retriever=base_retriever,
    llm=llm,
    prompt=PromptTemplate(
        input_variables=["question"],
        template="""你是一个AI助手,帮助生成多个搜索查询。
        
        用户问题: {question}
        
        生成3个不同的搜索查询,这些查询表达相同的意图但用词不同。
        每个查询一行。"""
    )
)

# 检索
results = multi_query_retriever.get_relevant_documents("Python如何读取文件?")

print(f"检索到 {len(results)} 个文档")
for i, doc in enumerate(results[:3]):
    print(f"\n{i+1}. {doc.page_content[:150]}...")

Part 4: 查询扩展 - Query Expansion

核心概念

查询扩展通过添加相关术语来增强原始查询,提高检索的覆盖面。

实现:基于LLM的查询扩展

class QueryExpander:
    """查询扩展器"""
    
    def __init__(self, llm):
        self.llm = llm
        
        self.expansion_prompt = ChatPromptTemplate.from_messages([
            ("system", """你是一个查询扩展专家。
            用户给你一个查询,你需要:
            1. 识别查询中的关键概念
            2. 为每个概念添加同义词、相关术语
            3. 返回扩展后的查询
            
            格式: 原始术语 OR 同义词1 OR 同义词2"""),
            ("human", "查询: {query}")
        ])
    
    def expand(self, query: str) -> str:
        """扩展查询"""
        chain = self.expansion_prompt | self.llm | StrOutputParser()
        expanded = chain.invoke({"query": query})
        
        print(f"原始查询: {query}")
        print(f"扩展查询: {expanded}")
        
        return expanded

# 使用示例
expander = QueryExpander(llm)

original_query = "Python机器学习"
expanded_query = expander.expand(original_query)

# 输出示例:
# 原始查询: Python机器学习
# 扩展查询: (Python OR py) AND (机器学习 OR machine learning OR ML OR 人工智能 OR AI)

实现:伪相关反馈(Pseudo Relevance Feedback)

class PseudoRelevanceFeedback:
    """伪相关反馈查询扩展"""
    
    def __init__(self, vectorstore, llm):
        self.vectorstore = vectorstore
        self.llm = llm
    
    def expand_query(self, query: str, top_k: int = 3) -> str:
        """使用伪相关反馈扩展查询"""
        # 1. 初始检索
        initial_docs = self.vectorstore.similarity_search(query, k=top_k)
        
        # 2. 从top文档提取关键词
        context = "\n\n".join([doc.page_content for doc in initial_docs])
        
        # 3. 使用LLM生成扩展查询
        expansion_prompt = ChatPromptTemplate.from_messages([
            ("system", "基于以下相关文档,扩展原始查询,添加重要的相关术语。"),
            ("human", """原始查询: {query}
            
相关文档:
{context}

扩展查询(保持简洁,只添加最重要的术语):""")
        ])
        
        chain = expansion_prompt | self.llm | StrOutputParser()
        expanded = chain.invoke({
            "query": query,
            "context": context
        })
        
        print(f"📝 原始查询: {query}")
        print(f"✨ 扩展查询: {expanded}")
        
        return expanded
    
    def retrieve_with_expansion(self, query: str, k: int = 5):
        """使用扩展查询检索"""
        # 1. 扩展查询
        expanded_query = self.expand_query(query)
        
        # 2. 使用扩展查询检索
        results = self.vectorstore.similarity_search(expanded_query, k=k)
        
        return results

# 使用
prf = PseudoRelevanceFeedback(vectorstore, llm)
results = prf.retrieve_with_expansion("Python异常", k=5)

核心概念

混合检索结合向量检索(语义搜索)和关键词检索(如BM25),利用两者的优势。

向量检索 vs 关键词检索

# 向量检索(语义搜索)
优点:
✅ 理解语义,能匹配同义词
✅ 能处理模糊查询
✅ 跨语言能力(多语言模型)

缺点:
❌ 对精确术语匹配不敏感
❌ 对罕见词或专有名词效果差
❌ 计算成本高

# 关键词检索(BM25)
优点:
✅ 精确匹配关键词
✅ 对专有名词、代码等效果好
✅ 计算快速

缺点:
❌ 不理解语义
❌ 无法匹配同义词
❌ 对查询措辞敏感

# 混合检索 = 向量检索 + 关键词检索 🎯
→ 结合两者优势
→ 适用于大多数场景

实现:向量 + BM25混合检索

from rank_bm25 import BM25Okapi
import jieba  # 中文分词

class HybridRetriever:
    """混合检索器(向量 + BM25)"""
    
    def __init__(self, embeddings, documents: List[Document], weights: tuple = (0.5, 0.5)):
        """
        Args:
            weights: (向量权重, BM25权重),两者之和应为1.0
        """
        self.embeddings = embeddings
        self.documents = documents
        self.vector_weight, self.bm25_weight = weights
        
        # 1. 向量存储
        self.vectorstore = Chroma(
            collection_name="hybrid_search",
            embedding_function=embeddings
        )
        self.vectorstore.add_documents(documents)
        
        # 2. BM25索引
        self._build_bm25_index()
    
    def _build_bm25_index(self):
        """构建BM25索引"""
        # 分词(中文使用jieba,英文可以使用split)
        tokenized_docs = [
            list(jieba.cut(doc.page_content)) for doc in self.documents
        ]
        
        self.bm25 = BM25Okapi(tokenized_docs)
        print(f"✅ BM25索引构建完成")
    
    def _vector_search(self, query: str, k: int) -> List[tuple]:
        """向量检索,返回 (doc, score)"""
        results = self.vectorstore.similarity_search_with_score(query, k=k)
        
        # 归一化分数到[0, 1]
        if results:
            max_score = max(score for _, score in results)
            min_score = min(score for _, score in results)
            score_range = max_score - min_score if max_score != min_score else 1
            
            normalized = [
                (doc, 1 - (score - min_score) / score_range)  # 距离转相似度
                for doc, score in results
            ]
        else:
            normalized = []
        
        return normalized
    
    def _bm25_search(self, query: str, k: int) -> List[tuple]:
        """BM25检索,返回 (doc, score)"""
        # 查询分词
        tokenized_query = list(jieba.cut(query))
        
        # BM25评分
        scores = self.bm25.get_scores(tokenized_query)
        
        # 获取top-k
        top_indices = np.argsort(scores)[::-1][:k]
        
        # 归一化分数
        max_score = scores[top_indices[0]] if len(top_indices) > 0 else 1
        
        results = [
            (self.documents[i], scores[i] / max_score if max_score > 0 else 0)
            for i in top_indices
        ]
        
        return results
    
    def hybrid_search(self, query: str, k: int = 5) -> List[Dict]:
        """混合检索"""
        print(f"🔍 混合检索: {query}")
        print(f"   权重: 向量={self.vector_weight}, BM25={self.bm25_weight}\n")
        
        # 1. 向量检索
        print("📊 向量检索...")
        vector_results = self._vector_search(query, k=k*2)
        
        # 2. BM25检索
        print("🔤 BM25检索...")
        bm25_results = self._bm25_search(query, k=k*2)
        
        # 3. 合并分数
        print("\n🔀 合并结果...")
        combined_scores = defaultdict(float)
        doc_map = {}
        
        for doc, score in vector_results:
            doc_id = id(doc)
            combined_scores[doc_id] += self.vector_weight * score
            doc_map[doc_id] = doc
        
        for doc, score in bm25_results:
            doc_id = id(doc)
            combined_scores[doc_id] += self.bm25_weight * score
            doc_map[doc_id] = doc
        
        # 4. 排序
        sorted_results = sorted(
            combined_scores.items(),
            key=lambda x: x[1],
            reverse=True
        )[:k]
        
        # 5. 构建结果
        final_results = [
            {
                'document': doc_map[doc_id],
                'hybrid_score': score
            }
            for doc_id, score in sorted_results
        ]
        
        return final_results

# 使用示例
hybrid_retriever = HybridRetriever(
    embeddings=embeddings,
    documents=documents,
    weights=(0.6, 0.4)  # 60%向量,40% BM25
)

results = hybrid_retriever.hybrid_search("Python异常处理", k=3)

print("混合检索结果:")
for i, item in enumerate(results):
    print(f"\n{i+1}. [混合分数: {item['hybrid_score']:.4f}]")
    print(f"   {item['document'].page_content[:150]}...")

实现:使用LangChain的EnsembleRetriever

from langchain.retrievers import EnsembleRetriever

# 创建向量检索器
vector_retriever = vectorstore.as_retriever(search_kwargs={"k": 10})

# 创建BM25检索器
bm25_retriever = BM25Retriever.from_documents(documents)
bm25_retriever.k = 10

# 创建集成检索器
ensemble_retriever = EnsembleRetriever(
    retrievers=[vector_retriever, bm25_retriever],
    weights=[0.6, 0.4]  # 向量权重0.6,BM25权重0.4
)

# 检索
results = ensemble_retriever.get_relevant_documents("Python异常处理")

print(f"检索到 {len(results)} 个文档")

综合实战:完整的高级检索系统

让我们将所有技术整合到一个完整系统:

class AdvancedRetrievalSystem:
    """高级检索系统(集成所有技术)"""
    
    def __init__(self, embeddings, llm, documents: List[Document]):
        self.embeddings = embeddings
        self.llm = llm
        self.documents = documents
        
        # 向量存储
        self.vectorstore = Chroma(
            collection_name="advanced_retrieval",
            embedding_function=embeddings
        )
        self.vectorstore.add_documents(documents)
        
        # 组件
        self.multi_query = MultiQueryRetriever(self.vectorstore, llm)
        self.reranker = LocalCrossEncoderReranker(embeddings)
        self.reranker.add_documents(documents)
        self.hybrid = HybridRetriever(embeddings, documents, weights=(0.6, 0.4))
        self.rrf = ReciprocalRankFusion(k=60)
    
    def retrieve(
        self,
        query: str,
        mode: str = "hybrid_multiquery_rerank",
        k: int = 5
    ):
        """
        高级检索
        
        Args:
            mode: 检索模式
                - "simple": 简单向量检索
                - "hybrid": 混合检索
                - "multiquery": 多查询检索
                - "hybrid_multiquery": 混合+多查询
                - "hybrid_multiquery_rerank": 混合+多查询+重排序(最强)
        """
        print(f"🎯 检索模式: {mode}")
        print(f"❓ 查询: {query}\n")
        
        if mode == "simple":
            # 简单向量检索
            results = self.vectorstore.similarity_search(query, k=k)
            results = [{'document': doc, 'score': 0} for doc in results]
        
        elif mode == "hybrid":
            # 混合检索
            results = self.hybrid.hybrid_search(query, k=k)
        
        elif mode == "multiquery":
            # 多查询检索
            results = self.multi_query.retrieve(query, k=k)
        
        elif mode == "hybrid_multiquery":
            # 混合 + 多查询
            # 1. 生成查询变体
            queries = self.multi_query.generate_queries(query)
            
            # 2. 对每个查询进行混合检索
            all_results = []
            for q in queries:
                hybrid_results = self.hybrid.hybrid_search(q, k=10)
                all_results.append([item['document'] for item in hybrid_results])
            
            # 3. RRF融合
            results = self.rrf.fuse(all_results)[:k]
        
        elif mode == "hybrid_multiquery_rerank":
            # 混合 + 多查询 + 重排序(最强模式)
            # 1. 生成查询变体
            queries = self.multi_query.generate_queries(query)
            
            # 2. 混合检索
            all_results = []
            for q in queries:
                hybrid_results = self.hybrid.hybrid_search(q, k=10)
                all_results.append([item['document'] for item in hybrid_results])
            
            # 3. RRF融合
            fused = self.rrf.fuse(all_results)
            candidate_docs = [item['document'] for item in fused[:20]]
            
            # 4. 交叉编码器重排序
            print("\n🎯 重排序...")
            results = self.reranker.retrieve_and_rerank(
                query=query,
                initial_k=len(candidate_docs),
                final_k=k
            )
        
        return results
    
    def query(self, question: str, mode: str = "hybrid_multiquery_rerank"):
        """执行完整的RAG查询"""
        # 1. 检索
        results = self.retrieve(question, mode=mode, k=3)
        
        # 2. 提取文档
        docs = [item['document'] for item in results]
        
        # 3. 生成答案
        context = "\n\n".join([doc.page_content for doc in docs])
        
        answer_prompt = ChatPromptTemplate.from_messages([
            ("system", "基于以下文档回答问题。如果文档中没有相关信息,请说明。"),
            ("human", "文档:\n{context}\n\n问题: {question}")
        ])
        
        chain = answer_prompt | self.llm | StrOutputParser()
        answer = chain.invoke({
            "context": context,
            "question": question
        })
        
        return {
            "question": question,
            "documents": docs,
            "answer": answer
        }

# 使用高级检索系统
advanced_system = AdvancedRetrievalSystem(embeddings, llm, documents)

# 测试不同模式
modes = ["simple", "hybrid", "multiquery", "hybrid_multiquery_rerank"]

question = "Python中如何处理异常?"

for mode in modes:
    print(f"\n{'='*60}")
    print(f"测试模式: {mode}")
    print(f"{'='*60}\n")
    
    result = advanced_system.query(question, mode=mode)
    
    print(f"\n💡 答案:\n{result['answer']}\n")

性能对比与最佳实践

不同策略的性能对比

import time

class RetrievalBenchmark:
    """检索性能基准测试"""
    
    def __init__(self, system: AdvancedRetrievalSystem):
        self.system = system
    
    def benchmark(self, queries: List[str], modes: List[str]):
        """对比不同检索模式"""
        results = []
        
        for mode in modes:
            print(f"\n测试模式: {mode}")
            
            total_time = 0
            for query in queries:
                start = time.time()
                _ = self.system.retrieve(query, mode=mode, k=5)
                elapsed = time.time() - start
                total_time += elapsed
            
            avg_time = total_time / len(queries)
            
            results.append({
                'mode': mode,
                'avg_time': avg_time,
                'total_time': total_time
            })
            
            print(f"  平均时间: {avg_time:.3f}秒")
        
        return results

# 基准测试
queries = [
    "Python异常处理",
    "如何读取文件",
    "机器学习算法"
]

benchmark = RetrievalBenchmark(advanced_system)
results = benchmark.benchmark(
    queries=queries,
    modes=["simple", "hybrid", "multiquery", "hybrid_multiquery_rerank"]
)

# 可视化结果
import pandas as pd

df = pd.DataFrame(results)
print("\n性能对比:")
print(df)

最佳实践总结

# 技术选择指南

RETRIEVAL_STRATEGIES = {
    "快速原型": {
        "策略": "simple",
        "说明": "简单向量检索",
        "适用": "快速验证想法,数据量小"
    },
    
    "生产环境基础": {
        "策略": "hybrid",
        "说明": "混合检索(向量+BM25)",
        "适用": "大多数生产场景,平衡速度和质量"
    },
    
    "高召回率": {
        "策略": "multiquery",
        "说明": "多查询检索",
        "适用": "需要全面覆盖,不要遗漏相关文档"
    },
    
    "高精度": {
        "策略": "hybrid_multiquery_rerank",
        "说明": "混合+多查询+重排序",
        "适用": "对质量要求极高,可以牺牲速度"
    },
    
    "实时应用": {
        "策略": "hybrid + 缓存",
        "说明": "混合检索+结果缓存",
        "适用": "需要快速响应的应用"
    }
}

def choose_strategy(priority: str):
    """根据优先级选择策略"""
    strategy = RETRIEVAL_STRATEGIES.get(priority)
    
    if strategy:
        print(f"推荐策略: {strategy['策略']}")
        print(f"说明: {strategy['说明']}")
        print(f"适用场景: {strategy['适用']}")
    
    return strategy

优化技巧

# 1. 缓存优化
from functools import lru_cache

class CachedRetriever:
    """带缓存的检索器"""
    
    def __init__(self, base_retriever):
        self.base_retriever = base_retriever
    
    @lru_cache(maxsize=1000)
    def retrieve(self, query: str, k: int = 5):
        """缓存检索结果"""
        return self.base_retriever.retrieve(query, k=k)

# 2. 批量检索
class BatchRetriever:
    """批量检索优化"""
    
    def __init__(self, retriever):
        self.retriever = retriever
    
    async def batch_retrieve(self, queries: List[str], k: int = 5):
        """并行处理多个查询"""
        import asyncio
        
        tasks = [
            asyncio.create_task(self._async_retrieve(query, k))
            for query in queries
        ]
        
        results = await asyncio.gather(*tasks)
        return results
    
    async def _async_retrieve(self, query: str, k: int):
        """异步检索"""
        # 实际实现需要使用异步版本的检索器
        return self.retriever.retrieve(query, k=k)

# 3. 动态K值
class AdaptiveKRetriever:
    """自适应K值检索器"""
    
    def __init__(self, retriever, min_k: int = 3, max_k: int = 10, threshold: float = 0.7):
        self.retriever = retriever
        self.min_k = min_k
        self.max_k = max_k
        self.threshold = threshold
    
    def retrieve(self, query: str):
        """根据相关性分数动态调整返回数量"""
        # 先检索max_k个
        results = self.retriever.retrieve(query, k=self.max_k)
        
        # 过滤低于阈值的结果
        filtered = [
            item for item in results
            if item.get('score', 0) >= self.threshold
        ]
        
        # 确保至少返回min_k个
        if len(filtered) < self.min_k:
            return results[:self.min_k]
        
        return filtered

总结与建议

🎯 核心要点

  1. 重排序: 使用交叉编码器提升精度
  2. RRF融合: 简单有效的结果融合方法
  3. 多查询: 提高召回率
  4. 混合检索: 结合向量和关键词检索

📊 技术组合建议

检索质量层级:
Level 1: 向量检索 (基础)

Level 2: 混合检索 (向量 + BM25)

Level 3: 混合检索 + RRF融合

Level 4: 多查询 + 混合检索 + RRF

Level 5: 多查询 + 混合检索 + RRF + 重排序 (最强)

根据应用场景选择合适的级别:
- 快速原型: Level 1
- 生产基础: Level 2-3
- 高质量应用: Level 4-5

💡 决策树

def choose_retrieval_strategy(
    latency_sensitive: bool,
    quality_critical: bool,
    budget_limited: bool
) -> str:
    """选择检索策略"""
    
    if latency_sensitive:
        if budget_limited:
            return "simple"  # 简单向量检索
        else:
            return "hybrid"  # 混合检索
    
    if quality_critical:
        if budget_limited:
            return "hybrid + local_reranker"  # 混合+本地重排序
        else:
            return "hybrid_multiquery_rerank"  # 完整方案
    
    # 默认推荐
    return "hybrid"  # 平衡方案

✅ 实施检查清单

基础设施:

  • 选择合适的向量数据库
  • 建立BM25索引
  • 配置交叉编码器模型

检索优化:

  • 实现混合检索
  • 添加RRF融合
  • 集成多查询生成
  • 配置重排序

性能优化:

  • 添加结果缓存
  • 实现批量检索
  • 监控检索延迟
  • 定期评估检索质量

持续改进:

  • 收集用户反馈
  • A/B测试不同策略
  • 分析失败案例
  • 调整权重和参数

恭喜!你已经掌握了RAG系统中最重要的检索优化技术。这些技术将显著提升你的RAG应用质量!🚀