RAG项目实战分析(原创)03 数据处理与索引
文档解析技术
从 PDF 到多模态内容的完整文档解析工程方案
为什么文档解析是 RAG 的数据入口
再强大的检索和生成能力,如果文档解析出错(表格错位、公式丢失、图片未识别),RAG 系统就是"垃圾进,垃圾出"。高质量的文档解析是构建可靠 RAG 系统的第一步,决定了后续所有环节的质量上限。
文档解析技术
背景与核心挑战
文档解析的难点
| 文档类型 | 核心挑战 | 影响 |
|---|---|---|
| 文字提取、表格识别、版面分析 | 最常见但最复杂 | |
| Office (Word/Excel/PPT) | 格式保留、嵌入对象 | 企业文档主流 |
| HTML/Markdown | 结构解析、样式剥离 | Web 内容爬取 |
| 图像 (JPG/PNG) | OCR 准确率、多语言 | 扫描文档、截图 |
| 多模态 (图表/公式) | 视觉理解、语义提取 | 技术文档、论文 |
解析质量的三个维度
- 准确性:文字、表格、公式是否正确提取
- 完整性:是否保留了所有重要信息(标题、列表、引用)
- 结构化:是否保留了文档的层次结构(章节、段落)
九大项目文档解析方案全景
| 项目 | PDF 解析 | 多模态支持 | 表格处理 | OCR | 技术成熟度 |
|---|---|---|---|---|---|
| ragflow | ✅ 深度版面分析 | ✅ 图像/表格理解 | ✅ 高级 | ✅ | ⭐⭐⭐⭐⭐ |
| onyx | ✅ 多引擎(Unstructured) | ✅ | ✅ | ✅ | ⭐⭐⭐⭐⭐ |
| kotaemon | ✅ LlamaIndex 集成 | ✅ 图像描述 | ✅ | ✅ | ⭐⭐⭐⭐ |
| Verba | ✅ Unstructured | ✅ | ✅ | ✅ | ⭐⭐⭐⭐ |
| RAG-Anything | ✅ 多模态优先 | ✅ 图像/视频/音频 | ✅ | ✅ | ⭐⭐⭐⭐⭐ |
| LightRAG | 基础(文本为主) | 有限 | 基础 | 无 | ⭐⭐⭐ |
| SurfSense | ✅ 网页优化 | ✅ 截图 | 基础 | ✅ | ⭐⭐⭐⭐ |
| Self-Corrective-Agentic-RAG | 基础 | 无 | 基础 | 无 | ⭐⭐⭐ |
| UltraRAG | 基础 | 无 | 基础 | 无 | ⭐⭐ |
关键洞察
- 最强大:ragflow 的深度版面分析(自研算法 + 视觉理解)
- 最通用:onyx 的 Unstructured 集成(支持 50+ 格式)
- 最前沿:RAG-Anything 的多模态优先策略(图像/音频/视频)
- 趋势:从纯文本提取向多模态理解演进
核心实现深度对比
1. ragflow:深度版面分析引擎
设计理念:将 PDF 视为"视觉文档",用 CV + NLP 联合解析
class DeepDocParser:
"""
ragflow 深度文档解析器
核心能力:
1. 版面分析(Layout Analysis):识别标题、段落、表格、图像区域
2. 阅读顺序恢复(Reading Order):重建逻辑阅读顺序
3. 表格结构识别(Table Structure):识别单元格、合并单元格
4. 公式识别(Formula OCR):将公式图像转为 LaTeX
5. 图像理解(Image Captioning):生成图像描述
"""
def __init__(
self,
layout_model: str = "layoutlmv3", # 版面分析模型
table_model: str = "table-transformer", # 表格识别模型
ocr_engine: str = "paddleocr", # OCR 引擎
enable_formula_ocr: bool = True,
enable_image_caption: bool = True
):
self.layout_model = self._load_layout_model(layout_model)
self.table_model = self._load_table_model(table_model)
self.ocr_engine = self._load_ocr_engine(ocr_engine)
self.enable_formula_ocr = enable_formula_ocr
self.enable_image_caption = enable_image_caption
if enable_formula_ocr:
self.formula_ocr = self._load_formula_ocr()
if enable_image_caption:
self.image_captioner = self._load_image_captioner()
def parse_pdf(self, pdf_path: str) -> list[dict]:
"""
解析 PDF 文档
Returns:
[
{
"type": "text" / "table" / "image" / "formula",
"content": str,
"bbox": [x1, y1, x2, y2],
"page": int,
"reading_order": int,
"metadata": {...}
},
...
]
"""
parsed_blocks = []
# 1. 将 PDF 转为图像(每页一张)
images = self._pdf_to_images(pdf_path)
for page_num, image in enumerate(images, 1):
# 2. 版面分析(识别区域类型)
layout_results = self._analyze_layout(image)
# layout_results = [
# {"type": "title", "bbox": [x1, y1, x2, y2], "score": 0.98},
# {"type": "text", "bbox": [...], "score": 0.95},
# {"type": "table", "bbox": [...], "score": 0.92},
# ...
# ]
# 3. 恢复阅读顺序(从上到下、从左到右,考虑多栏布局)
layout_results = self._sort_by_reading_order(layout_results)
# 4. 处理每个区域
for order, region in enumerate(layout_results, 1):
region_type = region["type"]
bbox = region["bbox"]
# 裁剪区域图像
region_image = self._crop_image(image, bbox)
if region_type in ["text", "title", "list"]:
# 文本 OCR
text = self._ocr_text(region_image)
parsed_blocks.append({
"type": region_type,
"content": text,
"bbox": bbox,
"page": page_num,
"reading_order": order
})
elif region_type == "table":
# 表格结构识别 + OCR
table_data = self._parse_table(region_image)
parsed_blocks.append({
"type": "table",
"content": self._table_to_markdown(table_data),
"raw_table": table_data, # [[cell1, cell2], [cell3, cell4]]
"bbox": bbox,
"page": page_num,
"reading_order": order
})
elif region_type == "formula":
# 公式 OCR(转为 LaTeX)
if self.enable_formula_ocr:
latex = self._formula_to_latex(region_image)
parsed_blocks.append({
"type": "formula",
"content": f"$${latex}$$",
"bbox": bbox,
"page": page_num,
"reading_order": order
})
elif region_type == "figure":
# 图像理解(生成描述)
if self.enable_image_caption:
caption = self._generate_image_caption(region_image)
parsed_blocks.append({
"type": "image",
"content": f"[Image: {caption}]",
"image_data": self._encode_image(region_image),
"bbox": bbox,
"page": page_num,
"reading_order": order
})
return parsed_blocks
def _analyze_layout(self, image: np.ndarray) -> list[dict]:
"""
版面分析(使用 LayoutLMv3 或类似模型)
返回文档中的所有区域及其类型
"""
# 运行版面检测模型
results = self.layout_model.detect(image)
# 过滤低置信度区域
filtered_results = [
r for r in results
if r["score"] > 0.5
]
return filtered_results
def _sort_by_reading_order(self, regions: list[dict]) -> list[dict]:
"""
恢复阅读顺序
策略:
1. 识别多栏布局(单栏/双栏/三栏)
2. 按栏从左到右,栏内从上到下排序
3. 特殊处理:标题通常跨越所有栏
"""
# 简化实现:从上到下、从左到右排序
sorted_regions = sorted(
regions,
key=lambda r: (r["bbox"][1], r["bbox"][0]) # (y1, x1)
)
# TODO: 实现多栏检测算法
return sorted_regions
def _parse_table(self, table_image: np.ndarray) -> list[list[str]]:
"""
表格结构识别 + OCR
步骤:
1. 识别表格结构(行列分割线)
2. 识别单元格边界
3. OCR 每个单元格
4. 处理合并单元格
"""
# 1. 识别表格结构
structure = self.table_model.detect_structure(table_image)
# structure = {
# "rows": 5,
# "cols": 3,
# "cells": [
# {"row": 0, "col": 0, "rowspan": 1, "colspan": 1, "bbox": [...]},
# ...
# ]
# }
# 2. OCR 每个单元格
table_data = []
for cell in structure["cells"]:
cell_image = self._crop_image(table_image, cell["bbox"])
cell_text = self._ocr_text(cell_image)
# 处理合并单元格
row = cell["row"]
col = cell["col"]
rowspan = cell["rowspan"]
colspan = cell["colspan"]
# 确保 table_data 有足够的行
while len(table_data) <= row + rowspan - 1:
table_data.append([])
# 填充单元格
for r in range(row, row + rowspan):
while len(table_data[r]) <= col + colspan - 1:
table_data[r].append("")
for c in range(col, col + colspan):
table_data[r][c] = cell_text if (r == row and c == col) else ""
return table_data
def _table_to_markdown(self, table_data: list[list[str]]) -> str:
"""将表格转为 Markdown 格式"""
if not table_data:
return ""
# 构建 Markdown 表格
lines = []
# 表头
lines.append("| " + " | ".join(table_data[0]) + " |")
lines.append("| " + " | ".join(["---"] * len(table_data[0])) + " |")
# 表格内容
for row in table_data[1:]:
lines.append("| " + " | ".join(row) + " |")
return "\n".join(lines)
def _ocr_text(self, image: np.ndarray) -> str:
"""OCR 文本提取"""
results = self.ocr_engine.ocr(image)
# 拼接文本(按阅读顺序)
texts = []
for line in results:
texts.append(line["text"])
return "\n".join(texts)
def _formula_to_latex(self, formula_image: np.ndarray) -> str:
"""公式图像转 LaTeX"""
latex = self.formula_ocr.predict(formula_image)
return latex
def _generate_image_caption(self, image: np.ndarray) -> str:
"""生成图像描述(使用视觉语言模型)"""
caption = self.image_captioner.generate_caption(image)
return caption
# 使用示例
parser = DeepDocParser(
layout_model="layoutlmv3",
table_model="table-transformer",
ocr_engine="paddleocr",
enable_formula_ocr=True,
enable_image_caption=True
)
# 解析 PDF
blocks = parser.parse_pdf("research_paper.pdf")
# 输出结构化内容
for block in blocks:
print(f"Page {block['page']}, Order {block['reading_order']}")
print(f"Type: {block['type']}")
print(f"Content: {block['content'][:100]}...")
print("---")核心优势:
- ✅ 版面分析准确率 > 95%(自研模型)
- ✅ 表格识别支持复杂合并单元格
- ✅ 公式识别支持手写公式
- ✅ 多语言支持(中英日韩等)
适用场景:
- 学术论文(大量公式和表格)
- 财报(复杂表格布局)
- 技术文档(图表密集)
2. onyx:Unstructured 通用解析引擎
设计理念:使用开源 Unstructured 库支持 50+ 文件格式
from unstructured.partition.auto import partition
from unstructured.staging.base import elements_to_json
from typing import Optional
class UniversalDocumentParser:
"""
onyx 通用文档解析器(基于 Unstructured)
支持格式:
- PDF, DOCX, PPTX, XLSX
- HTML, XML, Markdown
- EML (Email), MSG (Outlook)
- 图像 (PNG, JPG, TIFF)
- CSV, TSV, JSON
- 等 50+ 种格式
"""
def __init__(
self,
strategy: str = "hi_res", # fast / hi_res / ocr_only
extract_images: bool = True,
infer_table_structure: bool = True,
include_page_breaks: bool = True
):
self.strategy = strategy
self.extract_images = extract_images
self.infer_table_structure = infer_table_structure
self.include_page_breaks = include_page_breaks
def parse(
self,
file_path: str,
file_type: Optional[str] = None
) -> list[dict]:
"""
自动解析文档
Args:
file_path: 文档路径
file_type: 文件类型(可选,自动检测)
Returns:
[
{
"type": "Title" / "Text" / "Table" / "Image" / "ListItem",
"content": str,
"metadata": {
"page_number": int,
"coordinates": {...},
"filename": str
}
},
...
]
"""
# 使用 Unstructured 自动解析
elements = partition(
filename=file_path,
file=file_type,
strategy=self.strategy,
extract_images_in_pdf=self.extract_images,
infer_table_structure=self.infer_table_structure,
include_page_breaks=self.include_page_breaks,
# 额外参数
languages=["eng", "chi_sim"], # 多语言支持
detect_language_per_element=True
)
# 转换为标准格式
parsed_elements = []
for elem in elements:
parsed_elements.append({
"type": elem.category, # Title / Text / Table / Image / ListItem
"content": str(elem),
"metadata": {
"page_number": elem.metadata.page_number,
"coordinates": elem.metadata.coordinates,
"filename": elem.metadata.filename,
"file_type": elem.metadata.filetype,
"languages": elem.metadata.languages
}
})
return parsed_elements
def parse_with_chunking(
self,
file_path: str,
chunk_size: int = 1000,
overlap: int = 200
) -> list[dict]:
"""
解析并分块
适用于大文档(自动按语义边界分块)
"""
from unstructured.chunking.title import chunk_by_title
# 1. 解析文档
elements = partition(
filename=file_path,
strategy=self.strategy
)
# 2. 按标题分块(保留文档结构)
chunks = chunk_by_title(
elements,
max_characters=chunk_size,
overlap=overlap,
combine_text_under_n_chars=100 # 合并过短的段落
)
# 3. 转换为标准格式
parsed_chunks = []
for i, chunk in enumerate(chunks, 1):
parsed_chunks.append({
"chunk_id": i,
"content": str(chunk),
"metadata": {
"page_numbers": self._extract_page_numbers(chunk),
"element_types": self._extract_element_types(chunk)
}
})
return parsed_chunks
def _extract_page_numbers(self, chunk) -> list[int]:
"""提取 chunk 涉及的页码"""
pages = set()
for elem in chunk.elements:
if hasattr(elem.metadata, 'page_number') and elem.metadata.page_number:
pages.add(elem.metadata.page_number)
return sorted(pages)
def _extract_element_types(self, chunk) -> list[str]:
"""提取 chunk 包含的元素类型"""
types = set()
for elem in chunk.elements:
types.add(elem.category)
return sorted(types)
# 使用示例
# 示例1:解析 PDF(高精度模式)
parser = UniversalDocumentParser(strategy="hi_res")
elements = parser.parse("report.pdf")
for elem in elements:
print(f"{elem['type']}: {elem['content'][:100]}...")
# 示例2:解析 Word 文档(快速模式)
parser_fast = UniversalDocumentParser(strategy="fast")
elements = parser_fast.parse("document.docx")
# 示例3:解析并分块(适合 RAG)
parser_chunking = UniversalDocumentParser(strategy="hi_res")
chunks = parser_chunking.parse_with_chunking(
"long_document.pdf",
chunk_size=1000,
overlap=200
)
for chunk in chunks:
print(f"Chunk {chunk['chunk_id']} (Pages: {chunk['metadata']['page_numbers']})")
print(chunk['content'])
print("---")核心优势:
- ✅ 支持 50+ 文件格式(开箱即用)
- ✅ 三种解析策略(fast / hi_res / ocr_only)
- ✅ 自动检测文件类型
- ✅ 保留文档结构(标题、列表、表格)
性能对比:
| 策略 | 速度 | 准确率 | 适用场景 |
|---|---|---|---|
| fast | 快 (1s/page) | 中 (85%) | 纯文本文档 |
| hi_res | 中 (3s/page) | 高 (95%) | 复杂布局、表格 |
| ocr_only | 慢 (5s/page) | 中 (80%) | 扫描文档、图像 |
3. RAG-Anything:多模态优先解析
设计理念:不仅提取文本,更理解视觉内容(图像、视频、音频)
from transformers import pipeline
from PIL import Image
import whisper
class MultimodalDocumentParser:
"""
RAG-Anything 多模态文档解析器
能力:
1. 文本提取(PDF, Office)
2. 图像理解(生成详细描述 + 回答问题)
3. 视频解析(关键帧提取 + 字幕识别)
4. 音频转录(语音转文字)
"""
def __init__(self):
# 视觉语言模型(图像理解)
self.vision_model = pipeline(
"image-to-text",
model="Salesforce/blip2-opt-2.7b"
)
# 音频转录模型
self.audio_model = whisper.load_model("base")
# 基础文本解析器
self.text_parser = UniversalDocumentParser()
def parse_document(
self,
file_path: str,
parse_images: bool = True,
parse_videos: bool = True,
parse_audio: bool = True
) -> dict:
"""
多模态文档解析
Returns:
{
"text_blocks": [...],
"images": [...],
"videos": [...],
"audio": [...]
}
"""
file_ext = file_path.split('.')[-1].lower()
result = {
"text_blocks": [],
"images": [],
"videos": [],
"audio": []
}
# 1. 文本文档(PDF, DOCX, etc.)
if file_ext in ['pdf', 'docx', 'pptx', 'txt', 'md']:
text_elements = self.text_parser.parse(file_path)
for elem in text_elements:
if elem['type'] in ['Title', 'Text', 'Table', 'ListItem']:
result['text_blocks'].append(elem)
elif elem['type'] == 'Image' and parse_images:
# 提取嵌入的图像并理解
image = self._extract_embedded_image(elem)
if image:
image_data = self.parse_image(image)
result['images'].append(image_data)
# 2. 纯图像文件
elif file_ext in ['jpg', 'jpeg', 'png', 'gif', 'bmp'] and parse_images:
image = Image.open(file_path)
image_data = self.parse_image(image, file_path)
result['images'].append(image_data)
# 3. 视频文件
elif file_ext in ['mp4', 'avi', 'mov', 'mkv'] and parse_videos:
video_data = self.parse_video(file_path)
result['videos'].append(video_data)
# 4. 音频文件
elif file_ext in ['mp3', 'wav', 'flac', 'm4a'] and parse_audio:
audio_data = self.parse_audio(file_path)
result['audio'].append(audio_data)
return result
def parse_image(
self,
image: Image.Image,
image_path: str = None
) -> dict:
"""
图像理解(生成详细描述)
策略:
1. 生成通用描述(Image Captioning)
2. 检测图像类型(图表 / 截图 / 照片)
3. 如果是图表,提取数据(OCR + 结构识别)
"""
# 1. 生成描述
caption = self.vision_model(image)[0]['generated_text']
# 2. 检测图像类型
image_type = self._classify_image_type(image)
# 3. 如果是图表/表格,提取详细信息
extracted_data = None
if image_type in ['chart', 'table', 'diagram']:
extracted_data = self._extract_chart_data(image)
return {
"type": "image",
"path": image_path,
"caption": caption,
"image_type": image_type,
"extracted_data": extracted_data,
"metadata": {
"width": image.width,
"height": image.height
}
}
def parse_video(self, video_path: str) -> dict:
"""
视频解析
策略:
1. 提取关键帧(每 N 秒一帧)
2. 对每帧生成描述
3. 提取音频并转录
4. 合并时间轴
"""
import cv2
# 1. 打开视频
video = cv2.VideoCapture(video_path)
fps = video.get(cv2.CAP_PROP_FPS)
frame_count = int(video.get(cv2.CAP_PROP_FRAME_COUNT))
duration = frame_count / fps
# 2. 提取关键帧(每 5 秒)
keyframe_interval = int(fps * 5)
keyframes = []
frame_idx = 0
while True:
ret, frame = video.read()
if not ret:
break
if frame_idx % keyframe_interval == 0:
# 转为 PIL Image
frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
image = Image.fromarray(frame_rgb)
# 生成描述
caption = self.vision_model(image)[0]['generated_text']
timestamp = frame_idx / fps
keyframes.append({
"timestamp": timestamp,
"caption": caption
})
frame_idx += 1
video.release()
# 3. 提取音频并转录
audio_path = self._extract_audio_from_video(video_path)
transcript = self.audio_model.transcribe(audio_path)
return {
"type": "video",
"path": video_path,
"duration": duration,
"fps": fps,
"keyframes": keyframes,
"transcript": transcript['text'],
"transcript_segments": transcript['segments'] # 带时间戳的片段
}
def parse_audio(self, audio_path: str) -> dict:
"""
音频转录
使用 Whisper 模型(支持 99 种语言)
"""
result = self.audio_model.transcribe(
audio_path,
language=None, # 自动检测
task="transcribe"
)
return {
"type": "audio",
"path": audio_path,
"transcript": result['text'],
"language": result['language'],
"segments": result['segments'] # 带时间戳的片段
}
def _classify_image_type(self, image: Image.Image) -> str:
"""分类图像类型(简化版)"""
# TODO: 使用图像分类模型
# 简化实现:基于宽高比和颜色分布
width, height = image.size
aspect_ratio = width / height
if 1.3 < aspect_ratio < 2.0:
return "chart" # 图表通常是横向的
elif aspect_ratio > 2.0:
return "screenshot" # 截图通常更宽
else:
return "photo"
def _extract_chart_data(self, image: Image.Image) -> dict:
"""从图表中提取数据(简化版)"""
# TODO: 使用专门的图表识别模型
# 简化实现:OCR + 启发式规则
return {
"chart_type": "bar_chart",
"data": [...] # 提取的数据点
}
def _extract_audio_from_video(self, video_path: str) -> str:
"""从视频中提取音频"""
import ffmpeg
audio_path = video_path.rsplit('.', 1)[0] + '_audio.wav'
ffmpeg.input(video_path).output(
audio_path,
acodec='pcm_s16le',
ac=1,
ar='16k'
).run(quiet=True, overwrite_output=True)
return audio_path
# 使用示例
parser = MultimodalDocumentParser()
# 示例1:解析包含图表的 PDF
result = parser.parse_document("financial_report.pdf")
print(f"Text blocks: {len(result['text_blocks'])}")
print(f"Images: {len(result['images'])}")
# 示例2:解析图像(生成详细描述)
image = Image.open("chart.png")
image_data = parser.parse_image(image)
print(f"Caption: {image_data['caption']}")
print(f"Type: {image_data['image_type']}")
# 示例3:解析视频(提取关键帧 + 字幕)
video_data = parser.parse_video("presentation.mp4")
print(f"Duration: {video_data['duration']}s")
print(f"Keyframes: {len(video_data['keyframes'])}")
print(f"Transcript: {video_data['transcript'][:200]}...")
# 示例4:解析音频(转录)
audio_data = parser.parse_audio("podcast.mp3")
print(f"Language: {audio_data['language']}")
print(f"Transcript: {audio_data['transcript']}")核心优势:
- ✅ 真正的多模态理解(不只是提取文本)
- ✅ 视频支持(关键帧 + 字幕)
- ✅ 音频转录(99 种语言)
- ✅ 图像理解(生成描述 + 图表数据提取)
适用场景:
- 在线课程(视频 + 字幕)
- 播客/访谈(音频转录)
- 图表密集的报告(自动提取数据)
解析质量评估
评估指标
| 指标 | 计算方法 | 目标值 |
|---|---|---|
| 文本准确率 | 正确字符数 / 总字符数 | > 98% |
| 表格完整率 | 识别的单元格数 / 实际单元格数 | > 95% |
| 版面保留率 | 保留的结构元素 / 原始结构元素 | > 90% |
| OCR 错误率 | 错误字符数 / 总字符数 | < 2% |
常见问题与解决方案
PDF 文字乱码
原因:字体嵌入问题、编码错误
解决:使用 OCR 模式(hi_res / ocr_only)而非文本提取
表格识别失败
原因:无边框表格、复杂合并单元格
解决:使用深度学习表格识别(table-transformer)
公式丢失
原因:公式为图像格式
解决:启用公式 OCR(Pix2Tex, LaTeX-OCR)
图像描述不准确
原因:视觉模型能力限制
解决:使用更强大的 VLM(GPT-4V, Claude 3)
最佳实践
1. 选择合适的解析策略
def select_parser(file_path: str, requirements: dict) -> DocumentParser:
"""
根据需求选择解析器
Args:
requirements: {
"accuracy": "high" / "medium" / "low",
"speed": "fast" / "medium" / "slow",
"multimodal": bool,
"preserve_structure": bool
}
"""
file_ext = file_path.split('.')[-1].lower()
# 策略1:纯文本文档 + 速度优先 → Unstructured (fast)
if file_ext in ['txt', 'md', 'docx'] and requirements['speed'] == 'fast':
return UniversalDocumentParser(strategy="fast")
# 策略2:复杂 PDF + 高准确率 → ragflow DeepDoc
if file_ext == 'pdf' and requirements['accuracy'] == 'high':
return DeepDocParser(
enable_formula_ocr=True,
enable_image_caption=True
)
# 策略3:多模态内容 → RAG-Anything
if requirements['multimodal']:
return MultimodalDocumentParser()
# 策略4:通用场景 → Unstructured (hi_res)
return UniversalDocumentParser(strategy="hi_res")2. 后处理优化
def clean_parsed_text(text: str) -> str:
"""清理解析后的文本"""
import re
# 1. 移除多余空白
text = re.sub(r'\s+', ' ', text)
# 2. 修复常见 OCR 错误
text = text.replace('l1', '11') # OCR 常见错误:l1 → 11
text = text.replace('O0', '00') # O0 → 00
# 3. 规范化标点
text = text.replace(',', ',')
text = text.replace('。', '.')
return text.strip()
def merge_fragments(blocks: list[dict]) -> str:
"""合并文档片段(保留结构)"""
merged = []
for block in blocks:
if block['type'] == 'Title':
merged.append(f"\n## {block['content']}\n")
elif block['type'] == 'Text':
merged.append(block['content'])
elif block['type'] == 'Table':
merged.append(f"\n{block['content']}\n")
elif block['type'] == 'ListItem':
merged.append(f"- {block['content']}")
return "\n".join(merged)延伸阅读
参考文献
本文基于以下研究材料整理:
- RAGSolutions/document_parsing_analysis.md - 文档解析技术分析
- RAGSolutions/best_practices_recommendations.md - 最佳实践推荐
- ragflow 官方文档 - 深度版面分析算法
- Unstructured 官方文档 - 通用解析库
下一步:进入 分块策略与算法 了解如何将解析后的文本高质量分块。