Deeptoai RAG系列教程

RAG路由与查询构建:智能检索的核心技术

深入探讨RAG系统中的路由机制和查询构建技术,包括逻辑路由、语义路由、结构化查询和自查询检索器

RAG路由与查询构建:智能检索的核心技术

在前两章中,我们学习了RAG系统的基础和查询优化技术。但是,当面对多个数据源或需要结构化查询时,如何智能地选择正确的数据源和构建合适的查询呢?本章将介绍路由机制和查询构建技术。

为什么需要路由和查询构建?

实际场景中的挑战

# 场景1: 多个数据源

数据源1: 技术文档数据库
数据源2: 用户手册数据库  
数据源3: FAQ知识库
数据源4: API参考文档

用户查询: "如何使用Python SDK连接数据库?"

# 应该查询哪个数据源?
→ 单一数据源可能不够
→ 查询所有数据源效率低
→ 需要智能路由机制
# 场景2: 复杂查询条件

向量数据库包含:
- 文档内容 (embedding)
- 元数据: 
  - 作者
  - 发布日期
  - 文档类型
  - 标签

用户查询: "找出2023年发布的关于机器学习的文章"

# 需要同时考虑:
→ 语义相似度 (机器学习)
→ 结构化条件 (日期 >= 2023-01-01)
→ 需要查询构建技术

本章内容概览

技术核心功能适用场景复杂度
逻辑路由基于规则的路由确定性路由
语义路由基于LLM的路由灵活路由⭐⭐
结构化查询构建filter条件带元数据查询⭐⭐
自查询检索器自动分离查询意图复杂查询⭐⭐⭐

Part 1: 逻辑路由 - Logical Routing

核心概念

逻辑路由使用基于规则的方法来决定将查询发送到哪个数据源。它通过LLM理解查询内容,然后根据预定义的规则选择合适的数据源。

工作原理

用户查询

LLM分析查询意图

匹配预定义的路由规则

选择目标数据源

执行检索

实现:基础逻辑路由

from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from typing import Literal

# 定义路由提示词
route_prompt = ChatPromptTemplate.from_messages([
    ("system", """你是一个路由助手,负责将用户查询发送到正确的数据源。

可用的数据源:
- python_docs: Python编程相关的技术文档
- web_search: 需要实时信息或一般性问题
- database: 数据库相关的查询

请分析用户查询,返回最合适的数据源名称(只返回名称,不要其他内容)。"""),
    ("human", "{question}")
])

llm = ChatOpenAI(model="gpt-3.5-turbo", temperature=0)

# 创建路由链
route_chain = route_prompt | llm | StrOutputParser()

# 测试路由
question1 = "如何在Python中使用列表推导式?"
route1 = route_chain.invoke({"question": question1})
print(f"Query: {question1}")
print(f"Route: {route1}\n")

question2 = "今天的天气怎么样?"
route2 = route_chain.invoke({"question": question2})
print(f"Query: {question2}")
print(f"Route: {route2}\n")

# 输出示例:
# Route: python_docs
# Route: web_search

实现:完整路由系统

from langchain_core.runnables import RunnableLambda
from langchain.vectorstores import Chroma
from langchain_openai import OpenAIEmbeddings

class LogicalRouter:
    """逻辑路由器"""
    
    def __init__(self, llm):
        self.llm = llm
        self.routes = {}
        self.route_chain = self._create_route_chain()
    
    def add_route(self, name: str, retriever):
        """添加路由"""
        self.routes[name] = retriever
        
    def _create_route_chain(self):
        """创建路由链"""
        route_prompt = ChatPromptTemplate.from_messages([
            ("system", """基于以下数据源描述,选择最合适的数据源:

{route_descriptions}

只返回数据源名称。"""),
            ("human", "{question}")
        ])
        
        return route_prompt | self.llm | StrOutputParser()
    
    def _get_route_descriptions(self) -> str:
        """获取路由描述"""
        descriptions = []
        for name, retriever in self.routes.items():
            desc = getattr(retriever, 'description', f'{name}数据源')
            descriptions.append(f"- {name}: {desc}")
        return "\n".join(descriptions)
    
    def route(self, question: str):
        """执行路由"""
        # 获取路由决策
        route_descriptions = self._get_route_descriptions()
        route_name = self.route_chain.invoke({
            "question": question,
            "route_descriptions": route_descriptions
        }).strip()
        
        # 清理路由名称
        route_name = route_name.lower().strip()
        
        # 检查路由是否存在
        if route_name not in self.routes:
            # 尝试模糊匹配
            for name in self.routes.keys():
                if name in route_name or route_name in name:
                    route_name = name
                    break
            else:
                raise ValueError(f"未找到路由: {route_name}")
        
        return route_name, self.routes[route_name]
    
    def query(self, question: str):
        """执行完整查询"""
        # 路由到正确的数据源
        route_name, retriever = self.route(question)
        print(f"📍 路由到: {route_name}")
        
        # 执行检索
        docs = retriever.get_relevant_documents(question)
        
        return {
            "route": route_name,
            "documents": docs,
            "question": question
        }

# 使用示例
embeddings = OpenAIEmbeddings()

# 创建不同的向量存储
python_docs = Chroma(
    collection_name="python_docs",
    embedding_function=embeddings
)
python_docs.description = "Python编程语言的官方文档"

web_docs = Chroma(
    collection_name="web_search",
    embedding_function=embeddings
)
web_docs.description = "实时网络搜索结果和一般性知识"

# 创建路由器
router = LogicalRouter(llm)
router.add_route("python_docs", python_docs.as_retriever())
router.add_route("web_search", web_docs.as_retriever())

# 测试路由
result = router.query("如何在Python中处理异常?")
print(f"找到 {len(result['documents'])} 个文档")

优化:带回退机制的路由

class LogicalRouterWithFallback(LogicalRouter):
    """带回退机制的逻辑路由器"""
    
    def __init__(self, llm, fallback_route: str = "web_search"):
        super().__init__(llm)
        self.fallback_route = fallback_route
    
    def route(self, question: str):
        """执行路由,带回退机制"""
        try:
            return super().route(question)
        except ValueError as e:
            print(f"⚠️ 路由失败: {e}, 使用回退路由: {self.fallback_route}")
            return self.fallback_route, self.routes[self.fallback_route]
    
    def query_multiple(self, question: str, max_routes: int = 2):
        """查询多个数据源"""
        # 获取主路由
        primary_route, primary_retriever = self.route(question)
        
        results = {
            primary_route: primary_retriever.get_relevant_documents(question)
        }
        
        # 如果需要,添加回退路由
        if len(results) < max_routes and primary_route != self.fallback_route:
            fallback_docs = self.routes[self.fallback_route].get_relevant_documents(question)
            results[self.fallback_route] = fallback_docs
        
        return results

# 使用示例
router_with_fallback = LogicalRouterWithFallback(llm)
router_with_fallback.add_route("python_docs", python_docs.as_retriever())
router_with_fallback.add_route("web_search", web_docs.as_retriever())

# 查询多个数据源
results = router_with_fallback.query_multiple(
    "Python中的装饰器是什么?",
    max_routes=2
)

for route_name, docs in results.items():
    print(f"\n{route_name}: {len(docs)} 文档")

逻辑路由的优缺点

优点 ✅:

  • 可预测和可控
  • 易于理解和调试
  • 适合确定性场景
  • 快速且高效

缺点 ❌:

  • 灵活性有限
  • 需要预定义规则
  • 难以处理边界情况
  • 可能需要频繁更新规则

Part 2: 语义路由 - Semantic Routing

核心概念

语义路由使用嵌入向量来路由查询。它为每个路由创建描述性文本的嵌入,然后将查询嵌入与路由嵌入进行相似度比较。

工作原理

用户查询

计算查询嵌入

计算与各路由描述的相似度

选择最相似的路由

执行检索

实现:语义路由器

from langchain_openai import OpenAIEmbeddings
import numpy as np
from typing import Dict, List, Tuple

class SemanticRouter:
    """语义路由器"""
    
    def __init__(self, embeddings):
        self.embeddings = embeddings
        self.routes = {}
        self.route_embeddings = {}
    
    def add_route(self, name: str, description: str, retriever):
        """添加路由
        
        Args:
            name: 路由名称
            description: 路由描述(将被嵌入)
            retriever: 检索器
        """
        self.routes[name] = {
            'description': description,
            'retriever': retriever
        }
        
        # 计算描述的嵌入
        self.route_embeddings[name] = self.embeddings.embed_query(description)
    
    def _cosine_similarity(self, vec1: List[float], vec2: List[float]) -> float:
        """计算余弦相似度"""
        vec1 = np.array(vec1)
        vec2 = np.array(vec2)
        
        dot_product = np.dot(vec1, vec2)
        norm1 = np.linalg.norm(vec1)
        norm2 = np.linalg.norm(vec2)
        
        return dot_product / (norm1 * norm2)
    
    def route(self, question: str, threshold: float = 0.7) -> Tuple[str, float]:
        """执行语义路由
        
        Args:
            question: 用户查询
            threshold: 相似度阈值
            
        Returns:
            (路由名称, 相似度分数)
        """
        # 计算查询嵌入
        query_embedding = self.embeddings.embed_query(question)
        
        # 计算与所有路由的相似度
        similarities = {}
        for name, route_embedding in self.route_embeddings.items():
            similarity = self._cosine_similarity(query_embedding, route_embedding)
            similarities[name] = similarity
        
        # 找到最相似的路由
        best_route = max(similarities, key=similarities.get)
        best_score = similarities[best_route]
        
        # 检查阈值
        if best_score < threshold:
            print(f"⚠️ 最佳路由分数 {best_score:.3f} 低于阈值 {threshold}")
        
        return best_route, best_score
    
    def route_with_scores(self, question: str) -> Dict[str, float]:
        """返回所有路由及其分数"""
        query_embedding = self.embeddings.embed_query(question)
        
        similarities = {}
        for name, route_embedding in self.route_embeddings.items():
            similarity = self._cosine_similarity(query_embedding, route_embedding)
            similarities[name] = similarity
        
        # 按分数排序
        return dict(sorted(similarities.items(), key=lambda x: x[1], reverse=True))
    
    def query(self, question: str):
        """执行完整查询"""
        # 路由
        route_name, score = self.route(question)
        print(f"📍 路由到: {route_name} (相似度: {score:.3f})")
        
        # 检索
        retriever = self.routes[route_name]['retriever']
        docs = retriever.get_relevant_documents(question)
        
        return {
            "route": route_name,
            "score": score,
            "documents": docs,
            "question": question
        }

# 使用示例
embeddings = OpenAIEmbeddings()

# 创建语义路由器
semantic_router = SemanticRouter(embeddings)

# 添加路由
semantic_router.add_route(
    name="python_programming",
    description="Python编程语言,包括语法、数据结构、函数、类、模块等",
    retriever=python_docs.as_retriever()
)

semantic_router.add_route(
    name="machine_learning",
    description="机器学习、深度学习、神经网络、模型训练、数据科学",
    retriever=ml_docs.as_retriever()
)

semantic_router.add_route(
    name="web_development",
    description="Web开发、前端、后端、API、数据库、服务器",
    retriever=web_docs.as_retriever()
)

# 测试路由
result = semantic_router.query("如何训练一个神经网络?")

# 查看所有路由得分
scores = semantic_router.route_with_scores("如何训练一个神经网络?")
print("\n所有路由得分:")
for route, score in scores.items():
    print(f"  {route}: {score:.3f}")

实现:混合路由器

class HybridRouter:
    """混合路由器:结合逻辑和语义路由"""
    
    def __init__(self, embeddings, llm):
        self.semantic_router = SemanticRouter(embeddings)
        self.logical_router = LogicalRouter(llm)
        self.routes = {}
    
    def add_route(self, name: str, description: str, retriever):
        """添加路由"""
        self.semantic_router.add_route(name, description, retriever)
        self.logical_router.add_route(name, retriever)
        self.routes[name] = retriever
    
    def route(self, question: str, use_semantic: bool = True):
        """执行路由
        
        Args:
            question: 用户查询
            use_semantic: 是否优先使用语义路由
        """
        if use_semantic:
            # 尝试语义路由
            route_name, score = self.semantic_router.route(question, threshold=0.75)
            
            if score >= 0.75:
                print(f"✅ 使用语义路由: {route_name} (分数: {score:.3f})")
                return route_name, self.routes[route_name]
            else:
                print(f"⚠️ 语义路由分数过低,切换到逻辑路由")
        
        # 使用逻辑路由作为后备
        route_name, retriever = self.logical_router.route(question)
        print(f"✅ 使用逻辑路由: {route_name}")
        return route_name, retriever
    
    def query(self, question: str, use_semantic: bool = True):
        """执行查询"""
        route_name, retriever = self.route(question, use_semantic)
        docs = retriever.get_relevant_documents(question)
        
        return {
            "route": route_name,
            "documents": docs,
            "question": question
        }

# 使用示例
hybrid_router = HybridRouter(embeddings, llm)

# 添加路由
hybrid_router.add_route(
    "python_programming",
    "Python编程相关内容",
    python_docs.as_retriever()
)

# 测试
result = hybrid_router.query("Python中的装饰器怎么用?")

语义路由 vs 逻辑路由

特性逻辑路由语义路由
决策依据规则/LLM分类嵌入相似度
灵活性中等
准确性高(规则明确时)中高
速度很快⚡
成本需LLM调用仅需嵌入
可解释性

Part 3: 查询构建 - Query Construction

核心概念

查询构建是将自然语言查询转换为结构化查询的过程。它允许我们结合语义搜索和结构化过滤。

为什么需要查询构建?

# 场景: 带元数据的文档检索

文档元数据示例:
{
    "content": "深度学习入门教程",
    "metadata": {
        "author": "张三",
        "date": "2023-06-15",
        "category": "机器学习",
        "tags": ["深度学习", "神经网络"],
        "views": 1500
    }
}

用户查询:
"找出张三在2023年写的关于深度学习的文章"

需要:
1. 语义搜索: "深度学习"
2. 结构化过滤:
   - author == "张三"
   - date >= "2023-01-01" AND date <= "2023-12-31"

实现:基础查询构建器

from langchain.chains.query_constructor.base import AttributeInfo
from langchain.retrievers.self_query.base import SelfQueryRetriever
from langchain_openai import ChatOpenAI

# 定义文档元数据
metadata_field_info = [
    AttributeInfo(
        name="author",
        description="文档作者",
        type="string"
    ),
    AttributeInfo(
        name="date",
        description="发布日期,格式: YYYY-MM-DD",
        type="string"
    ),
    AttributeInfo(
        name="category",
        description="文档类别",
        type="string"
    ),
    AttributeInfo(
        name="views",
        description="浏览次数",
        type="integer"
    ),
]

# 文档内容描述
document_content_description = "技术文章和教程"

# 创建自查询检索器
llm = ChatOpenAI(model="gpt-3.5-turbo", temperature=0)

self_query_retriever = SelfQueryRetriever.from_llm(
    llm=llm,
    vectorstore=vectorstore,
    document_contents=document_content_description,
    metadata_field_info=metadata_field_info,
    verbose=True
)

# 使用示例
query = "找出2023年关于Python的文章"
results = self_query_retriever.get_relevant_documents(query)

for doc in results:
    print(f"标题: {doc.page_content}")
    print(f"元数据: {doc.metadata}\n")

实现:手动查询构建

from typing import Dict, Any, Optional
from langchain_core.prompts import ChatPromptTemplate

class QueryConstructor:
    """查询构建器"""
    
    def __init__(self, llm):
        self.llm = llm
        self.prompt = self._create_prompt()
    
    def _create_prompt(self):
        """创建查询构建提示词"""
        return ChatPromptTemplate.from_messages([
            ("system", """你是一个查询构建助手。将用户的自然语言查询转换为结构化查询。

可用的元数据字段:
- author (string): 作者名称
- date (string): 发布日期,格式YYYY-MM-DD
- category (string): 文档类别
- views (integer): 浏览次数
- tags (list): 标签列表

请返回JSON格式:
{{
    "semantic_query": "语义搜索内容",
    "filters": {{
        "field_name": {{
            "operator": "==|!=|>|<|>=|<=|in",
            "value": "value"
        }}
    }}
}}

如果没有过滤条件,filters可以为空对象。"""),
            ("human", "{question}")
        ])
    
    def construct(self, question: str) -> Dict[str, Any]:
        """构建查询
        
        Returns:
            {
                "semantic_query": str,
                "filters": dict
            }
        """
        chain = self.prompt | self.llm
        response = chain.invoke({"question": question})
        
        # 解析响应
        import json
        try:
            query_dict = json.loads(response.content)
            return query_dict
        except json.JSONDecodeError:
            # 如果解析失败,返回原始查询
            return {
                "semantic_query": question,
                "filters": {}
            }
    
    def construct_filter(self, filters: Dict[str, Any]) -> str:
        """将过滤器转换为向量数据库查询格式
        
        不同的向量数据库有不同的过滤语法:
        - Chroma: {"field": {"$op": value}}
        - Pinecone: {"field": {"$op": value}}
        - Weaviate: where filter
        """
        # 这里以Chroma为例
        if not filters:
            return None
        
        chroma_filter = {}
        for field, condition in filters.items():
            operator = condition.get("operator", "==")
            value = condition["value"]
            
            # 映射操作符
            op_map = {
                "==": "$eq",
                "!=": "$ne",
                ">": "$gt",
                "<": "$lt",
                ">=": "$gte",
                "<=": "$lte",
                "in": "$in"
            }
            
            chroma_op = op_map.get(operator, "$eq")
            chroma_filter[field] = {chroma_op: value}
        
        return chroma_filter

# 使用示例
constructor = QueryConstructor(llm)

# 构建查询
question = "找出张三在2023年写的浏览量超过1000的文章"
query = constructor.construct(question)

print("语义查询:", query["semantic_query"])
print("过滤条件:", query["filters"])

# 转换为向量数据库格式
chroma_filter = constructor.construct_filter(query["filters"])
print("Chroma过滤器:", chroma_filter)

实现:高级查询构建

class AdvancedQueryConstructor(QueryConstructor):
    """高级查询构建器"""
    
    def construct_complex_filter(self, question: str) -> Dict[str, Any]:
        """构建复杂过滤器(支持AND/OR逻辑)"""
        query = self.construct(question)
        filters = query.get("filters", {})
        
        if not filters:
            return None
        
        # 支持多个条件的AND逻辑
        and_conditions = []
        for field, condition in filters.items():
            operator = condition.get("operator", "==")
            value = condition["value"]
            
            # Chroma格式
            if operator == "==":
                and_conditions.append({field: {"$eq": value}})
            elif operator == ">":
                and_conditions.append({field: {"$gt": value}})
            elif operator == ">=":
                and_conditions.append({field: {"$gte": value}})
            elif operator == "<":
                and_conditions.append({field: {"$lt": value}})
            elif operator == "<=":
                and_conditions.append({field: {"$lte": value}})
        
        # 组合多个条件
        if len(and_conditions) > 1:
            return {"$and": and_conditions}
        elif len(and_conditions) == 1:
            return and_conditions[0]
        else:
            return None
    
    def query_with_filter(self, question: str, vectorstore):
        """执行带过滤的查询"""
        # 构建查询
        query = self.construct(question)
        semantic_query = query["semantic_query"]
        
        # 构建过滤器
        filter_dict = self.construct_complex_filter(question)
        
        print(f"🔍 语义查询: {semantic_query}")
        print(f"📋 过滤条件: {filter_dict}")
        
        # 执行查询
        if filter_dict:
            docs = vectorstore.similarity_search(
                semantic_query,
                k=5,
                filter=filter_dict
            )
        else:
            docs = vectorstore.similarity_search(semantic_query, k=5)
        
        return docs

# 使用示例
advanced_constructor = AdvancedQueryConstructor(llm)

# 复杂查询
results = advanced_constructor.query_with_filter(
    "找出2023年机器学习类别的热门文章",
    vectorstore
)

for doc in results:
    print(f"\n内容: {doc.page_content[:100]}...")
    print(f"元数据: {doc.metadata}")

查询构建的最佳实践

# 1. 清晰的元数据定义
metadata_field_info = [
    AttributeInfo(
        name="price",
        description="产品价格,单位:元",  # 明确单位
        type="float"  # 明确类型
    ),
    AttributeInfo(
        name="color",
        description="产品颜色,可选值: 红色、蓝色、绿色",  # 列出可选值
        type="string"
    ),
]

# 2. 处理日期范围
def construct_date_filter(start_date: str, end_date: str):
    """构建日期范围过滤器"""
    return {
        "$and": [
            {"date": {"$gte": start_date}},
            {"date": {"$lte": end_date}}
        ]
    }

# 3. 处理多值字段
def construct_tags_filter(tags: List[str]):
    """构建标签过滤器"""
    # 包含任一标签
    return {"tags": {"$in": tags}}

# 4. 组合多个过滤器
def combine_filters(filters: List[Dict]) -> Dict:
    """组合多个过滤器"""
    if not filters:
        return None
    if len(filters) == 1:
        return filters[0]
    return {"$and": filters}

Part 4: 自查询检索器 - Self-Query Retriever

核心概念

自查询检索器是LangChain提供的高级工具,它能够自动将自然语言查询分离为:

  1. 语义搜索部分
  2. 结构化过滤部分

工作原理

用户自然语言查询

LLM分析查询

分离为两部分:
├─ 语义查询内容
└─ 元数据过滤条件

执行混合检索

返回结果

实现:自查询检索器

from langchain.chains.query_constructor.base import AttributeInfo
from langchain.retrievers.self_query.base import SelfQueryRetriever
from langchain_openai import ChatOpenAI
from langchain.vectorstores import Chroma

# 1. 定义元数据字段
metadata_field_info = [
    AttributeInfo(
        name="author",
        description="文章作者",
        type="string"
    ),
    AttributeInfo(
        name="published_date",
        description="发布日期,格式: YYYY-MM-DD",
        type="string"
    ),
    AttributeInfo(
        name="word_count",
        description="文章字数",
        type="integer"
    ),
    AttributeInfo(
        name="category",
        description="文章类别,如: 技术、生活、旅游等",
        type="string"
    ),
    AttributeInfo(
        name="tags",
        description="文章标签列表",
        type="list[string]"
    ),
]

# 2. 文档内容描述
document_content_description = "博客文章集合"

# 3. 创建LLM
llm = ChatOpenAI(model="gpt-3.5-turbo", temperature=0)

# 4. 创建自查询检索器
self_query_retriever = SelfQueryRetriever.from_llm(
    llm=llm,
    vectorstore=vectorstore,
    document_contents=document_content_description,
    metadata_field_info=metadata_field_info,
    verbose=True,  # 显示内部过程
    enable_limit=True  # 启用结果数量限制
)

# 5. 使用自查询检索器
query = "找出李四在2023年写的关于Python的文章"
results = self_query_retriever.get_relevant_documents(query)

print(f"找到 {len(results)} 个结果:")
for doc in results:
    print(f"\n标题: {doc.page_content[:50]}...")
    print(f"作者: {doc.metadata.get('author')}")
    print(f"日期: {doc.metadata.get('published_date')}")

自查询检索器的高级用法

# 1. 限制返回数量
query = "最近的5篇机器学习文章"
results = self_query_retriever.get_relevant_documents(query)

# 2. 数值范围查询
query = "字数在1000到5000之间的技术文章"
results = self_query_retriever.get_relevant_documents(query)

# 3. 多条件组合
query = "找出张三或李四在2023年6月之后发布的关于深度学习的文章"
results = self_query_retriever.get_relevant_documents(query)

# 4. 标签查询
query = "带有'Python'或'机器学习'标签的文章"
results = self_query_retriever.get_relevant_documents(query)

自定义自查询检索器

from langchain.chains.query_constructor.base import (
    StructuredQueryOutputParser,
    get_query_constructor_prompt,
)

class CustomSelfQueryRetriever:
    """自定义自查询检索器"""
    
    def __init__(self, llm, vectorstore, metadata_field_info, document_content_description):
        self.llm = llm
        self.vectorstore = vectorstore
        self.metadata_field_info = metadata_field_info
        self.document_content_description = document_content_description
        
        # 创建查询构造链
        self.query_constructor = self._create_query_constructor()
    
    def _create_query_constructor(self):
        """创建查询构造链"""
        prompt = get_query_constructor_prompt(
            document_contents=self.document_content_description,
            attribute_info=self.metadata_field_info,
        )
        
        output_parser = StructuredQueryOutputParser.from_components()
        
        return prompt | self.llm | output_parser
    
    def get_relevant_documents(self, query: str):
        """获取相关文档"""
        # 1. 构造结构化查询
        structured_query = self.query_constructor.invoke({"query": query})
        
        print(f"📝 语义查询: {structured_query.query}")
        print(f"📋 过滤条件: {structured_query.filter}")
        
        # 2. 执行检索
        if structured_query.filter:
            # 将过滤条件转换为向量数据库格式
            filter_dict = self._convert_filter(structured_query.filter)
            docs = self.vectorstore.similarity_search(
                structured_query.query,
                k=structured_query.limit or 5,
                filter=filter_dict
            )
        else:
            docs = self.vectorstore.similarity_search(
                structured_query.query,
                k=structured_query.limit or 5
            )
        
        return docs
    
    def _convert_filter(self, filter_condition):
        """转换过滤条件为向量数据库格式"""
        # 根据实际使用的向量数据库实现转换逻辑
        # 这里以Chroma为例
        if hasattr(filter_condition, 'operator'):
            # 单个条件
            op_map = {
                "eq": "$eq",
                "ne": "$ne",
                "gt": "$gt",
                "gte": "$gte",
                "lt": "$lt",
                "lte": "$lte",
                "in": "$in",
            }
            
            return {
                filter_condition.attribute: {
                    op_map[filter_condition.operator]: filter_condition.value
                }
            }
        elif hasattr(filter_condition, 'arguments'):
            # 复合条件 (AND/OR)
            operator = filter_condition.operator.value  # "and" or "or"
            conditions = [
                self._convert_filter(arg) 
                for arg in filter_condition.arguments
            ]
            
            return {f"${operator}": conditions}
        
        return None

# 使用示例
custom_retriever = CustomSelfQueryRetriever(
    llm=llm,
    vectorstore=vectorstore,
    metadata_field_info=metadata_field_info,
    document_content_description=document_content_description
)

results = custom_retriever.get_relevant_documents(
    "2023年发布的高质量Python教程"
)

自查询检索器的优势

优点 ✅:

  • 自动分离语义和结构化查询
  • 无需手动解析查询意图
  • 支持复杂的过滤条件
  • 易于使用和集成

注意事项 ⚠️:

  • 需要清晰的元数据字段定义
  • LLM可能误解查询意图
  • 依赖LLM性能
  • 需要足够的token预算

综合案例:智能文档问答系统

让我们将所有技术结合起来,构建一个完整的智能文档问答系统:

class IntelligentDocumentQA:
    """智能文档问答系统"""
    
    def __init__(self, llm, embeddings):
        self.llm = llm
        self.embeddings = embeddings
        
        # 初始化路由器
        self.semantic_router = SemanticRouter(embeddings)
        self.logical_router = LogicalRouter(llm)
        
        # 初始化查询构建器
        self.query_constructor = QueryConstructor(llm)
        
        # 数据源
        self.vectorstores = {}
        self.self_query_retrievers = {}
    
    def add_datasource(
        self,
        name: str,
        description: str,
        vectorstore,
        metadata_field_info=None
    ):
        """添加数据源"""
        # 添加到路由器
        retriever = vectorstore.as_retriever()
        self.semantic_router.add_route(name, description, retriever)
        self.logical_router.add_route(name, retriever)
        
        # 存储向量存储
        self.vectorstores[name] = vectorstore
        
        # 如果提供了元数据信息,创建自查询检索器
        if metadata_field_info:
            self_query_retriever = SelfQueryRetriever.from_llm(
                llm=self.llm,
                vectorstore=vectorstore,
                document_contents=description,
                metadata_field_info=metadata_field_info,
                verbose=False
            )
            self.self_query_retrievers[name] = self_query_retriever
    
    def query(self, question: str, use_self_query: bool = True):
        """执行智能查询"""
        print(f"❓ 用户查询: {question}\n")
        
        # Step 1: 路由到合适的数据源
        route_name, route_score = self.semantic_router.route(question)
        print(f"📍 路由到: {route_name} (分数: {route_score:.3f})\n")
        
        # Step 2: 检查是否应使用自查询
        if use_self_query and route_name in self.self_query_retrievers:
            print("🔍 使用自查询检索器\n")
            retriever = self.self_query_retrievers[route_name]
            docs = retriever.get_relevant_documents(question)
        else:
            print("🔍 使用标准检索器\n")
            vectorstore = self.vectorstores[route_name]
            docs = vectorstore.similarity_search(question, k=5)
        
        # Step 3: 生成答案
        print(f"📚 检索到 {len(docs)} 个文档\n")
        context = "\n\n".join([doc.page_content for doc in docs])
        
        answer_prompt = ChatPromptTemplate.from_messages([
            ("system", "基于以下文档回答问题。如果文档中没有相关信息,请明确说明。"),
            ("human", "文档:\n{context}\n\n问题: {question}")
        ])
        
        answer_chain = answer_prompt | self.llm | StrOutputParser()
        answer = answer_chain.invoke({
            "context": context,
            "question": question
        })
        
        return {
            "question": question,
            "route": route_name,
            "route_score": route_score,
            "num_docs": len(docs),
            "documents": docs,
            "answer": answer
        }

# 使用示例
intelligent_qa = IntelligentDocumentQA(llm, embeddings)

# 添加数据源
intelligent_qa.add_datasource(
    name="python_docs",
    description="Python编程语言官方文档",
    vectorstore=python_vectorstore,
    metadata_field_info=[
        AttributeInfo(name="version", description="Python版本", type="string"),
        AttributeInfo(name="module", description="模块名称", type="string"),
    ]
)

intelligent_qa.add_datasource(
    name="ml_papers",
    description="机器学习研究论文",
    vectorstore=ml_vectorstore,
    metadata_field_info=[
        AttributeInfo(name="author", description="作者", type="string"),
        AttributeInfo(name="year", description="发表年份", type="integer"),
        AttributeInfo(name="conference", description="会议名称", type="string"),
    ]
)

# 执行查询
result = intelligent_qa.query(
    "Python 3.10版本中列表推导式的性能优化"
)

print("\n💡 答案:")
print(result["answer"])

性能优化技巧

1. 路由缓存

from functools import lru_cache
import hashlib

class CachedRouter:
    """带缓存的路由器"""
    
    def __init__(self, semantic_router, cache_size=100):
        self.semantic_router = semantic_router
        self.cache_size = cache_size
        self._cache = {}
    
    def _get_cache_key(self, question: str) -> str:
        """生成缓存键"""
        return hashlib.md5(question.encode()).hexdigest()
    
    @lru_cache(maxsize=100)
    def route(self, question: str):
        """带缓存的路由"""
        cache_key = self._get_cache_key(question)
        
        if cache_key in self._cache:
            print("💾 使用缓存的路由结果")
            return self._cache[cache_key]
        
        # 执行路由
        route_name, score = self.semantic_router.route(question)
        
        # 存入缓存
        self._cache[cache_key] = (route_name, score)
        
        return route_name, score

2. 并行路由

import asyncio
from typing import List, Tuple

class ParallelRouter:
    """并行路由器"""
    
    async def route_multiple(
        self,
        questions: List[str]
    ) -> List[Tuple[str, str, float]]:
        """并行路由多个查询
        
        Returns:
            List[(question, route_name, score)]
        """
        async def route_single(question: str):
            # 异步路由(这里简化,实际需要异步嵌入)
            route_name, score = self.semantic_router.route(question)
            return question, route_name, score
        
        tasks = [route_single(q) for q in questions]
        results = await asyncio.gather(*tasks)
        
        return results

# 使用
# results = asyncio.run(router.route_multiple(["问题1", "问题2", "问题3"]))

3. 自适应阈值

class AdaptiveRouter:
    """自适应阈值路由器"""
    
    def __init__(self, semantic_router, initial_threshold=0.7):
        self.semantic_router = semantic_router
        self.threshold = initial_threshold
        self.success_rate = []
    
    def route_with_feedback(self, question: str, user_satisfied: bool = None):
        """带反馈的路由"""
        route_name, score = self.semantic_router.route(question)
        
        # 如果提供了用户反馈,调整阈值
        if user_satisfied is not None:
            self.success_rate.append((score, user_satisfied))
            self._adjust_threshold()
        
        return route_name, score
    
    def _adjust_threshold(self):
        """调整阈值"""
        if len(self.success_rate) < 10:
            return
        
        # 计算最近10次的成功率
        recent = self.success_rate[-10:]
        avg_success_score = sum(s for s, sat in recent if sat) / len([s for s, sat in recent if sat])
        avg_fail_score = sum(s for s, sat in recent if not sat) / len([s for s, sat in recent if not sat])
        
        # 调整阈值到两者中间
        self.threshold = (avg_success_score + avg_fail_score) / 2
        print(f"📊 调整阈值为: {self.threshold:.3f}")

总结与最佳实践

🎯 核心要点

  1. 逻辑路由: 适合规则明确的场景
  2. 语义路由: 适合需要灵活性的场景
  3. 查询构建: 结合语义和结构化搜索
  4. 自查询检索器: 自动化的查询分离工具

📊 技术选择指南

选择路由方式:
├─ 规则明确且固定
│  → 逻辑路由
├─ 需要灵活性
│  → 语义路由
└─ 最佳效果
   → 混合路由

选择查询方式:
├─ 只需语义搜索
│  → 标准检索器
├─ 需要元数据过滤
│  → 查询构建
└─ 复杂查询
   → 自查询检索器

💡 最佳实践清单

路由设计:

  • 清晰定义每个数据源的用途
  • 使用描述性的路由名称
  • 实现回退机制
  • 监控路由准确率

查询构建:

  • 详细定义元数据字段
  • 提供字段类型和描述
  • 列出可选值(如适用)
  • 处理边界情况

性能优化:

  • 缓存路由决策
  • 使用批处理
  • 监控响应时间
  • 优化嵌入计算

参考资源

论文

工具

恭喜!你已经掌握了RAG系统中的路由和查询构建技术。接下来,让我们继续学习高级索引和检索技术!🚀