Deeptoai RAG系列教程

RAG高级索引与检索策略:提升检索质量的关键

深入探讨RAG系统中的高级索引技术和检索策略,包括文档分块、多向量索引、父文档检索和上下文压缩

RAG高级索引与检索策略:提升检索质量的关键

在前面的章节中,我们学习了查询优化和路由技术。但是,检索质量不仅取决于查询,还取决于如何组织和索引文档。本章将深入探讨高级索引和检索策略。

为什么需要高级索引?

基础索引的局限性

# 基础索引方法的问题

问题1: 文档过长
→ 一个10000字的文档被整体嵌入
→ 语义信息过于粗糙
→ 检索不精确

问题2: 上下文丢失
→ 将文档分成小块
→ 每块独立检索
→ 丢失了块与块之间的关系
→ 无法理解完整上下文

问题3: 多语义内容
→ 一个文档包含多个主题
→ 单一向量无法表示所有语义
→ 相关内容可能被遗漏

问题4: 检索冗余
→ 检索到大量文档
→ 很多内容重复或不相关
→ 影响最终答案质量

本章技术概览

技术核心目标适用场景复杂度
智能分块优化块大小和边界所有RAG系统
多向量索引一个文档多个向量多主题文档⭐⭐⭐
父文档检索检索小块返回大块保持上下文⭐⭐
上下文压缩压缩检索结果减少冗余⭐⭐⭐
时间衰减检索考虑文档新鲜度时效性内容⭐⭐

Part 1: 智能文档分块策略

核心概念

文档分块(Chunking)是将长文档切分成更小片段的过程。好的分块策略能显著提升检索质量。

分块方法对比

# 方法1: 固定长度分块
def fixed_length_split(text: str, chunk_size: int = 500) -> List[str]:
    """简单但可能切断语义"""
    return [text[i:i+chunk_size] for i in range(0, len(text), chunk_size)]

# 方法2: 句子级分块
def sentence_split(text: str, max_sentences: int = 5) -> List[str]:
    """保持语义完整但长度不均"""
    sentences = text.split('。')
    chunks = []
    current_chunk = []
    
    for sent in sentences:
        current_chunk.append(sent)
        if len(current_chunk) >= max_sentences:
            chunks.append('。'.join(current_chunk))
            current_chunk = []
    
    if current_chunk:
        chunks.append('。'.join(current_chunk))
    
    return chunks

# 方法3: 语义分块 ✅ 推荐
def semantic_split(text: str, embeddings) -> List[str]:
    """基于语义相似度分块"""
    # 将在下面实现
    pass

实现:RecursiveCharacterTextSplitter

from langchain.text_splitter import RecursiveCharacterTextSplitter

# 创建文本分割器
text_splitter = RecursiveCharacterTextSplitter(
    chunk_size=1000,        # 每块的目标大小
    chunk_overlap=200,      # 块之间的重叠
    length_function=len,    # 长度计算函数
    separators=[            # 分隔符优先级
        "\n\n",             # 段落
        "\n",               # 行
        " ",                # 单词
        ""                  # 字符
    ]
)

# 示例文档
document = """
# Python编程基础

## 变量和数据类型

Python是一种动态类型语言,这意味着你不需要声明变量的类型。
Python支持多种数据类型,包括整数、浮点数、字符串等。

## 控制流

Python使用缩进来定义代码块。
if语句用于条件判断,for循环用于迭代。

## 函数

函数是可重用的代码块。
使用def关键字定义函数。
"""

# 分块
chunks = text_splitter.split_text(document)

print(f"文档被分成 {len(chunks)} 块")
for i, chunk in enumerate(chunks):
    print(f"\n{i+1}:")
    print(chunk[:100] + "...")

实现:语义分块

from langchain_openai import OpenAIEmbeddings
import numpy as np
from typing import List, Tuple

class SemanticChunker:
    """基于语义相似度的分块器"""
    
    def __init__(self, embeddings, similarity_threshold: float = 0.75):
        self.embeddings = embeddings
        self.similarity_threshold = similarity_threshold
    
    def cosine_similarity(self, vec1: np.ndarray, vec2: np.ndarray) -> float:
        """计算余弦相似度"""
        return np.dot(vec1, vec2) / (np.linalg.norm(vec1) * np.linalg.norm(vec2))
    
    def split_text(self, text: str) -> List[str]:
        """基于语义相似度分块"""
        # 1. 按句子分割
        sentences = [s.strip() + '。' for s in text.split('。') if s.strip()]
        
        if len(sentences) <= 1:
            return [text]
        
        # 2. 计算每个句子的嵌入
        embeddings = [
            np.array(self.embeddings.embed_query(sent))
            for sent in sentences
        ]
        
        # 3. 计算相邻句子的相似度
        similarities = []
        for i in range(len(embeddings) - 1):
            sim = self.cosine_similarity(embeddings[i], embeddings[i+1])
            similarities.append(sim)
        
        # 4. 在相似度低的地方切分
        chunks = []
        current_chunk = [sentences[0]]
        
        for i, sim in enumerate(similarities):
            if sim < self.similarity_threshold:
                # 相似度低,开始新块
                chunks.append(''.join(current_chunk))
                current_chunk = [sentences[i+1]]
            else:
                # 相似度高,添加到当前块
                current_chunk.append(sentences[i+1])
        
        # 添加最后一块
        if current_chunk:
            chunks.append(''.join(current_chunk))
        
        return chunks

# 使用示例
embeddings = OpenAIEmbeddings()
semantic_chunker = SemanticChunker(embeddings, similarity_threshold=0.7)

text = """
机器学习是人工智能的一个分支。它使计算机能够从数据中学习。
深度学习是机器学习的一个子领域。它使用神经网络来建模复杂模式。

Python是一种流行的编程语言。它广泛用于数据科学和机器学习。
Python有丰富的库生态系统。NumPy和Pandas是常用的数据处理库。
"""

chunks = semantic_chunker.split_text(text)

print(f"语义分块结果: {len(chunks)} 块")
for i, chunk in enumerate(chunks):
    print(f"\n{i+1}:\n{chunk}")

分块最佳实践

class SmartChunker:
    """智能分块器 - 综合多种策略"""
    
    def __init__(
        self,
        chunk_size: int = 1000,
        chunk_overlap: int = 200,
        min_chunk_size: int = 100,
        max_chunk_size: int = 2000
    ):
        self.chunk_size = chunk_size
        self.chunk_overlap = chunk_overlap
        self.min_chunk_size = min_chunk_size
        self.max_chunk_size = max_chunk_size
    
    def split_by_structure(self, text: str) -> List[str]:
        """按文档结构分块(标题、段落等)"""
        chunks = []
        
        # 按标题分割
        sections = text.split('\n# ')
        
        for section in sections:
            if not section.strip():
                continue
            
            # 如果章节太长,进一步分割
            if len(section) > self.max_chunk_size:
                sub_chunks = self._split_long_section(section)
                chunks.extend(sub_chunks)
            elif len(section) >= self.min_chunk_size:
                chunks.append(section)
        
        return chunks
    
    def _split_long_section(self, text: str) -> List[str]:
        """分割过长的章节"""
        # 按段落分割
        paragraphs = text.split('\n\n')
        chunks = []
        current_chunk = []
        current_length = 0
        
        for para in paragraphs:
            para_length = len(para)
            
            if current_length + para_length > self.chunk_size and current_chunk:
                # 当前块已满,保存并开始新块
                chunks.append('\n\n'.join(current_chunk))
                
                # 重叠:包含上一块的最后一个段落
                if len(current_chunk) > 1:
                    current_chunk = [current_chunk[-1], para]
                    current_length = len(current_chunk[-1]) + para_length
                else:
                    current_chunk = [para]
                    current_length = para_length
            else:
                current_chunk.append(para)
                current_length += para_length
        
        if current_chunk:
            chunks.append('\n\n'.join(current_chunk))
        
        return chunks
    
    def add_metadata_to_chunks(
        self,
        chunks: List[str],
        doc_metadata: dict
    ) -> List[dict]:
        """为每个块添加元数据"""
        chunk_docs = []
        
        for i, chunk in enumerate(chunks):
            chunk_docs.append({
                'content': chunk,
                'metadata': {
                    **doc_metadata,
                    'chunk_id': i,
                    'chunk_total': len(chunks),
                    'chunk_size': len(chunk)
                }
            })
        
        return chunk_docs

# 使用示例
smart_chunker = SmartChunker(
    chunk_size=800,
    chunk_overlap=150,
    min_chunk_size=200
)

# 按结构分块
chunks = smart_chunker.split_by_structure(document)

# 添加元数据
chunk_docs = smart_chunker.add_metadata_to_chunks(
    chunks,
    doc_metadata={'source': 'python_tutorial.md', 'author': '张三'}
)

for doc in chunk_docs:
    print(f"\n{doc['metadata']['chunk_id'] + 1}/{doc['metadata']['chunk_total']}")
    print(f"内容: {doc['content'][:100]}...")

Part 2: 多向量索引 - Multi-Vector Indexing

核心概念

多向量索引为单个文档生成多个向量,每个向量代表文档的不同方面或部分。这样可以更全面地表示文档的语义。

为什么需要多向量?

# 场景:一篇包含多个主题的文档

文档内容:
"""
本文介绍Python编程基础。

第一部分:变量和数据类型
Python支持多种数据类型...

第二部分:函数和模块
函数是可重用的代码块...

第三部分:面向对象编程
类是对象的蓝图...
"""

# 问题:
用户查询: "Python中的类是什么?"

# 单向量索引:
→ 整个文档被表示为一个向量
→ 向量混合了所有主题的语义
→ 可能无法精确匹配"类"的相关内容

# 多向量索引:✅
→ 为每个部分生成独立向量
"面向对象编程"部分的向量更匹配查询
→ 检索更精确

实现:多向量检索器

from langchain.retrievers.multi_vector import MultiVectorRetriever
from langchain.storage import InMemoryStore
from langchain_openai import OpenAIEmbeddings
from langchain.vectorstores import Chroma
from langchain.schema import Document
import uuid

class MultiVectorIndexer:
    """多向量索引器"""
    
    def __init__(self, embeddings):
        self.embeddings = embeddings
        
        # 向量存储:存储小块的向量
        self.vectorstore = Chroma(
            collection_name="multi_vector",
            embedding_function=embeddings
        )
        
        # 文档存储:存储完整文档
        self.docstore = InMemoryStore()
        
        # 多向量检索器
        self.retriever = MultiVectorRetriever(
            vectorstore=self.vectorstore,
            docstore=self.docstore,
            id_key="doc_id"  # 用于关联向量和文档的键
        )
    
    def index_documents(self, documents: List[str], doc_ids: List[str] = None):
        """索引文档(为每个文档创建多个向量)"""
        if doc_ids is None:
            doc_ids = [str(uuid.uuid4()) for _ in documents]
        
        # 1. 存储完整文档
        for doc_id, doc in zip(doc_ids, documents):
            self.docstore.mset([(doc_id, doc)])
        
        # 2. 将每个文档分成小块
        text_splitter = RecursiveCharacterTextSplitter(
            chunk_size=400,
            chunk_overlap=50
        )
        
        sub_docs = []
        for doc_id, doc in zip(doc_ids, documents):
            chunks = text_splitter.split_text(doc)
            
            # 为每个小块创建Document对象,关联到父文档ID
            for chunk in chunks:
                sub_docs.append(
                    Document(
                        page_content=chunk,
                        metadata={"doc_id": doc_id}
                    )
                )
        
        # 3. 将小块向量化并存储
        self.vectorstore.add_documents(sub_docs)
        
        print(f"✅ 索引了 {len(documents)} 个文档,生成 {len(sub_docs)} 个向量")
    
    def retrieve(self, query: str, k: int = 4):
        """检索文档"""
        # 检索小块,返回完整文档
        docs = self.retriever.get_relevant_documents(query, k=k)
        return docs

# 使用示例
embeddings = OpenAIEmbeddings()
multi_indexer = MultiVectorIndexer(embeddings)

# 准备文档
documents = [
    """
    Python编程基础教程
    
    第一章:变量和数据类型
    Python是动态类型语言,支持整数、浮点数、字符串等多种数据类型。
    
    第二章:控制流
    Python使用if、for、while等关键字进行流程控制。
    
    第三章:函数
    函数是可重用的代码块,使用def关键字定义。
    """,
    
    """
    机器学习入门
    
    监督学习:使用标记数据训练模型。
    无监督学习:从无标记数据中发现模式。
    强化学习:通过奖励机制学习最优策略。
    """
]

# 索引文档
multi_indexer.index_documents(documents)

# 检索
results = multi_indexer.retrieve("什么是函数?", k=2)

print("\n检索结果:")
for i, doc in enumerate(results):
    print(f"\n文档 {i+1}:")
    print(doc.page_content[:200] + "...")

实现:摘要向量 + 原文

from langchain.chains.summarize import load_summarize_chain
from langchain_openai import ChatOpenAI

class SummaryMultiVectorIndexer:
    """使用摘要作为向量,但返回原文"""
    
    def __init__(self, embeddings, llm):
        self.embeddings = embeddings
        self.llm = llm
        
        self.vectorstore = Chroma(
            collection_name="summary_vectors",
            embedding_function=embeddings
        )
        
        self.docstore = InMemoryStore()
        
        self.retriever = MultiVectorRetriever(
            vectorstore=self.vectorstore,
            docstore=self.docstore,
            id_key="doc_id"
        )
    
    def _generate_summary(self, text: str) -> str:
        """生成文档摘要"""
        from langchain.chains import LLMChain
        from langchain_core.prompts import PromptTemplate
        
        prompt = PromptTemplate(
            input_variables=["text"],
            template="请用一段话总结以下内容:\n\n{text}\n\n摘要:"
        )
        
        chain = LLMChain(llm=self.llm, prompt=prompt)
        summary = chain.run(text=text)
        
        return summary.strip()
    
    def index_documents(self, documents: List[str]):
        """索引文档:生成摘要向量,保存原文"""
        doc_ids = [str(uuid.uuid4()) for _ in documents]
        
        # 1. 存储完整文档
        for doc_id, doc in zip(doc_ids, documents):
            self.docstore.mset([(doc_id, doc)])
        
        # 2. 生成摘要并创建向量
        summaries = []
        for doc_id, doc in zip(doc_ids, documents):
            summary = self._generate_summary(doc)
            
            summaries.append(
                Document(
                    page_content=summary,
                    metadata={"doc_id": doc_id}
                )
            )
            
            print(f"📝 文档摘要: {summary[:100]}...")
        
        # 3. 存储摘要向量
        self.vectorstore.add_documents(summaries)
        
        print(f"✅ 索引了 {len(documents)} 个文档")
    
    def retrieve(self, query: str, k: int = 3):
        """检索:用摘要向量搜索,返回原文"""
        docs = self.retriever.get_relevant_documents(query, k=k)
        return docs

# 使用示例
llm = ChatOpenAI(model="gpt-3.5-turbo", temperature=0)
summary_indexer = SummaryMultiVectorIndexer(embeddings, llm)

# 索引(会生成摘要)
summary_indexer.index_documents(documents)

# 检索(搜索摘要,返回原文)
results = summary_indexer.retrieve("Python的控制流语句", k=1)

print("\n检索到的完整文档:")
print(results[0].page_content)

多向量索引的优势

优点 ✅:

  • 更精确的语义表示
  • 可以检索到文档的特定部分
  • 提高相关性得分
  • 灵活的检索策略

缺点 ❌:

  • 存储成本增加
  • 索引时间更长
  • 实现复杂度高

Part 3: 父文档检索器 - Parent Document Retriever

核心概念

父文档检索器的策略是:

  1. 将文档分成小块进行索引和检索(精确匹配)
  2. 返回大块或完整文档给LLM(保持上下文)

工作原理

索引阶段:
大文档
    ↓ 分块
小块1、小块2、小块3
    ↓ 向量化
存储在向量数据库

检索阶段:
用户查询
    ↓ 检索
匹配小块2
    ↓ 查找父文档
返回:包含小块2的大块/完整文档

实现:父文档检索器

from langchain.retrievers import ParentDocumentRetriever
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.storage import InMemoryStore

class ParentDocRetriever:
    """父文档检索器"""
    
    def __init__(self, embeddings):
        self.embeddings = embeddings
        
        # 向量存储
        self.vectorstore = Chroma(
            collection_name="parent_doc",
            embedding_function=embeddings
        )
        
        # 文档存储
        self.docstore = InMemoryStore()
        
        # 子文档分割器(用于检索)
        self.child_splitter = RecursiveCharacterTextSplitter(
            chunk_size=400,    # 小块:精确检索
            chunk_overlap=50
        )
        
        # 父文档分割器(用于返回)
        self.parent_splitter = RecursiveCharacterTextSplitter(
            chunk_size=2000,   # 大块:保持上下文
            chunk_overlap=200
        )
        
        # 创建父文档检索器
        self.retriever = ParentDocumentRetriever(
            vectorstore=self.vectorstore,
            docstore=self.docstore,
            child_splitter=self.child_splitter,
            parent_splitter=self.parent_splitter
        )
    
    def add_documents(self, documents: List[Document]):
        """添加文档"""
        self.retriever.add_documents(documents)
        print(f"✅ 添加了 {len(documents)} 个文档")
    
    def retrieve(self, query: str, k: int = 2):
        """检索父文档"""
        docs = self.retriever.get_relevant_documents(query, k=k)
        return docs

# 使用示例
embeddings = OpenAIEmbeddings()
parent_retriever = ParentDocRetriever(embeddings)

# 准备文档
docs = [
    Document(
        page_content="""
        Python编程语言完整指南
        
        第一部分:基础语法
        Python使用缩进来定义代码块。变量不需要声明类型。
        支持多种数据类型,包括整数、浮点数、字符串、列表、字典等。
        
        第二部分:控制流
        if语句用于条件判断:
        if condition:
            do_something()
        
        for循环用于迭代:
        for item in items:
            process(item)
        
        while循环用于重复执行:
        while condition:
            do_something()
        
        第三部分:函数
        使用def关键字定义函数:
        def function_name(parameters):
            # function body
            return result
        
        函数可以有默认参数、可变参数等高级特性。
        Lambda表达式用于创建匿名函数。
        """,
        metadata={"source": "python_guide.md"}
    )
]

# 添加文档
parent_retriever.add_documents(docs)

# 检索:查询具体的内容,但返回更大的上下文
query = "Python中for循环怎么用?"
results = parent_retriever.retrieve(query, k=1)

print("\n检索结果:")
print(f"匹配到的小块可能只包含for循环")
print(f"但返回的是包含完整上下文的大块:\n")
print(results[0].page_content)

实现:完整文档检索

class FullDocumentRetriever:
    """检索小块,返回完整文档"""
    
    def __init__(self, embeddings):
        self.embeddings = embeddings
        
        self.vectorstore = Chroma(
            collection_name="full_doc",
            embedding_function=embeddings
        )
        
        self.docstore = InMemoryStore()
        
        # 只使用子文档分割器
        self.child_splitter = RecursiveCharacterTextSplitter(
            chunk_size=400,
            chunk_overlap=50
        )
        
        # 父文档分割器为None,表示返回完整文档
        self.retriever = ParentDocumentRetriever(
            vectorstore=self.vectorstore,
            docstore=self.docstore,
            child_splitter=self.child_splitter,
            parent_splitter=None  # 返回完整文档
        )
    
    def add_documents(self, documents: List[Document]):
        """添加文档"""
        self.retriever.add_documents(documents)
    
    def retrieve(self, query: str, k: int = 1):
        """检索完整文档"""
        docs = self.retriever.get_relevant_documents(query, k=k)
        return docs

# 使用
full_doc_retriever = FullDocumentRetriever(embeddings)
full_doc_retriever.add_documents(docs)

results = full_doc_retriever.retrieve("for循环", k=1)
print("\n返回完整文档:")
print(results[0].page_content)

父文档检索的优势

最佳场景:

  • ✅ 需要精确匹配 + 完整上下文
  • ✅ 文档有明确的层次结构
  • ✅ 答案需要周围的解释
  • ✅ 避免上下文丢失

配置建议:

# 技术文档
child_chunk_size = 300    # 小块检索代码片段
parent_chunk_size = 1500  # 大块保持完整示例

# 学术论文
child_chunk_size = 500    # 小块检索具体论述
parent_chunk_size = 3000  # 大块保持完整论证

# 问答知识库
child_chunk_size = 200    # 小块检索具体答案
parent_chunk_size = None  # 返回完整QA对

Part 4: 上下文压缩检索 - Contextual Compression

核心概念

上下文压缩检索在检索后对文档进行过滤和压缩,只保留与查询最相关的内容。

为什么需要压缩?

# 问题:检索冗余

用户查询: "Python中的列表推导式是什么?"

检索到的文档:
"""
Python高级特性完整指南

1. 列表推导式
列表推导式是创建列表的简洁方式...

2. 生成器表达式
生成器用于惰性计算...

3. 装饰器
装饰器用于修改函数行为...

4. 上下文管理器
with语句用于资源管理...
"""

# 问题:
→ 只有"列表推导式"部分相关
→ 其他部分是噪声
→ 浪费LLM的上下文窗口
→ 可能影响答案质量

# 解决方案:上下文压缩 ✅
→ 只提取相关部分:"列表推导式是创建列表的简洁方式..."
→ 节省tokens
→ 提高答案精度

实现:LLM过滤器

from langchain.retrievers import ContextualCompressionRetriever
from langchain.retrievers.document_compressors import LLMChainExtractor
from langchain_openai import ChatOpenAI

class CompressedRetriever:
    """压缩检索器"""
    
    def __init__(self, base_retriever, llm):
        # 创建LLM提取器
        compressor = LLMChainExtractor.from_llm(llm)
        
        # 创建压缩检索器
        self.retriever = ContextualCompressionRetriever(
            base_compressor=compressor,
            base_retriever=base_retriever
        )
    
    def retrieve(self, query: str):
        """检索并压缩"""
        compressed_docs = self.retriever.get_relevant_documents(query)
        return compressed_docs

# 使用示例
# 1. 创建基础检索器
vectorstore = Chroma(
    collection_name="docs",
    embedding_function=embeddings
)

# 添加文档
docs = [
    Document(
        page_content="""
        Python高级特性
        
        列表推导式:
        列表推导式是Python中创建列表的简洁方式。
        语法:[expression for item in iterable if condition]
        示例:squares = [x**2 for x in range(10)]
        
        生成器表达式:
        生成器用于惰性计算,节省内存。
        语法:(expression for item in iterable)
        
        装饰器:
        装饰器用于修改函数行为,不改变原函数代码。
        使用@符号应用装饰器。
        """,
        metadata={"source": "python_advanced.md"}
    )
]

vectorstore.add_documents(docs)
base_retriever = vectorstore.as_retriever()

# 2. 创建压缩检索器
llm = ChatOpenAI(model="gpt-3.5-turbo", temperature=0)
compressed_retriever = CompressedRetriever(base_retriever, llm)

# 3. 检索
query = "什么是列表推导式?"

print("🔍 普通检索:")
normal_docs = base_retriever.get_relevant_documents(query)
print(f"长度: {len(normal_docs[0].page_content)} 字符")
print(normal_docs[0].page_content)

print("\n✂️ 压缩检索:")
compressed_docs = compressed_retriever.retrieve(query)
print(f"长度: {len(compressed_docs[0].page_content)} 字符")
print(compressed_docs[0].page_content)

实现:嵌入过滤器

from langchain.retrievers.document_compressors import EmbeddingsFilter

class EmbeddingFilterRetriever:
    """基于嵌入相似度的过滤器"""
    
    def __init__(self, base_retriever, embeddings, similarity_threshold: float = 0.75):
        # 创建嵌入过滤器
        compressor = EmbeddingsFilter(
            embeddings=embeddings,
            similarity_threshold=similarity_threshold
        )
        
        self.retriever = ContextualCompressionRetriever(
            base_compressor=compressor,
            base_retriever=base_retriever
        )
    
    def retrieve(self, query: str, k: int = 5):
        """检索并过滤"""
        docs = self.retriever.get_relevant_documents(query, k=k)
        return docs

# 使用示例
embedding_filter = EmbeddingFilterRetriever(
    base_retriever=base_retriever,
    embeddings=embeddings,
    similarity_threshold=0.7
)

# 检索:只返回相似度 > 0.7 的文档
filtered_docs = embedding_filter.retrieve(query, k=5)

print(f"\n过滤后保留 {len(filtered_docs)} 个文档")

实现:文档分割过滤器

from langchain.retrievers.document_compressors import DocumentCompressorPipeline
from langchain.text_splitter import CharacterTextSplitter

class PipelineCompressor:
    """组合多个压缩器"""
    
    def __init__(self, base_retriever, embeddings, llm):
        # 1. 分割器:将文档分成小段
        splitter = CharacterTextSplitter(
            chunk_size=300,
            chunk_overlap=0,
            separator=". "
        )
        
        # 2. 嵌入过滤器:过滤不相关的段
        embedding_filter = EmbeddingsFilter(
            embeddings=embeddings,
            similarity_threshold=0.7
        )
        
        # 3. LLM提取器:进一步提取相关内容
        llm_extractor = LLMChainExtractor.from_llm(llm)
        
        # 创建管道
        pipeline = DocumentCompressorPipeline(
            transformers=[splitter, embedding_filter, llm_extractor]
        )
        
        self.retriever = ContextualCompressionRetriever(
            base_compressor=pipeline,
            base_retriever=base_retriever
        )
    
    def retrieve(self, query: str):
        """执行管道压缩"""
        docs = self.retriever.get_relevant_documents(query)
        return docs

# 使用
pipeline_compressor = PipelineCompressor(base_retriever, embeddings, llm)

# 执行多级压缩
final_docs = pipeline_compressor.retrieve(query)

print("\n📦 管道压缩结果:")
for doc in final_docs:
    print(f"\n{doc.page_content}")

压缩策略对比

压缩器方法速度质量成本
LLM提取器LLM提取相关内容
嵌入过滤器相似度过滤
管道压缩组合多种方法

Part 5: 时间衰减检索 - Time-Weighted Retrieval

核心概念

时间衰减检索考虑文档的新鲜度,给予新文档更高的权重。

实现:时间加权检索器

from langchain.retrievers import TimeWeightedVectorStoreRetriever
import datetime

class TimeSensitiveRetriever:
    """时间敏感检索器"""
    
    def __init__(self, vectorstore, decay_rate: float = 0.01):
        """
        Args:
            decay_rate: 衰减率,越大则时间影响越大
        """
        self.retriever = TimeWeightedVectorStoreRetriever(
            vectorstore=vectorstore,
            decay_rate=decay_rate,
            k=4
        )
    
    def add_documents(self, documents: List[Document]):
        """添加文档(会自动记录时间)"""
        self.retriever.add_documents(documents)
    
    def retrieve(self, query: str):
        """检索(考虑时间因素)"""
        docs = self.retriever.get_relevant_documents(query)
        return docs

# 使用示例
vectorstore = Chroma(
    collection_name="news",
    embedding_function=embeddings
)

time_retriever = TimeSensitiveRetriever(vectorstore, decay_rate=0.01)

# 添加不同时间的文档
old_doc = Document(
    page_content="2023年1月:Python 3.11发布",
    metadata={"date": "2023-01-01"}
)

recent_doc = Document(
    page_content="2024年10月:Python 3.13发布,性能提升显著",
    metadata={"date": "2024-10-01"}
)

time_retriever.add_documents([old_doc, recent_doc])

# 检索:新文档会获得更高权重
results = time_retriever.retrieve("Python最新版本")

print("\n检索结果(按时间加权):")
for doc in results:
    print(f"\n{doc.metadata['date']}: {doc.page_content}")

综合实战:构建高级RAG系统

让我们将所有技术整合到一个完整的系统中:

class AdvancedRAGSystem:
    """高级RAG系统"""
    
    def __init__(self, embeddings, llm):
        self.embeddings = embeddings
        self.llm = llm
        
        # 向量存储
        self.vectorstore = Chroma(
            collection_name="advanced_rag",
            embedding_function=embeddings
        )
        
        # 文档存储
        self.docstore = InMemoryStore()
        
        # 智能分块器
        self.chunker = SmartChunker(chunk_size=800, chunk_overlap=150)
        
        # 父文档检索器
        self.parent_retriever = self._create_parent_retriever()
        
        # 压缩检索器
        self.compressed_retriever = self._create_compressed_retriever()
    
    def _create_parent_retriever(self):
        """创建父文档检索器"""
        child_splitter = RecursiveCharacterTextSplitter(
            chunk_size=400,
            chunk_overlap=50
        )
        
        parent_splitter = RecursiveCharacterTextSplitter(
            chunk_size=2000,
            chunk_overlap=200
        )
        
        return ParentDocumentRetriever(
            vectorstore=self.vectorstore,
            docstore=self.docstore,
            child_splitter=child_splitter,
            parent_splitter=parent_splitter
        )
    
    def _create_compressed_retriever(self):
        """创建压缩检索器"""
        base_retriever = self.vectorstore.as_retriever(search_kwargs={"k": 5})
        
        # 创建压缩管道
        splitter = CharacterTextSplitter(chunk_size=300, chunk_overlap=0)
        embedding_filter = EmbeddingsFilter(
            embeddings=self.embeddings,
            similarity_threshold=0.7
        )
        llm_extractor = LLMChainExtractor.from_llm(self.llm)
        
        pipeline = DocumentCompressorPipeline(
            transformers=[splitter, embedding_filter, llm_extractor]
        )
        
        return ContextualCompressionRetriever(
            base_compressor=pipeline,
            base_retriever=base_retriever
        )
    
    def index_document(self, content: str, metadata: dict = None):
        """索引单个文档"""
        doc = Document(
            page_content=content,
            metadata=metadata or {}
        )
        
        self.parent_retriever.add_documents([doc])
        print(f"✅ 已索引文档")
    
    def query(
        self,
        question: str,
        use_compression: bool = True,
        use_parent_retriever: bool = True
    ):
        """执行高级检索"""
        print(f"❓ 查询: {question}\n")
        
        # 选择检索器
        if use_parent_retriever:
            print("📚 使用父文档检索器")
            docs = self.parent_retriever.get_relevant_documents(question)
        elif use_compression:
            print("✂️ 使用压缩检索器")
            docs = self.compressed_retriever.get_relevant_documents(question)
        else:
            print("🔍 使用标准检索器")
            docs = self.vectorstore.similarity_search(question, k=3)
        
        print(f"📄 检索到 {len(docs)} 个文档\n")
        
        # 生成答案
        context = "\n\n".join([doc.page_content for doc in docs])
        
        answer_prompt = ChatPromptTemplate.from_messages([
            ("system", "基于以下文档回答问题。"),
            ("human", "文档:\n{context}\n\n问题: {question}")
        ])
        
        answer_chain = answer_prompt | self.llm | StrOutputParser()
        answer = answer_chain.invoke({
            "context": context,
            "question": question
        })
        
        return {
            "question": question,
            "documents": docs,
            "answer": answer
        }

# 使用高级RAG系统
advanced_rag = AdvancedRAGSystem(embeddings, llm)

# 索引文档
advanced_rag.index_document(
    content=documents[0],
    metadata={"source": "python_tutorial.md", "type": "tutorial"}
)

# 查询
result = advanced_rag.query(
    "Python中的函数如何定义?",
    use_compression=True,
    use_parent_retriever=True
)

print("💡 答案:")
print(result["answer"])

性能优化与最佳实践

1. 索引优化

# 批量索引
def batch_index(documents: List[str], batch_size: int = 100):
    """批量索引文档"""
    for i in range(0, len(documents), batch_size):
        batch = documents[i:i+batch_size]
        vectorstore.add_documents(batch)
        print(f"✅ 已索引 {i+batch_size}/{len(documents)}")

# 异步索引
import asyncio

async def async_index(documents: List[str]):
    """异步索引"""
    tasks = [
        vectorstore.aadd_documents([doc])
        for doc in documents
    ]
    await asyncio.gather(*tasks)

2. 块大小选择指南

# 根据文档类型选择块大小

CHUNK_SIZES = {
    "code": {
        "chunk_size": 300,      # 代码片段通常较短
        "chunk_overlap": 50
    },
    "article": {
        "chunk_size": 1000,     # 文章段落较长
        "chunk_overlap": 200
    },
    "qa": {
        "chunk_size": 500,      # QA对中等长度
        "chunk_overlap": 100
    },
    "academic": {
        "chunk_size": 1500,     # 学术论文需要更多上下文
        "chunk_overlap": 300
    }
}

def get_optimal_chunker(doc_type: str):
    """获取最优分块器"""
    config = CHUNK_SIZES.get(doc_type, CHUNK_SIZES["article"])
    
    return RecursiveCharacterTextSplitter(
        chunk_size=config["chunk_size"],
        chunk_overlap=config["chunk_overlap"]
    )

3. 监控和评估

class RAGMetrics:
    """RAG系统指标"""
    
    @staticmethod
    def retrieval_metrics(query: str, retrieved_docs: List[Document], relevant_docs: List[str]):
        """计算检索指标"""
        retrieved_ids = {doc.metadata.get('id') for doc in retrieved_docs}
        relevant_ids = set(relevant_docs)
        
        # 召回率
        recall = len(retrieved_ids & relevant_ids) / len(relevant_ids) if relevant_ids else 0
        
        # 精确率
        precision = len(retrieved_ids & relevant_ids) / len(retrieved_ids) if retrieved_ids else 0
        
        # F1分数
        f1 = 2 * (precision * recall) / (precision + recall) if (precision + recall) > 0 else 0
        
        return {
            "recall": recall,
            "precision": precision,
            "f1": f1
        }
    
    @staticmethod
    def chunk_stats(chunks: List[str]):
        """分块统计"""
        lengths = [len(chunk) for chunk in chunks]
        
        return {
            "total_chunks": len(chunks),
            "avg_length": sum(lengths) / len(lengths) if lengths else 0,
            "min_length": min(lengths) if lengths else 0,
            "max_length": max(lengths) if lengths else 0
        }

总结与建议

🎯 核心要点

  1. 智能分块: 文档质量的基础
  2. 多向量索引: 提升语义表示
  3. 父文档检索: 平衡精度和上下文
  4. 上下文压缩: 节省tokens,提高质量

📊 技术选择

文档索引策略:
├─ 短文档(<1000字)
│  → 直接索引完整文档
├─ 中等文档(1000-5000字)
│  → 智能分块 + 父文档检索
└─ 长文档(>5000字)
   → 多向量索引 + 压缩检索

检索优化:
├─ 需要精确匹配
│  → 小块检索 + 父文档返回
├─ 需要节省tokens
│  → 上下文压缩
└─ 有时效性要求
   → 时间衰减检索

💡 最佳实践

索引阶段:

  • 根据文档类型选择分块策略
  • 添加丰富的元数据
  • 使用批量操作提高效率
  • 建立索引质量监控

检索阶段:

  • 组合多种检索策略
  • 使用压缩减少冗余
  • 考虑文档新鲜度
  • 评估和优化检索质量

恭喜!你已经掌握了RAG系统的高级索引和检索技术。接下来学习检索结果的重排序技术!🚀