RAG高级索引与检索策略:提升检索质量的关键
深入探讨RAG系统中的高级索引技术和检索策略,包括文档分块、多向量索引、父文档检索和上下文压缩
RAG高级索引与检索策略:提升检索质量的关键
在前面的章节中,我们学习了查询优化和路由技术。但是,检索质量不仅取决于查询,还取决于如何组织和索引文档。本章将深入探讨高级索引和检索策略。
为什么需要高级索引?
基础索引的局限性
# 基础索引方法的问题
问题1: 文档过长
→ 一个10000字的文档被整体嵌入
→ 语义信息过于粗糙
→ 检索不精确
问题2: 上下文丢失
→ 将文档分成小块
→ 每块独立检索
→ 丢失了块与块之间的关系
→ 无法理解完整上下文
问题3: 多语义内容
→ 一个文档包含多个主题
→ 单一向量无法表示所有语义
→ 相关内容可能被遗漏
问题4: 检索冗余
→ 检索到大量文档
→ 很多内容重复或不相关
→ 影响最终答案质量本章技术概览
| 技术 | 核心目标 | 适用场景 | 复杂度 |
|---|---|---|---|
| 智能分块 | 优化块大小和边界 | 所有RAG系统 | ⭐ |
| 多向量索引 | 一个文档多个向量 | 多主题文档 | ⭐⭐⭐ |
| 父文档检索 | 检索小块返回大块 | 保持上下文 | ⭐⭐ |
| 上下文压缩 | 压缩检索结果 | 减少冗余 | ⭐⭐⭐ |
| 时间衰减检索 | 考虑文档新鲜度 | 时效性内容 | ⭐⭐ |
Part 1: 智能文档分块策略
核心概念
文档分块(Chunking)是将长文档切分成更小片段的过程。好的分块策略能显著提升检索质量。
分块方法对比
# 方法1: 固定长度分块
def fixed_length_split(text: str, chunk_size: int = 500) -> List[str]:
"""简单但可能切断语义"""
return [text[i:i+chunk_size] for i in range(0, len(text), chunk_size)]
# 方法2: 句子级分块
def sentence_split(text: str, max_sentences: int = 5) -> List[str]:
"""保持语义完整但长度不均"""
sentences = text.split('。')
chunks = []
current_chunk = []
for sent in sentences:
current_chunk.append(sent)
if len(current_chunk) >= max_sentences:
chunks.append('。'.join(current_chunk))
current_chunk = []
if current_chunk:
chunks.append('。'.join(current_chunk))
return chunks
# 方法3: 语义分块 ✅ 推荐
def semantic_split(text: str, embeddings) -> List[str]:
"""基于语义相似度分块"""
# 将在下面实现
pass实现:RecursiveCharacterTextSplitter
from langchain.text_splitter import RecursiveCharacterTextSplitter
# 创建文本分割器
text_splitter = RecursiveCharacterTextSplitter(
chunk_size=1000, # 每块的目标大小
chunk_overlap=200, # 块之间的重叠
length_function=len, # 长度计算函数
separators=[ # 分隔符优先级
"\n\n", # 段落
"\n", # 行
" ", # 单词
"" # 字符
]
)
# 示例文档
document = """
# Python编程基础
## 变量和数据类型
Python是一种动态类型语言,这意味着你不需要声明变量的类型。
Python支持多种数据类型,包括整数、浮点数、字符串等。
## 控制流
Python使用缩进来定义代码块。
if语句用于条件判断,for循环用于迭代。
## 函数
函数是可重用的代码块。
使用def关键字定义函数。
"""
# 分块
chunks = text_splitter.split_text(document)
print(f"文档被分成 {len(chunks)} 块")
for i, chunk in enumerate(chunks):
print(f"\n块 {i+1}:")
print(chunk[:100] + "...")实现:语义分块
from langchain_openai import OpenAIEmbeddings
import numpy as np
from typing import List, Tuple
class SemanticChunker:
"""基于语义相似度的分块器"""
def __init__(self, embeddings, similarity_threshold: float = 0.75):
self.embeddings = embeddings
self.similarity_threshold = similarity_threshold
def cosine_similarity(self, vec1: np.ndarray, vec2: np.ndarray) -> float:
"""计算余弦相似度"""
return np.dot(vec1, vec2) / (np.linalg.norm(vec1) * np.linalg.norm(vec2))
def split_text(self, text: str) -> List[str]:
"""基于语义相似度分块"""
# 1. 按句子分割
sentences = [s.strip() + '。' for s in text.split('。') if s.strip()]
if len(sentences) <= 1:
return [text]
# 2. 计算每个句子的嵌入
embeddings = [
np.array(self.embeddings.embed_query(sent))
for sent in sentences
]
# 3. 计算相邻句子的相似度
similarities = []
for i in range(len(embeddings) - 1):
sim = self.cosine_similarity(embeddings[i], embeddings[i+1])
similarities.append(sim)
# 4. 在相似度低的地方切分
chunks = []
current_chunk = [sentences[0]]
for i, sim in enumerate(similarities):
if sim < self.similarity_threshold:
# 相似度低,开始新块
chunks.append(''.join(current_chunk))
current_chunk = [sentences[i+1]]
else:
# 相似度高,添加到当前块
current_chunk.append(sentences[i+1])
# 添加最后一块
if current_chunk:
chunks.append(''.join(current_chunk))
return chunks
# 使用示例
embeddings = OpenAIEmbeddings()
semantic_chunker = SemanticChunker(embeddings, similarity_threshold=0.7)
text = """
机器学习是人工智能的一个分支。它使计算机能够从数据中学习。
深度学习是机器学习的一个子领域。它使用神经网络来建模复杂模式。
Python是一种流行的编程语言。它广泛用于数据科学和机器学习。
Python有丰富的库生态系统。NumPy和Pandas是常用的数据处理库。
"""
chunks = semantic_chunker.split_text(text)
print(f"语义分块结果: {len(chunks)} 块")
for i, chunk in enumerate(chunks):
print(f"\n块 {i+1}:\n{chunk}")分块最佳实践
class SmartChunker:
"""智能分块器 - 综合多种策略"""
def __init__(
self,
chunk_size: int = 1000,
chunk_overlap: int = 200,
min_chunk_size: int = 100,
max_chunk_size: int = 2000
):
self.chunk_size = chunk_size
self.chunk_overlap = chunk_overlap
self.min_chunk_size = min_chunk_size
self.max_chunk_size = max_chunk_size
def split_by_structure(self, text: str) -> List[str]:
"""按文档结构分块(标题、段落等)"""
chunks = []
# 按标题分割
sections = text.split('\n# ')
for section in sections:
if not section.strip():
continue
# 如果章节太长,进一步分割
if len(section) > self.max_chunk_size:
sub_chunks = self._split_long_section(section)
chunks.extend(sub_chunks)
elif len(section) >= self.min_chunk_size:
chunks.append(section)
return chunks
def _split_long_section(self, text: str) -> List[str]:
"""分割过长的章节"""
# 按段落分割
paragraphs = text.split('\n\n')
chunks = []
current_chunk = []
current_length = 0
for para in paragraphs:
para_length = len(para)
if current_length + para_length > self.chunk_size and current_chunk:
# 当前块已满,保存并开始新块
chunks.append('\n\n'.join(current_chunk))
# 重叠:包含上一块的最后一个段落
if len(current_chunk) > 1:
current_chunk = [current_chunk[-1], para]
current_length = len(current_chunk[-1]) + para_length
else:
current_chunk = [para]
current_length = para_length
else:
current_chunk.append(para)
current_length += para_length
if current_chunk:
chunks.append('\n\n'.join(current_chunk))
return chunks
def add_metadata_to_chunks(
self,
chunks: List[str],
doc_metadata: dict
) -> List[dict]:
"""为每个块添加元数据"""
chunk_docs = []
for i, chunk in enumerate(chunks):
chunk_docs.append({
'content': chunk,
'metadata': {
**doc_metadata,
'chunk_id': i,
'chunk_total': len(chunks),
'chunk_size': len(chunk)
}
})
return chunk_docs
# 使用示例
smart_chunker = SmartChunker(
chunk_size=800,
chunk_overlap=150,
min_chunk_size=200
)
# 按结构分块
chunks = smart_chunker.split_by_structure(document)
# 添加元数据
chunk_docs = smart_chunker.add_metadata_to_chunks(
chunks,
doc_metadata={'source': 'python_tutorial.md', 'author': '张三'}
)
for doc in chunk_docs:
print(f"\n块 {doc['metadata']['chunk_id'] + 1}/{doc['metadata']['chunk_total']}")
print(f"内容: {doc['content'][:100]}...")Part 2: 多向量索引 - Multi-Vector Indexing
核心概念
多向量索引为单个文档生成多个向量,每个向量代表文档的不同方面或部分。这样可以更全面地表示文档的语义。
为什么需要多向量?
# 场景:一篇包含多个主题的文档
文档内容:
"""
本文介绍Python编程基础。
第一部分:变量和数据类型
Python支持多种数据类型...
第二部分:函数和模块
函数是可重用的代码块...
第三部分:面向对象编程
类是对象的蓝图...
"""
# 问题:
用户查询: "Python中的类是什么?"
# 单向量索引:
→ 整个文档被表示为一个向量
→ 向量混合了所有主题的语义
→ 可能无法精确匹配"类"的相关内容
# 多向量索引:✅
→ 为每个部分生成独立向量
→ "面向对象编程"部分的向量更匹配查询
→ 检索更精确实现:多向量检索器
from langchain.retrievers.multi_vector import MultiVectorRetriever
from langchain.storage import InMemoryStore
from langchain_openai import OpenAIEmbeddings
from langchain.vectorstores import Chroma
from langchain.schema import Document
import uuid
class MultiVectorIndexer:
"""多向量索引器"""
def __init__(self, embeddings):
self.embeddings = embeddings
# 向量存储:存储小块的向量
self.vectorstore = Chroma(
collection_name="multi_vector",
embedding_function=embeddings
)
# 文档存储:存储完整文档
self.docstore = InMemoryStore()
# 多向量检索器
self.retriever = MultiVectorRetriever(
vectorstore=self.vectorstore,
docstore=self.docstore,
id_key="doc_id" # 用于关联向量和文档的键
)
def index_documents(self, documents: List[str], doc_ids: List[str] = None):
"""索引文档(为每个文档创建多个向量)"""
if doc_ids is None:
doc_ids = [str(uuid.uuid4()) for _ in documents]
# 1. 存储完整文档
for doc_id, doc in zip(doc_ids, documents):
self.docstore.mset([(doc_id, doc)])
# 2. 将每个文档分成小块
text_splitter = RecursiveCharacterTextSplitter(
chunk_size=400,
chunk_overlap=50
)
sub_docs = []
for doc_id, doc in zip(doc_ids, documents):
chunks = text_splitter.split_text(doc)
# 为每个小块创建Document对象,关联到父文档ID
for chunk in chunks:
sub_docs.append(
Document(
page_content=chunk,
metadata={"doc_id": doc_id}
)
)
# 3. 将小块向量化并存储
self.vectorstore.add_documents(sub_docs)
print(f"✅ 索引了 {len(documents)} 个文档,生成 {len(sub_docs)} 个向量")
def retrieve(self, query: str, k: int = 4):
"""检索文档"""
# 检索小块,返回完整文档
docs = self.retriever.get_relevant_documents(query, k=k)
return docs
# 使用示例
embeddings = OpenAIEmbeddings()
multi_indexer = MultiVectorIndexer(embeddings)
# 准备文档
documents = [
"""
Python编程基础教程
第一章:变量和数据类型
Python是动态类型语言,支持整数、浮点数、字符串等多种数据类型。
第二章:控制流
Python使用if、for、while等关键字进行流程控制。
第三章:函数
函数是可重用的代码块,使用def关键字定义。
""",
"""
机器学习入门
监督学习:使用标记数据训练模型。
无监督学习:从无标记数据中发现模式。
强化学习:通过奖励机制学习最优策略。
"""
]
# 索引文档
multi_indexer.index_documents(documents)
# 检索
results = multi_indexer.retrieve("什么是函数?", k=2)
print("\n检索结果:")
for i, doc in enumerate(results):
print(f"\n文档 {i+1}:")
print(doc.page_content[:200] + "...")实现:摘要向量 + 原文
from langchain.chains.summarize import load_summarize_chain
from langchain_openai import ChatOpenAI
class SummaryMultiVectorIndexer:
"""使用摘要作为向量,但返回原文"""
def __init__(self, embeddings, llm):
self.embeddings = embeddings
self.llm = llm
self.vectorstore = Chroma(
collection_name="summary_vectors",
embedding_function=embeddings
)
self.docstore = InMemoryStore()
self.retriever = MultiVectorRetriever(
vectorstore=self.vectorstore,
docstore=self.docstore,
id_key="doc_id"
)
def _generate_summary(self, text: str) -> str:
"""生成文档摘要"""
from langchain.chains import LLMChain
from langchain_core.prompts import PromptTemplate
prompt = PromptTemplate(
input_variables=["text"],
template="请用一段话总结以下内容:\n\n{text}\n\n摘要:"
)
chain = LLMChain(llm=self.llm, prompt=prompt)
summary = chain.run(text=text)
return summary.strip()
def index_documents(self, documents: List[str]):
"""索引文档:生成摘要向量,保存原文"""
doc_ids = [str(uuid.uuid4()) for _ in documents]
# 1. 存储完整文档
for doc_id, doc in zip(doc_ids, documents):
self.docstore.mset([(doc_id, doc)])
# 2. 生成摘要并创建向量
summaries = []
for doc_id, doc in zip(doc_ids, documents):
summary = self._generate_summary(doc)
summaries.append(
Document(
page_content=summary,
metadata={"doc_id": doc_id}
)
)
print(f"📝 文档摘要: {summary[:100]}...")
# 3. 存储摘要向量
self.vectorstore.add_documents(summaries)
print(f"✅ 索引了 {len(documents)} 个文档")
def retrieve(self, query: str, k: int = 3):
"""检索:用摘要向量搜索,返回原文"""
docs = self.retriever.get_relevant_documents(query, k=k)
return docs
# 使用示例
llm = ChatOpenAI(model="gpt-3.5-turbo", temperature=0)
summary_indexer = SummaryMultiVectorIndexer(embeddings, llm)
# 索引(会生成摘要)
summary_indexer.index_documents(documents)
# 检索(搜索摘要,返回原文)
results = summary_indexer.retrieve("Python的控制流语句", k=1)
print("\n检索到的完整文档:")
print(results[0].page_content)多向量索引的优势
优点 ✅:
- 更精确的语义表示
- 可以检索到文档的特定部分
- 提高相关性得分
- 灵活的检索策略
缺点 ❌:
- 存储成本增加
- 索引时间更长
- 实现复杂度高
Part 3: 父文档检索器 - Parent Document Retriever
核心概念
父文档检索器的策略是:
- 将文档分成小块进行索引和检索(精确匹配)
- 返回大块或完整文档给LLM(保持上下文)
工作原理
索引阶段:
大文档
↓ 分块
小块1、小块2、小块3
↓ 向量化
存储在向量数据库
检索阶段:
用户查询
↓ 检索
匹配小块2
↓ 查找父文档
返回:包含小块2的大块/完整文档实现:父文档检索器
from langchain.retrievers import ParentDocumentRetriever
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.storage import InMemoryStore
class ParentDocRetriever:
"""父文档检索器"""
def __init__(self, embeddings):
self.embeddings = embeddings
# 向量存储
self.vectorstore = Chroma(
collection_name="parent_doc",
embedding_function=embeddings
)
# 文档存储
self.docstore = InMemoryStore()
# 子文档分割器(用于检索)
self.child_splitter = RecursiveCharacterTextSplitter(
chunk_size=400, # 小块:精确检索
chunk_overlap=50
)
# 父文档分割器(用于返回)
self.parent_splitter = RecursiveCharacterTextSplitter(
chunk_size=2000, # 大块:保持上下文
chunk_overlap=200
)
# 创建父文档检索器
self.retriever = ParentDocumentRetriever(
vectorstore=self.vectorstore,
docstore=self.docstore,
child_splitter=self.child_splitter,
parent_splitter=self.parent_splitter
)
def add_documents(self, documents: List[Document]):
"""添加文档"""
self.retriever.add_documents(documents)
print(f"✅ 添加了 {len(documents)} 个文档")
def retrieve(self, query: str, k: int = 2):
"""检索父文档"""
docs = self.retriever.get_relevant_documents(query, k=k)
return docs
# 使用示例
embeddings = OpenAIEmbeddings()
parent_retriever = ParentDocRetriever(embeddings)
# 准备文档
docs = [
Document(
page_content="""
Python编程语言完整指南
第一部分:基础语法
Python使用缩进来定义代码块。变量不需要声明类型。
支持多种数据类型,包括整数、浮点数、字符串、列表、字典等。
第二部分:控制流
if语句用于条件判断:
if condition:
do_something()
for循环用于迭代:
for item in items:
process(item)
while循环用于重复执行:
while condition:
do_something()
第三部分:函数
使用def关键字定义函数:
def function_name(parameters):
# function body
return result
函数可以有默认参数、可变参数等高级特性。
Lambda表达式用于创建匿名函数。
""",
metadata={"source": "python_guide.md"}
)
]
# 添加文档
parent_retriever.add_documents(docs)
# 检索:查询具体的内容,但返回更大的上下文
query = "Python中for循环怎么用?"
results = parent_retriever.retrieve(query, k=1)
print("\n检索结果:")
print(f"匹配到的小块可能只包含for循环")
print(f"但返回的是包含完整上下文的大块:\n")
print(results[0].page_content)实现:完整文档检索
class FullDocumentRetriever:
"""检索小块,返回完整文档"""
def __init__(self, embeddings):
self.embeddings = embeddings
self.vectorstore = Chroma(
collection_name="full_doc",
embedding_function=embeddings
)
self.docstore = InMemoryStore()
# 只使用子文档分割器
self.child_splitter = RecursiveCharacterTextSplitter(
chunk_size=400,
chunk_overlap=50
)
# 父文档分割器为None,表示返回完整文档
self.retriever = ParentDocumentRetriever(
vectorstore=self.vectorstore,
docstore=self.docstore,
child_splitter=self.child_splitter,
parent_splitter=None # 返回完整文档
)
def add_documents(self, documents: List[Document]):
"""添加文档"""
self.retriever.add_documents(documents)
def retrieve(self, query: str, k: int = 1):
"""检索完整文档"""
docs = self.retriever.get_relevant_documents(query, k=k)
return docs
# 使用
full_doc_retriever = FullDocumentRetriever(embeddings)
full_doc_retriever.add_documents(docs)
results = full_doc_retriever.retrieve("for循环", k=1)
print("\n返回完整文档:")
print(results[0].page_content)父文档检索的优势
最佳场景:
- ✅ 需要精确匹配 + 完整上下文
- ✅ 文档有明确的层次结构
- ✅ 答案需要周围的解释
- ✅ 避免上下文丢失
配置建议:
# 技术文档
child_chunk_size = 300 # 小块检索代码片段
parent_chunk_size = 1500 # 大块保持完整示例
# 学术论文
child_chunk_size = 500 # 小块检索具体论述
parent_chunk_size = 3000 # 大块保持完整论证
# 问答知识库
child_chunk_size = 200 # 小块检索具体答案
parent_chunk_size = None # 返回完整QA对Part 4: 上下文压缩检索 - Contextual Compression
核心概念
上下文压缩检索在检索后对文档进行过滤和压缩,只保留与查询最相关的内容。
为什么需要压缩?
# 问题:检索冗余
用户查询: "Python中的列表推导式是什么?"
检索到的文档:
"""
Python高级特性完整指南
1. 列表推导式
列表推导式是创建列表的简洁方式...
2. 生成器表达式
生成器用于惰性计算...
3. 装饰器
装饰器用于修改函数行为...
4. 上下文管理器
with语句用于资源管理...
"""
# 问题:
→ 只有"列表推导式"部分相关
→ 其他部分是噪声
→ 浪费LLM的上下文窗口
→ 可能影响答案质量
# 解决方案:上下文压缩 ✅
→ 只提取相关部分:"列表推导式是创建列表的简洁方式..."
→ 节省tokens
→ 提高答案精度实现:LLM过滤器
from langchain.retrievers import ContextualCompressionRetriever
from langchain.retrievers.document_compressors import LLMChainExtractor
from langchain_openai import ChatOpenAI
class CompressedRetriever:
"""压缩检索器"""
def __init__(self, base_retriever, llm):
# 创建LLM提取器
compressor = LLMChainExtractor.from_llm(llm)
# 创建压缩检索器
self.retriever = ContextualCompressionRetriever(
base_compressor=compressor,
base_retriever=base_retriever
)
def retrieve(self, query: str):
"""检索并压缩"""
compressed_docs = self.retriever.get_relevant_documents(query)
return compressed_docs
# 使用示例
# 1. 创建基础检索器
vectorstore = Chroma(
collection_name="docs",
embedding_function=embeddings
)
# 添加文档
docs = [
Document(
page_content="""
Python高级特性
列表推导式:
列表推导式是Python中创建列表的简洁方式。
语法:[expression for item in iterable if condition]
示例:squares = [x**2 for x in range(10)]
生成器表达式:
生成器用于惰性计算,节省内存。
语法:(expression for item in iterable)
装饰器:
装饰器用于修改函数行为,不改变原函数代码。
使用@符号应用装饰器。
""",
metadata={"source": "python_advanced.md"}
)
]
vectorstore.add_documents(docs)
base_retriever = vectorstore.as_retriever()
# 2. 创建压缩检索器
llm = ChatOpenAI(model="gpt-3.5-turbo", temperature=0)
compressed_retriever = CompressedRetriever(base_retriever, llm)
# 3. 检索
query = "什么是列表推导式?"
print("🔍 普通检索:")
normal_docs = base_retriever.get_relevant_documents(query)
print(f"长度: {len(normal_docs[0].page_content)} 字符")
print(normal_docs[0].page_content)
print("\n✂️ 压缩检索:")
compressed_docs = compressed_retriever.retrieve(query)
print(f"长度: {len(compressed_docs[0].page_content)} 字符")
print(compressed_docs[0].page_content)实现:嵌入过滤器
from langchain.retrievers.document_compressors import EmbeddingsFilter
class EmbeddingFilterRetriever:
"""基于嵌入相似度的过滤器"""
def __init__(self, base_retriever, embeddings, similarity_threshold: float = 0.75):
# 创建嵌入过滤器
compressor = EmbeddingsFilter(
embeddings=embeddings,
similarity_threshold=similarity_threshold
)
self.retriever = ContextualCompressionRetriever(
base_compressor=compressor,
base_retriever=base_retriever
)
def retrieve(self, query: str, k: int = 5):
"""检索并过滤"""
docs = self.retriever.get_relevant_documents(query, k=k)
return docs
# 使用示例
embedding_filter = EmbeddingFilterRetriever(
base_retriever=base_retriever,
embeddings=embeddings,
similarity_threshold=0.7
)
# 检索:只返回相似度 > 0.7 的文档
filtered_docs = embedding_filter.retrieve(query, k=5)
print(f"\n过滤后保留 {len(filtered_docs)} 个文档")实现:文档分割过滤器
from langchain.retrievers.document_compressors import DocumentCompressorPipeline
from langchain.text_splitter import CharacterTextSplitter
class PipelineCompressor:
"""组合多个压缩器"""
def __init__(self, base_retriever, embeddings, llm):
# 1. 分割器:将文档分成小段
splitter = CharacterTextSplitter(
chunk_size=300,
chunk_overlap=0,
separator=". "
)
# 2. 嵌入过滤器:过滤不相关的段
embedding_filter = EmbeddingsFilter(
embeddings=embeddings,
similarity_threshold=0.7
)
# 3. LLM提取器:进一步提取相关内容
llm_extractor = LLMChainExtractor.from_llm(llm)
# 创建管道
pipeline = DocumentCompressorPipeline(
transformers=[splitter, embedding_filter, llm_extractor]
)
self.retriever = ContextualCompressionRetriever(
base_compressor=pipeline,
base_retriever=base_retriever
)
def retrieve(self, query: str):
"""执行管道压缩"""
docs = self.retriever.get_relevant_documents(query)
return docs
# 使用
pipeline_compressor = PipelineCompressor(base_retriever, embeddings, llm)
# 执行多级压缩
final_docs = pipeline_compressor.retrieve(query)
print("\n📦 管道压缩结果:")
for doc in final_docs:
print(f"\n{doc.page_content}")压缩策略对比
| 压缩器 | 方法 | 速度 | 质量 | 成本 |
|---|---|---|---|---|
| LLM提取器 | LLM提取相关内容 | 慢 | 高 | 高 |
| 嵌入过滤器 | 相似度过滤 | 快 | 中 | 低 |
| 管道压缩 | 组合多种方法 | 中 | 高 | 中 |
Part 5: 时间衰减检索 - Time-Weighted Retrieval
核心概念
时间衰减检索考虑文档的新鲜度,给予新文档更高的权重。
实现:时间加权检索器
from langchain.retrievers import TimeWeightedVectorStoreRetriever
import datetime
class TimeSensitiveRetriever:
"""时间敏感检索器"""
def __init__(self, vectorstore, decay_rate: float = 0.01):
"""
Args:
decay_rate: 衰减率,越大则时间影响越大
"""
self.retriever = TimeWeightedVectorStoreRetriever(
vectorstore=vectorstore,
decay_rate=decay_rate,
k=4
)
def add_documents(self, documents: List[Document]):
"""添加文档(会自动记录时间)"""
self.retriever.add_documents(documents)
def retrieve(self, query: str):
"""检索(考虑时间因素)"""
docs = self.retriever.get_relevant_documents(query)
return docs
# 使用示例
vectorstore = Chroma(
collection_name="news",
embedding_function=embeddings
)
time_retriever = TimeSensitiveRetriever(vectorstore, decay_rate=0.01)
# 添加不同时间的文档
old_doc = Document(
page_content="2023年1月:Python 3.11发布",
metadata={"date": "2023-01-01"}
)
recent_doc = Document(
page_content="2024年10月:Python 3.13发布,性能提升显著",
metadata={"date": "2024-10-01"}
)
time_retriever.add_documents([old_doc, recent_doc])
# 检索:新文档会获得更高权重
results = time_retriever.retrieve("Python最新版本")
print("\n检索结果(按时间加权):")
for doc in results:
print(f"\n{doc.metadata['date']}: {doc.page_content}")综合实战:构建高级RAG系统
让我们将所有技术整合到一个完整的系统中:
class AdvancedRAGSystem:
"""高级RAG系统"""
def __init__(self, embeddings, llm):
self.embeddings = embeddings
self.llm = llm
# 向量存储
self.vectorstore = Chroma(
collection_name="advanced_rag",
embedding_function=embeddings
)
# 文档存储
self.docstore = InMemoryStore()
# 智能分块器
self.chunker = SmartChunker(chunk_size=800, chunk_overlap=150)
# 父文档检索器
self.parent_retriever = self._create_parent_retriever()
# 压缩检索器
self.compressed_retriever = self._create_compressed_retriever()
def _create_parent_retriever(self):
"""创建父文档检索器"""
child_splitter = RecursiveCharacterTextSplitter(
chunk_size=400,
chunk_overlap=50
)
parent_splitter = RecursiveCharacterTextSplitter(
chunk_size=2000,
chunk_overlap=200
)
return ParentDocumentRetriever(
vectorstore=self.vectorstore,
docstore=self.docstore,
child_splitter=child_splitter,
parent_splitter=parent_splitter
)
def _create_compressed_retriever(self):
"""创建压缩检索器"""
base_retriever = self.vectorstore.as_retriever(search_kwargs={"k": 5})
# 创建压缩管道
splitter = CharacterTextSplitter(chunk_size=300, chunk_overlap=0)
embedding_filter = EmbeddingsFilter(
embeddings=self.embeddings,
similarity_threshold=0.7
)
llm_extractor = LLMChainExtractor.from_llm(self.llm)
pipeline = DocumentCompressorPipeline(
transformers=[splitter, embedding_filter, llm_extractor]
)
return ContextualCompressionRetriever(
base_compressor=pipeline,
base_retriever=base_retriever
)
def index_document(self, content: str, metadata: dict = None):
"""索引单个文档"""
doc = Document(
page_content=content,
metadata=metadata or {}
)
self.parent_retriever.add_documents([doc])
print(f"✅ 已索引文档")
def query(
self,
question: str,
use_compression: bool = True,
use_parent_retriever: bool = True
):
"""执行高级检索"""
print(f"❓ 查询: {question}\n")
# 选择检索器
if use_parent_retriever:
print("📚 使用父文档检索器")
docs = self.parent_retriever.get_relevant_documents(question)
elif use_compression:
print("✂️ 使用压缩检索器")
docs = self.compressed_retriever.get_relevant_documents(question)
else:
print("🔍 使用标准检索器")
docs = self.vectorstore.similarity_search(question, k=3)
print(f"📄 检索到 {len(docs)} 个文档\n")
# 生成答案
context = "\n\n".join([doc.page_content for doc in docs])
answer_prompt = ChatPromptTemplate.from_messages([
("system", "基于以下文档回答问题。"),
("human", "文档:\n{context}\n\n问题: {question}")
])
answer_chain = answer_prompt | self.llm | StrOutputParser()
answer = answer_chain.invoke({
"context": context,
"question": question
})
return {
"question": question,
"documents": docs,
"answer": answer
}
# 使用高级RAG系统
advanced_rag = AdvancedRAGSystem(embeddings, llm)
# 索引文档
advanced_rag.index_document(
content=documents[0],
metadata={"source": "python_tutorial.md", "type": "tutorial"}
)
# 查询
result = advanced_rag.query(
"Python中的函数如何定义?",
use_compression=True,
use_parent_retriever=True
)
print("💡 答案:")
print(result["answer"])性能优化与最佳实践
1. 索引优化
# 批量索引
def batch_index(documents: List[str], batch_size: int = 100):
"""批量索引文档"""
for i in range(0, len(documents), batch_size):
batch = documents[i:i+batch_size]
vectorstore.add_documents(batch)
print(f"✅ 已索引 {i+batch_size}/{len(documents)}")
# 异步索引
import asyncio
async def async_index(documents: List[str]):
"""异步索引"""
tasks = [
vectorstore.aadd_documents([doc])
for doc in documents
]
await asyncio.gather(*tasks)2. 块大小选择指南
# 根据文档类型选择块大小
CHUNK_SIZES = {
"code": {
"chunk_size": 300, # 代码片段通常较短
"chunk_overlap": 50
},
"article": {
"chunk_size": 1000, # 文章段落较长
"chunk_overlap": 200
},
"qa": {
"chunk_size": 500, # QA对中等长度
"chunk_overlap": 100
},
"academic": {
"chunk_size": 1500, # 学术论文需要更多上下文
"chunk_overlap": 300
}
}
def get_optimal_chunker(doc_type: str):
"""获取最优分块器"""
config = CHUNK_SIZES.get(doc_type, CHUNK_SIZES["article"])
return RecursiveCharacterTextSplitter(
chunk_size=config["chunk_size"],
chunk_overlap=config["chunk_overlap"]
)3. 监控和评估
class RAGMetrics:
"""RAG系统指标"""
@staticmethod
def retrieval_metrics(query: str, retrieved_docs: List[Document], relevant_docs: List[str]):
"""计算检索指标"""
retrieved_ids = {doc.metadata.get('id') for doc in retrieved_docs}
relevant_ids = set(relevant_docs)
# 召回率
recall = len(retrieved_ids & relevant_ids) / len(relevant_ids) if relevant_ids else 0
# 精确率
precision = len(retrieved_ids & relevant_ids) / len(retrieved_ids) if retrieved_ids else 0
# F1分数
f1 = 2 * (precision * recall) / (precision + recall) if (precision + recall) > 0 else 0
return {
"recall": recall,
"precision": precision,
"f1": f1
}
@staticmethod
def chunk_stats(chunks: List[str]):
"""分块统计"""
lengths = [len(chunk) for chunk in chunks]
return {
"total_chunks": len(chunks),
"avg_length": sum(lengths) / len(lengths) if lengths else 0,
"min_length": min(lengths) if lengths else 0,
"max_length": max(lengths) if lengths else 0
}总结与建议
🎯 核心要点
- 智能分块: 文档质量的基础
- 多向量索引: 提升语义表示
- 父文档检索: 平衡精度和上下文
- 上下文压缩: 节省tokens,提高质量
📊 技术选择
文档索引策略:
├─ 短文档(<1000字)
│ → 直接索引完整文档
├─ 中等文档(1000-5000字)
│ → 智能分块 + 父文档检索
└─ 长文档(>5000字)
→ 多向量索引 + 压缩检索
检索优化:
├─ 需要精确匹配
│ → 小块检索 + 父文档返回
├─ 需要节省tokens
│ → 上下文压缩
└─ 有时效性要求
→ 时间衰减检索💡 最佳实践
✅ 索引阶段:
- 根据文档类型选择分块策略
- 添加丰富的元数据
- 使用批量操作提高效率
- 建立索引质量监控
✅ 检索阶段:
- 组合多种检索策略
- 使用压缩减少冗余
- 考虑文档新鲜度
- 评估和优化检索质量
恭喜!你已经掌握了RAG系统的高级索引和检索技术。接下来学习检索结果的重排序技术!🚀