Deeptoai RAG系列教程

性能优化实战

RAG 系统端到端性能优化:延迟、成本、吞吐量的全方位工程实践

为什么性能优化是 RAG 落地的关键

即使 RAG 系统在功能上完美,如果延迟超过 5 秒、成本过高或无法扩展到数千并发用户,它就无法在生产环境中生存。性能优化不是锦上添花,而是生死攸关的工程问题,直接决定了用户体验、商业可行性和系统稳定性。

性能优化实战

背景与优化目标

RAG 系统的性能瓶颈

一个典型的 RAG 请求链路:

用户查询 → [索引] → [检索] → [重排序] → [LLM生成] → 返回答案
           50ms      200ms      300ms       3000ms      = 3.55s

关键发现

  • LLM 生成占据 80%+ 延迟
  • 向量检索是第二大瓶颈(大规模数据集)
  • 重排序可能比检索还慢(复杂模型)
  • 索引和嵌入是成本黑洞(大量文档)

优化的三个维度

维度目标关键指标代价
延迟优化提升用户体验P50/P95/P99 延迟可能增加成本
成本优化降低运营成本$/1000 请求可能增加延迟
吞吐优化支持高并发QPS(每秒请求数)需要更多资源

核心权衡:延迟 ↔ 成本 ↔ 吞吐(无法同时最优)

九大项目性能方案全景

项目缓存策略异步架构批处理监控技术成熟度
onyx✅ 多层缓存✅ Celery 队列✅ 高级✅ Prometheus⭐⭐⭐⭐⭐(企业)
LightRAG✅ 增量缓存✅ 异步 I/O基础⭐⭐⭐⭐⭐
ragflow✅ Redis✅ 分布式⭐⭐⭐⭐⭐
kotaemon⭐⭐⭐⭐
Verba基础✅ FastAPI async基础基础⭐⭐⭐⭐
SurfSense✅ 浏览器缓存基础基础⭐⭐⭐
Self-Corrective-Agentic-RAG⭐⭐⭐
UltraRAG基础基础基础⭐⭐
RAG-Anything✅ 继承 LightRAG基础⭐⭐⭐⭐⭐

关键洞察

  • 最全面:onyx 的企业级方案(多层缓存 + 异步队列 + 完整监控)
  • 最轻量:LightRAG 的增量缓存(只缓存变化部分)
  • 最实用:ragflow 的 Redis 缓存 + 分布式架构
  • 趋势:从单机优化向分布式 + 可观测性演进

延迟优化

1. 缓存策略(核心)

缓存命中率提升 10% = 延迟降低 30%+

多层缓存架构(onyx 方案)

onyx/multilayer_cache.py
from typing import Optional
import redis
from functools import lru_cache
import hashlib

class MultiLayerCache:
    """
    onyx 多层缓存架构
    
    层级(由快到慢):
    1. 内存缓存(进程内 LRU,最快,容量小)
    2. Redis 缓存(进程间共享,快,容量中)
    3. 数据库缓存(持久化,慢,容量大)
    
    典型命中率:L1 (30%) → L2 (50%) → L3 (15%) → Miss (5%)
    """
    
    def __init__(
        self,
        redis_client: redis.Redis,
        l1_size: int = 1000,  # 内存缓存大小
        l2_ttl: int = 3600,   # Redis 过期时间(秒)
        l3_ttl: int = 86400,  # 数据库过期时间(秒)
    ):
        self.redis = redis_client
        self.l1_size = l1_size
        self.l2_ttl = l2_ttl
        self.l3_ttl = l3_ttl
        
        # 统计指标
        self.stats = {
            "l1_hits": 0,
            "l2_hits": 0,
            "l3_hits": 0,
            "misses": 0
        }
    
    def _cache_key(self, query: str, context_hash: str = None) -> str:
        """生成缓存键"""
        key_data = f"{query}:{context_hash or 'global'}"
        return f"rag:cache:{hashlib.md5(key_data.encode()).hexdigest()}"
    
    @lru_cache(maxsize=1000)  # L1: 内存缓存(装饰器实现)
    def _l1_get(self, cache_key: str) -> Optional[str]:
        """L1 缓存(最快,但仅限当前进程)"""
        # LRU cache 自动处理
        return None
    
    def get(self, query: str, context_hash: str = None) -> Optional[dict]:
        """
        多层缓存查询
        
        Returns:
            {
                "answer": str,
                "sources": list,
                "cache_hit": "l1" / "l2" / "l3" / None
            }
        """
        cache_key = self._cache_key(query, context_hash)
        
        # 1. 尝试 L1(内存)
        try:
            result = self._l1_get(cache_key)
            if result:
                self.stats["l1_hits"] += 1
                return {"cache_hit": "l1", **result}
        except:
            pass
        
        # 2. 尝试 L2(Redis)
        try:
            result = self.redis.get(cache_key)
            if result:
                self.stats["l2_hits"] += 1
                result = json.loads(result)
                
                # 回填到 L1
                self._l1_set(cache_key, result)
                
                return {"cache_hit": "l2", **result}
        except Exception as e:
            print(f"L2 cache error: {e}")
        
        # 3. 尝试 L3(数据库,略)
        # result = self.db.query(cache_key)
        # if result:
        #     self.stats["l3_hits"] += 1
        #     return {"cache_hit": "l3", **result}
        
        # 4. 缓存未命中
        self.stats["misses"] += 1
        return None
    
    def set(
        self,
        query: str,
        answer: dict,
        context_hash: str = None
    ):
        """
        写入多层缓存
        
        策略:
        - L1: 立即写入
        - L2: 异步写入(不阻塞)
        - L3: 批量写入(定期刷新)
        """
        cache_key = self._cache_key(query, context_hash)
        
        # 1. 写入 L1(内存)
        self._l1_set(cache_key, answer)
        
        # 2. 异步写入 L2(Redis)
        try:
            self.redis.setex(
                cache_key,
                self.l2_ttl,
                json.dumps(answer)
            )
        except Exception as e:
            print(f"L2 cache write error: {e}")
        
        # 3. 批量写入 L3(数据库,略)
        # self._schedule_l3_write(cache_key, answer)
    
    def _l1_set(self, cache_key: str, value: dict):
        """更新 L1 缓存(通过重新调用 _l1_get 触发 LRU)"""
        # Python 的 lru_cache 不支持手动设置,需自定义实现
        # 这里简化处理
        pass
    
    def get_stats(self) -> dict:
        """获取缓存统计"""
        total = sum(self.stats.values())
        if total == 0:
            return {"hit_rate": 0.0}
        
        hit_rate = (total - self.stats["misses"]) / total
        
        return {
            "total_requests": total,
            "hit_rate": hit_rate,
            "l1_hit_rate": self.stats["l1_hits"] / total,
            "l2_hit_rate": self.stats["l2_hits"] / total,
            "l3_hit_rate": self.stats["l3_hits"] / total,
            "miss_rate": self.stats["misses"] / total
        }

# 使用示例
import redis

redis_client = redis.Redis(host="localhost", port=6379, db=0)
cache = MultiLayerCache(redis_client)

# 查询
result = cache.get(query="What is RAG?", context_hash="doc_v1")
if result is None:
    # 缓存未命中,执行 RAG
    answer = execute_rag_pipeline(query)
    
    # 写入缓存
    cache.set(query, answer, context_hash="doc_v1")
else:
    # 缓存命中
    print(f"Cache hit: {result['cache_hit']}")
    answer = result

# 查看统计
stats = cache.get_stats()
print(f"Cache hit rate: {stats['hit_rate']:.2%}")

优化效果

  • ✅ L1 命中延迟 < 1ms
  • ✅ L2 命中延迟 < 10ms
  • ✅ L3 命中延迟 < 100ms
  • ✅ 整体缓存命中率可达 80%+

语义缓存(智能缓存)

传统缓存基于精确匹配,语义缓存基于相似度匹配:

semantic_cache.py
class SemanticCache:
    """
    语义缓存:相似查询复用缓存
    
    示例:
    - "What is RAG?" → 缓存
    - "Can you explain RAG?" → 命中(相似度 > 阈值)
    """
    
    def __init__(
        self,
        embedding_model,
        redis_client,
        similarity_threshold: float = 0.85
    ):
        self.embedding_model = embedding_model
        self.redis = redis_client
        self.similarity_threshold = similarity_threshold
    
    async def get(self, query: str) -> Optional[dict]:
        """查找语义相似的缓存"""
        # 1. 生成查询 embedding
        query_emb = await self.embedding_model.embed([query])
        query_emb = query_emb[0]
        
        # 2. 从 Redis 获取所有缓存的查询 embedding
        cached_queries = self.redis.hgetall("semantic_cache:queries")
        
        # 3. 计算相似度,找到最相似的
        max_similarity = 0
        best_match_key = None
        
        for cache_key, cached_emb_bytes in cached_queries.items():
            cached_emb = np.frombuffer(cached_emb_bytes, dtype=np.float32)
            
            # 余弦相似度
            similarity = np.dot(query_emb, cached_emb) / (
                np.linalg.norm(query_emb) * np.linalg.norm(cached_emb)
            )
            
            if similarity > max_similarity:
                max_similarity = similarity
                best_match_key = cache_key
        
        # 4. 判断是否命中
        if max_similarity >= self.similarity_threshold:
            # 从 Redis 获取缓存的答案
            answer = self.redis.get(f"semantic_cache:answer:{best_match_key}")
            if answer:
                return {
                    "answer": json.loads(answer),
                    "cache_hit": True,
                    "similarity": max_similarity,
                    "cached_query": best_match_key.decode()
                }
        
        return None
    
    async def set(self, query: str, answer: dict):
        """存储查询和答案"""
        # 1. 生成查询 embedding
        query_emb = await self.embedding_model.embed([query])
        query_emb = query_emb[0]
        
        # 2. 存储 embedding
        cache_key = hashlib.md5(query.encode()).hexdigest()
        self.redis.hset(
            "semantic_cache:queries",
            cache_key,
            query_emb.tobytes()
        )
        
        # 3. 存储答案
        self.redis.setex(
            f"semantic_cache:answer:{cache_key}",
            3600,  # 1小时过期
            json.dumps(answer)
        )

# 使用示例
semantic_cache = SemanticCache(
    embedding_model=my_embedding_model,
    redis_client=redis_client,
    similarity_threshold=0.85
)

# 查询1
result = await semantic_cache.get("What is RAG?")
# 缓存未命中,执行 RAG
answer = await execute_rag("What is RAG?")
await semantic_cache.set("What is RAG?", answer)

# 查询2(相似查询)
result = await semantic_cache.get("Can you explain RAG?")
# 缓存命中!similarity = 0.92

优势

  • ✅ 命中率提升 20-30%(相似查询复用)
  • ✅ 减少 LLM 调用(最昂贵的部分)

成本

  • ❌ 需要额外 embedding 计算(但比 LLM 便宜得多)
  • ❌ Redis 存储成本略增

2. 并行化(Async/Await)

串行 vs 并行

parallel_optimization.py
import asyncio
import time

# ❌ 串行实现(慢)
def serial_rag(query: str):
    # 1. 检索(200ms)
    docs = retriever.retrieve(query)
    
    # 2. 重排序(300ms)
    ranked_docs = reranker.rerank(query, docs)
    
    # 3. LLM 生成(3000ms)
    answer = llm.generate(query, ranked_docs)
    
    return answer  # 总计:3500ms

# ✅ 并行实现(快)
async def parallel_rag(query: str):
    # 1. 并行检索多个索引
    dense_task = asyncio.create_task(dense_retriever.retrieve(query))
    sparse_task = asyncio.create_task(sparse_retriever.retrieve(query))
    
    dense_docs, sparse_docs = await asyncio.gather(dense_task, sparse_task)
    # 时间:max(100ms, 120ms) = 120ms(而非 220ms)
    
    # 2. 合并并重排序
    all_docs = dense_docs + sparse_docs
    ranked_docs = await reranker.rerank(query, all_docs)
    # 时间:300ms
    
    # 3. LLM 生成(无法并行)
    answer = await llm.generate(query, ranked_docs)
    # 时间:3000ms
    
    return answer  # 总计:3420ms(节省 80ms)

# 更激进的并行化(风险:质量可能略降)
async def aggressive_parallel_rag(query: str):
    # 同时检索 + 生成初步回答(基于缓存的相似查询)
    retrieval_task = asyncio.create_task(full_retrieval_pipeline(query))
    fast_answer_task = asyncio.create_task(get_cached_similar_answer(query))
    
    # 如果缓存命中且相似度高,直接返回
    fast_answer = await fast_answer_task
    if fast_answer and fast_answer["similarity"] > 0.9:
        retrieval_task.cancel()  # 取消检索任务
        return fast_answer["answer"]
    
    # 否则等待完整检索
    docs = await retrieval_task
    answer = await llm.generate(query, docs)
    return answer

优化效果

  • 并行检索:节省 30-50% 检索时间
  • 推测执行:命中缓存时节省 80%+ 总时间

3. 流式输出(降低首字延迟)

streaming_optimization.py
# ❌ 非流式(用户等待 3s)
def non_streaming_rag(query: str):
    docs = retrieve(query)  # 500ms
    answer = llm.generate(query, docs)  # 3000ms
    return answer  # 用户等待 3500ms 后才看到第一个字

# ✅ 流式(用户等待 500ms)
async def streaming_rag(query: str):
    # 1. 快速检索
    docs = await retrieve(query)  # 500ms
    
    # 2. 立即开始流式生成
    async for token in llm.generate_stream(query, docs):
        yield token  # 用户在 500ms 后就开始看到输出
    
    # 首字延迟:500ms(减少 83%)

用户体验提升

  • 首字延迟从 3.5s → 0.5s
  • 感知速度提升 7x

成本优化

1. 模型降级策略

model_tiering.py
class AdaptiveModelSelector:
    """
    自适应模型选择器
    
    策略:
    - 简单查询 → 小模型(GPT-3.5, $0.002/1K tokens)
    - 复杂查询 → 大模型(GPT-4, $0.03/1K tokens)
    
    成本节省:40-60%(大部分查询用小模型)
    """
    
    def __init__(self):
        self.simple_model = "gpt-3.5-turbo"
        self.complex_model = "gpt-4"
        
        # 复杂度分类器(可用 ML 模型)
        self.classifier = self._init_classifier()
    
    def _init_classifier(self):
        """初始化查询复杂度分类器"""
        # 简化版:基于规则
        def classify(query: str) -> str:
            query_lower = query.lower()
            
            # 复杂查询特征
            complex_keywords = [
                "compare", "analyze", "explain in detail",
                "pros and cons", "step by step",
                "对比", "分析", "详细说明"
            ]
            
            if any(kw in query_lower for kw in complex_keywords):
                return "complex"
            
            if len(query.split()) > 20:  # 长查询
                return "complex"
            
            return "simple"
        
        return classify
    
    async def generate(self, query: str, context: list[str]) -> dict:
        """自适应生成"""
        complexity = self.classifier(query)
        
        if complexity == "simple":
            model = self.simple_model
            cost_per_1k_tokens = 0.002
        else:
            model = self.complex_model
            cost_per_1k_tokens = 0.03
        
        # 生成
        response = await openai.ChatCompletion.create(
            model=model,
            messages=[
                {"role": "system", "content": "..."},
                {"role": "user", "content": f"{context}\n\n{query}"}
            ]
        )
        
        # 计算成本
        total_tokens = response.usage.total_tokens
        cost = (total_tokens / 1000) * cost_per_1k_tokens
        
        return {
            "answer": response.choices[0].message.content,
            "model": model,
            "tokens": total_tokens,
            "cost": cost
        }

# 成本对比
# 假设 1000 个查询,70% 简单,30% 复杂

# 方案1:全部用 GPT-4
# 成本 = 1000 * 500 tokens * $0.03 / 1000 = $15

# 方案2:自适应模型选择
# 成本 = (700 * 500 * $0.002 + 300 * 500 * $0.03) / 1000 = $5.2
# 节省:65%

2. 上下文压缩(减少 Token)

context_compression.py
class TokenOptimizer:
    """Token 优化器(降低 LLM 成本)"""
    
    def compress_context(
        self,
        query: str,
        contexts: list[str],
        max_tokens: int = 2000
    ) -> str:
        """
        压缩上下文到指定 token 限制
        
        策略:
        1. 提取与查询最相关的句子
        2. 去除冗余信息
        3. 截断到 token 限制
        """
        # 1. 分句并打分
        sentences_with_scores = []
        for ctx in contexts:
            sentences = self._split_sentences(ctx)
            for sent in sentences:
                score = self._relevance_score(query, sent)
                sentences_with_scores.append((sent, score))
        
        # 2. 按相关性排序
        sentences_with_scores.sort(key=lambda x: x[1], reverse=True)
        
        # 3. 贪心选择句子直到达到 token 限制
        compressed = []
        total_tokens = 0
        
        for sent, score in sentences_with_scores:
            sent_tokens = self._count_tokens(sent)
            
            if total_tokens + sent_tokens <= max_tokens:
                compressed.append(sent)
                total_tokens += sent_tokens
            else:
                break
        
        # 4. 重新排序(保持逻辑顺序)
        # compressed = self._reorder_by_original_order(compressed)
        
        return " ".join(compressed)
    
    def _relevance_score(self, query: str, sentence: str) -> float:
        """计算句子与查询的相关性(简化版)"""
        query_words = set(query.lower().split())
        sent_words = set(sentence.lower().split())
        
        # Jaccard 相似度
        intersection = query_words & sent_words
        union = query_words | sent_words
        
        return len(intersection) / len(union) if union else 0
    
    def _count_tokens(self, text: str) -> int:
        """估算 token 数(粗略:1 token ≈ 0.75 words)"""
        return int(len(text.split()) * 1.33)

# 成本对比
optimizer = TokenOptimizer()

# 原始上下文:5000 tokens
original_context = "\n\n".join(long_documents)

# 压缩后:2000 tokens
compressed_context = optimizer.compress_context(query, long_documents, max_tokens=2000)

# 成本节省:(5000 - 2000) / 5000 = 60%

3. 批处理(Batch Processing)

batch_processing.py
class BatchProcessor:
    """批处理优化器(降低 API 调用次数)"""
    
    def __init__(self, batch_size: int = 20, wait_time: float = 0.5):
        self.batch_size = batch_size
        self.wait_time = wait_time
        self.queue = []
        self.processing = False
    
    async def embed_with_batching(self, texts: list[str]) -> list[np.ndarray]:
        """批处理 embedding(减少 API 调用)"""
        # 将文本加入队列
        self.queue.extend(texts)
        
        # 等待凑够一批或超时
        await asyncio.sleep(self.wait_time)
        
        if not self.processing and len(self.queue) >= self.batch_size:
            self.processing = True
            
            # 批量调用 embedding API
            batch = self.queue[:self.batch_size]
            embeddings = await embedding_api.embed(batch)
            
            self.queue = self.queue[self.batch_size:]
            self.processing = False
            
            return embeddings
        
        # 单独处理(队列不足)
        return await embedding_api.embed(texts)

# 成本对比
# 假设 embedding API 有固定延迟(50ms)+ 可变延迟(10ms/text)

# 方案1:逐个调用(100 个文本)
# 时间 = 100 * (50ms + 10ms) = 6000ms

# 方案2:批处理(batch_size=20)
# 时间 = 5 * (50ms + 20 * 10ms) = 1250ms
# 节省:79%

吞吐优化

1. 异步队列架构(onyx 方案)

onyx/async_queue.py
from celery import Celery
import redis

# Celery 配置(分布式任务队列)
celery_app = Celery(
    'rag_tasks',
    broker='redis://localhost:6379/0',
    backend='redis://localhost:6379/1'
)

@celery_app.task
def index_document_task(doc_id: str, content: str):
    """
    异步索引任务
    
    优势:
    - 不阻塞 API 响应
    - 可水平扩展 worker
    - 自动重试失败任务
    """
    try:
        # 1. 分块
        chunks = chunker.split(content)
        
        # 2. 生成 embedding
        embeddings = embedding_model.embed([c.text for c in chunks])
        
        # 3. 存储到向量数据库
        vector_db.insert(
            ids=[c.id for c in chunks],
            embeddings=embeddings,
            metadata=[c.metadata for c in chunks]
        )
        
        return {"status": "success", "chunks": len(chunks)}
    
    except Exception as e:
        # 自动重试
        raise self.retry(exc=e, countdown=60, max_retries=3)

# FastAPI 端点(非阻塞)
from fastapi import FastAPI

app = FastAPI()

@app.post("/documents/index")
async def index_document(doc_id: str, content: str):
    """非阻塞索引端点"""
    # 提交任务到队列
    task = index_document_task.delay(doc_id, content)
    
    # 立即返回(不等待完成)
    return {
        "task_id": task.id,
        "status": "queued",
        "message": "Document indexing started"
    }

@app.get("/tasks/{task_id}")
async def get_task_status(task_id: str):
    """查询任务状态"""
    task = celery_app.AsyncResult(task_id)
    
    return {
        "task_id": task_id,
        "status": task.state,  # PENDING / STARTED / SUCCESS / FAILURE
        "result": task.result if task.ready() else None
    }

# 部署架构
"""
[用户] → [API Server 1] ┐
                        ├→ [Redis Queue] → [Worker 1]
[用户] → [API Server 2] ┘                  [Worker 2]
                                           [Worker 3]

扩展方式:
1. 增加 API Server(处理更多并发请求)
2. 增加 Worker(处理更多后台任务)
3. Redis 集群(避免单点故障)
"""

吞吐提升

  • API 响应时间:从 5s → 50ms(异步化)
  • 支持并发:从 10 QPS → 1000 QPS(水平扩展)

2. 连接池优化

connection_pool.py
from qdrant_client import QdrantClient
from openai import AsyncOpenAI

# ❌ 每次请求新建连接(慢)
def bad_query(query: str):
    client = QdrantClient(host="localhost", port=6333)  # 新建连接(100ms)
    results = client.search(...)
    return results

# ✅ 连接池复用(快)
class RAGService:
    def __init__(self):
        # 初始化时创建连接池
        self.vector_db = QdrantClient(
            host="localhost",
            port=6333,
            timeout=30,
            # 连接池配置
            grpc_options={
                "grpc.max_send_message_length": 100 * 1024 * 1024,
                "grpc.max_receive_message_length": 100 * 1024 * 1024,
                "grpc.keepalive_time_ms": 10000,
            }
        )
        
        self.llm = AsyncOpenAI(
            max_retries=3,
            timeout=30.0
        )
    
    async def query(self, query: str):
        # 复用连接(无额外延迟)
        results = self.vector_db.search(...)
        answer = await self.llm.chat.completions.create(...)
        return answer

# 全局单例
rag_service = RAGService()

@app.post("/query")
async def handle_query(query: str):
    return await rag_service.query(query)

监控与可观测性

完整监控方案(onyx 级别)

monitoring.py
from prometheus_client import Counter, Histogram, Gauge
import time

# Prometheus 指标
request_count = Counter(
    'rag_requests_total',
    'Total RAG requests',
    ['status', 'model']
)

request_latency = Histogram(
    'rag_request_duration_seconds',
    'RAG request latency',
    ['stage'],  # retrieval / rerank / generation
    buckets=[0.1, 0.5, 1.0, 2.0, 5.0, 10.0]
)

cache_hit_rate = Gauge(
    'rag_cache_hit_rate',
    'Cache hit rate'
)

cost_per_request = Histogram(
    'rag_cost_per_request_dollars',
    'Cost per request in dollars'
)

class InstrumentedRAGPipeline:
    """带监控的 RAG Pipeline"""
    
    async def query(self, query: str) -> dict:
        start_time = time.time()
        
        try:
            # 1. 检索(监控)
            retrieval_start = time.time()
            docs = await self.retrieve(query)
            request_latency.labels(stage='retrieval').observe(
                time.time() - retrieval_start
            )
            
            # 2. 重排序(监控)
            rerank_start = time.time()
            ranked_docs = await self.rerank(query, docs)
            request_latency.labels(stage='rerank').observe(
                time.time() - rerank_start
            )
            
            # 3. 生成(监控)
            generation_start = time.time()
            answer = await self.generate(query, ranked_docs)
            request_latency.labels(stage='generation').observe(
                time.time() - generation_start
            )
            
            # 4. 记录成本
            cost = self._calculate_cost(answer)
            cost_per_request.observe(cost)
            
            # 5. 记录成功
            request_count.labels(status='success', model=answer['model']).inc()
            
            return answer
        
        except Exception as e:
            # 记录失败
            request_count.labels(status='error', model='unknown').inc()
            raise
        
        finally:
            # 记录总延迟
            total_latency = time.time() - start_time
            request_latency.labels(stage='total').observe(total_latency)

# Grafana Dashboard 配置示例
"""
仪表盘面板:

1. QPS(每秒请求数)
   - 查询:rate(rag_requests_total[1m])

2. P95 延迟
   - 查询:histogram_quantile(0.95, rag_request_duration_seconds)

3. 缓存命中率
   - 查询:rag_cache_hit_rate

4. 每个阶段延迟分布
   - 查询:rag_request_duration_seconds{stage=~"retrieval|rerank|generation"}

5. 成本监控
   - 查询:sum(rate(rag_cost_per_request_dollars_sum[1h])) * 3600

6. 错误率
   - 查询:rate(rag_requests_total{status="error"}[5m]) / rate(rag_requests_total[5m])
"""

性能基准与优化清单

延迟基准(目标)

阶段基准优化后优化方法
检索200ms50ms向量索引优化 + 并行检索
重排序300ms100ms批处理 + 模型量化
生成3000ms500ms (首字)流式输出 + 缓存
总延迟3500ms1000ms综合优化

成本基准(每 1000 请求)

项目基准成本优化后节省
Embedding$2$0.575% (缓存)
LLM 生成$15$567% (模型降级 + 压缩)
向量检索$0.5$0.5-
总成本$17.5$666%

优化清单

延伸阅读

参考文献

  • Qdrant Docs - Performance tuning and HNSW parameters
  • FAISS Wiki/Papers - IVF/IVFPQ/Scalar Quantization
  • Milvus Docs - Index types and performance tuning
  • Vespa Docs - Ranking profiles and hybrid search
  • OpenAI/Anthropic best practices - Streaming, batching, prompt compression

**完成!**你已掌握 RAG Pipeline 节点的深度剖析。下一步进入 数据处理与索引策略