Deeptoai RAG系列教程

RAG查询优化:多查询与查询转换技术

深入探讨RAG系统中的查询优化技术,包括Multi-Query、RAG-Fusion、查询分解、Step Back和HyDE等高级方法

RAG查询优化:多查询与查询转换技术

在基础RAG系统中,我们使用单一查询进行检索。但在实际应用中,用户的查询往往存在表达模糊角度单一过于笼统的问题。本章将介绍多种查询优化技术,让你的RAG系统能够更准确地理解用户意图。

查询优化的必要性

单一查询的局限性

# ❌ 单一查询的问题

用户查询: "机器学习是什么?"

# 可能错过的相关文档:
- "深度学习入门" (使用了不同但相关的术语)
- "AI算法基础" (更广泛的主题)
- "神经网络原理" (具体技术)
- "监督学习vs无监督学习" (细分话题)

查询优化如何帮助?

查询优化的核心思想

通过生成多个角度的查询、重写查询或分解复杂查询,增加检索到相关文档的概率。

# ✅ 查询优化后

原始查询: "机器学习是什么?"

生成的变体:
1. "什么是机器学习算法?"
2. "机器学习的基本概念和原理"
3. "AI中的机器学习技术"
4. "机器学习的应用场景"

→ 检索 → 合并结果 → 去重 → 生成答案

技术概览

本章将介绍5种主要的查询优化技术:

技术核心思想适用场景复杂度
Multi-Query生成查询的多个变体用户查询表达不清
RAG-Fusion多查询+重排序融合需要高质量结果⭐⭐
Decomposition分解复杂查询多步骤问题⭐⭐⭐
Step Back先问概括性问题需要背景知识⭐⭐
HyDE生成假设性文档语义搜索增强⭐⭐⭐

Part 1: Multi-Query - 多角度查询

核心概念

Multi-Query技术通过LLM生成原始查询的多个变体,从不同角度检索文档,提高检索的召回率。

工作流程

用户查询: "什么是Agent?"

使用LLM生成变体:
    ├─ "Agent系统的定义是什么?"
    ├─ "AI Agent的核心概念"
    └─ "什么是自主智能体?"

并行检索每个变体

合并并去重结果

生成最终答案

实现:基础版本

from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser

# 定义查询生成提示词
query_gen_prompt = ChatPromptTemplate.from_messages([
    ("system", """你是一个AI助手,负责生成查询的多个变体。
    给定一个用户查询,生成3个不同角度的相关查询。
    每行一个查询,不要编号。"""),
    ("human", "{question}")
])

# 创建查询生成链
llm = ChatOpenAI(model="gpt-3.5-turbo", temperature=0.7)
query_generator = (
    query_gen_prompt 
    | llm 
    | StrOutputParser() 
    | (lambda x: x.split("\n"))  # 分割成列表
)

# 使用示例
original_query = "什么是Agent?"
queries = query_generator.invoke({"question": original_query})

print("原始查询:", original_query)
print("\n生成的查询变体:")
for i, q in enumerate(queries, 1):
    print(f"{i}. {q}")

# 输出示例:
# 1. Agent系统的定义和核心特征是什么?
# 2. AI Agent与传统程序有什么区别?
# 3. Agent在实际应用中如何工作?

实现:完整Multi-Query RAG

from langchain.load import dumps, loads
from typing import List

def get_unique_documents(documents: List[List]) -> List:
    """去重文档"""
    # 使用文档内容作为唯一标识
    unique_docs = {}
    for doc_list in documents:
        for doc in doc_list:
            content = doc.page_content
            if content not in unique_docs:
                unique_docs[content] = doc
    return list(unique_docs.values())

# 完整的Multi-Query RAG链
from langchain_core.runnables import RunnablePassthrough

# 1. 生成多个查询
def generate_queries(question: str) -> List[str]:
    """生成查询变体"""
    queries = query_generator.invoke({"question": question})
    # 确保原始查询也包含在内
    return [question] + [q for q in queries if q.strip()]

# 2. 检索每个查询
def retrieve_documents(queries: List[str], retriever) -> List:
    """并行检索所有查询"""
    all_docs = []
    for query in queries:
        docs = retriever.get_relevant_documents(query)
        all_docs.append(docs)
    return all_docs

# 3. 合并去重
def process_documents(all_docs: List[List]) -> List:
    """合并并去重文档"""
    unique_docs = get_unique_documents(all_docs)
    return unique_docs[:10]  # 返回top-10

# 4. 创建完整链
from langchain.chains import RetrievalQA

class MultiQueryRAG:
    def __init__(self, vectorstore, llm):
        self.vectorstore = vectorstore
        self.retriever = vectorstore.as_retriever(search_kwargs={"k": 3})
        self.llm = llm
        
    def query(self, question: str) -> dict:
        """执行Multi-Query RAG"""
        # Step 1: 生成查询变体
        queries = generate_queries(question)
        print(f"📝 生成了 {len(queries)} 个查询")
        
        # Step 2: 检索文档
        all_docs = retrieve_documents(queries, self.retriever)
        print(f"📚 检索到 {sum(len(docs) for docs in all_docs)} 个文档(含重复)")
        
        # Step 3: 去重
        unique_docs = process_documents(all_docs)
        print(f"✨ 去重后剩余 {len(unique_docs)} 个文档")
        
        # Step 4: 生成答案
        context = "\n\n".join([doc.page_content for doc in unique_docs])
        
        answer_prompt = ChatPromptTemplate.from_messages([
            ("system", "基于以下文档内容回答问题。如果文档中没有相关信息,请明确说明。"),
            ("human", "文档:\n{context}\n\n问题: {question}")
        ])
        
        answer_chain = answer_prompt | self.llm | StrOutputParser()
        answer = answer_chain.invoke({
            "context": context,
            "question": question
        })
        
        return {
            "question": question,
            "queries": queries,
            "num_docs": len(unique_docs),
            "answer": answer
        }

# 使用示例
multi_rag = MultiQueryRAG(vectorstore, llm)
result = multi_rag.query("什么是Agent?它有哪些关键能力?")

print("\n" + "="*60)
print("最终答案:")
print(result["answer"])

Multi-Query的优缺点

优点 ✅:

  • 提高召回率(找到更多相关文档)
  • 覆盖不同角度和表达方式
  • 对用户表达不清的查询特别有效
  • 实现相对简单

缺点 ❌:

  • 增加检索成本(多次查询)
  • 可能引入噪声(不相关的变体)
  • 需要额外的LLM调用
  • 去重逻辑可能过滤掉有价值的文档

优化技巧

# 1. 限制查询数量
query_generator = (
    query_gen_prompt 
    | llm 
    | StrOutputParser() 
    | (lambda x: x.split("\n")[:3])  # 最多3个变体
)

# 2. 使用缓存避免重复生成
from functools import lru_cache

@lru_cache(maxsize=100)
def cached_query_generation(question: str) -> tuple:
    queries = query_generator.invoke({"question": question})
    return tuple(queries)  # 返回元组以支持缓存

# 3. 异步并行检索
import asyncio

async def async_retrieve_all(queries: List[str], retriever):
    """异步并行检索"""
    tasks = [retriever.aget_relevant_documents(q) for q in queries]
    results = await asyncio.gather(*tasks)
    return results

Part 2: RAG-Fusion - 融合式检索

核心概念

RAG-Fusion结合了Multi-Query和倒数排序融合(Reciprocal Rank Fusion, RRF),不仅生成多个查询,还智能地合并和重排序结果。

什么是倒数排序融合?

RRF是一种排序融合算法,给予排名靠前的文档更高的分数:

公式: RRF_score(doc) = Σ 1 / (k + rank(doc))

其中:
- k = 常数(通常为60)
- rank(doc) = 文档在某个查询结果中的排名
- Σ = 对所有查询结果求和

示例

# 查询1的结果排序:
查询1: "什么是Agent?"
  1. 文档A (排名1)
  2. 文档B (排名2)
  3. 文档C (排名3)

# 查询2的结果排序:
查询2: "AI Agent的定义"
  1. 文档B (排名1)  ← 在这个查询中排名更高
  2. 文档D (排名2)
  3. 文档A (排名3)

# RRF融合计算 (k=60):
文档A: 1/(60+1) + 1/(60+3) = 0.0164 + 0.0159 = 0.0323
文档B: 1/(60+2) + 1/(60+1) = 0.0161 + 0.0164 = 0.0325 ← 最高分
文档C: 1/(60+3) + 0         = 0.0159
文档D: 0         + 1/(60+2) = 0.0161

最终排序: B > A > D > C

实现RAG-Fusion

from typing import List, Tuple

def reciprocal_rank_fusion(
    results: List[List],
    k: int = 60
) -> List[Tuple[any, float]]:
    """
    实现倒数排序融合
    
    Args:
        results: 多个查询的结果列表
        k: RRF常数
    
    Returns:
        排序后的(文档, 分数)列表
    """
    # 存储每个文档的融合分数
    fusion_scores = {}
    
    for docs in results:
        for rank, doc in enumerate(docs):
            # 使用文档内容作为唯一标识
            doc_id = doc.page_content
            
            # 计算RRF分数
            if doc_id not in fusion_scores:
                fusion_scores[doc_id] = {
                    'doc': doc,
                    'score': 0
                }
            
            # 累加分数
            fusion_scores[doc_id]['score'] += 1 / (k + rank + 1)
    
    # 按分数排序
    sorted_docs = sorted(
        fusion_scores.values(),
        key=lambda x: x['score'],
        reverse=True
    )
    
    return [(item['doc'], item['score']) for item in sorted_docs]


class RAGFusion:
    """RAG-Fusion系统"""
    
    def __init__(self, vectorstore, llm, k: int = 60):
        self.vectorstore = vectorstore
        self.retriever = vectorstore.as_retriever(search_kwargs={"k": 5})
        self.llm = llm
        self.k = k
        
    def generate_queries(self, question: str, n: int = 3) -> List[str]:
        """生成查询变体"""
        prompt = ChatPromptTemplate.from_messages([
            ("system", f"""生成{n}个不同角度的查询变体。
            要求:
            1. 语义相关但表达不同
            2. 覆盖不同角度
            3. 每行一个查询"""),
            ("human", "{question}")
        ])
        
        chain = prompt | self.llm | StrOutputParser()
        queries_str = chain.invoke({"question": question})
        queries = [q.strip() for q in queries_str.split("\n") if q.strip()]
        
        return [question] + queries[:n]
    
    def retrieve_and_fuse(self, queries: List[str]) -> List[Tuple[any, float]]:
        """检索并融合结果"""
        all_results = []
        
        print(f"🔍 执行 {len(queries)} 个查询...")
        for i, query in enumerate(queries, 1):
            docs = self.retriever.get_relevant_documents(query)
            all_results.append(docs)
            print(f"   查询{i}: 检索到 {len(docs)} 个文档")
        
        # RRF融合
        fused_results = reciprocal_rank_fusion(all_results, k=self.k)
        print(f"✨ 融合后共 {len(fused_results)} 个唯一文档")
        
        return fused_results
    
    def query(self, question: str, top_k: int = 5) -> dict:
        """执行RAG-Fusion查询"""
        # 1. 生成查询
        queries = self.generate_queries(question)
        
        # 2. 检索并融合
        fused_docs = self.retrieve_and_fuse(queries)
        
        # 3. 选择top-k
        top_docs = fused_docs[:top_k]
        
        # 4. 生成答案
        context = "\n\n".join([
            f"[分数: {score:.4f}]\n{doc.page_content}" 
            for doc, score in top_docs
        ])
        
        answer_prompt = ChatPromptTemplate.from_messages([
            ("system", "基于以下按相关性排序的文档回答问题。"),
            ("human", "文档:\n{context}\n\n问题: {question}")
        ])
        
        answer_chain = answer_prompt | self.llm | StrOutputParser()
        answer = answer_chain.invoke({
            "context": context,
            "question": question
        })
        
        return {
            "question": question,
            "generated_queries": queries,
            "num_docs": len(top_docs),
            "top_scores": [score for _, score in top_docs],
            "answer": answer
        }

# 使用示例
rag_fusion = RAGFusion(vectorstore, llm, k=60)
result = rag_fusion.query("什么是Agent?")

print("\n生成的查询:")
for q in result["generated_queries"]:
    print(f"  - {q}")

print(f"\nTop-{result['num_docs']} 文档分数:")
for i, score in enumerate(result["top_scores"], 1):
    print(f"  {i}. {score:.4f}")

print("\n答案:")
print(result["answer"])

RAG-Fusion vs Multi-Query

特性Multi-QueryRAG-Fusion
查询生成
并行检索
智能排序✅ RRF算法
结果质量中等
计算成本中等
适用场景一般检索高质量需求

Part 3: Query Decomposition - 查询分解

核心概念

对于复杂的多步骤问题,Query Decomposition将其分解为多个子问题,分别回答后再合成最终答案。

两种分解策略

1. 递归分解(Answer Recursively)

复杂问题: "比较GPT-3和GPT-4在多模态能力上的差异"

子问题1: "GPT-3有哪些能力?"
    ↓ 检索 + 回答
答案1: "GPT-3主要是文本模型..."

子问题2: "GPT-4有哪些新能力?" (基于答案1)
    ↓ 检索 + 回答
答案2: "GPT-4增加了图像理解..."

综合答案: "GPT-3仅支持文本,而GPT-4..."

2. 并行分解(Answer Individually)

复杂问题: "比较Python和JavaScript在Web开发中的优劣"

子问题1: "Python在Web开发中的优势"
子问题2: "JavaScript在Web开发中的优势"
子问题3: "Python在Web开发中的劣势"
子问题4: "JavaScript在Web开发中的劣势"

并行检索 + 回答

综合所有答案

实现:递归分解

from langchain.chains import LLMChain

class RecursiveDecomposition:
    """递归查询分解"""
    
    def __init__(self, vectorstore, llm):
        self.vectorstore = vectorstore
        self.retriever = vectorstore.as_retriever()
        self.llm = llm
        
    def decompose_query(self, question: str) -> List[str]:
        """分解复杂查询为子问题"""
        decompose_prompt = ChatPromptTemplate.from_messages([
            ("system", """将复杂问题分解为2-4个简单的子问题。
            要求:
            1. 子问题应该按逻辑顺序排列
            2. 每个子问题都应该是独立可回答的
            3. 每行一个问题"""),
            ("human", "{question}")
        ])
        
        chain = decompose_prompt | self.llm | StrOutputParser()
        sub_questions_str = chain.invoke({"question": question})
        sub_questions = [q.strip() for q in sub_questions_str.split("\n") if q.strip()]
        
        return sub_questions
    
    def answer_sub_question(
        self, 
        question: str, 
        context: str = ""
    ) -> str:
        """回答单个子问题"""
        # 检索相关文档
        docs = self.retriever.get_relevant_documents(question)
        doc_context = "\n\n".join([doc.page_content for doc in docs[:3]])
        
        # 构建提示词
        if context:
            full_context = f"已知信息:\n{context}\n\n相关文档:\n{doc_context}"
        else:
            full_context = f"相关文档:\n{doc_context}"
        
        answer_prompt = ChatPromptTemplate.from_messages([
            ("system", "基于提供的信息简洁地回答问题。"),
            ("human", "{context}\n\n问题: {question}")
        ])
        
        chain = answer_prompt | self.llm | StrOutputParser()
        answer = chain.invoke({
            "context": full_context,
            "question": question
        })
        
        return answer
    
    def query(self, question: str) -> dict:
        """执行递归分解查询"""
        print(f"📋 原始问题: {question}\n")
        
        # 1. 分解问题
        sub_questions = self.decompose_query(question)
        print(f"🔍 分解为 {len(sub_questions)} 个子问题:")
        for i, sq in enumerate(sub_questions, 1):
            print(f"   {i}. {sq}")
        
        # 2. 递归回答
        accumulated_context = ""
        sub_answers = []
        
        print("\n💡 逐步回答:")
        for i, sq in enumerate(sub_questions, 1):
            print(f"\n   子问题{i}: {sq}")
            answer = self.answer_sub_question(sq, accumulated_context)
            sub_answers.append(answer)
            accumulated_context += f"\n\n问题{i}: {sq}\n答案: {answer}"
            print(f"   答案: {answer[:100]}...")
        
        # 3. 综合最终答案
        final_prompt = ChatPromptTemplate.from_messages([
            ("system", "基于以下子问题和答案,综合回答原始问题。"),
            ("human", """原始问题: {question}
            
子问题和答案:
{sub_qa}

请给出完整的综合答案:""")
        ])
        
        sub_qa = "\n\n".join([
            f"Q{i}: {q}\nA{i}: {a}" 
            for i, (q, a) in enumerate(zip(sub_questions, sub_answers), 1)
        ])
        
        chain = final_prompt | self.llm | StrOutputParser()
        final_answer = chain.invoke({
            "question": question,
            "sub_qa": sub_qa
        })
        
        return {
            "question": question,
            "sub_questions": sub_questions,
            "sub_answers": sub_answers,
            "final_answer": final_answer
        }

# 使用示例
decomp = RecursiveDecomposition(vectorstore, llm)
result = decomp.query("比较GPT-3和GPT-4在能力上的主要差异")

print("\n" + "="*60)
print("最终综合答案:")
print(result["final_answer"])

实现:并行分解

import asyncio

class ParallelDecomposition:
    """并行查询分解"""
    
    def __init__(self, vectorstore, llm):
        self.vectorstore = vectorstore
        self.retriever = vectorstore.as_retriever()
        self.llm = llm
    
    def decompose_query(self, question: str) -> List[str]:
        """分解查询(同上)"""
        # ... 相同的实现
        pass
    
    async def answer_sub_question_async(self, question: str) -> str:
        """异步回答子问题"""
        docs = await self.retriever.aget_relevant_documents(question)
        doc_context = "\n\n".join([doc.page_content for doc in docs[:3]])
        
        answer_prompt = ChatPromptTemplate.from_messages([
            ("system", "基于文档简洁回答问题。"),
            ("human", "文档:\n{context}\n\n问题: {question}")
        ])
        
        chain = answer_prompt | self.llm | StrOutputParser()
        answer = await chain.ainvoke({
            "context": doc_context,
            "question": question
        })
        
        return answer
    
    async def query_async(self, question: str) -> dict:
        """异步执行并行分解"""
        # 1. 分解问题
        sub_questions = self.decompose_query(question)
        
        # 2. 并行回答所有子问题
        tasks = [
            self.answer_sub_question_async(sq) 
            for sq in sub_questions
        ]
        sub_answers = await asyncio.gather(*tasks)
        
        # 3. 综合答案(同递归版本)
        # ...
        
        return {
            "question": question,
            "sub_questions": sub_questions,
            "sub_answers": sub_answers,
            "final_answer": final_answer
        }

# 使用
parallel_decomp = ParallelDecomposition(vectorstore, llm)
result = asyncio.run(
    parallel_decomp.query_async("Python vs JavaScript in Web Development")
)

分解策略对比

特性递归分解并行分解
执行方式顺序并行
速度快 ⚡
子问题依赖支持不支持
适用场景有逻辑顺序的问题独立子问题
实现复杂度中等

Part 4: Step Back Prompting - 抽象化提问

核心概念

Step Back Prompting先提出一个更抽象、更概括的问题,获取背景知识后,再回答原始具体问题。

为什么需要Step Back?

# ❌ 直接回答可能缺乏背景

原始问题: "Transformer中的Multi-Head Attention有几个头?"

直接检索 → 可能找不到确切答案(文档只描述了原理,没说具体数字)

# ✅ Step Back后

Step 1: Step Back问题
"Transformer架构的基本组成是什么?"

Step 2: 获取背景知识
"Transformer由编码器和解码器组成,使用Multi-Head Attention..."

Step 3: 结合背景回答原始问题
"根据原始论文,使用8个attention头..."

实现Step Back

class StepBackRAG:
    """Step Back提示的RAG系统"""
    
    def __init__(self, vectorstore, llm):
        self.vectorstore = vectorstore
        self.retriever = vectorstore.as_retriever()
        self.llm = llm
    
    def generate_step_back_question(self, question: str) -> str:
        """生成Step Back问题"""
        step_back_prompt = ChatPromptTemplate.from_messages([
            ("system", """给定一个具体问题,生成一个更抽象、更概括的问题。
            
示例:
具体问题: "GPT-4的上下文长度是多少?"
Step Back问题: "GPT-4的主要技术特性有哪些?"

具体问题: "Python中的装饰器如何工作?"
Step Back问题: "Python中的元编程概念是什么?"
"""),
            ("human", "具体问题: {question}\nStep Back问题:")
        ])
        
        chain = step_back_prompt | self.llm | StrOutputParser()
        step_back_q = chain.invoke({"question": question})
        
        return step_back_q.strip()
    
    def query(self, question: str) -> dict:
        """执行Step Back RAG"""
        print(f"❓ 原始问题: {question}")
        
        # 1. 生成Step Back问题
        step_back_q = self.generate_step_back_question(question)
        print(f"📚 Step Back问题: {step_back_q}")
        
        # 2. 检索背景知识(Step Back问题)
        background_docs = self.retriever.get_relevant_documents(step_back_q)
        background = "\n\n".join([doc.page_content for doc in background_docs[:2]])
        
        # 3. 检索具体信息(原始问题)
        specific_docs = self.retriever.get_relevant_documents(question)
        specific = "\n\n".join([doc.page_content for doc in specific_docs[:2]])
        
        # 4. 结合两者生成答案
        answer_prompt = ChatPromptTemplate.from_messages([
            ("system", """基于提供的背景知识和具体信息回答问题。
            先理解背景,再回答具体问题。"""),
            ("human", """背景知识:
{background}

具体信息:
{specific}

原始问题: {question}
请回答:""")
        ])
        
        chain = answer_prompt | self.llm | StrOutputParser()
        answer = chain.invoke({
            "background": background,
            "specific": specific,
            "question": question
        })
        
        return {
            "question": question,
            "step_back_question": step_back_q,
            "answer": answer
        }

# 使用示例
step_back_rag = StepBackRAG(vectorstore, llm)
result = step_back_rag.query("GPT-4的最大上下文长度是多少个token?")

print("\n💡 答案:")
print(result["answer"])

Step Back的优势

何时使用Step Back

  • ✅ 问题需要背景知识
  • ✅ 直接检索效果不好
  • ✅ 问题过于具体或技术性强
  • ✅ 需要理论支撑的答案

不适合的场景

  • ❌ 简单事实性问题
  • ❌ 已有充足直接信息
  • ❌ 实时数据查询

Part 5: HyDE - 假设性文档嵌入

核心概念

HyDE (Hypothetical Document Embeddings) 不直接检索用户查询,而是先让LLM生成一个假设性的答案文档,然后用这个文档去检索相似内容。

HyDE的直觉

# 传统检索
用户查询: "什么是机器学习?"
    ↓ 直接嵌入
查询向量: [0.1, 0.3, -0.2, ...]
    ↓ 检索
找到的文档

# HyDE检索
用户查询: "什么是机器学习?"
    ↓ LLM生成假设性答案
假设文档: "机器学习是人工智能的一个分支,它使计算机能够从数据中学习并做出预测..."
    ↓ 嵌入假设文档
文档向量: [0.2, 0.4, -0.1, ...]  # 与真实答案文档更相似!
    ↓ 检索
找到更相关的文档

为什么有效?

查询通常很短,而文档内容丰富。假设性文档比查询更接近真实文档的表达方式,因此能找到更相关的内容。

实现HyDE

class HyDERAG:
    """HyDE (Hypothetical Document Embeddings) RAG"""
    
    def __init__(self, vectorstore, llm, embeddings):
        self.vectorstore = vectorstore
        self.llm = llm
        self.embeddings = embeddings
    
    def generate_hypothetical_document(
        self, 
        question: str,
        style: str = "academic"
    ) -> str:
        """生成假设性文档"""
        if style == "academic":
            system_msg = "你是一位专家。请对以下问题写一段详细、准确的学术性回答(200-300字)。"
        elif style == "concise":
            system_msg = "请对以下问题写一段简洁的回答(100-150字)。"
        else:
            system_msg = "请对以下问题写一段回答。"
        
        hyde_prompt = ChatPromptTemplate.from_messages([
            ("system", system_msg),
            ("human", "{question}")
        ])
        
        chain = hyde_prompt | self.llm | StrOutputParser()
        hypothetical_doc = chain.invoke({"question": question})
        
        return hypothetical_doc
    
    def search_with_hyde(
        self, 
        question: str, 
        k: int = 5
    ) -> List:
        """使用HyDE进行检索"""
        # 1. 生成假设性文档
        hyp_doc = self.generate_hypothetical_document(question)
        print(f"📝 假设性文档:\n{hyp_doc[:200]}...\n")
        
        # 2. 使用假设性文档检索
        # 注意:这里用假设文档而不是原始查询
        docs = self.vectorstore.similarity_search(hyp_doc, k=k)
        
        return docs
    
    def query(self, question: str, k: int = 5) -> dict:
        """执行HyDE RAG查询"""
        print(f"❓ 原始问题: {question}\n")
        
        # 1. HyDE检索
        docs = self.search_with_hyde(question, k=k)
        print(f"📚 检索到 {len(docs)} 个文档\n")
        
        # 2. 生成最终答案
        context = "\n\n".join([doc.page_content for doc in docs])
        
        answer_prompt = ChatPromptTemplate.from_messages([
            ("system", "基于以下文档准确回答问题。"),
            ("human", "文档:\n{context}\n\n问题: {question}")
        ])
        
        chain = answer_prompt | self.llm | StrOutputParser()
        answer = chain.invoke({
            "context": context,
            "question": question
        })
        
        return {
            "question": question,
            "num_docs": len(docs),
            "answer": answer
        }

# 使用示例
hyde_rag = HyDERAG(vectorstore, llm, embeddings)
result = hyde_rag.query("解释Transformer架构中的注意力机制")

print("💡 最终答案:")
print(result["answer"])

HyDE的变体

# 变体1: 生成多个假设文档
def multi_hypothetical_search(question: str, n: int = 3):
    """生成多个假设文档并融合结果"""
    all_docs = []
    
    for i in range(n):
        hyp_doc = generate_hypothetical_document(
            question, 
            temperature=0.7  # 增加多样性
        )
        docs = vectorstore.similarity_search(hyp_doc, k=3)
        all_docs.append(docs)
    
    # 使用RRF融合
    fused = reciprocal_rank_fusion(all_docs)
    return fused

# 变体2: 特定领域的假设文档
def domain_specific_hyde(question: str, domain: str):
    """生成特定领域风格的假设文档"""
    prompts = {
        "technical": "以技术文档的风格回答...",
        "tutorial": "以教程的风格解释...",
        "research": "以研究论文的风格阐述..."
    }
    
    system_msg = prompts.get(domain, "回答以下问题...")
    # ... 生成和检索

HyDE vs 传统检索

维度传统检索HyDE
检索对象用户查询假设性答案
语义匹配查询↔文档答案↔答案
查询长度敏感性
额外LLM调用01
适用场景清晰查询复杂/技术性查询
检索质量中等

技术对比与选择指南

综合对比

技术召回率准确率速度成本复杂度
Multi-Query⭐⭐⭐⭐⭐⭐⭐💰💰
RAG-Fusion⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐💰💰💰⭐⭐
Decomposition⭐⭐⭐⭐⭐⭐⭐⭐💰💰💰💰⭐⭐⭐
Step Back⭐⭐⭐⭐⭐⭐⭐⭐⭐💰💰⭐⭐
HyDE⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐💰💰⭐⭐

选择决策树

问题类型?
├─ 简单查询(单一事实)
│  → 不需要优化,使用基础RAG

├─ 表达模糊/多角度
│  → Multi-Query 或 RAG-Fusion

├─ 复杂多步骤问题
│  → Query Decomposition

├─ 需要背景知识
│  → Step Back

└─ 技术性强/专业领域
   → HyDE

组合使用

这些技术可以组合使用以获得更好的效果:

class AdvancedRAG:
    """组合多种技术的高级RAG系统"""
    
    def __init__(self, vectorstore, llm):
        self.vectorstore = vectorstore
        self.llm = llm
        
    def intelligent_query(self, question: str) -> dict:
        """智能选择和组合技术"""
        
        # 1. 分析查询复杂度
        complexity = self.analyze_complexity(question)
        
        if complexity == "simple":
            # 简单查询:直接检索
            return self.simple_rag(question)
        
        elif complexity == "ambiguous":
            # 模糊查询:Multi-Query + RRF
            return self.multi_query_with_fusion(question)
        
        elif complexity == "complex":
            # 复杂查询:分解 + HyDE
            return self.decomposition_with_hyde(question)
        
        elif complexity == "technical":
            # 技术查询:Step Back + HyDE
            return self.step_back_with_hyde(question)
    
    def analyze_complexity(self, question: str) -> str:
        """分析查询复杂度"""
        # 使用LLM判断
        analysis_prompt = ChatPromptTemplate.from_messages([
            ("system", """分析查询的复杂度,返回以下之一:
            - simple: 简单事实性问题
            - ambiguous: 表达模糊的问题
            - complex: 需要多步推理的复杂问题
            - technical: 技术性强的专业问题
            
            只返回类别,不要解释。"""),
            ("human", "{question}")
        ])
        
        chain = analysis_prompt | self.llm | StrOutputParser()
        complexity = chain.invoke({"question": question}).strip().lower()
        
        return complexity

性能优化与最佳实践

1. 缓存策略

from functools import lru_cache
import hashlib

class CachedQueryOptimizer:
    """带缓存的查询优化器"""
    
    def __init__(self):
        self.query_cache = {}
        self.doc_cache = {}
    
    def get_cache_key(self, query: str, method: str) -> str:
        """生成缓存键"""
        content = f"{method}:{query}"
        return hashlib.md5(content.encode()).hexdigest()
    
    def cached_multi_query(self, question: str) -> List[str]:
        """缓存Multi-Query结果"""
        cache_key = self.get_cache_key(question, "multi_query")
        
        if cache_key in self.query_cache:
            print("💾 使用缓存的查询变体")
            return self.query_cache[cache_key]
        
        # 生成查询
        queries = generate_queries(question)
        self.query_cache[cache_key] = queries
        
        return queries

2. 并行处理

import asyncio
from concurrent.futures import ThreadPoolExecutor

async def parallel_multi_query(
    queries: List[str],
    retriever,
    max_workers: int = 5
) -> List[List]:
    """并行执行多个查询"""
    loop = asyncio.get_event_loop()
    
    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        tasks = [
            loop.run_in_executor(
                executor,
                retriever.get_relevant_documents,
                query
            )
            for query in queries
        ]
        
        results = await asyncio.gather(*tasks)
    
    return results

3. 自适应参数调整

def adaptive_k_selection(question: str, base_k: int = 5) -> int:
    """根据查询动态调整k值"""
    # 计算查询长度
    word_count = len(question.split())
    
    if word_count < 5:
        # 短查询需要更多文档
        return base_k + 2
    elif word_count > 20:
        # 长查询可以减少文档数
        return max(base_k - 1, 3)
    
    return base_k

# 使用
k = adaptive_k_selection("什么是AI?")  # 返回7
k = adaptive_k_selection("详细解释深度学习中的...")  # 返回4

4. 质量评估

def evaluate_query_quality(
    original_query: str,
    generated_queries: List[str],
    llm
) -> float:
    """评估生成的查询质量"""
    eval_prompt = ChatPromptTemplate.from_messages([
        ("system", """评估生成的查询质量。
        标准:
        1. 语义相关性(0-5分)
        2. 角度多样性(0-5分)
        3. 可回答性(0-5分)
        
        返回总分(0-15)。"""),
        ("human", """原始查询: {original}
        
生成的查询:
{generated}

评分:""")
    ])
    
    generated_str = "\n".join([f"{i}. {q}" for i, q in enumerate(generated_queries, 1)])
    
    chain = eval_prompt | llm | StrOutputParser()
    score_str = chain.invoke({
        "original": original_query,
        "generated": generated_str
    })
    
    # 提取分数
    try:
        score = float(score_str.strip().split()[0])
        return score / 15.0  # 归一化到0-1
    except:
        return 0.5  # 默认中等分数

实战案例

案例1: 构建智能客服RAG

class CustomerServiceRAG:
    """智能客服RAG系统"""
    
    def __init__(self, vectorstore, llm):
        self.vectorstore = vectorstore
        self.llm = llm
        
        # 预定义场景
        self.scenarios = {
            "product_info": ["产品", "功能", "特性", "价格"],
            "troubleshooting": ["问题", "错误", "不工作", "故障"],
            "how_to": ["如何", "怎么", "步骤", "教程"]
        }
    
    def classify_query(self, question: str) -> str:
        """分类用户查询"""
        for scenario, keywords in self.scenarios.items():
            if any(kw in question for kw in keywords):
                return scenario
        return "general"
    
    def handle_query(self, question: str) -> str:
        """处理客服查询"""
        scenario = self.classify_query(question)
        
        if scenario == "product_info":
            # 产品信息:使用HyDE获得更专业的答案
            return HyDERAG(self.vectorstore, self.llm).query(question)
        
        elif scenario == "troubleshooting":
            # 故障排查:使用分解获得步骤化答案
            return RecursiveDecomposition(self.vectorstore, self.llm).query(question)
        
        elif scenario == "how_to":
            # 教程类:使用Step Back获得背景+步骤
            return StepBackRAG(self.vectorstore, self.llm).query(question)
        
        else:
            # 一般查询:使用RAG-Fusion
            return RAGFusion(self.vectorstore, self.llm).query(question)

案例2: 学术论文助手

class AcademicRAG:
    """学术论文助手"""
    
    def __init__(self, vectorstore, llm):
        self.multi_query = MultiQueryRAG(vectorstore, llm)
        self.hyde = HyDERAG(vectorstore, llm, embeddings)
        self.decomp = RecursiveDecomposition(vectorstore, llm)
    
    def literature_review(self, topic: str) -> str:
        """文献综述"""
        # 1. Multi-Query找全面的相关论文
        broad_results = self.multi_query.query(
            f"关于{topic}的研究现状和主要发现"
        )
        
        # 2. HyDE找高质量的理论论文
        theory_results = self.hyde.query(
            f"{topic}的理论基础和核心概念"
        )
        
        # 3. 综合生成文献综述
        # ...
        
        return review
    
    def compare_methods(self, method1: str, method2: str) -> str:
        """方法对比"""
        # 使用查询分解
        comparison_query = f"比较{method1}{method2}的优缺点"
        return self.decomp.query(comparison_query)

总结与建议

🎯 核心要点

  1. Multi-Query: 最简单,适合快速提升召回率
  2. RAG-Fusion: 最平衡,推荐作为默认选择
  3. Decomposition: 最强大,但成本最高
  4. Step Back: 最适合需要理论背景的问题
  5. HyDE: 最适合专业领域的技术查询

📊 实施建议

初级阶段(刚开始优化):

  • 先实现Multi-Query
  • 评估效果,如果显著提升则继续使用
  • 否则尝试RAG-Fusion

中级阶段(有一定经验):

  • 根据查询类型选择技术
  • 实现简单的路由逻辑
  • 添加缓存和并行优化

高级阶段(追求极致):

  • 组合多种技术
  • 实现自适应系统
  • 持续评估和优化

🚀 下一步

学习完查询优化后,继续探索:

  1. [3] 路由与查询构建 - 智能路由和结构化查询
  2. [4] 索引与高级检索 - 优化索引策略
  3. [5] 检索与重排序 - 结果重排序技术

💡 最佳实践清单

查询优化

  • 根据场景选择合适的技术
  • 实现缓存机制
  • 监控生成的查询质量
  • 使用并行处理提升速度

质量保证

  • 评估每种技术的效果
  • A/B测试不同配置
  • 收集用户反馈
  • 持续迭代优化

参考资源

论文

工具

相关文章

通过本章的学习,你已经掌握了多种强大的查询优化技术。这些技术能够显著提升RAG系统的检索质量和答案准确性。继续实践和优化,你将能够构建出真正智能的RAG应用!🎉