RAG查询优化:多查询与查询转换技术
深入探讨RAG系统中的查询优化技术,包括Multi-Query、RAG-Fusion、查询分解、Step Back和HyDE等高级方法
RAG查询优化:多查询与查询转换技术
在基础RAG系统中,我们使用单一查询进行检索。但在实际应用中,用户的查询往往存在表达模糊、角度单一或过于笼统的问题。本章将介绍多种查询优化技术,让你的RAG系统能够更准确地理解用户意图。
查询优化的必要性
单一查询的局限性
# ❌ 单一查询的问题
用户查询: "机器学习是什么?"
# 可能错过的相关文档:
- "深度学习入门" (使用了不同但相关的术语)
- "AI算法基础" (更广泛的主题)
- "神经网络原理" (具体技术)
- "监督学习vs无监督学习" (细分话题)查询优化如何帮助?
查询优化的核心思想
通过生成多个角度的查询、重写查询或分解复杂查询,增加检索到相关文档的概率。
# ✅ 查询优化后
原始查询: "机器学习是什么?"
生成的变体:
1. "什么是机器学习算法?"
2. "机器学习的基本概念和原理"
3. "AI中的机器学习技术"
4. "机器学习的应用场景"
→ 检索 → 合并结果 → 去重 → 生成答案技术概览
本章将介绍5种主要的查询优化技术:
| 技术 | 核心思想 | 适用场景 | 复杂度 |
|---|---|---|---|
| Multi-Query | 生成查询的多个变体 | 用户查询表达不清 | ⭐ |
| RAG-Fusion | 多查询+重排序融合 | 需要高质量结果 | ⭐⭐ |
| Decomposition | 分解复杂查询 | 多步骤问题 | ⭐⭐⭐ |
| Step Back | 先问概括性问题 | 需要背景知识 | ⭐⭐ |
| HyDE | 生成假设性文档 | 语义搜索增强 | ⭐⭐⭐ |
Part 1: Multi-Query - 多角度查询
核心概念
Multi-Query技术通过LLM生成原始查询的多个变体,从不同角度检索文档,提高检索的召回率。
工作流程
用户查询: "什么是Agent?"
↓
使用LLM生成变体:
├─ "Agent系统的定义是什么?"
├─ "AI Agent的核心概念"
└─ "什么是自主智能体?"
↓
并行检索每个变体
↓
合并并去重结果
↓
生成最终答案实现:基础版本
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
# 定义查询生成提示词
query_gen_prompt = ChatPromptTemplate.from_messages([
("system", """你是一个AI助手,负责生成查询的多个变体。
给定一个用户查询,生成3个不同角度的相关查询。
每行一个查询,不要编号。"""),
("human", "{question}")
])
# 创建查询生成链
llm = ChatOpenAI(model="gpt-3.5-turbo", temperature=0.7)
query_generator = (
query_gen_prompt
| llm
| StrOutputParser()
| (lambda x: x.split("\n")) # 分割成列表
)
# 使用示例
original_query = "什么是Agent?"
queries = query_generator.invoke({"question": original_query})
print("原始查询:", original_query)
print("\n生成的查询变体:")
for i, q in enumerate(queries, 1):
print(f"{i}. {q}")
# 输出示例:
# 1. Agent系统的定义和核心特征是什么?
# 2. AI Agent与传统程序有什么区别?
# 3. Agent在实际应用中如何工作?实现:完整Multi-Query RAG
from langchain.load import dumps, loads
from typing import List
def get_unique_documents(documents: List[List]) -> List:
"""去重文档"""
# 使用文档内容作为唯一标识
unique_docs = {}
for doc_list in documents:
for doc in doc_list:
content = doc.page_content
if content not in unique_docs:
unique_docs[content] = doc
return list(unique_docs.values())
# 完整的Multi-Query RAG链
from langchain_core.runnables import RunnablePassthrough
# 1. 生成多个查询
def generate_queries(question: str) -> List[str]:
"""生成查询变体"""
queries = query_generator.invoke({"question": question})
# 确保原始查询也包含在内
return [question] + [q for q in queries if q.strip()]
# 2. 检索每个查询
def retrieve_documents(queries: List[str], retriever) -> List:
"""并行检索所有查询"""
all_docs = []
for query in queries:
docs = retriever.get_relevant_documents(query)
all_docs.append(docs)
return all_docs
# 3. 合并去重
def process_documents(all_docs: List[List]) -> List:
"""合并并去重文档"""
unique_docs = get_unique_documents(all_docs)
return unique_docs[:10] # 返回top-10
# 4. 创建完整链
from langchain.chains import RetrievalQA
class MultiQueryRAG:
def __init__(self, vectorstore, llm):
self.vectorstore = vectorstore
self.retriever = vectorstore.as_retriever(search_kwargs={"k": 3})
self.llm = llm
def query(self, question: str) -> dict:
"""执行Multi-Query RAG"""
# Step 1: 生成查询变体
queries = generate_queries(question)
print(f"📝 生成了 {len(queries)} 个查询")
# Step 2: 检索文档
all_docs = retrieve_documents(queries, self.retriever)
print(f"📚 检索到 {sum(len(docs) for docs in all_docs)} 个文档(含重复)")
# Step 3: 去重
unique_docs = process_documents(all_docs)
print(f"✨ 去重后剩余 {len(unique_docs)} 个文档")
# Step 4: 生成答案
context = "\n\n".join([doc.page_content for doc in unique_docs])
answer_prompt = ChatPromptTemplate.from_messages([
("system", "基于以下文档内容回答问题。如果文档中没有相关信息,请明确说明。"),
("human", "文档:\n{context}\n\n问题: {question}")
])
answer_chain = answer_prompt | self.llm | StrOutputParser()
answer = answer_chain.invoke({
"context": context,
"question": question
})
return {
"question": question,
"queries": queries,
"num_docs": len(unique_docs),
"answer": answer
}
# 使用示例
multi_rag = MultiQueryRAG(vectorstore, llm)
result = multi_rag.query("什么是Agent?它有哪些关键能力?")
print("\n" + "="*60)
print("最终答案:")
print(result["answer"])Multi-Query的优缺点
优点 ✅:
- 提高召回率(找到更多相关文档)
- 覆盖不同角度和表达方式
- 对用户表达不清的查询特别有效
- 实现相对简单
缺点 ❌:
- 增加检索成本(多次查询)
- 可能引入噪声(不相关的变体)
- 需要额外的LLM调用
- 去重逻辑可能过滤掉有价值的文档
优化技巧
# 1. 限制查询数量
query_generator = (
query_gen_prompt
| llm
| StrOutputParser()
| (lambda x: x.split("\n")[:3]) # 最多3个变体
)
# 2. 使用缓存避免重复生成
from functools import lru_cache
@lru_cache(maxsize=100)
def cached_query_generation(question: str) -> tuple:
queries = query_generator.invoke({"question": question})
return tuple(queries) # 返回元组以支持缓存
# 3. 异步并行检索
import asyncio
async def async_retrieve_all(queries: List[str], retriever):
"""异步并行检索"""
tasks = [retriever.aget_relevant_documents(q) for q in queries]
results = await asyncio.gather(*tasks)
return resultsPart 2: RAG-Fusion - 融合式检索
核心概念
RAG-Fusion结合了Multi-Query和倒数排序融合(Reciprocal Rank Fusion, RRF),不仅生成多个查询,还智能地合并和重排序结果。
什么是倒数排序融合?
RRF是一种排序融合算法,给予排名靠前的文档更高的分数:
公式: RRF_score(doc) = Σ 1 / (k + rank(doc))
其中:
- k = 常数(通常为60)
- rank(doc) = 文档在某个查询结果中的排名
- Σ = 对所有查询结果求和示例:
# 查询1的结果排序:
查询1: "什么是Agent?"
1. 文档A (排名1)
2. 文档B (排名2)
3. 文档C (排名3)
# 查询2的结果排序:
查询2: "AI Agent的定义"
1. 文档B (排名1) ← 在这个查询中排名更高
2. 文档D (排名2)
3. 文档A (排名3)
# RRF融合计算 (k=60):
文档A: 1/(60+1) + 1/(60+3) = 0.0164 + 0.0159 = 0.0323
文档B: 1/(60+2) + 1/(60+1) = 0.0161 + 0.0164 = 0.0325 ← 最高分
文档C: 1/(60+3) + 0 = 0.0159
文档D: 0 + 1/(60+2) = 0.0161
最终排序: B > A > D > C实现RAG-Fusion
from typing import List, Tuple
def reciprocal_rank_fusion(
results: List[List],
k: int = 60
) -> List[Tuple[any, float]]:
"""
实现倒数排序融合
Args:
results: 多个查询的结果列表
k: RRF常数
Returns:
排序后的(文档, 分数)列表
"""
# 存储每个文档的融合分数
fusion_scores = {}
for docs in results:
for rank, doc in enumerate(docs):
# 使用文档内容作为唯一标识
doc_id = doc.page_content
# 计算RRF分数
if doc_id not in fusion_scores:
fusion_scores[doc_id] = {
'doc': doc,
'score': 0
}
# 累加分数
fusion_scores[doc_id]['score'] += 1 / (k + rank + 1)
# 按分数排序
sorted_docs = sorted(
fusion_scores.values(),
key=lambda x: x['score'],
reverse=True
)
return [(item['doc'], item['score']) for item in sorted_docs]
class RAGFusion:
"""RAG-Fusion系统"""
def __init__(self, vectorstore, llm, k: int = 60):
self.vectorstore = vectorstore
self.retriever = vectorstore.as_retriever(search_kwargs={"k": 5})
self.llm = llm
self.k = k
def generate_queries(self, question: str, n: int = 3) -> List[str]:
"""生成查询变体"""
prompt = ChatPromptTemplate.from_messages([
("system", f"""生成{n}个不同角度的查询变体。
要求:
1. 语义相关但表达不同
2. 覆盖不同角度
3. 每行一个查询"""),
("human", "{question}")
])
chain = prompt | self.llm | StrOutputParser()
queries_str = chain.invoke({"question": question})
queries = [q.strip() for q in queries_str.split("\n") if q.strip()]
return [question] + queries[:n]
def retrieve_and_fuse(self, queries: List[str]) -> List[Tuple[any, float]]:
"""检索并融合结果"""
all_results = []
print(f"🔍 执行 {len(queries)} 个查询...")
for i, query in enumerate(queries, 1):
docs = self.retriever.get_relevant_documents(query)
all_results.append(docs)
print(f" 查询{i}: 检索到 {len(docs)} 个文档")
# RRF融合
fused_results = reciprocal_rank_fusion(all_results, k=self.k)
print(f"✨ 融合后共 {len(fused_results)} 个唯一文档")
return fused_results
def query(self, question: str, top_k: int = 5) -> dict:
"""执行RAG-Fusion查询"""
# 1. 生成查询
queries = self.generate_queries(question)
# 2. 检索并融合
fused_docs = self.retrieve_and_fuse(queries)
# 3. 选择top-k
top_docs = fused_docs[:top_k]
# 4. 生成答案
context = "\n\n".join([
f"[分数: {score:.4f}]\n{doc.page_content}"
for doc, score in top_docs
])
answer_prompt = ChatPromptTemplate.from_messages([
("system", "基于以下按相关性排序的文档回答问题。"),
("human", "文档:\n{context}\n\n问题: {question}")
])
answer_chain = answer_prompt | self.llm | StrOutputParser()
answer = answer_chain.invoke({
"context": context,
"question": question
})
return {
"question": question,
"generated_queries": queries,
"num_docs": len(top_docs),
"top_scores": [score for _, score in top_docs],
"answer": answer
}
# 使用示例
rag_fusion = RAGFusion(vectorstore, llm, k=60)
result = rag_fusion.query("什么是Agent?")
print("\n生成的查询:")
for q in result["generated_queries"]:
print(f" - {q}")
print(f"\nTop-{result['num_docs']} 文档分数:")
for i, score in enumerate(result["top_scores"], 1):
print(f" {i}. {score:.4f}")
print("\n答案:")
print(result["answer"])RAG-Fusion vs Multi-Query
| 特性 | Multi-Query | RAG-Fusion |
|---|---|---|
| 查询生成 | ✅ | ✅ |
| 并行检索 | ✅ | ✅ |
| 智能排序 | ❌ | ✅ RRF算法 |
| 结果质量 | 中等 | 高 |
| 计算成本 | 低 | 中等 |
| 适用场景 | 一般检索 | 高质量需求 |
Part 3: Query Decomposition - 查询分解
核心概念
对于复杂的多步骤问题,Query Decomposition将其分解为多个子问题,分别回答后再合成最终答案。
两种分解策略
1. 递归分解(Answer Recursively)
复杂问题: "比较GPT-3和GPT-4在多模态能力上的差异"
↓
子问题1: "GPT-3有哪些能力?"
↓ 检索 + 回答
答案1: "GPT-3主要是文本模型..."
↓
子问题2: "GPT-4有哪些新能力?" (基于答案1)
↓ 检索 + 回答
答案2: "GPT-4增加了图像理解..."
↓
综合答案: "GPT-3仅支持文本,而GPT-4..."2. 并行分解(Answer Individually)
复杂问题: "比较Python和JavaScript在Web开发中的优劣"
↓
子问题1: "Python在Web开发中的优势"
子问题2: "JavaScript在Web开发中的优势"
子问题3: "Python在Web开发中的劣势"
子问题4: "JavaScript在Web开发中的劣势"
↓
并行检索 + 回答
↓
综合所有答案实现:递归分解
from langchain.chains import LLMChain
class RecursiveDecomposition:
"""递归查询分解"""
def __init__(self, vectorstore, llm):
self.vectorstore = vectorstore
self.retriever = vectorstore.as_retriever()
self.llm = llm
def decompose_query(self, question: str) -> List[str]:
"""分解复杂查询为子问题"""
decompose_prompt = ChatPromptTemplate.from_messages([
("system", """将复杂问题分解为2-4个简单的子问题。
要求:
1. 子问题应该按逻辑顺序排列
2. 每个子问题都应该是独立可回答的
3. 每行一个问题"""),
("human", "{question}")
])
chain = decompose_prompt | self.llm | StrOutputParser()
sub_questions_str = chain.invoke({"question": question})
sub_questions = [q.strip() for q in sub_questions_str.split("\n") if q.strip()]
return sub_questions
def answer_sub_question(
self,
question: str,
context: str = ""
) -> str:
"""回答单个子问题"""
# 检索相关文档
docs = self.retriever.get_relevant_documents(question)
doc_context = "\n\n".join([doc.page_content for doc in docs[:3]])
# 构建提示词
if context:
full_context = f"已知信息:\n{context}\n\n相关文档:\n{doc_context}"
else:
full_context = f"相关文档:\n{doc_context}"
answer_prompt = ChatPromptTemplate.from_messages([
("system", "基于提供的信息简洁地回答问题。"),
("human", "{context}\n\n问题: {question}")
])
chain = answer_prompt | self.llm | StrOutputParser()
answer = chain.invoke({
"context": full_context,
"question": question
})
return answer
def query(self, question: str) -> dict:
"""执行递归分解查询"""
print(f"📋 原始问题: {question}\n")
# 1. 分解问题
sub_questions = self.decompose_query(question)
print(f"🔍 分解为 {len(sub_questions)} 个子问题:")
for i, sq in enumerate(sub_questions, 1):
print(f" {i}. {sq}")
# 2. 递归回答
accumulated_context = ""
sub_answers = []
print("\n💡 逐步回答:")
for i, sq in enumerate(sub_questions, 1):
print(f"\n 子问题{i}: {sq}")
answer = self.answer_sub_question(sq, accumulated_context)
sub_answers.append(answer)
accumulated_context += f"\n\n问题{i}: {sq}\n答案: {answer}"
print(f" 答案: {answer[:100]}...")
# 3. 综合最终答案
final_prompt = ChatPromptTemplate.from_messages([
("system", "基于以下子问题和答案,综合回答原始问题。"),
("human", """原始问题: {question}
子问题和答案:
{sub_qa}
请给出完整的综合答案:""")
])
sub_qa = "\n\n".join([
f"Q{i}: {q}\nA{i}: {a}"
for i, (q, a) in enumerate(zip(sub_questions, sub_answers), 1)
])
chain = final_prompt | self.llm | StrOutputParser()
final_answer = chain.invoke({
"question": question,
"sub_qa": sub_qa
})
return {
"question": question,
"sub_questions": sub_questions,
"sub_answers": sub_answers,
"final_answer": final_answer
}
# 使用示例
decomp = RecursiveDecomposition(vectorstore, llm)
result = decomp.query("比较GPT-3和GPT-4在能力上的主要差异")
print("\n" + "="*60)
print("最终综合答案:")
print(result["final_answer"])实现:并行分解
import asyncio
class ParallelDecomposition:
"""并行查询分解"""
def __init__(self, vectorstore, llm):
self.vectorstore = vectorstore
self.retriever = vectorstore.as_retriever()
self.llm = llm
def decompose_query(self, question: str) -> List[str]:
"""分解查询(同上)"""
# ... 相同的实现
pass
async def answer_sub_question_async(self, question: str) -> str:
"""异步回答子问题"""
docs = await self.retriever.aget_relevant_documents(question)
doc_context = "\n\n".join([doc.page_content for doc in docs[:3]])
answer_prompt = ChatPromptTemplate.from_messages([
("system", "基于文档简洁回答问题。"),
("human", "文档:\n{context}\n\n问题: {question}")
])
chain = answer_prompt | self.llm | StrOutputParser()
answer = await chain.ainvoke({
"context": doc_context,
"question": question
})
return answer
async def query_async(self, question: str) -> dict:
"""异步执行并行分解"""
# 1. 分解问题
sub_questions = self.decompose_query(question)
# 2. 并行回答所有子问题
tasks = [
self.answer_sub_question_async(sq)
for sq in sub_questions
]
sub_answers = await asyncio.gather(*tasks)
# 3. 综合答案(同递归版本)
# ...
return {
"question": question,
"sub_questions": sub_questions,
"sub_answers": sub_answers,
"final_answer": final_answer
}
# 使用
parallel_decomp = ParallelDecomposition(vectorstore, llm)
result = asyncio.run(
parallel_decomp.query_async("Python vs JavaScript in Web Development")
)分解策略对比
| 特性 | 递归分解 | 并行分解 |
|---|---|---|
| 执行方式 | 顺序 | 并行 |
| 速度 | 慢 | 快 ⚡ |
| 子问题依赖 | 支持 | 不支持 |
| 适用场景 | 有逻辑顺序的问题 | 独立子问题 |
| 实现复杂度 | 中等 | 高 |
Part 4: Step Back Prompting - 抽象化提问
核心概念
Step Back Prompting先提出一个更抽象、更概括的问题,获取背景知识后,再回答原始具体问题。
为什么需要Step Back?
# ❌ 直接回答可能缺乏背景
原始问题: "Transformer中的Multi-Head Attention有几个头?"
直接检索 → 可能找不到确切答案(文档只描述了原理,没说具体数字)
# ✅ Step Back后
Step 1: Step Back问题
"Transformer架构的基本组成是什么?"
Step 2: 获取背景知识
"Transformer由编码器和解码器组成,使用Multi-Head Attention..."
Step 3: 结合背景回答原始问题
"根据原始论文,使用8个attention头..."实现Step Back
class StepBackRAG:
"""Step Back提示的RAG系统"""
def __init__(self, vectorstore, llm):
self.vectorstore = vectorstore
self.retriever = vectorstore.as_retriever()
self.llm = llm
def generate_step_back_question(self, question: str) -> str:
"""生成Step Back问题"""
step_back_prompt = ChatPromptTemplate.from_messages([
("system", """给定一个具体问题,生成一个更抽象、更概括的问题。
示例:
具体问题: "GPT-4的上下文长度是多少?"
Step Back问题: "GPT-4的主要技术特性有哪些?"
具体问题: "Python中的装饰器如何工作?"
Step Back问题: "Python中的元编程概念是什么?"
"""),
("human", "具体问题: {question}\nStep Back问题:")
])
chain = step_back_prompt | self.llm | StrOutputParser()
step_back_q = chain.invoke({"question": question})
return step_back_q.strip()
def query(self, question: str) -> dict:
"""执行Step Back RAG"""
print(f"❓ 原始问题: {question}")
# 1. 生成Step Back问题
step_back_q = self.generate_step_back_question(question)
print(f"📚 Step Back问题: {step_back_q}")
# 2. 检索背景知识(Step Back问题)
background_docs = self.retriever.get_relevant_documents(step_back_q)
background = "\n\n".join([doc.page_content for doc in background_docs[:2]])
# 3. 检索具体信息(原始问题)
specific_docs = self.retriever.get_relevant_documents(question)
specific = "\n\n".join([doc.page_content for doc in specific_docs[:2]])
# 4. 结合两者生成答案
answer_prompt = ChatPromptTemplate.from_messages([
("system", """基于提供的背景知识和具体信息回答问题。
先理解背景,再回答具体问题。"""),
("human", """背景知识:
{background}
具体信息:
{specific}
原始问题: {question}
请回答:""")
])
chain = answer_prompt | self.llm | StrOutputParser()
answer = chain.invoke({
"background": background,
"specific": specific,
"question": question
})
return {
"question": question,
"step_back_question": step_back_q,
"answer": answer
}
# 使用示例
step_back_rag = StepBackRAG(vectorstore, llm)
result = step_back_rag.query("GPT-4的最大上下文长度是多少个token?")
print("\n💡 答案:")
print(result["answer"])Step Back的优势
何时使用Step Back:
- ✅ 问题需要背景知识
- ✅ 直接检索效果不好
- ✅ 问题过于具体或技术性强
- ✅ 需要理论支撑的答案
不适合的场景:
- ❌ 简单事实性问题
- ❌ 已有充足直接信息
- ❌ 实时数据查询
Part 5: HyDE - 假设性文档嵌入
核心概念
HyDE (Hypothetical Document Embeddings) 不直接检索用户查询,而是先让LLM生成一个假设性的答案文档,然后用这个文档去检索相似内容。
HyDE的直觉
# 传统检索
用户查询: "什么是机器学习?"
↓ 直接嵌入
查询向量: [0.1, 0.3, -0.2, ...]
↓ 检索
找到的文档
# HyDE检索
用户查询: "什么是机器学习?"
↓ LLM生成假设性答案
假设文档: "机器学习是人工智能的一个分支,它使计算机能够从数据中学习并做出预测..."
↓ 嵌入假设文档
文档向量: [0.2, 0.4, -0.1, ...] # 与真实答案文档更相似!
↓ 检索
找到更相关的文档为什么有效?
查询通常很短,而文档内容丰富。假设性文档比查询更接近真实文档的表达方式,因此能找到更相关的内容。
实现HyDE
class HyDERAG:
"""HyDE (Hypothetical Document Embeddings) RAG"""
def __init__(self, vectorstore, llm, embeddings):
self.vectorstore = vectorstore
self.llm = llm
self.embeddings = embeddings
def generate_hypothetical_document(
self,
question: str,
style: str = "academic"
) -> str:
"""生成假设性文档"""
if style == "academic":
system_msg = "你是一位专家。请对以下问题写一段详细、准确的学术性回答(200-300字)。"
elif style == "concise":
system_msg = "请对以下问题写一段简洁的回答(100-150字)。"
else:
system_msg = "请对以下问题写一段回答。"
hyde_prompt = ChatPromptTemplate.from_messages([
("system", system_msg),
("human", "{question}")
])
chain = hyde_prompt | self.llm | StrOutputParser()
hypothetical_doc = chain.invoke({"question": question})
return hypothetical_doc
def search_with_hyde(
self,
question: str,
k: int = 5
) -> List:
"""使用HyDE进行检索"""
# 1. 生成假设性文档
hyp_doc = self.generate_hypothetical_document(question)
print(f"📝 假设性文档:\n{hyp_doc[:200]}...\n")
# 2. 使用假设性文档检索
# 注意:这里用假设文档而不是原始查询
docs = self.vectorstore.similarity_search(hyp_doc, k=k)
return docs
def query(self, question: str, k: int = 5) -> dict:
"""执行HyDE RAG查询"""
print(f"❓ 原始问题: {question}\n")
# 1. HyDE检索
docs = self.search_with_hyde(question, k=k)
print(f"📚 检索到 {len(docs)} 个文档\n")
# 2. 生成最终答案
context = "\n\n".join([doc.page_content for doc in docs])
answer_prompt = ChatPromptTemplate.from_messages([
("system", "基于以下文档准确回答问题。"),
("human", "文档:\n{context}\n\n问题: {question}")
])
chain = answer_prompt | self.llm | StrOutputParser()
answer = chain.invoke({
"context": context,
"question": question
})
return {
"question": question,
"num_docs": len(docs),
"answer": answer
}
# 使用示例
hyde_rag = HyDERAG(vectorstore, llm, embeddings)
result = hyde_rag.query("解释Transformer架构中的注意力机制")
print("💡 最终答案:")
print(result["answer"])HyDE的变体
# 变体1: 生成多个假设文档
def multi_hypothetical_search(question: str, n: int = 3):
"""生成多个假设文档并融合结果"""
all_docs = []
for i in range(n):
hyp_doc = generate_hypothetical_document(
question,
temperature=0.7 # 增加多样性
)
docs = vectorstore.similarity_search(hyp_doc, k=3)
all_docs.append(docs)
# 使用RRF融合
fused = reciprocal_rank_fusion(all_docs)
return fused
# 变体2: 特定领域的假设文档
def domain_specific_hyde(question: str, domain: str):
"""生成特定领域风格的假设文档"""
prompts = {
"technical": "以技术文档的风格回答...",
"tutorial": "以教程的风格解释...",
"research": "以研究论文的风格阐述..."
}
system_msg = prompts.get(domain, "回答以下问题...")
# ... 生成和检索HyDE vs 传统检索
| 维度 | 传统检索 | HyDE |
|---|---|---|
| 检索对象 | 用户查询 | 假设性答案 |
| 语义匹配 | 查询↔文档 | 答案↔答案 |
| 查询长度敏感性 | 是 | 否 |
| 额外LLM调用 | 0 | 1 |
| 适用场景 | 清晰查询 | 复杂/技术性查询 |
| 检索质量 | 中等 | 高 |
技术对比与选择指南
综合对比
| 技术 | 召回率 | 准确率 | 速度 | 成本 | 复杂度 |
|---|---|---|---|---|---|
| Multi-Query | ⭐⭐⭐ | ⭐⭐ | ⭐⭐ | 💰💰 | ⭐ |
| RAG-Fusion | ⭐⭐⭐⭐ | ⭐⭐⭐⭐ | ⭐⭐ | 💰💰💰 | ⭐⭐ |
| Decomposition | ⭐⭐⭐⭐ | ⭐⭐⭐⭐ | ⭐ | 💰💰💰💰 | ⭐⭐⭐ |
| Step Back | ⭐⭐⭐ | ⭐⭐⭐⭐ | ⭐⭐ | 💰💰 | ⭐⭐ |
| HyDE | ⭐⭐⭐⭐ | ⭐⭐⭐⭐⭐ | ⭐⭐ | 💰💰 | ⭐⭐ |
选择决策树
问题类型?
├─ 简单查询(单一事实)
│ → 不需要优化,使用基础RAG
│
├─ 表达模糊/多角度
│ → Multi-Query 或 RAG-Fusion
│
├─ 复杂多步骤问题
│ → Query Decomposition
│
├─ 需要背景知识
│ → Step Back
│
└─ 技术性强/专业领域
→ HyDE组合使用
这些技术可以组合使用以获得更好的效果:
class AdvancedRAG:
"""组合多种技术的高级RAG系统"""
def __init__(self, vectorstore, llm):
self.vectorstore = vectorstore
self.llm = llm
def intelligent_query(self, question: str) -> dict:
"""智能选择和组合技术"""
# 1. 分析查询复杂度
complexity = self.analyze_complexity(question)
if complexity == "simple":
# 简单查询:直接检索
return self.simple_rag(question)
elif complexity == "ambiguous":
# 模糊查询:Multi-Query + RRF
return self.multi_query_with_fusion(question)
elif complexity == "complex":
# 复杂查询:分解 + HyDE
return self.decomposition_with_hyde(question)
elif complexity == "technical":
# 技术查询:Step Back + HyDE
return self.step_back_with_hyde(question)
def analyze_complexity(self, question: str) -> str:
"""分析查询复杂度"""
# 使用LLM判断
analysis_prompt = ChatPromptTemplate.from_messages([
("system", """分析查询的复杂度,返回以下之一:
- simple: 简单事实性问题
- ambiguous: 表达模糊的问题
- complex: 需要多步推理的复杂问题
- technical: 技术性强的专业问题
只返回类别,不要解释。"""),
("human", "{question}")
])
chain = analysis_prompt | self.llm | StrOutputParser()
complexity = chain.invoke({"question": question}).strip().lower()
return complexity性能优化与最佳实践
1. 缓存策略
from functools import lru_cache
import hashlib
class CachedQueryOptimizer:
"""带缓存的查询优化器"""
def __init__(self):
self.query_cache = {}
self.doc_cache = {}
def get_cache_key(self, query: str, method: str) -> str:
"""生成缓存键"""
content = f"{method}:{query}"
return hashlib.md5(content.encode()).hexdigest()
def cached_multi_query(self, question: str) -> List[str]:
"""缓存Multi-Query结果"""
cache_key = self.get_cache_key(question, "multi_query")
if cache_key in self.query_cache:
print("💾 使用缓存的查询变体")
return self.query_cache[cache_key]
# 生成查询
queries = generate_queries(question)
self.query_cache[cache_key] = queries
return queries2. 并行处理
import asyncio
from concurrent.futures import ThreadPoolExecutor
async def parallel_multi_query(
queries: List[str],
retriever,
max_workers: int = 5
) -> List[List]:
"""并行执行多个查询"""
loop = asyncio.get_event_loop()
with ThreadPoolExecutor(max_workers=max_workers) as executor:
tasks = [
loop.run_in_executor(
executor,
retriever.get_relevant_documents,
query
)
for query in queries
]
results = await asyncio.gather(*tasks)
return results3. 自适应参数调整
def adaptive_k_selection(question: str, base_k: int = 5) -> int:
"""根据查询动态调整k值"""
# 计算查询长度
word_count = len(question.split())
if word_count < 5:
# 短查询需要更多文档
return base_k + 2
elif word_count > 20:
# 长查询可以减少文档数
return max(base_k - 1, 3)
return base_k
# 使用
k = adaptive_k_selection("什么是AI?") # 返回7
k = adaptive_k_selection("详细解释深度学习中的...") # 返回44. 质量评估
def evaluate_query_quality(
original_query: str,
generated_queries: List[str],
llm
) -> float:
"""评估生成的查询质量"""
eval_prompt = ChatPromptTemplate.from_messages([
("system", """评估生成的查询质量。
标准:
1. 语义相关性(0-5分)
2. 角度多样性(0-5分)
3. 可回答性(0-5分)
返回总分(0-15)。"""),
("human", """原始查询: {original}
生成的查询:
{generated}
评分:""")
])
generated_str = "\n".join([f"{i}. {q}" for i, q in enumerate(generated_queries, 1)])
chain = eval_prompt | llm | StrOutputParser()
score_str = chain.invoke({
"original": original_query,
"generated": generated_str
})
# 提取分数
try:
score = float(score_str.strip().split()[0])
return score / 15.0 # 归一化到0-1
except:
return 0.5 # 默认中等分数实战案例
案例1: 构建智能客服RAG
class CustomerServiceRAG:
"""智能客服RAG系统"""
def __init__(self, vectorstore, llm):
self.vectorstore = vectorstore
self.llm = llm
# 预定义场景
self.scenarios = {
"product_info": ["产品", "功能", "特性", "价格"],
"troubleshooting": ["问题", "错误", "不工作", "故障"],
"how_to": ["如何", "怎么", "步骤", "教程"]
}
def classify_query(self, question: str) -> str:
"""分类用户查询"""
for scenario, keywords in self.scenarios.items():
if any(kw in question for kw in keywords):
return scenario
return "general"
def handle_query(self, question: str) -> str:
"""处理客服查询"""
scenario = self.classify_query(question)
if scenario == "product_info":
# 产品信息:使用HyDE获得更专业的答案
return HyDERAG(self.vectorstore, self.llm).query(question)
elif scenario == "troubleshooting":
# 故障排查:使用分解获得步骤化答案
return RecursiveDecomposition(self.vectorstore, self.llm).query(question)
elif scenario == "how_to":
# 教程类:使用Step Back获得背景+步骤
return StepBackRAG(self.vectorstore, self.llm).query(question)
else:
# 一般查询:使用RAG-Fusion
return RAGFusion(self.vectorstore, self.llm).query(question)案例2: 学术论文助手
class AcademicRAG:
"""学术论文助手"""
def __init__(self, vectorstore, llm):
self.multi_query = MultiQueryRAG(vectorstore, llm)
self.hyde = HyDERAG(vectorstore, llm, embeddings)
self.decomp = RecursiveDecomposition(vectorstore, llm)
def literature_review(self, topic: str) -> str:
"""文献综述"""
# 1. Multi-Query找全面的相关论文
broad_results = self.multi_query.query(
f"关于{topic}的研究现状和主要发现"
)
# 2. HyDE找高质量的理论论文
theory_results = self.hyde.query(
f"{topic}的理论基础和核心概念"
)
# 3. 综合生成文献综述
# ...
return review
def compare_methods(self, method1: str, method2: str) -> str:
"""方法对比"""
# 使用查询分解
comparison_query = f"比较{method1}和{method2}的优缺点"
return self.decomp.query(comparison_query)总结与建议
🎯 核心要点
- Multi-Query: 最简单,适合快速提升召回率
- RAG-Fusion: 最平衡,推荐作为默认选择
- Decomposition: 最强大,但成本最高
- Step Back: 最适合需要理论背景的问题
- HyDE: 最适合专业领域的技术查询
📊 实施建议
初级阶段(刚开始优化):
- 先实现Multi-Query
- 评估效果,如果显著提升则继续使用
- 否则尝试RAG-Fusion
中级阶段(有一定经验):
- 根据查询类型选择技术
- 实现简单的路由逻辑
- 添加缓存和并行优化
高级阶段(追求极致):
- 组合多种技术
- 实现自适应系统
- 持续评估和优化
🚀 下一步
学习完查询优化后,继续探索:
- [3] 路由与查询构建 - 智能路由和结构化查询
- [4] 索引与高级检索 - 优化索引策略
- [5] 检索与重排序 - 结果重排序技术
💡 最佳实践清单
✅ 查询优化:
- 根据场景选择合适的技术
- 实现缓存机制
- 监控生成的查询质量
- 使用并行处理提升速度
✅ 质量保证:
- 评估每种技术的效果
- A/B测试不同配置
- 收集用户反馈
- 持续迭代优化
参考资源
论文
- Query Rewriting for Retrieval
- RAG-Fusion
- HyDE: Precise Zero-Shot Dense Retrieval
- Least-to-Most Prompting
工具
相关文章
通过本章的学习,你已经掌握了多种强大的查询优化技术。这些技术能够显著提升RAG系统的检索质量和答案准确性。继续实践和优化,你将能够构建出真正智能的RAG应用!🎉