Deeptoai RAG系列教程
RAG项目实战分析(原创)03 数据处理与索引

文档解析技术

从 PDF 到多模态内容的完整文档解析工程方案

为什么文档解析是 RAG 的数据入口

再强大的检索和生成能力,如果文档解析出错(表格错位、公式丢失、图片未识别),RAG 系统就是"垃圾进,垃圾出"。高质量的文档解析是构建可靠 RAG 系统的第一步,决定了后续所有环节的质量上限。

文档解析技术

背景与核心挑战

文档解析的难点

文档类型核心挑战影响
PDF文字提取、表格识别、版面分析最常见但最复杂
Office (Word/Excel/PPT)格式保留、嵌入对象企业文档主流
HTML/Markdown结构解析、样式剥离Web 内容爬取
图像 (JPG/PNG)OCR 准确率、多语言扫描文档、截图
多模态 (图表/公式)视觉理解、语义提取技术文档、论文

解析质量的三个维度

  1. 准确性:文字、表格、公式是否正确提取
  2. 完整性:是否保留了所有重要信息(标题、列表、引用)
  3. 结构化:是否保留了文档的层次结构(章节、段落)

九大项目文档解析方案全景

项目PDF 解析多模态支持表格处理OCR技术成熟度
ragflow✅ 深度版面分析✅ 图像/表格理解✅ 高级⭐⭐⭐⭐⭐
onyx✅ 多引擎(Unstructured)⭐⭐⭐⭐⭐
kotaemon✅ LlamaIndex 集成✅ 图像描述⭐⭐⭐⭐
Verba✅ Unstructured⭐⭐⭐⭐
RAG-Anything✅ 多模态优先✅ 图像/视频/音频⭐⭐⭐⭐⭐
LightRAG基础(文本为主)有限基础⭐⭐⭐
SurfSense✅ 网页优化✅ 截图基础⭐⭐⭐⭐
Self-Corrective-Agentic-RAG基础基础⭐⭐⭐
UltraRAG基础基础⭐⭐

关键洞察

  • 最强大:ragflow 的深度版面分析(自研算法 + 视觉理解)
  • 最通用:onyx 的 Unstructured 集成(支持 50+ 格式)
  • 最前沿:RAG-Anything 的多模态优先策略(图像/音频/视频)
  • 趋势:从纯文本提取向多模态理解演进

核心实现深度对比

1. ragflow:深度版面分析引擎

设计理念:将 PDF 视为"视觉文档",用 CV + NLP 联合解析

ragflow/deepdoc_parser.py
class DeepDocParser:
    """
    ragflow 深度文档解析器
    
    核心能力:
    1. 版面分析(Layout Analysis):识别标题、段落、表格、图像区域
    2. 阅读顺序恢复(Reading Order):重建逻辑阅读顺序
    3. 表格结构识别(Table Structure):识别单元格、合并单元格
    4. 公式识别(Formula OCR):将公式图像转为 LaTeX
    5. 图像理解(Image Captioning):生成图像描述
    """
    
    def __init__(
        self,
        layout_model: str = "layoutlmv3",  # 版面分析模型
        table_model: str = "table-transformer",  # 表格识别模型
        ocr_engine: str = "paddleocr",  # OCR 引擎
        enable_formula_ocr: bool = True,
        enable_image_caption: bool = True
    ):
        self.layout_model = self._load_layout_model(layout_model)
        self.table_model = self._load_table_model(table_model)
        self.ocr_engine = self._load_ocr_engine(ocr_engine)
        self.enable_formula_ocr = enable_formula_ocr
        self.enable_image_caption = enable_image_caption
        
        if enable_formula_ocr:
            self.formula_ocr = self._load_formula_ocr()
        
        if enable_image_caption:
            self.image_captioner = self._load_image_captioner()
    
    def parse_pdf(self, pdf_path: str) -> list[dict]:
        """
        解析 PDF 文档
        
        Returns:
            [
                {
                    "type": "text" / "table" / "image" / "formula",
                    "content": str,
                    "bbox": [x1, y1, x2, y2],
                    "page": int,
                    "reading_order": int,
                    "metadata": {...}
                },
                ...
            ]
        """
        parsed_blocks = []
        
        # 1. 将 PDF 转为图像(每页一张)
        images = self._pdf_to_images(pdf_path)
        
        for page_num, image in enumerate(images, 1):
            # 2. 版面分析(识别区域类型)
            layout_results = self._analyze_layout(image)
            # layout_results = [
            #     {"type": "title", "bbox": [x1, y1, x2, y2], "score": 0.98},
            #     {"type": "text", "bbox": [...], "score": 0.95},
            #     {"type": "table", "bbox": [...], "score": 0.92},
            #     ...
            # ]
            
            # 3. 恢复阅读顺序(从上到下、从左到右,考虑多栏布局)
            layout_results = self._sort_by_reading_order(layout_results)
            
            # 4. 处理每个区域
            for order, region in enumerate(layout_results, 1):
                region_type = region["type"]
                bbox = region["bbox"]
                
                # 裁剪区域图像
                region_image = self._crop_image(image, bbox)
                
                if region_type in ["text", "title", "list"]:
                    # 文本 OCR
                    text = self._ocr_text(region_image)
                    
                    parsed_blocks.append({
                        "type": region_type,
                        "content": text,
                        "bbox": bbox,
                        "page": page_num,
                        "reading_order": order
                    })
                
                elif region_type == "table":
                    # 表格结构识别 + OCR
                    table_data = self._parse_table(region_image)
                    
                    parsed_blocks.append({
                        "type": "table",
                        "content": self._table_to_markdown(table_data),
                        "raw_table": table_data,  # [[cell1, cell2], [cell3, cell4]]
                        "bbox": bbox,
                        "page": page_num,
                        "reading_order": order
                    })
                
                elif region_type == "formula":
                    # 公式 OCR(转为 LaTeX)
                    if self.enable_formula_ocr:
                        latex = self._formula_to_latex(region_image)
                        
                        parsed_blocks.append({
                            "type": "formula",
                            "content": f"$${latex}$$",
                            "bbox": bbox,
                            "page": page_num,
                            "reading_order": order
                        })
                
                elif region_type == "figure":
                    # 图像理解(生成描述)
                    if self.enable_image_caption:
                        caption = self._generate_image_caption(region_image)
                        
                        parsed_blocks.append({
                            "type": "image",
                            "content": f"[Image: {caption}]",
                            "image_data": self._encode_image(region_image),
                            "bbox": bbox,
                            "page": page_num,
                            "reading_order": order
                        })
        
        return parsed_blocks
    
    def _analyze_layout(self, image: np.ndarray) -> list[dict]:
        """
        版面分析(使用 LayoutLMv3 或类似模型)
        
        返回文档中的所有区域及其类型
        """
        # 运行版面检测模型
        results = self.layout_model.detect(image)
        
        # 过滤低置信度区域
        filtered_results = [
            r for r in results
            if r["score"] > 0.5
        ]
        
        return filtered_results
    
    def _sort_by_reading_order(self, regions: list[dict]) -> list[dict]:
        """
        恢复阅读顺序
        
        策略:
        1. 识别多栏布局(单栏/双栏/三栏)
        2. 按栏从左到右,栏内从上到下排序
        3. 特殊处理:标题通常跨越所有栏
        """
        # 简化实现:从上到下、从左到右排序
        sorted_regions = sorted(
            regions,
            key=lambda r: (r["bbox"][1], r["bbox"][0])  # (y1, x1)
        )
        
        # TODO: 实现多栏检测算法
        
        return sorted_regions
    
    def _parse_table(self, table_image: np.ndarray) -> list[list[str]]:
        """
        表格结构识别 + OCR
        
        步骤:
        1. 识别表格结构(行列分割线)
        2. 识别单元格边界
        3. OCR 每个单元格
        4. 处理合并单元格
        """
        # 1. 识别表格结构
        structure = self.table_model.detect_structure(table_image)
        # structure = {
        #     "rows": 5,
        #     "cols": 3,
        #     "cells": [
        #         {"row": 0, "col": 0, "rowspan": 1, "colspan": 1, "bbox": [...]},
        #         ...
        #     ]
        # }
        
        # 2. OCR 每个单元格
        table_data = []
        for cell in structure["cells"]:
            cell_image = self._crop_image(table_image, cell["bbox"])
            cell_text = self._ocr_text(cell_image)
            
            # 处理合并单元格
            row = cell["row"]
            col = cell["col"]
            rowspan = cell["rowspan"]
            colspan = cell["colspan"]
            
            # 确保 table_data 有足够的行
            while len(table_data) <= row + rowspan - 1:
                table_data.append([])
            
            # 填充单元格
            for r in range(row, row + rowspan):
                while len(table_data[r]) <= col + colspan - 1:
                    table_data[r].append("")
                
                for c in range(col, col + colspan):
                    table_data[r][c] = cell_text if (r == row and c == col) else ""
        
        return table_data
    
    def _table_to_markdown(self, table_data: list[list[str]]) -> str:
        """将表格转为 Markdown 格式"""
        if not table_data:
            return ""
        
        # 构建 Markdown 表格
        lines = []
        
        # 表头
        lines.append("| " + " | ".join(table_data[0]) + " |")
        lines.append("| " + " | ".join(["---"] * len(table_data[0])) + " |")
        
        # 表格内容
        for row in table_data[1:]:
            lines.append("| " + " | ".join(row) + " |")
        
        return "\n".join(lines)
    
    def _ocr_text(self, image: np.ndarray) -> str:
        """OCR 文本提取"""
        results = self.ocr_engine.ocr(image)
        
        # 拼接文本(按阅读顺序)
        texts = []
        for line in results:
            texts.append(line["text"])
        
        return "\n".join(texts)
    
    def _formula_to_latex(self, formula_image: np.ndarray) -> str:
        """公式图像转 LaTeX"""
        latex = self.formula_ocr.predict(formula_image)
        return latex
    
    def _generate_image_caption(self, image: np.ndarray) -> str:
        """生成图像描述(使用视觉语言模型)"""
        caption = self.image_captioner.generate_caption(image)
        return caption

# 使用示例
parser = DeepDocParser(
    layout_model="layoutlmv3",
    table_model="table-transformer",
    ocr_engine="paddleocr",
    enable_formula_ocr=True,
    enable_image_caption=True
)

# 解析 PDF
blocks = parser.parse_pdf("research_paper.pdf")

# 输出结构化内容
for block in blocks:
    print(f"Page {block['page']}, Order {block['reading_order']}")
    print(f"Type: {block['type']}")
    print(f"Content: {block['content'][:100]}...")
    print("---")

核心优势

  • ✅ 版面分析准确率 > 95%(自研模型)
  • ✅ 表格识别支持复杂合并单元格
  • ✅ 公式识别支持手写公式
  • ✅ 多语言支持(中英日韩等)

适用场景

  • 学术论文(大量公式和表格)
  • 财报(复杂表格布局)
  • 技术文档(图表密集)

2. onyx:Unstructured 通用解析引擎

设计理念:使用开源 Unstructured 库支持 50+ 文件格式

onyx/unstructured_parser.py
from unstructured.partition.auto import partition
from unstructured.staging.base import elements_to_json
from typing import Optional

class UniversalDocumentParser:
    """
    onyx 通用文档解析器(基于 Unstructured)
    
    支持格式:
    - PDF, DOCX, PPTX, XLSX
    - HTML, XML, Markdown
    - EML (Email), MSG (Outlook)
    - 图像 (PNG, JPG, TIFF)
    - CSV, TSV, JSON
    - 等 50+ 种格式
    """
    
    def __init__(
        self,
        strategy: str = "hi_res",  # fast / hi_res / ocr_only
        extract_images: bool = True,
        infer_table_structure: bool = True,
        include_page_breaks: bool = True
    ):
        self.strategy = strategy
        self.extract_images = extract_images
        self.infer_table_structure = infer_table_structure
        self.include_page_breaks = include_page_breaks
    
    def parse(
        self,
        file_path: str,
        file_type: Optional[str] = None
    ) -> list[dict]:
        """
        自动解析文档
        
        Args:
            file_path: 文档路径
            file_type: 文件类型(可选,自动检测)
        
        Returns:
            [
                {
                    "type": "Title" / "Text" / "Table" / "Image" / "ListItem",
                    "content": str,
                    "metadata": {
                        "page_number": int,
                        "coordinates": {...},
                        "filename": str
                    }
                },
                ...
            ]
        """
        # 使用 Unstructured 自动解析
        elements = partition(
            filename=file_path,
            file=file_type,
            strategy=self.strategy,
            extract_images_in_pdf=self.extract_images,
            infer_table_structure=self.infer_table_structure,
            include_page_breaks=self.include_page_breaks,
            # 额外参数
            languages=["eng", "chi_sim"],  # 多语言支持
            detect_language_per_element=True
        )
        
        # 转换为标准格式
        parsed_elements = []
        for elem in elements:
            parsed_elements.append({
                "type": elem.category,  # Title / Text / Table / Image / ListItem
                "content": str(elem),
                "metadata": {
                    "page_number": elem.metadata.page_number,
                    "coordinates": elem.metadata.coordinates,
                    "filename": elem.metadata.filename,
                    "file_type": elem.metadata.filetype,
                    "languages": elem.metadata.languages
                }
            })
        
        return parsed_elements
    
    def parse_with_chunking(
        self,
        file_path: str,
        chunk_size: int = 1000,
        overlap: int = 200
    ) -> list[dict]:
        """
        解析并分块
        
        适用于大文档(自动按语义边界分块)
        """
        from unstructured.chunking.title import chunk_by_title
        
        # 1. 解析文档
        elements = partition(
            filename=file_path,
            strategy=self.strategy
        )
        
        # 2. 按标题分块(保留文档结构)
        chunks = chunk_by_title(
            elements,
            max_characters=chunk_size,
            overlap=overlap,
            combine_text_under_n_chars=100  # 合并过短的段落
        )
        
        # 3. 转换为标准格式
        parsed_chunks = []
        for i, chunk in enumerate(chunks, 1):
            parsed_chunks.append({
                "chunk_id": i,
                "content": str(chunk),
                "metadata": {
                    "page_numbers": self._extract_page_numbers(chunk),
                    "element_types": self._extract_element_types(chunk)
                }
            })
        
        return parsed_chunks
    
    def _extract_page_numbers(self, chunk) -> list[int]:
        """提取 chunk 涉及的页码"""
        pages = set()
        for elem in chunk.elements:
            if hasattr(elem.metadata, 'page_number') and elem.metadata.page_number:
                pages.add(elem.metadata.page_number)
        return sorted(pages)
    
    def _extract_element_types(self, chunk) -> list[str]:
        """提取 chunk 包含的元素类型"""
        types = set()
        for elem in chunk.elements:
            types.add(elem.category)
        return sorted(types)

# 使用示例

# 示例1:解析 PDF(高精度模式)
parser = UniversalDocumentParser(strategy="hi_res")
elements = parser.parse("report.pdf")

for elem in elements:
    print(f"{elem['type']}: {elem['content'][:100]}...")

# 示例2:解析 Word 文档(快速模式)
parser_fast = UniversalDocumentParser(strategy="fast")
elements = parser_fast.parse("document.docx")

# 示例3:解析并分块(适合 RAG)
parser_chunking = UniversalDocumentParser(strategy="hi_res")
chunks = parser_chunking.parse_with_chunking(
    "long_document.pdf",
    chunk_size=1000,
    overlap=200
)

for chunk in chunks:
    print(f"Chunk {chunk['chunk_id']} (Pages: {chunk['metadata']['page_numbers']})")
    print(chunk['content'])
    print("---")

核心优势

  • ✅ 支持 50+ 文件格式(开箱即用)
  • ✅ 三种解析策略(fast / hi_res / ocr_only)
  • ✅ 自动检测文件类型
  • ✅ 保留文档结构(标题、列表、表格)

性能对比

策略速度准确率适用场景
fast快 (1s/page)中 (85%)纯文本文档
hi_res中 (3s/page)高 (95%)复杂布局、表格
ocr_only慢 (5s/page)中 (80%)扫描文档、图像

3. RAG-Anything:多模态优先解析

设计理念:不仅提取文本,更理解视觉内容(图像、视频、音频)

rag_anything/multimodal_parser.py
from transformers import pipeline
from PIL import Image
import whisper

class MultimodalDocumentParser:
    """
    RAG-Anything 多模态文档解析器
    
    能力:
    1. 文本提取(PDF, Office)
    2. 图像理解(生成详细描述 + 回答问题)
    3. 视频解析(关键帧提取 + 字幕识别)
    4. 音频转录(语音转文字)
    """
    
    def __init__(self):
        # 视觉语言模型(图像理解)
        self.vision_model = pipeline(
            "image-to-text",
            model="Salesforce/blip2-opt-2.7b"
        )
        
        # 音频转录模型
        self.audio_model = whisper.load_model("base")
        
        # 基础文本解析器
        self.text_parser = UniversalDocumentParser()
    
    def parse_document(
        self,
        file_path: str,
        parse_images: bool = True,
        parse_videos: bool = True,
        parse_audio: bool = True
    ) -> dict:
        """
        多模态文档解析
        
        Returns:
            {
                "text_blocks": [...],
                "images": [...],
                "videos": [...],
                "audio": [...]
            }
        """
        file_ext = file_path.split('.')[-1].lower()
        
        result = {
            "text_blocks": [],
            "images": [],
            "videos": [],
            "audio": []
        }
        
        # 1. 文本文档(PDF, DOCX, etc.)
        if file_ext in ['pdf', 'docx', 'pptx', 'txt', 'md']:
            text_elements = self.text_parser.parse(file_path)
            
            for elem in text_elements:
                if elem['type'] in ['Title', 'Text', 'Table', 'ListItem']:
                    result['text_blocks'].append(elem)
                
                elif elem['type'] == 'Image' and parse_images:
                    # 提取嵌入的图像并理解
                    image = self._extract_embedded_image(elem)
                    if image:
                        image_data = self.parse_image(image)
                        result['images'].append(image_data)
        
        # 2. 纯图像文件
        elif file_ext in ['jpg', 'jpeg', 'png', 'gif', 'bmp'] and parse_images:
            image = Image.open(file_path)
            image_data = self.parse_image(image, file_path)
            result['images'].append(image_data)
        
        # 3. 视频文件
        elif file_ext in ['mp4', 'avi', 'mov', 'mkv'] and parse_videos:
            video_data = self.parse_video(file_path)
            result['videos'].append(video_data)
        
        # 4. 音频文件
        elif file_ext in ['mp3', 'wav', 'flac', 'm4a'] and parse_audio:
            audio_data = self.parse_audio(file_path)
            result['audio'].append(audio_data)
        
        return result
    
    def parse_image(
        self,
        image: Image.Image,
        image_path: str = None
    ) -> dict:
        """
        图像理解(生成详细描述)
        
        策略:
        1. 生成通用描述(Image Captioning)
        2. 检测图像类型(图表 / 截图 / 照片)
        3. 如果是图表,提取数据(OCR + 结构识别)
        """
        # 1. 生成描述
        caption = self.vision_model(image)[0]['generated_text']
        
        # 2. 检测图像类型
        image_type = self._classify_image_type(image)
        
        # 3. 如果是图表/表格,提取详细信息
        extracted_data = None
        if image_type in ['chart', 'table', 'diagram']:
            extracted_data = self._extract_chart_data(image)
        
        return {
            "type": "image",
            "path": image_path,
            "caption": caption,
            "image_type": image_type,
            "extracted_data": extracted_data,
            "metadata": {
                "width": image.width,
                "height": image.height
            }
        }
    
    def parse_video(self, video_path: str) -> dict:
        """
        视频解析
        
        策略:
        1. 提取关键帧(每 N 秒一帧)
        2. 对每帧生成描述
        3. 提取音频并转录
        4. 合并时间轴
        """
        import cv2
        
        # 1. 打开视频
        video = cv2.VideoCapture(video_path)
        fps = video.get(cv2.CAP_PROP_FPS)
        frame_count = int(video.get(cv2.CAP_PROP_FRAME_COUNT))
        duration = frame_count / fps
        
        # 2. 提取关键帧(每 5 秒)
        keyframe_interval = int(fps * 5)
        keyframes = []
        
        frame_idx = 0
        while True:
            ret, frame = video.read()
            if not ret:
                break
            
            if frame_idx % keyframe_interval == 0:
                # 转为 PIL Image
                frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
                image = Image.fromarray(frame_rgb)
                
                # 生成描述
                caption = self.vision_model(image)[0]['generated_text']
                timestamp = frame_idx / fps
                
                keyframes.append({
                    "timestamp": timestamp,
                    "caption": caption
                })
            
            frame_idx += 1
        
        video.release()
        
        # 3. 提取音频并转录
        audio_path = self._extract_audio_from_video(video_path)
        transcript = self.audio_model.transcribe(audio_path)
        
        return {
            "type": "video",
            "path": video_path,
            "duration": duration,
            "fps": fps,
            "keyframes": keyframes,
            "transcript": transcript['text'],
            "transcript_segments": transcript['segments']  # 带时间戳的片段
        }
    
    def parse_audio(self, audio_path: str) -> dict:
        """
        音频转录
        
        使用 Whisper 模型(支持 99 种语言)
        """
        result = self.audio_model.transcribe(
            audio_path,
            language=None,  # 自动检测
            task="transcribe"
        )
        
        return {
            "type": "audio",
            "path": audio_path,
            "transcript": result['text'],
            "language": result['language'],
            "segments": result['segments']  # 带时间戳的片段
        }
    
    def _classify_image_type(self, image: Image.Image) -> str:
        """分类图像类型(简化版)"""
        # TODO: 使用图像分类模型
        # 简化实现:基于宽高比和颜色分布
        width, height = image.size
        aspect_ratio = width / height
        
        if 1.3 < aspect_ratio < 2.0:
            return "chart"  # 图表通常是横向的
        elif aspect_ratio > 2.0:
            return "screenshot"  # 截图通常更宽
        else:
            return "photo"
    
    def _extract_chart_data(self, image: Image.Image) -> dict:
        """从图表中提取数据(简化版)"""
        # TODO: 使用专门的图表识别模型
        # 简化实现:OCR + 启发式规则
        return {
            "chart_type": "bar_chart",
            "data": [...]  # 提取的数据点
        }
    
    def _extract_audio_from_video(self, video_path: str) -> str:
        """从视频中提取音频"""
        import ffmpeg
        
        audio_path = video_path.rsplit('.', 1)[0] + '_audio.wav'
        
        ffmpeg.input(video_path).output(
            audio_path,
            acodec='pcm_s16le',
            ac=1,
            ar='16k'
        ).run(quiet=True, overwrite_output=True)
        
        return audio_path

# 使用示例

parser = MultimodalDocumentParser()

# 示例1:解析包含图表的 PDF
result = parser.parse_document("financial_report.pdf")
print(f"Text blocks: {len(result['text_blocks'])}")
print(f"Images: {len(result['images'])}")

# 示例2:解析图像(生成详细描述)
image = Image.open("chart.png")
image_data = parser.parse_image(image)
print(f"Caption: {image_data['caption']}")
print(f"Type: {image_data['image_type']}")

# 示例3:解析视频(提取关键帧 + 字幕)
video_data = parser.parse_video("presentation.mp4")
print(f"Duration: {video_data['duration']}s")
print(f"Keyframes: {len(video_data['keyframes'])}")
print(f"Transcript: {video_data['transcript'][:200]}...")

# 示例4:解析音频(转录)
audio_data = parser.parse_audio("podcast.mp3")
print(f"Language: {audio_data['language']}")
print(f"Transcript: {audio_data['transcript']}")

核心优势

  • ✅ 真正的多模态理解(不只是提取文本)
  • ✅ 视频支持(关键帧 + 字幕)
  • ✅ 音频转录(99 种语言)
  • ✅ 图像理解(生成描述 + 图表数据提取)

适用场景

  • 在线课程(视频 + 字幕)
  • 播客/访谈(音频转录)
  • 图表密集的报告(自动提取数据)

解析质量评估

评估指标

指标计算方法目标值
文本准确率正确字符数 / 总字符数> 98%
表格完整率识别的单元格数 / 实际单元格数> 95%
版面保留率保留的结构元素 / 原始结构元素> 90%
OCR 错误率错误字符数 / 总字符数< 2%

常见问题与解决方案

最佳实践

1. 选择合适的解析策略

parser_selector.py
def select_parser(file_path: str, requirements: dict) -> DocumentParser:
    """
    根据需求选择解析器
    
    Args:
        requirements: {
            "accuracy": "high" / "medium" / "low",
            "speed": "fast" / "medium" / "slow",
            "multimodal": bool,
            "preserve_structure": bool
        }
    """
    file_ext = file_path.split('.')[-1].lower()
    
    # 策略1:纯文本文档 + 速度优先 → Unstructured (fast)
    if file_ext in ['txt', 'md', 'docx'] and requirements['speed'] == 'fast':
        return UniversalDocumentParser(strategy="fast")
    
    # 策略2:复杂 PDF + 高准确率 → ragflow DeepDoc
    if file_ext == 'pdf' and requirements['accuracy'] == 'high':
        return DeepDocParser(
            enable_formula_ocr=True,
            enable_image_caption=True
        )
    
    # 策略3:多模态内容 → RAG-Anything
    if requirements['multimodal']:
        return MultimodalDocumentParser()
    
    # 策略4:通用场景 → Unstructured (hi_res)
    return UniversalDocumentParser(strategy="hi_res")

2. 后处理优化

post_processing.py
def clean_parsed_text(text: str) -> str:
    """清理解析后的文本"""
    import re
    
    # 1. 移除多余空白
    text = re.sub(r'\s+', ' ', text)
    
    # 2. 修复常见 OCR 错误
    text = text.replace('l1', '11')  # OCR 常见错误:l1 → 11
    text = text.replace('O0', '00')  # O0 → 00
    
    # 3. 规范化标点
    text = text.replace(',', ',')
    text = text.replace('。', '.')
    
    return text.strip()

def merge_fragments(blocks: list[dict]) -> str:
    """合并文档片段(保留结构)"""
    merged = []
    
    for block in blocks:
        if block['type'] == 'Title':
            merged.append(f"\n## {block['content']}\n")
        elif block['type'] == 'Text':
            merged.append(block['content'])
        elif block['type'] == 'Table':
            merged.append(f"\n{block['content']}\n")
        elif block['type'] == 'ListItem':
            merged.append(f"- {block['content']}")
    
    return "\n".join(merged)

延伸阅读

参考文献

本文基于以下研究材料整理:

  • RAGSolutions/document_parsing_analysis.md - 文档解析技术分析
  • RAGSolutions/best_practices_recommendations.md - 最佳实践推荐
  • ragflow 官方文档 - 深度版面分析算法
  • Unstructured 官方文档 - 通用解析库

下一步:进入 分块策略与算法 了解如何将解析后的文本高质量分块。