Deeptoai RAG系列教程

企业知识库 RAG

多租户、权限控制、审计合规的企业级 RAG 完整方案

企业知识库 RAG 的核心挑战

企业级 RAG 不仅是技术问题,更是业务与合规的综合挑战:多部门权限隔离、敏感数据保护、审计追溯、高可用性保证。本文从 onyx 等企业级项目提炼最佳实践,提供从 0 到 1 的完整落地方案。

企业知识库 RAG

企业场景核心需求

典型应用场景

场景核心需求典型用户数据规模
内部知识库权限控制、全文检索全体员工10K-1M 文档
技术文档助手代码/API 检索、精确性工程师1K-100K 文档
客服支持系统快速响应、多语言客服团队10K-500K 文档
合规文档查询审计日志、版本管理法务/合规1K-50K 文档
销售辅助工具实时更新、移动端销售团队5K-100K 文档

企业级 vs 个人级 RAG

维度个人级企业级
用户单用户多租户(部门/团队隔离)
权限RBAC(角色/组/用户三级)
安全基础加密存储 + 传输、审计日志
规模千级文档百万级文档 + 高并发
可用性单机99.9%+ SLA(分布式)
合规GDPR/SOC2/ISO27001
集成独立SSO、AD/LDAP、API

项目映射与选型理由

如何选型

按“合规/权限/可运维”优先级排序:优先 onyx;对检索质量敏感且资源有限,选 SurfSense;需要多模态解析作为输入,搭配 ragflow;希望快速打底或做 PoC,用 LightRAG。

  • onyx(推荐优先)

    • 为何适配:内置多租户/RBAC/审计与可观测性,支持 Slack/Web/HTTP 接入,契合企业知识库权限与合规场景。
    • 关键能力:LangGraph 工作流、Vespa 混合检索(向量+BM25)、两阶段排序、审计留痕、滚动发布策略。
    • 不适用:极端轻量/单机、无权限诉求的个人或小团队场景。
    • 深入阅读:onyx 深度解析
    • 快速落地:
      1. 选定租户隔离级别与 RBAC 策略;2) 部署 Vespa 与 Ranking Profile;3) 按部门/项目侧重调 alpha、time_decay;4) 打通 SSO/Slack。
  • SurfSense

    • 为何适配:在 Postgres 里做向量+BM25 融合(RRF),CPU 级重排(FlashRank)即可达到不错效果,易维护、成本可控。
    • 关键能力:SQL CTE + RRF 融合、可解释的检索逻辑、较低运维复杂度。
    • 不适用:需要强权限/审计或多租户隔离的严格合规环境。
    • 深入阅读:SurfSense 深度解析
    • 快速落地:
      1. 导入文档与 BM25 索引;2) 向量表与文本表建联;3) 调 RRF k、权重与 topN;4) 评估 P95 延迟与 NDCG@10。
  • LightRAG

    • 为何适配:通用基座,参数网格与多检索策略便于做 baseline 与快速验证,后续可迁移到 onyx。
    • 关键能力:Token 感知分块/重叠、多策略检索、灵活后端。
    • 不适用:强合规/强权限需求(需额外补齐中间件)。
    • 深入阅读:LightRAG 深度解析
    • 快速落地:
      1. 选 embedding/重叠/chunk 尺寸;2) 试 naive/local/global/hybrid;3) 固化最佳参数到生产。
  • ragflow(作为解析前置)

    • 为何适配:PDF/扫描件/表格/公式占比高的企业资料库,先用 ragflow 做高质量解析再入库检索。
    • 关键能力:版面/表格/公式识别、阅读顺序恢复、ColPali 融合。
    • 深入阅读:ragflow 深度解析

其他候选:RAG-Anything(多模态解析更强)、kotaemon(文档管理与图谱可视化)、Verba(轻集成)、Self-Corrective-Agentic-RAG(自纠正链路)、UltraRAG(实验/研究加速)。

onyx 企业级架构深度解析

系统架构

核心特性实现

1. 多租户隔离

multi_tenancy.py
from enum import Enum
from dataclasses import dataclass
from typing import List, Optional

class TenantIsolationLevel(Enum):
    """租户隔离级别"""
    SHARED = "shared"  # 共享基础设施
    PARTITIONED = "partitioned"  # 分区(独立 collection)
    DEDICATED = "dedicated"  # 独立实例

@dataclass
class Tenant:
    """租户模型"""
    tenant_id: str
    name: str
    isolation_level: TenantIsolationLevel
    max_documents: int = 100000
    max_users: int = 100
    enabled: bool = True
    
    # 配置
    embedding_model: str = "openai/text-embedding-3-large"
    llm_model: str = "gpt-4"
    custom_prompt: Optional[str] = None

class MultiTenantRAGService:
    """多租户 RAG 服务"""
    
    def __init__(self):
        self.vector_stores = {}  # {tenant_id: VectorStore}
        self.tenants = {}  # {tenant_id: Tenant}
    
    def add_tenant(self, tenant: Tenant):
        """添加租户"""
        # 1. 创建独立的向量集合
        if tenant.isolation_level == TenantIsolationLevel.PARTITIONED:
            collection_name = f"tenant_{tenant.tenant_id}"
            self.vector_stores[tenant.tenant_id] = QdrantClient()
            self.vector_stores[tenant.tenant_id].create_collection(
                collection_name=collection_name,
                vectors_config=VectorParams(size=3072, distance=Distance.COSINE)
            )
        
        self.tenants[tenant.tenant_id] = tenant
    
    async def query(
        self,
        tenant_id: str,
        query: str,
        user_id: str,
        user_groups: List[str]
    ) -> dict:
        """
        多租户查询
        
        安全检查:
        1. 租户隔离
        2. 用户权限
        3. 审计日志
        """
        # 1. 验证租户
        tenant = self.tenants.get(tenant_id)
        if not tenant or not tenant.enabled:
            raise PermissionError("Tenant not found or disabled")
        
        # 2. 获取租户的向量存储
        vector_store = self.vector_stores[tenant_id]
        
        # 3. 检索(带权限过滤)
        results = vector_store.search(
            query_vector=self.embed(query),
            filter={
                "tenant_id": tenant_id,  # 租户隔离
                "$or": [
                    {"access_level": 0},  # PUBLIC
                    {"owner": user_id},
                    {"allowed_groups": {"$in": user_groups}}
                ]
            }
        )
        
        # 4. 生成答案
        answer = await self.generate(
            query=query,
            context=results,
            model=tenant.llm_model,
            system_prompt=tenant.custom_prompt
        )
        
        # 5. 记录审计日志
        await self.log_access(
            tenant_id=tenant_id,
            user_id=user_id,
            query=query,
            doc_ids=[r['id'] for r in results]
        )
        
        return {"answer": answer, "sources": results}

2. 细粒度权限控制(RBAC)

rbac.py
from enum import Enum
from typing import List, Set

class Permission(Enum):
    """权限枚举"""
    READ = "read"
    WRITE = "write"
    DELETE = "delete"
    SHARE = "share"
    ADMIN = "admin"

class Role:
    """角色"""
    def __init__(self, name: str, permissions: Set[Permission]):
        self.name = name
        self.permissions = permissions

# 预定义角色
ROLES = {
    "viewer": Role("Viewer", {Permission.READ}),
    "editor": Role("Editor", {Permission.READ, Permission.WRITE}),
    "admin": Role("Admin", {Permission.READ, Permission.WRITE, 
                           Permission.DELETE, Permission.SHARE, Permission.ADMIN})
}

class RBACManager:
    """权限管理器"""
    
    def __init__(self):
        self.user_roles = {}  # {user_id: [roles]}
        self.user_groups = {}  # {user_id: [groups]}
        self.group_roles = {}  # {group_id: [roles]}
    
    def assign_role(self, user_id: str, role_name: str):
        """分配角色给用户"""
        if user_id not in self.user_roles:
            self.user_roles[user_id] = []
        self.user_roles[user_id].append(role_name)
    
    def add_user_to_group(self, user_id: str, group_id: str):
        """添加用户到组"""
        if user_id not in self.user_groups:
            self.user_groups[user_id] = []
        self.user_groups[user_id].append(group_id)
    
    def has_permission(
        self,
        user_id: str,
        permission: Permission,
        resource_owner: str = None,
        resource_groups: List[str] = None
    ) -> bool:
        """
        检查用户权限
        
        规则:
        1. 资源所有者有所有权限
        2. 用户角色权限
        3. 用户组角色权限
        """
        # 1. Owner check
        if resource_owner and resource_owner == user_id:
            return True
        
        # 2. 用户直接角色
        user_roles = self.user_roles.get(user_id, [])
        for role_name in user_roles:
            role = ROLES.get(role_name)
            if role and permission in role.permissions:
                return True
        
        # 3. 用户组角色
        user_groups = self.user_groups.get(user_id, [])
        if resource_groups:
            # 检查用户是否在资源允许的组中
            if any(g in resource_groups for g in user_groups):
                # 检查组的角色权限
                for group_id in user_groups:
                    group_roles = self.group_roles.get(group_id, [])
                    for role_name in group_roles:
                        role = ROLES.get(role_name)
                        if role and permission in role.permissions:
                            return True
        
        return False

# 使用示例
rbac = RBACManager()

# 配置权限
rbac.assign_role("user_123", "editor")
rbac.add_user_to_group("user_123", "team_ai")
rbac.group_roles["team_ai"] = ["editor"]

# 检查权限
can_read = rbac.has_permission(
    user_id="user_123",
    permission=Permission.READ,
    resource_groups=["team_ai"]
)  # True

can_delete = rbac.has_permission(
    user_id="user_123",
    permission=Permission.DELETE
)  # False (editor 没有 delete 权限)

3. 审计日志与合规

audit_logging.py
from datetime import datetime
from typing import Optional, Dict
import json

class AuditLog:
    """审计日志"""
    
    def __init__(self, log_store):
        self.log_store = log_store  # PostgreSQL / Elasticsearch
    
    async def log_query(
        self,
        tenant_id: str,
        user_id: str,
        query: str,
        results_count: int,
        response_time_ms: float,
        success: bool,
        error: Optional[str] = None
    ):
        """记录查询日志"""
        log_entry = {
            "event_type": "query",
            "timestamp": datetime.utcnow().isoformat(),
            "tenant_id": tenant_id,
            "user_id": user_id,
            "query": query,
            "results_count": results_count,
            "response_time_ms": response_time_ms,
            "success": success,
            "error": error,
            "ip_address": self._get_client_ip(),
            "user_agent": self._get_user_agent()
        }
        
        await self.log_store.insert("audit_logs", log_entry)
    
    async def log_document_access(
        self,
        tenant_id: str,
        user_id: str,
        document_ids: list[str],
        action: str  # view / download / delete
    ):
        """记录文档访问日志(合规要求)"""
        for doc_id in document_ids:
            log_entry = {
                "event_type": "document_access",
                "timestamp": datetime.utcnow().isoformat(),
                "tenant_id": tenant_id,
                "user_id": user_id,
                "document_id": doc_id,
                "action": action,
                "ip_address": self._get_client_ip()
            }
            
            await self.log_store.insert("audit_logs", log_entry)
    
    async def log_permission_change(
        self,
        tenant_id: str,
        admin_user_id: str,
        target_user_id: str,
        old_permissions: Dict,
        new_permissions: Dict
    ):
        """记录权限变更"""
        log_entry = {
            "event_type": "permission_change",
            "timestamp": datetime.utcnow().isoformat(),
            "tenant_id": tenant_id,
            "admin_user_id": admin_user_id,
            "target_user_id": target_user_id,
            "old_permissions": json.dumps(old_permissions),
            "new_permissions": json.dumps(new_permissions)
        }
        
        await self.log_store.insert("audit_logs", log_entry)
    
    async def generate_compliance_report(
        self,
        tenant_id: str,
        start_date: str,
        end_date: str
    ) -> dict:
        """
        生成合规报告(GDPR/SOC2)
        
        包含:
        - 访问统计
        - 权限变更历史
        - 敏感数据访问记录
        """
        report = {
            "tenant_id": tenant_id,
            "period": f"{start_date} to {end_date}",
            "query_count": 0,
            "unique_users": set(),
            "document_accesses": 0,
            "permission_changes": 0,
            "failed_access_attempts": 0
        }
        
        # 统计查询
        logs = await self.log_store.query(
            table="audit_logs",
            filters={
                "tenant_id": tenant_id,
                "timestamp": {"$gte": start_date, "$lte": end_date}
            }
        )
        
        for log in logs:
            if log["event_type"] == "query":
                report["query_count"] += 1
                report["unique_users"].add(log["user_id"])
                if not log["success"]:
                    report["failed_access_attempts"] += 1
            
            elif log["event_type"] == "document_access":
                report["document_accesses"] += 1
            
            elif log["event_type"] == "permission_change":
                report["permission_changes"] += 1
        
        report["unique_users"] = len(report["unique_users"])
        
        return report

实战部署架构

单租户 vs 多租户部署

deployment_patterns.py
# 模式1:单租户(小规模)
"""
[Nginx] → [FastAPI] → [Qdrant] → [PostgreSQL]

    [Celery Worker]
"""

# 模式2:多租户共享(中等规模)
"""
            [Load Balancer]

    ┌─────────────┼─────────────┐
    ↓             ↓             ↓
[API-1]      [API-2]       [API-3]
    ↓             ↓             ↓
    └─────────────┼─────────────┘

         [Qdrant Cluster]
         (分 collection 隔离)

        [PostgreSQL Master]
              ↓       ↓
        [PG Replica] [PG Replica]
"""

# 模式3:多租户独立(大规模)
"""
[Tenant A] → [Dedicated Qdrant A] → [Dedicated DB A]
[Tenant B] → [Dedicated Qdrant B] → [Dedicated DB B]
[Tenant C] → [Shared Qdrant Pool] → [Shared DB Pool]
"""

Docker Compose 配置示例

docker-compose.yml
version: '3.8'

services:
  # API 服务
  api:
    image: my-rag-api:latest
    ports:
      - "8000:8000"
    environment:
      - DATABASE_URL=postgresql://user:pass@postgres:5432/rag
      - QDRANT_URL=http://qdrant:6333
      - REDIS_URL=redis://redis:6379
    depends_on:
      - postgres
      - qdrant
      - redis
    deploy:
      replicas: 3  # 3个实例负载均衡
  
  # 向量数据库
  qdrant:
    image: qdrant/qdrant:latest
    ports:
      - "6333:6333"
    volumes:
      - qdrant_data:/qdrant/storage
  
  # 关系数据库
  postgres:
    image: postgres:15
    environment:
      - POSTGRES_DB=rag
      - POSTGRES_USER=user
      - POSTGRES_PASSWORD=pass
    volumes:
      - postgres_data:/var/lib/postgresql/data
  
  # 缓存/队列
  redis:
    image: redis:7
    ports:
      - "6379:6379"
    volumes:
      - redis_data:/data
  
  # 后台任务
  celery_worker:
    image: my-rag-api:latest
    command: celery -A tasks worker -l info
    environment:
      - DATABASE_URL=postgresql://user:pass@postgres:5432/rag
      - REDIS_URL=redis://redis:6379
    depends_on:
      - postgres
      - redis
    deploy:
      replicas: 2

volumes:
  qdrant_data:
  postgres_data:
  redis_data:

企业集成

SSO 集成(SAML/OAuth)

sso_integration.py
from fastapi import FastAPI, Depends, HTTPException
from fastapi.security import OAuth2PasswordBearer
import jwt

app = FastAPI()
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")

class SSOAuthenticator:
    """SSO 认证器"""
    
    def __init__(self, saml_settings: dict):
        self.saml_settings = saml_settings
    
    async def authenticate(self, token: str) -> dict:
        """
        验证 SSO token
        
        支持:
        - SAML 2.0
        - OAuth 2.0 / OIDC
        - LDAP/AD
        """
        try:
            # 解码 JWT token
            payload = jwt.decode(
                token,
                self.saml_settings["public_key"],
                algorithms=["RS256"]
            )
            
            # 提取用户信息
            user_info = {
                "user_id": payload["sub"],
                "email": payload["email"],
                "name": payload["name"],
                "groups": payload.get("groups", []),
                "tenant_id": payload["tenant_id"]
            }
            
            return user_info
        
        except jwt.ExpiredSignatureError:
            raise HTTPException(status_code=401, detail="Token expired")
        except jwt.InvalidTokenError:
            raise HTTPException(status_code=401, detail="Invalid token")

# FastAPI 依赖注入
async def get_current_user(token: str = Depends(oauth2_scheme)) -> dict:
    """获取当前用户(从 SSO)"""
    authenticator = SSOAuthenticator(SAML_SETTINGS)
    return await authenticator.authenticate(token)

@app.post("/query")
async def query(
    query: str,
    current_user: dict = Depends(get_current_user)
):
    """受保护的查询端点"""
    return await rag_service.query(
        tenant_id=current_user["tenant_id"],
        user_id=current_user["user_id"],
        user_groups=current_user["groups"],
        query=query
    )

性能与成本优化

企业级性能指标

指标目标实现方法
查询延迟 P95< 2s缓存 + HNSW + 流式输出
并发 QPS> 100水平扩展 + 负载均衡
可用性99.9%多副本 + 健康检查
数据一致性最终一致异步索引 + 版本管理

成本优化策略

cost_optimization.py
# 1. 分层存储(热/温/冷数据)
class TieredStorage:
    """分层存储"""
    
    def route_query(self, query: str, user_context: dict) -> str:
        """根据查询路由到不同存储层"""
        
        # 热数据(最近3个月,高频访问)
        if self.is_recent_query(query):
            return "hot_tier"  # Qdrant in-memory
        
        # 温数据(3-12个月)
        elif self.is_medium_age(query):
            return "warm_tier"  # Qdrant on-disk
        
        # 冷数据(>12个月,低频)
        else:
            return "cold_tier"  # S3 + on-demand index

# 2. 模型降级(简单查询用小模型)
class ModelTiering:
    """模型分层"""
    
    def select_model(self, query: str) -> str:
        complexity = self.assess_complexity(query)
        
        if complexity < 0.3:
            return "gpt-3.5-turbo"  # $0.002/1K
        elif complexity < 0.7:
            return "gpt-4"  # $0.03/1K
        else:
            return "gpt-4-turbo"  # $0.01/1K

# 3. 缓存策略(语义缓存)
class EnterpriseCache:
    """企业级缓存"""
    
    def get_cached(self, query: str, tenant_id: str) -> Optional[dict]:
        """带租户隔离的语义缓存"""
        cache_key = f"{tenant_id}:{self.semantic_hash(query)}"
        return redis.get(cache_key)

最佳实践清单

实操清单

  • 权限与租户
    • 设计角色矩阵(viewer/editor/admin),按部门/项目建立组;开启最小权限原则
    • 确认隔离级别:shared/partitioned/dedicated;为每个 tenant 建独立前缀/collection
  • 解析与索引
    • 统一解析链:OCR/版面/表格/公式(若有)→ 分块(300-500 tokens, 10-20% overlap)→ embedding
    • 增量索引与回填:定时任务 + 变更捕获;失败重试与死信队列
  • 检索与排序
    • 启用混合检索(向量+BM25);设定 first-phase 命中数与 RRF/加权策略
    • 重排器选择与上限 topK;按业务近因加入时间衰减/部门加权
  • 安全与合规
    • 审计表分区与归档策略(≥12 个月);敏感字段脱敏;数据驻留域配置
    • 接入 SSO(OIDC/SAML/LDAP);API 鉴权策略
  • 观测与 SLO
    • 指标:Recall@5/NDCG@10/P95/P99/错误率;按 tenant 维度打点
    • 告警:延迟涨幅、QPS 异常、索引积压、权限拒绝突增

参数网格模板

enterprise_param_grid.yaml
retrieval:
  chunk_size: [300, 400, 500]
  overlap_ratio: [0.1, 0.2]
  embedding_model: ["bge-m3", "text-embedding-3-large"]
  top_k: [5, 8, 10]
  hybrid:
    alpha: [0.3, 0.5, 0.7]      # 向量:BM25 权重
    rrf_k: [30, 60]             # RRF 平滑参数
    first_phase_hits: [100, 200]
    time_decay: [0.5, 1.0, 2.0] # 指数系数
rerank:
  enabled: [true, false]
  model: ["flashrank-bge-large", "cohere-rerank-v3"]
  re_rank_top_k: [50, 100]
indexing:
  batch_size: [64, 128]
  concurrency: [2, 4, 8]
security:
  audit_retention_days: [180, 365]
  pii_redaction: ["none", "mask", "remove"]

延伸阅读

参考文献

  • onyx 官方文档 - 企业级架构
  • RAGSolutions/enterprise_deployment.md
  • Microsoft RAG Best Practices

下一步:了解 个人知识管理 RAG 的轻量化实现。