Deeptoai RAG系列教程
RAG项目实战分析(原创)03 数据处理与索引

分块策略与算法

从固定长度到语义分块的完整分块工程方案

为什么分块质量决定 RAG 效果

分块是 RAG 系统的"粒度控制"环节:块太大导致信息过载(检索准确但生成质量差),块太小导致上下文丢失(生成断章取义)。高质量的分块需要平衡语义完整性、检索颗粒度和生成上下文,是优化 RAG 性能的关键杠杆。

分块策略与算法

背景与核心挑战

为什么需要分块?

直接对整个文档进行 embedding 存在以下问题:

  1. 超出模型限制:文档通常超过 embedding 模型的 token 限制(512-8192 tokens)
  2. 检索粒度过粗:用户查询通常针对特定段落,而非整个文档
  3. 生成上下文过长:LLM 生成时无法关注过长的上下文

分块的核心权衡

维度块太小块太大最佳实践
语义完整性断章取义信息过载保持完整语义单元
检索精度高(精确匹配)低(噪音多)中等块 + 重排序
生成质量缺乏上下文干扰信息多适中(200-800 tokens)
存储成本高(重复多)与 overlap 平衡

分块策略分类

九大项目分块方案全景

项目主要策略Overlap 支持动态大小特殊处理技术成熟度
LightRAG语义分块 + 图结构实体关系⭐⭐⭐⭐⭐
ragflow智能分块(自研)表格/公式⭐⭐⭐⭐⭐
onyx递归 + 语义标题保留⭐⭐⭐⭐⭐
kotaemonLlamaIndex 分块器多种策略⭐⭐⭐⭐
Verba递归字符分块Markdown⭐⭐⭐⭐
RAG-Anything多模态分块图像/视频⭐⭐⭐⭐⭐
SurfSense网页语义分块HTML 结构⭐⭐⭐⭐
Self-Corrective-Agentic-RAG固定长度基础⭐⭐⭐
UltraRAG固定长度基础⭐⭐

关键洞察

  • 最创新:LightRAG 的图结构分块(实体 + 关系作为额外索引)
  • 最智能:ragflow 的自适应分块(根据内容类型动态调整)
  • 最实用:onyx 的递归分块(保留文档结构)
  • 趋势:从固定长度向语义感知 + 结构保留演进

核心实现深度对比

1. 固定长度分块(Baseline)

设计理念:简单高效,按固定字符数或 token 数分块

fixed_size_chunking.py
class FixedSizeChunker:
    """
    固定长度分块器
    
    优势:
    - 简单高效(无需复杂计算)
    - 可预测(每个 chunk 大小一致)
    - 适合快速原型验证
    
    劣势:
    - 可能切断语义单元(句子、段落)
    - 无法感知文档结构
    """
    
    def __init__(
        self,
        chunk_size: int = 500,  # 字符数
        chunk_overlap: int = 50,  # 重叠字符数
        length_function: callable = len,  # 长度计算函数(字符/token)
        separator: str = "\n\n"  # 分隔符(优先在此处切分)
    ):
        self.chunk_size = chunk_size
        self.chunk_overlap = chunk_overlap
        self.length_function = length_function
        self.separator = separator
    
    def split_text(self, text: str) -> list[str]:
        """
        固定长度分块
        
        算法:
        1. 按分隔符切分文本
        2. 合并片段直到达到 chunk_size
        3. 添加 overlap(与下一个 chunk 重叠)
        """
        # 1. 按分隔符切分
        splits = text.split(self.separator)
        
        # 2. 合并为 chunks
        chunks = []
        current_chunk = []
        current_length = 0
        
        for split in splits:
            split_length = self.length_function(split)
            
            # 如果当前片段本身就超过 chunk_size
            if split_length > self.chunk_size:
                # 先保存当前 chunk
                if current_chunk:
                    chunks.append(self.separator.join(current_chunk))
                    current_chunk = []
                    current_length = 0
                
                # 强制切分长片段
                chunks.extend(self._split_long_text(split))
                continue
            
            # 如果加上这个片段会超过 chunk_size
            if current_length + split_length > self.chunk_size:
                # 保存当前 chunk
                if current_chunk:
                    chunks.append(self.separator.join(current_chunk))
                
                # 添加 overlap(重用部分上一个 chunk 的内容)
                overlap_text = self._get_overlap_text(current_chunk)
                current_chunk = [overlap_text] if overlap_text else []
                current_length = self.length_function(overlap_text) if overlap_text else 0
            
            # 添加当前片段
            current_chunk.append(split)
            current_length += split_length
        
        # 保存最后一个 chunk
        if current_chunk:
            chunks.append(self.separator.join(current_chunk))
        
        return chunks
    
    def _split_long_text(self, text: str) -> list[str]:
        """强制切分超长文本"""
        chunks = []
        for i in range(0, len(text), self.chunk_size - self.chunk_overlap):
            chunk = text[i:i + self.chunk_size]
            chunks.append(chunk)
        return chunks
    
    def _get_overlap_text(self, chunks: list[str]) -> str:
        """获取 overlap 文本(上一个 chunk 的末尾)"""
        if not chunks:
            return ""
        
        # 取最后几个片段作为 overlap
        overlap_text = ""
        for chunk in reversed(chunks):
            if self.length_function(overlap_text + chunk) <= self.chunk_overlap:
                overlap_text = chunk + self.separator + overlap_text
            else:
                break
        
        return overlap_text.strip()

# 使用示例
chunker = FixedSizeChunker(
    chunk_size=500,
    chunk_overlap=50,
    separator="\n\n"
)

text = """
长文本内容...
包含多个段落...
"""

chunks = chunker.split_text(text)
for i, chunk in enumerate(chunks, 1):
    print(f"Chunk {i} ({len(chunk)} chars):")
    print(chunk)
    print("---")

优化版:Token-aware 分块

token_aware_chunking.py
import tiktoken

class TokenAwareChunker(FixedSizeChunker):
    """
    Token 感知分块器
    
    改进:用 token 数而非字符数计算长度
    重要性:embedding 模型和 LLM 都有 token 限制
    """
    
    def __init__(
        self,
        chunk_size: int = 500,  # token 数
        chunk_overlap: int = 50,
        model_name: str = "gpt-3.5-turbo"
    ):
        # 加载 tokenizer
        self.tokenizer = tiktoken.encoding_for_model(model_name)
        
        # 使用 token 计数函数
        super().__init__(
            chunk_size=chunk_size,
            chunk_overlap=chunk_overlap,
            length_function=self._count_tokens,
            separator="\n\n"
        )
    
    def _count_tokens(self, text: str) -> int:
        """计算 token 数"""
        return len(self.tokenizer.encode(text))

# 使用示例
token_chunker = TokenAwareChunker(
    chunk_size=500,  # 500 tokens
    chunk_overlap=50,
    model_name="gpt-3.5-turbo"
)

chunks = token_chunker.split_text(text)

2. 递归分块(结构保留)

设计理念:按文档结构(段落 → 句子 → 字符)递归切分

recursive_chunking.py
class RecursiveCharacterChunker:
    """
    递归字符分块器(LangChain 方案)
    
    策略:
    1. 优先按段落切分(\n\n
    2. 段落过大时按句子切分(\n, .)
    3. 句子过大时按字符强制切分
    
    优势:
    - 尽量保留语义单元(段落/句子)
    - 适应多种文档格式
    - 效果 > 固定长度分块
    """
    
    def __init__(
        self,
        chunk_size: int = 500,
        chunk_overlap: int = 50,
        separators: list[str] = None
    ):
        self.chunk_size = chunk_size
        self.chunk_overlap = chunk_overlap
        
        # 分隔符优先级(从大到小)
        self.separators = separators or [
            "\n\n",  # 段落
            "\n",    # 换行
            ". ",    # 句子(英文)
            "。",    # 句子(中文)
            " ",     # 单词
            ""       # 字符(强制切分)
        ]
    
    def split_text(self, text: str) -> list[str]:
        """递归分块"""
        return self._split_recursive(text, self.separators)
    
    def _split_recursive(
        self,
        text: str,
        separators: list[str]
    ) -> list[str]:
        """
        递归切分算法
        
        流程:
        1. 尝试用当前分隔符切分
        2. 如果切分后的块仍然过大,递归用下一级分隔符
        3. 直到所有块都小于 chunk_size
        """
        final_chunks = []
        
        # 选择当前分隔符
        separator = separators[0] if separators else ""
        remaining_separators = separators[1:] if len(separators) > 1 else []
        
        # 按当前分隔符切分
        if separator:
            splits = text.split(separator)
        else:
            splits = list(text)  # 字符级切分
        
        # 合并小块
        current_chunk = []
        current_length = 0
        
        for split in splits:
            split_length = len(split)
            
            # 如果当前片段本身就过大,递归切分
            if split_length > self.chunk_size:
                # 先保存当前 chunk
                if current_chunk:
                    final_chunks.append(self._merge_chunks(current_chunk, separator))
                    current_chunk = []
                    current_length = 0
                
                # 递归切分
                if remaining_separators:
                    sub_chunks = self._split_recursive(split, remaining_separators)
                    final_chunks.extend(sub_chunks)
                else:
                    # 强制切分
                    final_chunks.extend(self._force_split(split))
                continue
            
            # 如果加上这个片段会超过 chunk_size
            if current_length + split_length + len(separator) > self.chunk_size:
                # 保存当前 chunk
                if current_chunk:
                    final_chunks.append(self._merge_chunks(current_chunk, separator))
                
                # 添加 overlap
                overlap_chunks = self._get_overlap_chunks(current_chunk, separator)
                current_chunk = overlap_chunks
                current_length = sum(len(c) for c in current_chunk) + \
                                len(separator) * (len(current_chunk) - 1)
            
            # 添加当前片段
            current_chunk.append(split)
            current_length += split_length + (len(separator) if current_chunk else 0)
        
        # 保存最后一个 chunk
        if current_chunk:
            final_chunks.append(self._merge_chunks(current_chunk, separator))
        
        return final_chunks
    
    def _merge_chunks(self, chunks: list[str], separator: str) -> str:
        """合并片段"""
        return separator.join(chunks)
    
    def _get_overlap_chunks(self, chunks: list[str], separator: str) -> list[str]:
        """获取 overlap 片段"""
        overlap_chunks = []
        overlap_length = 0
        
        for chunk in reversed(chunks):
            chunk_length = len(chunk) + len(separator)
            if overlap_length + chunk_length <= self.chunk_overlap:
                overlap_chunks.insert(0, chunk)
                overlap_length += chunk_length
            else:
                break
        
        return overlap_chunks
    
    def _force_split(self, text: str) -> list[str]:
        """强制按字符切分"""
        return [
            text[i:i + self.chunk_size]
            for i in range(0, len(text), self.chunk_size - self.chunk_overlap)
        ]

# 使用示例
recursive_chunker = RecursiveCharacterChunker(
    chunk_size=500,
    chunk_overlap=50
)

text = """
# 标题

这是第一段内容。包含多个句子。第一句。第二句。

这是第二段内容。也包含多个句子。
"""

chunks = recursive_chunker.split_text(text)

特化版:Markdown 递归分块

markdown_chunking.py
class MarkdownChunker(RecursiveCharacterChunker):
    """
    Markdown 专用分块器
    
    改进:
    1. 按标题层级分块(# → ## → ###)
    2. 保留标题作为上下文
    3. 处理代码块、表格
    """
    
    def __init__(self, chunk_size: int = 500, chunk_overlap: int = 50):
        super().__init__(
            chunk_size=chunk_size,
            chunk_overlap=chunk_overlap,
            separators=[
                "\n## ",    # H2 标题
                "\n### ",   # H3 标题
                "\n#### ",  # H4 标题
                "\n\n",     # 段落
                "\n",       # 换行
                ". ",       # 句子
                " ",        # 单词
                ""          # 字符
            ]
        )
    
    def split_text(self, text: str) -> list[dict]:
        """
        Markdown 分块(保留标题信息)
        
        Returns:
            [
                {
                    "content": str,
                    "heading": str,
                    "level": int
                },
                ...
            ]
        """
        # 提取标题层级
        import re
        
        # 识别标题
        heading_pattern = r'^(#{1,6})\s+(.+)$'
        current_heading = {"text": "", "level": 0}
        
        chunks = []
        current_text = []
        
        for line in text.split('\n'):
            match = re.match(heading_pattern, line)
            
            if match:
                # 保存当前 chunk
                if current_text:
                    text_content = '\n'.join(current_text)
                    for chunk in super().split_text(text_content):
                        chunks.append({
                            "content": chunk,
                            "heading": current_heading["text"],
                            "level": current_heading["level"]
                        })
                    current_text = []
                
                # 更新当前标题
                level = len(match.group(1))
                heading_text = match.group(2)
                current_heading = {"text": heading_text, "level": level}
                
                # 标题本身也作为一个 chunk
                chunks.append({
                    "content": line,
                    "heading": heading_text,
                    "level": level
                })
            else:
                current_text.append(line)
        
        # 保存最后一个 chunk
        if current_text:
            text_content = '\n'.join(current_text)
            for chunk in super().split_text(text_content):
                chunks.append({
                    "content": chunk,
                    "heading": current_heading["text"],
                    "level": current_heading["level"]
                })
        
        return chunks

# 使用示例
md_chunker = MarkdownChunker(chunk_size=500, chunk_overlap=50)

markdown_text = """
# 主标题

一些介绍内容。

## 第一节

第一节的内容。包含多个段落。

### 子节

子节的详细内容。

## 第二节

第二节的内容。
"""

chunks = md_chunker.split_text(markdown_text)
for chunk in chunks:
    print(f"Level {chunk['level']}: {chunk['heading']}")
    print(f"Content: {chunk['content'][:50]}...")
    print("---")

3. 语义分块(最先进)

设计理念:基于语义相似度切分,保持语义连贯性

semantic_chunking.py
import numpy as np
from sklearn.metrics.pairwise import cosine_similarity

class SemanticChunker:
    """
    语义分块器
    
    策略:
    1. 将文本切分为句子
    2. 计算相邻句子的语义相似度
    3. 在相似度低的地方切分(语义边界)
    
    优势:
    - 保持语义完整性(每个 chunk 讨论一个主题)
    - 自适应块大小(根据内容动态调整)
    - 检索效果最好
    
    劣势:
    - 计算开销大(需要 embedding 所有句子)
    - 块大小不可预测
    """
    
    def __init__(
        self,
        embedding_model,
        buffer_size: int = 1,  # 合并相邻句子的窗口大小
        breakpoint_threshold: float = 0.5  # 语义边界阈值
    ):
        self.embedding_model = embedding_model
        self.buffer_size = buffer_size
        self.breakpoint_threshold = breakpoint_threshold
    
    def split_text(self, text: str) -> list[str]:
        """
        语义分块
        
        算法:
        1. 分句
        2. 计算每个句子的 embedding
        3. 计算相邻句子组的相似度
        4. 在相似度骤降处切分
        """
        # 1. 分句
        sentences = self._split_sentences(text)
        
        if len(sentences) <= 1:
            return [text]
        
        # 2. 生成 embeddings
        embeddings = self.embedding_model.embed(sentences)
        
        # 3. 计算相邻句子组的相似度
        similarities = self._calculate_similarity_scores(embeddings)
        
        # 4. 识别语义边界(相似度骤降)
        breakpoints = self._identify_breakpoints(similarities)
        
        # 5. 按边界切分
        chunks = self._split_by_breakpoints(sentences, breakpoints)
        
        return chunks
    
    def _split_sentences(self, text: str) -> list[str]:
        """
        分句(支持中英文)
        """
        import re
        
        # 简单分句(生产环境可用 spaCy 或 nltk)
        # 英文:按 .!? 切分
        # 中文:按 。!? 切分
        sentence_delimiters = r'[.!?。!?]+'
        
        sentences = re.split(sentence_delimiters, text)
        sentences = [s.strip() for s in sentences if s.strip()]
        
        return sentences
    
    def _calculate_similarity_scores(
        self,
        embeddings: np.ndarray
    ) -> list[float]:
        """
        计算相邻句子组的语义相似度
        
        策略:用 buffer_size 控制比较的句子数
        - buffer_size=1: 比较单句 vs 单句
        - buffer_size=2: 比较 [s1, s2] vs [s3, s4]
        """
        similarities = []
        
        for i in range(len(embeddings) - self.buffer_size):
            # 组1:当前句子及其前 buffer_size-1 个句子
            group1 = embeddings[i:i + self.buffer_size]
            group1_combined = np.mean(group1, axis=0)
            
            # 组2:下一个句子及其后 buffer_size-1 个句子
            group2 = embeddings[i + self.buffer_size:i + 2 * self.buffer_size]
            if len(group2) < self.buffer_size:
                break
            group2_combined = np.mean(group2, axis=0)
            
            # 计算余弦相似度
            similarity = cosine_similarity(
                [group1_combined],
                [group2_combined]
            )[0][0]
            
            similarities.append(similarity)
        
        return similarities
    
    def _identify_breakpoints(
        self,
        similarities: list[float]
    ) -> list[int]:
        """
        识别语义边界
        
        策略:相似度低于阈值的位置作为切分点
        """
        breakpoints = []
        
        for i, similarity in enumerate(similarities):
            if similarity < self.breakpoint_threshold:
                # 切分点在第 i+1 个句子之前
                breakpoints.append(i + self.buffer_size)
        
        return breakpoints
    
    def _split_by_breakpoints(
        self,
        sentences: list[str],
        breakpoints: list[int]
    ) -> list[str]:
        """按边界切分句子"""
        chunks = []
        
        start = 0
        for breakpoint in breakpoints:
            chunk_sentences = sentences[start:breakpoint]
            chunks.append(" ".join(chunk_sentences))
            start = breakpoint
        
        # 最后一个 chunk
        if start < len(sentences):
            chunk_sentences = sentences[start:]
            chunks.append(" ".join(chunk_sentences))
        
        return chunks

# 使用示例
from sentence_transformers import SentenceTransformer

embedding_model = SentenceTransformer("all-MiniLM-L6-v2")

semantic_chunker = SemanticChunker(
    embedding_model=embedding_model,
    buffer_size=1,
    breakpoint_threshold=0.5
)

text = """
机器学习是人工智能的一个子领域。它专注于让计算机从数据中学习。
深度学习是机器学习的一个分支。它使用多层神经网络来处理复杂模式。
自然语言处理是另一个重要领域。它帮助计算机理解人类语言。
计算机视觉让机器能够"看见"图像。它广泛应用于自动驾驶和医疗诊断。
"""

chunks = semantic_chunker.split_text(text)
for i, chunk in enumerate(chunks, 1):
    print(f"Chunk {i}:")
    print(chunk)
    print("---")

优化版:混合语义 + 大小限制

hybrid_semantic_chunking.py
class HybridSemanticChunker(SemanticChunker):
    """
    混合语义分块器
    
    改进:
    1. 语义分块(保证语义完整性)
    2. 大小限制(避免块过大或过小)
    3. 最佳实践:语义边界 + 固定长度兜底
    """
    
    def __init__(
        self,
        embedding_model,
        min_chunk_size: int = 100,
        max_chunk_size: int = 1000,
        buffer_size: int = 1,
        breakpoint_threshold: float = 0.5
    ):
        super().__init__(embedding_model, buffer_size, breakpoint_threshold)
        self.min_chunk_size = min_chunk_size
        self.max_chunk_size = max_chunk_size
    
    def split_text(self, text: str) -> list[str]:
        """混合分块"""
        # 1. 语义分块
        semantic_chunks = super().split_text(text)
        
        # 2. 后处理:合并过小的块,切分过大的块
        final_chunks = []
        buffer = []
        buffer_length = 0
        
        for chunk in semantic_chunks:
            chunk_length = len(chunk)
            
            # 块过大:强制切分
            if chunk_length > self.max_chunk_size:
                # 先保存 buffer
                if buffer:
                    final_chunks.append(" ".join(buffer))
                    buffer = []
                    buffer_length = 0
                
                # 切分大块
                sub_chunks = self._split_large_chunk(chunk)
                final_chunks.extend(sub_chunks)
                continue
            
            # 块过小:合并到 buffer
            if buffer_length + chunk_length < self.min_chunk_size:
                buffer.append(chunk)
                buffer_length += chunk_length
                continue
            
            # 块大小合适
            if buffer:
                final_chunks.append(" ".join(buffer))
                buffer = []
                buffer_length = 0
            
            final_chunks.append(chunk)
        
        # 保存剩余 buffer
        if buffer:
            final_chunks.append(" ".join(buffer))
        
        return final_chunks
    
    def _split_large_chunk(self, chunk: str) -> list[str]:
        """切分过大的块(使用固定长度分块器)"""
        fixed_chunker = FixedSizeChunker(
            chunk_size=self.max_chunk_size,
            chunk_overlap=50
        )
        return fixed_chunker.split_text(chunk)

# 使用示例
hybrid_chunker = HybridSemanticChunker(
    embedding_model=embedding_model,
    min_chunk_size=100,
    max_chunk_size=1000,
    buffer_size=1,
    breakpoint_threshold=0.5
)

chunks = hybrid_chunker.split_text(text)

高级技巧

1. 上下文增强(Parent-Child 策略)

context_enrichment.py
class ContextEnrichedChunker:
    """
    上下文增强分块器
    
    策略:
    - Child Chunk:用于检索(小块,精确匹配)
    - Parent Chunk:用于生成(大块,完整上下文)
    
    优势:
    - 检索准确(小块匹配更精确)
    - 生成质量高(大块提供完整上下文)
    """
    
    def __init__(
        self,
        child_chunk_size: int = 200,
        parent_chunk_size: int = 800,
        overlap: int = 50
    ):
        self.child_chunk_size = child_chunk_size
        self.parent_chunk_size = parent_chunk_size
        self.overlap = overlap
        
        self.child_chunker = FixedSizeChunker(child_chunk_size, overlap)
        self.parent_chunker = FixedSizeChunker(parent_chunk_size, overlap)
    
    def split_text(self, text: str) -> list[dict]:
        """
        分层分块
        
        Returns:
            [
                {
                    "child": str,
                    "parent": str,
                    "siblings": list[str]
                },
                ...
            ]
        """
        # 1. 生成 parent chunks
        parent_chunks = self.parent_chunker.split_text(text)
        
        # 2. 为每个 parent chunk 生成 child chunks
        enriched_chunks = []
        
        for parent_chunk in parent_chunks:
            child_chunks = self.child_chunker.split_text(parent_chunk)
            
            for child_chunk in child_chunks:
                enriched_chunks.append({
                    "child": child_chunk,  # 用于检索
                    "parent": parent_chunk,  # 用于生成
                    "siblings": child_chunks  # 同一 parent 的其他 children
                })
        
        return enriched_chunks

# 使用示例
context_chunker = ContextEnrichedChunker(
    child_chunk_size=200,
    parent_chunk_size=800
)

chunks = context_chunker.split_text(text)

# 检索时:用 child chunk 匹配
# 生成时:用 parent chunk 作为上下文

2. 动态分块(LightRAG 方案)

dynamic_chunking.py
class DynamicChunker:
    """
    动态分块器(LightRAG 风格)
    
    策略:
    1. 按内容类型动态调整块大小
    2. 提取实体和关系作为额外索引
    3. 构建文档级图结构
    """
    
    def __init__(self, embedding_model, llm_client):
        self.embedding_model = embedding_model
        self.llm = llm_client
    
    async def split_and_enrich(self, text: str) -> dict:
        """
        动态分块 + 实体提取
        
        Returns:
            {
                "chunks": list[str],
                "entities": list[dict],
                "relations": list[dict],
                "graph": dict
            }
        """
        # 1. 语义分块
        semantic_chunker = SemanticChunker(self.embedding_model)
        chunks = semantic_chunker.split_text(text)
        
        # 2. 从每个 chunk 提取实体和关系
        entities = []
        relations = []
        
        for chunk in chunks:
            chunk_entities, chunk_relations = await self._extract_kg(chunk)
            entities.extend(chunk_entities)
            relations.extend(chunk_relations)
        
        # 3. 构建知识图谱
        graph = self._build_graph(entities, relations)
        
        return {
            "chunks": chunks,
            "entities": entities,
            "relations": relations,
            "graph": graph
        }
    
    async def _extract_kg(self, text: str) -> tuple[list, list]:
        """提取知识图谱(实体 + 关系)"""
        prompt = f"""Extract entities and relations from the following text.

Text: {text}

Output JSON format:
{{
  "entities": [
    {{"name": "entity1", "type": "Person"}},
    {{"name": "entity2", "type": "Organization"}}
  ],
  "relations": [
    {{"source": "entity1", "relation": "works_at", "target": "entity2"}}
  ]
}}

JSON:"""
        
        response = await self.llm.chat.completions.create(
            model="gpt-4",
            messages=[{"role": "user", "content": prompt}],
            temperature=0
        )
        
        import json
        result = json.loads(response.choices[0].message.content)
        
        return result["entities"], result["relations"]
    
    def _build_graph(self, entities: list[dict], relations: list[dict]) -> dict:
        """构建知识图谱"""
        # 简化实现:用 networkx
        import networkx as nx
        
        G = nx.Graph()
        
        # 添加节点
        for entity in entities:
            G.add_node(entity["name"], type=entity["type"])
        
        # 添加边
        for relation in relations:
            G.add_edge(
                relation["source"],
                relation["target"],
                relation=relation["relation"]
            )
        
        return {
            "nodes": list(G.nodes(data=True)),
            "edges": list(G.edges(data=True))
        }

分块质量评估

评估指标

指标计算方法目标值
语义完整性人工评估 chunk 是否完整> 90%
块大小分布标准差 / 平均值< 0.3
检索准确率正确 chunk 在 top-k 中的比例> 95% (k=5)
生成质量BLEU/ROUGE 分数> 基线 5%

常见问题与解决方案

最佳实践

分块策略选择指南

strategy_selector.py
def select_chunking_strategy(
    document_type: str,
    requirements: dict
) -> Chunker:
    """
    根据文档类型和需求选择分块策略
    
    Args:
        document_type: "article" / "code" / "markdown" / "chat" / "table"
        requirements: {
            "accuracy": "high" / "medium" / "low",
            "speed": "fast" / "medium" / "slow",
            "preserve_structure": bool
        }
    """
    # 策略1:代码文档 → 代码专用分块器
    if document_type == "code":
        return CodeChunker()
    
    # 策略2:Markdown → Markdown 专用
    if document_type == "markdown":
        return MarkdownChunker()
    
    # 策略3:高准确率 + 不在乎速度 → 语义分块
    if requirements['accuracy'] == 'high' and requirements['speed'] != 'fast':
        return HybridSemanticChunker(embedding_model)
    
    # 策略4:需要保留结构 → 递归分块
    if requirements['preserve_structure']:
        return RecursiveCharacterChunker()
    
    # 策略5:默认 → 固定长度(最快)
    return TokenAwareChunker()

延伸阅读

参考文献

  • LangChain 文档 - 递归分块算法
  • LlamaIndex 文档 - 语义/窗口分块实现
  • Chonkie - 通用分块库
  • Academic/Industry blogs on chunking best practices

下一步:进入 向量索引优化 了解如何高效存储和检索向量。