RAG路由与查询构建:智能检索的核心技术
深入探讨RAG系统中的路由机制和查询构建技术,包括逻辑路由、语义路由、结构化查询和自查询检索器
RAG路由与查询构建:智能检索的核心技术
在前两章中,我们学习了RAG系统的基础和查询优化技术。但是,当面对多个数据源或需要结构化查询时,如何智能地选择正确的数据源和构建合适的查询呢?本章将介绍路由机制和查询构建技术。
为什么需要路由和查询构建?
实际场景中的挑战
# 场景1: 多个数据源
数据源1: 技术文档数据库
数据源2: 用户手册数据库
数据源3: FAQ知识库
数据源4: API参考文档
用户查询: "如何使用Python SDK连接数据库?"
# 应该查询哪个数据源?
→ 单一数据源可能不够
→ 查询所有数据源效率低
→ 需要智能路由机制# 场景2: 复杂查询条件
向量数据库包含:
- 文档内容 (embedding)
- 元数据:
- 作者
- 发布日期
- 文档类型
- 标签
用户查询: "找出2023年发布的关于机器学习的文章"
# 需要同时考虑:
→ 语义相似度 (机器学习)
→ 结构化条件 (日期 >= 2023-01-01)
→ 需要查询构建技术本章内容概览
| 技术 | 核心功能 | 适用场景 | 复杂度 |
|---|---|---|---|
| 逻辑路由 | 基于规则的路由 | 确定性路由 | ⭐ |
| 语义路由 | 基于LLM的路由 | 灵活路由 | ⭐⭐ |
| 结构化查询 | 构建filter条件 | 带元数据查询 | ⭐⭐ |
| 自查询检索器 | 自动分离查询意图 | 复杂查询 | ⭐⭐⭐ |
Part 1: 逻辑路由 - Logical Routing
核心概念
逻辑路由使用基于规则的方法来决定将查询发送到哪个数据源。它通过LLM理解查询内容,然后根据预定义的规则选择合适的数据源。
工作原理
用户查询
↓
LLM分析查询意图
↓
匹配预定义的路由规则
↓
选择目标数据源
↓
执行检索实现:基础逻辑路由
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from typing import Literal
# 定义路由提示词
route_prompt = ChatPromptTemplate.from_messages([
("system", """你是一个路由助手,负责将用户查询发送到正确的数据源。
可用的数据源:
- python_docs: Python编程相关的技术文档
- web_search: 需要实时信息或一般性问题
- database: 数据库相关的查询
请分析用户查询,返回最合适的数据源名称(只返回名称,不要其他内容)。"""),
("human", "{question}")
])
llm = ChatOpenAI(model="gpt-3.5-turbo", temperature=0)
# 创建路由链
route_chain = route_prompt | llm | StrOutputParser()
# 测试路由
question1 = "如何在Python中使用列表推导式?"
route1 = route_chain.invoke({"question": question1})
print(f"Query: {question1}")
print(f"Route: {route1}\n")
question2 = "今天的天气怎么样?"
route2 = route_chain.invoke({"question": question2})
print(f"Query: {question2}")
print(f"Route: {route2}\n")
# 输出示例:
# Route: python_docs
# Route: web_search实现:完整路由系统
from langchain_core.runnables import RunnableLambda
from langchain.vectorstores import Chroma
from langchain_openai import OpenAIEmbeddings
class LogicalRouter:
"""逻辑路由器"""
def __init__(self, llm):
self.llm = llm
self.routes = {}
self.route_chain = self._create_route_chain()
def add_route(self, name: str, retriever):
"""添加路由"""
self.routes[name] = retriever
def _create_route_chain(self):
"""创建路由链"""
route_prompt = ChatPromptTemplate.from_messages([
("system", """基于以下数据源描述,选择最合适的数据源:
{route_descriptions}
只返回数据源名称。"""),
("human", "{question}")
])
return route_prompt | self.llm | StrOutputParser()
def _get_route_descriptions(self) -> str:
"""获取路由描述"""
descriptions = []
for name, retriever in self.routes.items():
desc = getattr(retriever, 'description', f'{name}数据源')
descriptions.append(f"- {name}: {desc}")
return "\n".join(descriptions)
def route(self, question: str):
"""执行路由"""
# 获取路由决策
route_descriptions = self._get_route_descriptions()
route_name = self.route_chain.invoke({
"question": question,
"route_descriptions": route_descriptions
}).strip()
# 清理路由名称
route_name = route_name.lower().strip()
# 检查路由是否存在
if route_name not in self.routes:
# 尝试模糊匹配
for name in self.routes.keys():
if name in route_name or route_name in name:
route_name = name
break
else:
raise ValueError(f"未找到路由: {route_name}")
return route_name, self.routes[route_name]
def query(self, question: str):
"""执行完整查询"""
# 路由到正确的数据源
route_name, retriever = self.route(question)
print(f"📍 路由到: {route_name}")
# 执行检索
docs = retriever.get_relevant_documents(question)
return {
"route": route_name,
"documents": docs,
"question": question
}
# 使用示例
embeddings = OpenAIEmbeddings()
# 创建不同的向量存储
python_docs = Chroma(
collection_name="python_docs",
embedding_function=embeddings
)
python_docs.description = "Python编程语言的官方文档"
web_docs = Chroma(
collection_name="web_search",
embedding_function=embeddings
)
web_docs.description = "实时网络搜索结果和一般性知识"
# 创建路由器
router = LogicalRouter(llm)
router.add_route("python_docs", python_docs.as_retriever())
router.add_route("web_search", web_docs.as_retriever())
# 测试路由
result = router.query("如何在Python中处理异常?")
print(f"找到 {len(result['documents'])} 个文档")优化:带回退机制的路由
class LogicalRouterWithFallback(LogicalRouter):
"""带回退机制的逻辑路由器"""
def __init__(self, llm, fallback_route: str = "web_search"):
super().__init__(llm)
self.fallback_route = fallback_route
def route(self, question: str):
"""执行路由,带回退机制"""
try:
return super().route(question)
except ValueError as e:
print(f"⚠️ 路由失败: {e}, 使用回退路由: {self.fallback_route}")
return self.fallback_route, self.routes[self.fallback_route]
def query_multiple(self, question: str, max_routes: int = 2):
"""查询多个数据源"""
# 获取主路由
primary_route, primary_retriever = self.route(question)
results = {
primary_route: primary_retriever.get_relevant_documents(question)
}
# 如果需要,添加回退路由
if len(results) < max_routes and primary_route != self.fallback_route:
fallback_docs = self.routes[self.fallback_route].get_relevant_documents(question)
results[self.fallback_route] = fallback_docs
return results
# 使用示例
router_with_fallback = LogicalRouterWithFallback(llm)
router_with_fallback.add_route("python_docs", python_docs.as_retriever())
router_with_fallback.add_route("web_search", web_docs.as_retriever())
# 查询多个数据源
results = router_with_fallback.query_multiple(
"Python中的装饰器是什么?",
max_routes=2
)
for route_name, docs in results.items():
print(f"\n{route_name}: {len(docs)} 文档")逻辑路由的优缺点
优点 ✅:
- 可预测和可控
- 易于理解和调试
- 适合确定性场景
- 快速且高效
缺点 ❌:
- 灵活性有限
- 需要预定义规则
- 难以处理边界情况
- 可能需要频繁更新规则
Part 2: 语义路由 - Semantic Routing
核心概念
语义路由使用嵌入向量来路由查询。它为每个路由创建描述性文本的嵌入,然后将查询嵌入与路由嵌入进行相似度比较。
工作原理
用户查询
↓
计算查询嵌入
↓
计算与各路由描述的相似度
↓
选择最相似的路由
↓
执行检索实现:语义路由器
from langchain_openai import OpenAIEmbeddings
import numpy as np
from typing import Dict, List, Tuple
class SemanticRouter:
"""语义路由器"""
def __init__(self, embeddings):
self.embeddings = embeddings
self.routes = {}
self.route_embeddings = {}
def add_route(self, name: str, description: str, retriever):
"""添加路由
Args:
name: 路由名称
description: 路由描述(将被嵌入)
retriever: 检索器
"""
self.routes[name] = {
'description': description,
'retriever': retriever
}
# 计算描述的嵌入
self.route_embeddings[name] = self.embeddings.embed_query(description)
def _cosine_similarity(self, vec1: List[float], vec2: List[float]) -> float:
"""计算余弦相似度"""
vec1 = np.array(vec1)
vec2 = np.array(vec2)
dot_product = np.dot(vec1, vec2)
norm1 = np.linalg.norm(vec1)
norm2 = np.linalg.norm(vec2)
return dot_product / (norm1 * norm2)
def route(self, question: str, threshold: float = 0.7) -> Tuple[str, float]:
"""执行语义路由
Args:
question: 用户查询
threshold: 相似度阈值
Returns:
(路由名称, 相似度分数)
"""
# 计算查询嵌入
query_embedding = self.embeddings.embed_query(question)
# 计算与所有路由的相似度
similarities = {}
for name, route_embedding in self.route_embeddings.items():
similarity = self._cosine_similarity(query_embedding, route_embedding)
similarities[name] = similarity
# 找到最相似的路由
best_route = max(similarities, key=similarities.get)
best_score = similarities[best_route]
# 检查阈值
if best_score < threshold:
print(f"⚠️ 最佳路由分数 {best_score:.3f} 低于阈值 {threshold}")
return best_route, best_score
def route_with_scores(self, question: str) -> Dict[str, float]:
"""返回所有路由及其分数"""
query_embedding = self.embeddings.embed_query(question)
similarities = {}
for name, route_embedding in self.route_embeddings.items():
similarity = self._cosine_similarity(query_embedding, route_embedding)
similarities[name] = similarity
# 按分数排序
return dict(sorted(similarities.items(), key=lambda x: x[1], reverse=True))
def query(self, question: str):
"""执行完整查询"""
# 路由
route_name, score = self.route(question)
print(f"📍 路由到: {route_name} (相似度: {score:.3f})")
# 检索
retriever = self.routes[route_name]['retriever']
docs = retriever.get_relevant_documents(question)
return {
"route": route_name,
"score": score,
"documents": docs,
"question": question
}
# 使用示例
embeddings = OpenAIEmbeddings()
# 创建语义路由器
semantic_router = SemanticRouter(embeddings)
# 添加路由
semantic_router.add_route(
name="python_programming",
description="Python编程语言,包括语法、数据结构、函数、类、模块等",
retriever=python_docs.as_retriever()
)
semantic_router.add_route(
name="machine_learning",
description="机器学习、深度学习、神经网络、模型训练、数据科学",
retriever=ml_docs.as_retriever()
)
semantic_router.add_route(
name="web_development",
description="Web开发、前端、后端、API、数据库、服务器",
retriever=web_docs.as_retriever()
)
# 测试路由
result = semantic_router.query("如何训练一个神经网络?")
# 查看所有路由得分
scores = semantic_router.route_with_scores("如何训练一个神经网络?")
print("\n所有路由得分:")
for route, score in scores.items():
print(f" {route}: {score:.3f}")实现:混合路由器
class HybridRouter:
"""混合路由器:结合逻辑和语义路由"""
def __init__(self, embeddings, llm):
self.semantic_router = SemanticRouter(embeddings)
self.logical_router = LogicalRouter(llm)
self.routes = {}
def add_route(self, name: str, description: str, retriever):
"""添加路由"""
self.semantic_router.add_route(name, description, retriever)
self.logical_router.add_route(name, retriever)
self.routes[name] = retriever
def route(self, question: str, use_semantic: bool = True):
"""执行路由
Args:
question: 用户查询
use_semantic: 是否优先使用语义路由
"""
if use_semantic:
# 尝试语义路由
route_name, score = self.semantic_router.route(question, threshold=0.75)
if score >= 0.75:
print(f"✅ 使用语义路由: {route_name} (分数: {score:.3f})")
return route_name, self.routes[route_name]
else:
print(f"⚠️ 语义路由分数过低,切换到逻辑路由")
# 使用逻辑路由作为后备
route_name, retriever = self.logical_router.route(question)
print(f"✅ 使用逻辑路由: {route_name}")
return route_name, retriever
def query(self, question: str, use_semantic: bool = True):
"""执行查询"""
route_name, retriever = self.route(question, use_semantic)
docs = retriever.get_relevant_documents(question)
return {
"route": route_name,
"documents": docs,
"question": question
}
# 使用示例
hybrid_router = HybridRouter(embeddings, llm)
# 添加路由
hybrid_router.add_route(
"python_programming",
"Python编程相关内容",
python_docs.as_retriever()
)
# 测试
result = hybrid_router.query("Python中的装饰器怎么用?")语义路由 vs 逻辑路由
| 特性 | 逻辑路由 | 语义路由 |
|---|---|---|
| 决策依据 | 规则/LLM分类 | 嵌入相似度 |
| 灵活性 | 中等 | 高 |
| 准确性 | 高(规则明确时) | 中高 |
| 速度 | 快 | 很快⚡ |
| 成本 | 需LLM调用 | 仅需嵌入 |
| 可解释性 | 高 | 中 |
Part 3: 查询构建 - Query Construction
核心概念
查询构建是将自然语言查询转换为结构化查询的过程。它允许我们结合语义搜索和结构化过滤。
为什么需要查询构建?
# 场景: 带元数据的文档检索
文档元数据示例:
{
"content": "深度学习入门教程",
"metadata": {
"author": "张三",
"date": "2023-06-15",
"category": "机器学习",
"tags": ["深度学习", "神经网络"],
"views": 1500
}
}
用户查询:
"找出张三在2023年写的关于深度学习的文章"
需要:
1. 语义搜索: "深度学习"
2. 结构化过滤:
- author == "张三"
- date >= "2023-01-01" AND date <= "2023-12-31"实现:基础查询构建器
from langchain.chains.query_constructor.base import AttributeInfo
from langchain.retrievers.self_query.base import SelfQueryRetriever
from langchain_openai import ChatOpenAI
# 定义文档元数据
metadata_field_info = [
AttributeInfo(
name="author",
description="文档作者",
type="string"
),
AttributeInfo(
name="date",
description="发布日期,格式: YYYY-MM-DD",
type="string"
),
AttributeInfo(
name="category",
description="文档类别",
type="string"
),
AttributeInfo(
name="views",
description="浏览次数",
type="integer"
),
]
# 文档内容描述
document_content_description = "技术文章和教程"
# 创建自查询检索器
llm = ChatOpenAI(model="gpt-3.5-turbo", temperature=0)
self_query_retriever = SelfQueryRetriever.from_llm(
llm=llm,
vectorstore=vectorstore,
document_contents=document_content_description,
metadata_field_info=metadata_field_info,
verbose=True
)
# 使用示例
query = "找出2023年关于Python的文章"
results = self_query_retriever.get_relevant_documents(query)
for doc in results:
print(f"标题: {doc.page_content}")
print(f"元数据: {doc.metadata}\n")实现:手动查询构建
from typing import Dict, Any, Optional
from langchain_core.prompts import ChatPromptTemplate
class QueryConstructor:
"""查询构建器"""
def __init__(self, llm):
self.llm = llm
self.prompt = self._create_prompt()
def _create_prompt(self):
"""创建查询构建提示词"""
return ChatPromptTemplate.from_messages([
("system", """你是一个查询构建助手。将用户的自然语言查询转换为结构化查询。
可用的元数据字段:
- author (string): 作者名称
- date (string): 发布日期,格式YYYY-MM-DD
- category (string): 文档类别
- views (integer): 浏览次数
- tags (list): 标签列表
请返回JSON格式:
{{
"semantic_query": "语义搜索内容",
"filters": {{
"field_name": {{
"operator": "==|!=|>|<|>=|<=|in",
"value": "value"
}}
}}
}}
如果没有过滤条件,filters可以为空对象。"""),
("human", "{question}")
])
def construct(self, question: str) -> Dict[str, Any]:
"""构建查询
Returns:
{
"semantic_query": str,
"filters": dict
}
"""
chain = self.prompt | self.llm
response = chain.invoke({"question": question})
# 解析响应
import json
try:
query_dict = json.loads(response.content)
return query_dict
except json.JSONDecodeError:
# 如果解析失败,返回原始查询
return {
"semantic_query": question,
"filters": {}
}
def construct_filter(self, filters: Dict[str, Any]) -> str:
"""将过滤器转换为向量数据库查询格式
不同的向量数据库有不同的过滤语法:
- Chroma: {"field": {"$op": value}}
- Pinecone: {"field": {"$op": value}}
- Weaviate: where filter
"""
# 这里以Chroma为例
if not filters:
return None
chroma_filter = {}
for field, condition in filters.items():
operator = condition.get("operator", "==")
value = condition["value"]
# 映射操作符
op_map = {
"==": "$eq",
"!=": "$ne",
">": "$gt",
"<": "$lt",
">=": "$gte",
"<=": "$lte",
"in": "$in"
}
chroma_op = op_map.get(operator, "$eq")
chroma_filter[field] = {chroma_op: value}
return chroma_filter
# 使用示例
constructor = QueryConstructor(llm)
# 构建查询
question = "找出张三在2023年写的浏览量超过1000的文章"
query = constructor.construct(question)
print("语义查询:", query["semantic_query"])
print("过滤条件:", query["filters"])
# 转换为向量数据库格式
chroma_filter = constructor.construct_filter(query["filters"])
print("Chroma过滤器:", chroma_filter)实现:高级查询构建
class AdvancedQueryConstructor(QueryConstructor):
"""高级查询构建器"""
def construct_complex_filter(self, question: str) -> Dict[str, Any]:
"""构建复杂过滤器(支持AND/OR逻辑)"""
query = self.construct(question)
filters = query.get("filters", {})
if not filters:
return None
# 支持多个条件的AND逻辑
and_conditions = []
for field, condition in filters.items():
operator = condition.get("operator", "==")
value = condition["value"]
# Chroma格式
if operator == "==":
and_conditions.append({field: {"$eq": value}})
elif operator == ">":
and_conditions.append({field: {"$gt": value}})
elif operator == ">=":
and_conditions.append({field: {"$gte": value}})
elif operator == "<":
and_conditions.append({field: {"$lt": value}})
elif operator == "<=":
and_conditions.append({field: {"$lte": value}})
# 组合多个条件
if len(and_conditions) > 1:
return {"$and": and_conditions}
elif len(and_conditions) == 1:
return and_conditions[0]
else:
return None
def query_with_filter(self, question: str, vectorstore):
"""执行带过滤的查询"""
# 构建查询
query = self.construct(question)
semantic_query = query["semantic_query"]
# 构建过滤器
filter_dict = self.construct_complex_filter(question)
print(f"🔍 语义查询: {semantic_query}")
print(f"📋 过滤条件: {filter_dict}")
# 执行查询
if filter_dict:
docs = vectorstore.similarity_search(
semantic_query,
k=5,
filter=filter_dict
)
else:
docs = vectorstore.similarity_search(semantic_query, k=5)
return docs
# 使用示例
advanced_constructor = AdvancedQueryConstructor(llm)
# 复杂查询
results = advanced_constructor.query_with_filter(
"找出2023年机器学习类别的热门文章",
vectorstore
)
for doc in results:
print(f"\n内容: {doc.page_content[:100]}...")
print(f"元数据: {doc.metadata}")查询构建的最佳实践
# 1. 清晰的元数据定义
metadata_field_info = [
AttributeInfo(
name="price",
description="产品价格,单位:元", # 明确单位
type="float" # 明确类型
),
AttributeInfo(
name="color",
description="产品颜色,可选值: 红色、蓝色、绿色", # 列出可选值
type="string"
),
]
# 2. 处理日期范围
def construct_date_filter(start_date: str, end_date: str):
"""构建日期范围过滤器"""
return {
"$and": [
{"date": {"$gte": start_date}},
{"date": {"$lte": end_date}}
]
}
# 3. 处理多值字段
def construct_tags_filter(tags: List[str]):
"""构建标签过滤器"""
# 包含任一标签
return {"tags": {"$in": tags}}
# 4. 组合多个过滤器
def combine_filters(filters: List[Dict]) -> Dict:
"""组合多个过滤器"""
if not filters:
return None
if len(filters) == 1:
return filters[0]
return {"$and": filters}Part 4: 自查询检索器 - Self-Query Retriever
核心概念
自查询检索器是LangChain提供的高级工具,它能够自动将自然语言查询分离为:
- 语义搜索部分
- 结构化过滤部分
工作原理
用户自然语言查询
↓
LLM分析查询
↓
分离为两部分:
├─ 语义查询内容
└─ 元数据过滤条件
↓
执行混合检索
↓
返回结果实现:自查询检索器
from langchain.chains.query_constructor.base import AttributeInfo
from langchain.retrievers.self_query.base import SelfQueryRetriever
from langchain_openai import ChatOpenAI
from langchain.vectorstores import Chroma
# 1. 定义元数据字段
metadata_field_info = [
AttributeInfo(
name="author",
description="文章作者",
type="string"
),
AttributeInfo(
name="published_date",
description="发布日期,格式: YYYY-MM-DD",
type="string"
),
AttributeInfo(
name="word_count",
description="文章字数",
type="integer"
),
AttributeInfo(
name="category",
description="文章类别,如: 技术、生活、旅游等",
type="string"
),
AttributeInfo(
name="tags",
description="文章标签列表",
type="list[string]"
),
]
# 2. 文档内容描述
document_content_description = "博客文章集合"
# 3. 创建LLM
llm = ChatOpenAI(model="gpt-3.5-turbo", temperature=0)
# 4. 创建自查询检索器
self_query_retriever = SelfQueryRetriever.from_llm(
llm=llm,
vectorstore=vectorstore,
document_contents=document_content_description,
metadata_field_info=metadata_field_info,
verbose=True, # 显示内部过程
enable_limit=True # 启用结果数量限制
)
# 5. 使用自查询检索器
query = "找出李四在2023年写的关于Python的文章"
results = self_query_retriever.get_relevant_documents(query)
print(f"找到 {len(results)} 个结果:")
for doc in results:
print(f"\n标题: {doc.page_content[:50]}...")
print(f"作者: {doc.metadata.get('author')}")
print(f"日期: {doc.metadata.get('published_date')}")自查询检索器的高级用法
# 1. 限制返回数量
query = "最近的5篇机器学习文章"
results = self_query_retriever.get_relevant_documents(query)
# 2. 数值范围查询
query = "字数在1000到5000之间的技术文章"
results = self_query_retriever.get_relevant_documents(query)
# 3. 多条件组合
query = "找出张三或李四在2023年6月之后发布的关于深度学习的文章"
results = self_query_retriever.get_relevant_documents(query)
# 4. 标签查询
query = "带有'Python'或'机器学习'标签的文章"
results = self_query_retriever.get_relevant_documents(query)自定义自查询检索器
from langchain.chains.query_constructor.base import (
StructuredQueryOutputParser,
get_query_constructor_prompt,
)
class CustomSelfQueryRetriever:
"""自定义自查询检索器"""
def __init__(self, llm, vectorstore, metadata_field_info, document_content_description):
self.llm = llm
self.vectorstore = vectorstore
self.metadata_field_info = metadata_field_info
self.document_content_description = document_content_description
# 创建查询构造链
self.query_constructor = self._create_query_constructor()
def _create_query_constructor(self):
"""创建查询构造链"""
prompt = get_query_constructor_prompt(
document_contents=self.document_content_description,
attribute_info=self.metadata_field_info,
)
output_parser = StructuredQueryOutputParser.from_components()
return prompt | self.llm | output_parser
def get_relevant_documents(self, query: str):
"""获取相关文档"""
# 1. 构造结构化查询
structured_query = self.query_constructor.invoke({"query": query})
print(f"📝 语义查询: {structured_query.query}")
print(f"📋 过滤条件: {structured_query.filter}")
# 2. 执行检索
if structured_query.filter:
# 将过滤条件转换为向量数据库格式
filter_dict = self._convert_filter(structured_query.filter)
docs = self.vectorstore.similarity_search(
structured_query.query,
k=structured_query.limit or 5,
filter=filter_dict
)
else:
docs = self.vectorstore.similarity_search(
structured_query.query,
k=structured_query.limit or 5
)
return docs
def _convert_filter(self, filter_condition):
"""转换过滤条件为向量数据库格式"""
# 根据实际使用的向量数据库实现转换逻辑
# 这里以Chroma为例
if hasattr(filter_condition, 'operator'):
# 单个条件
op_map = {
"eq": "$eq",
"ne": "$ne",
"gt": "$gt",
"gte": "$gte",
"lt": "$lt",
"lte": "$lte",
"in": "$in",
}
return {
filter_condition.attribute: {
op_map[filter_condition.operator]: filter_condition.value
}
}
elif hasattr(filter_condition, 'arguments'):
# 复合条件 (AND/OR)
operator = filter_condition.operator.value # "and" or "or"
conditions = [
self._convert_filter(arg)
for arg in filter_condition.arguments
]
return {f"${operator}": conditions}
return None
# 使用示例
custom_retriever = CustomSelfQueryRetriever(
llm=llm,
vectorstore=vectorstore,
metadata_field_info=metadata_field_info,
document_content_description=document_content_description
)
results = custom_retriever.get_relevant_documents(
"2023年发布的高质量Python教程"
)自查询检索器的优势
优点 ✅:
- 自动分离语义和结构化查询
- 无需手动解析查询意图
- 支持复杂的过滤条件
- 易于使用和集成
注意事项 ⚠️:
- 需要清晰的元数据字段定义
- LLM可能误解查询意图
- 依赖LLM性能
- 需要足够的token预算
综合案例:智能文档问答系统
让我们将所有技术结合起来,构建一个完整的智能文档问答系统:
class IntelligentDocumentQA:
"""智能文档问答系统"""
def __init__(self, llm, embeddings):
self.llm = llm
self.embeddings = embeddings
# 初始化路由器
self.semantic_router = SemanticRouter(embeddings)
self.logical_router = LogicalRouter(llm)
# 初始化查询构建器
self.query_constructor = QueryConstructor(llm)
# 数据源
self.vectorstores = {}
self.self_query_retrievers = {}
def add_datasource(
self,
name: str,
description: str,
vectorstore,
metadata_field_info=None
):
"""添加数据源"""
# 添加到路由器
retriever = vectorstore.as_retriever()
self.semantic_router.add_route(name, description, retriever)
self.logical_router.add_route(name, retriever)
# 存储向量存储
self.vectorstores[name] = vectorstore
# 如果提供了元数据信息,创建自查询检索器
if metadata_field_info:
self_query_retriever = SelfQueryRetriever.from_llm(
llm=self.llm,
vectorstore=vectorstore,
document_contents=description,
metadata_field_info=metadata_field_info,
verbose=False
)
self.self_query_retrievers[name] = self_query_retriever
def query(self, question: str, use_self_query: bool = True):
"""执行智能查询"""
print(f"❓ 用户查询: {question}\n")
# Step 1: 路由到合适的数据源
route_name, route_score = self.semantic_router.route(question)
print(f"📍 路由到: {route_name} (分数: {route_score:.3f})\n")
# Step 2: 检查是否应使用自查询
if use_self_query and route_name in self.self_query_retrievers:
print("🔍 使用自查询检索器\n")
retriever = self.self_query_retrievers[route_name]
docs = retriever.get_relevant_documents(question)
else:
print("🔍 使用标准检索器\n")
vectorstore = self.vectorstores[route_name]
docs = vectorstore.similarity_search(question, k=5)
# Step 3: 生成答案
print(f"📚 检索到 {len(docs)} 个文档\n")
context = "\n\n".join([doc.page_content for doc in docs])
answer_prompt = ChatPromptTemplate.from_messages([
("system", "基于以下文档回答问题。如果文档中没有相关信息,请明确说明。"),
("human", "文档:\n{context}\n\n问题: {question}")
])
answer_chain = answer_prompt | self.llm | StrOutputParser()
answer = answer_chain.invoke({
"context": context,
"question": question
})
return {
"question": question,
"route": route_name,
"route_score": route_score,
"num_docs": len(docs),
"documents": docs,
"answer": answer
}
# 使用示例
intelligent_qa = IntelligentDocumentQA(llm, embeddings)
# 添加数据源
intelligent_qa.add_datasource(
name="python_docs",
description="Python编程语言官方文档",
vectorstore=python_vectorstore,
metadata_field_info=[
AttributeInfo(name="version", description="Python版本", type="string"),
AttributeInfo(name="module", description="模块名称", type="string"),
]
)
intelligent_qa.add_datasource(
name="ml_papers",
description="机器学习研究论文",
vectorstore=ml_vectorstore,
metadata_field_info=[
AttributeInfo(name="author", description="作者", type="string"),
AttributeInfo(name="year", description="发表年份", type="integer"),
AttributeInfo(name="conference", description="会议名称", type="string"),
]
)
# 执行查询
result = intelligent_qa.query(
"Python 3.10版本中列表推导式的性能优化"
)
print("\n💡 答案:")
print(result["answer"])性能优化技巧
1. 路由缓存
from functools import lru_cache
import hashlib
class CachedRouter:
"""带缓存的路由器"""
def __init__(self, semantic_router, cache_size=100):
self.semantic_router = semantic_router
self.cache_size = cache_size
self._cache = {}
def _get_cache_key(self, question: str) -> str:
"""生成缓存键"""
return hashlib.md5(question.encode()).hexdigest()
@lru_cache(maxsize=100)
def route(self, question: str):
"""带缓存的路由"""
cache_key = self._get_cache_key(question)
if cache_key in self._cache:
print("💾 使用缓存的路由结果")
return self._cache[cache_key]
# 执行路由
route_name, score = self.semantic_router.route(question)
# 存入缓存
self._cache[cache_key] = (route_name, score)
return route_name, score2. 并行路由
import asyncio
from typing import List, Tuple
class ParallelRouter:
"""并行路由器"""
async def route_multiple(
self,
questions: List[str]
) -> List[Tuple[str, str, float]]:
"""并行路由多个查询
Returns:
List[(question, route_name, score)]
"""
async def route_single(question: str):
# 异步路由(这里简化,实际需要异步嵌入)
route_name, score = self.semantic_router.route(question)
return question, route_name, score
tasks = [route_single(q) for q in questions]
results = await asyncio.gather(*tasks)
return results
# 使用
# results = asyncio.run(router.route_multiple(["问题1", "问题2", "问题3"]))3. 自适应阈值
class AdaptiveRouter:
"""自适应阈值路由器"""
def __init__(self, semantic_router, initial_threshold=0.7):
self.semantic_router = semantic_router
self.threshold = initial_threshold
self.success_rate = []
def route_with_feedback(self, question: str, user_satisfied: bool = None):
"""带反馈的路由"""
route_name, score = self.semantic_router.route(question)
# 如果提供了用户反馈,调整阈值
if user_satisfied is not None:
self.success_rate.append((score, user_satisfied))
self._adjust_threshold()
return route_name, score
def _adjust_threshold(self):
"""调整阈值"""
if len(self.success_rate) < 10:
return
# 计算最近10次的成功率
recent = self.success_rate[-10:]
avg_success_score = sum(s for s, sat in recent if sat) / len([s for s, sat in recent if sat])
avg_fail_score = sum(s for s, sat in recent if not sat) / len([s for s, sat in recent if not sat])
# 调整阈值到两者中间
self.threshold = (avg_success_score + avg_fail_score) / 2
print(f"📊 调整阈值为: {self.threshold:.3f}")总结与最佳实践
🎯 核心要点
- 逻辑路由: 适合规则明确的场景
- 语义路由: 适合需要灵活性的场景
- 查询构建: 结合语义和结构化搜索
- 自查询检索器: 自动化的查询分离工具
📊 技术选择指南
选择路由方式:
├─ 规则明确且固定
│ → 逻辑路由
├─ 需要灵活性
│ → 语义路由
└─ 最佳效果
→ 混合路由
选择查询方式:
├─ 只需语义搜索
│ → 标准检索器
├─ 需要元数据过滤
│ → 查询构建
└─ 复杂查询
→ 自查询检索器💡 最佳实践清单
✅ 路由设计:
- 清晰定义每个数据源的用途
- 使用描述性的路由名称
- 实现回退机制
- 监控路由准确率
✅ 查询构建:
- 详细定义元数据字段
- 提供字段类型和描述
- 列出可选值(如适用)
- 处理边界情况
✅ 性能优化:
- 缓存路由决策
- 使用批处理
- 监控响应时间
- 优化嵌入计算
参考资源
论文
工具
恭喜!你已经掌握了RAG系统中的路由和查询构建技术。接下来,让我们继续学习高级索引和检索技术!🚀