性能优化实战
RAG 系统端到端性能优化:延迟、成本、吞吐量的全方位工程实践
为什么性能优化是 RAG 落地的关键
即使 RAG 系统在功能上完美,如果延迟超过 5 秒、成本过高或无法扩展到数千并发用户,它就无法在生产环境中生存。性能优化不是锦上添花,而是生死攸关的工程问题,直接决定了用户体验、商业可行性和系统稳定性。
性能优化实战
背景与优化目标
RAG 系统的性能瓶颈
一个典型的 RAG 请求链路:
用户查询 → [索引] → [检索] → [重排序] → [LLM生成] → 返回答案
50ms 200ms 300ms 3000ms = 3.55s关键发现:
- LLM 生成占据 80%+ 延迟
- 向量检索是第二大瓶颈(大规模数据集)
- 重排序可能比检索还慢(复杂模型)
- 索引和嵌入是成本黑洞(大量文档)
优化的三个维度
| 维度 | 目标 | 关键指标 | 代价 |
|---|---|---|---|
| 延迟优化 | 提升用户体验 | P50/P95/P99 延迟 | 可能增加成本 |
| 成本优化 | 降低运营成本 | $/1000 请求 | 可能增加延迟 |
| 吞吐优化 | 支持高并发 | QPS(每秒请求数) | 需要更多资源 |
核心权衡:延迟 ↔ 成本 ↔ 吞吐(无法同时最优)
九大项目性能方案全景
| 项目 | 缓存策略 | 异步架构 | 批处理 | 监控 | 技术成熟度 |
|---|---|---|---|---|---|
| onyx | ✅ 多层缓存 | ✅ Celery 队列 | ✅ 高级 | ✅ Prometheus | ⭐⭐⭐⭐⭐(企业) |
| LightRAG | ✅ 增量缓存 | ✅ 异步 I/O | ✅ | 基础 | ⭐⭐⭐⭐⭐ |
| ragflow | ✅ Redis | ✅ 分布式 | ✅ | ✅ | ⭐⭐⭐⭐⭐ |
| kotaemon | ✅ | ✅ | ✅ | 中 | ⭐⭐⭐⭐ |
| Verba | 基础 | ✅ FastAPI async | 基础 | 基础 | ⭐⭐⭐⭐ |
| SurfSense | ✅ 浏览器缓存 | ✅ | 基础 | 基础 | ⭐⭐⭐ |
| Self-Corrective-Agentic-RAG | 无 | ✅ | 无 | 无 | ⭐⭐⭐ |
| UltraRAG | 基础 | 基础 | 基础 | 无 | ⭐⭐ |
| RAG-Anything | ✅ 继承 LightRAG | ✅ | ✅ | 基础 | ⭐⭐⭐⭐⭐ |
关键洞察
- 最全面:onyx 的企业级方案(多层缓存 + 异步队列 + 完整监控)
- 最轻量:LightRAG 的增量缓存(只缓存变化部分)
- 最实用:ragflow 的 Redis 缓存 + 分布式架构
- 趋势:从单机优化向分布式 + 可观测性演进
延迟优化
1. 缓存策略(核心)
缓存命中率提升 10% = 延迟降低 30%+
多层缓存架构(onyx 方案)
from typing import Optional
import redis
from functools import lru_cache
import hashlib
class MultiLayerCache:
"""
onyx 多层缓存架构
层级(由快到慢):
1. 内存缓存(进程内 LRU,最快,容量小)
2. Redis 缓存(进程间共享,快,容量中)
3. 数据库缓存(持久化,慢,容量大)
典型命中率:L1 (30%) → L2 (50%) → L3 (15%) → Miss (5%)
"""
def __init__(
self,
redis_client: redis.Redis,
l1_size: int = 1000, # 内存缓存大小
l2_ttl: int = 3600, # Redis 过期时间(秒)
l3_ttl: int = 86400, # 数据库过期时间(秒)
):
self.redis = redis_client
self.l1_size = l1_size
self.l2_ttl = l2_ttl
self.l3_ttl = l3_ttl
# 统计指标
self.stats = {
"l1_hits": 0,
"l2_hits": 0,
"l3_hits": 0,
"misses": 0
}
def _cache_key(self, query: str, context_hash: str = None) -> str:
"""生成缓存键"""
key_data = f"{query}:{context_hash or 'global'}"
return f"rag:cache:{hashlib.md5(key_data.encode()).hexdigest()}"
@lru_cache(maxsize=1000) # L1: 内存缓存(装饰器实现)
def _l1_get(self, cache_key: str) -> Optional[str]:
"""L1 缓存(最快,但仅限当前进程)"""
# LRU cache 自动处理
return None
def get(self, query: str, context_hash: str = None) -> Optional[dict]:
"""
多层缓存查询
Returns:
{
"answer": str,
"sources": list,
"cache_hit": "l1" / "l2" / "l3" / None
}
"""
cache_key = self._cache_key(query, context_hash)
# 1. 尝试 L1(内存)
try:
result = self._l1_get(cache_key)
if result:
self.stats["l1_hits"] += 1
return {"cache_hit": "l1", **result}
except:
pass
# 2. 尝试 L2(Redis)
try:
result = self.redis.get(cache_key)
if result:
self.stats["l2_hits"] += 1
result = json.loads(result)
# 回填到 L1
self._l1_set(cache_key, result)
return {"cache_hit": "l2", **result}
except Exception as e:
print(f"L2 cache error: {e}")
# 3. 尝试 L3(数据库,略)
# result = self.db.query(cache_key)
# if result:
# self.stats["l3_hits"] += 1
# return {"cache_hit": "l3", **result}
# 4. 缓存未命中
self.stats["misses"] += 1
return None
def set(
self,
query: str,
answer: dict,
context_hash: str = None
):
"""
写入多层缓存
策略:
- L1: 立即写入
- L2: 异步写入(不阻塞)
- L3: 批量写入(定期刷新)
"""
cache_key = self._cache_key(query, context_hash)
# 1. 写入 L1(内存)
self._l1_set(cache_key, answer)
# 2. 异步写入 L2(Redis)
try:
self.redis.setex(
cache_key,
self.l2_ttl,
json.dumps(answer)
)
except Exception as e:
print(f"L2 cache write error: {e}")
# 3. 批量写入 L3(数据库,略)
# self._schedule_l3_write(cache_key, answer)
def _l1_set(self, cache_key: str, value: dict):
"""更新 L1 缓存(通过重新调用 _l1_get 触发 LRU)"""
# Python 的 lru_cache 不支持手动设置,需自定义实现
# 这里简化处理
pass
def get_stats(self) -> dict:
"""获取缓存统计"""
total = sum(self.stats.values())
if total == 0:
return {"hit_rate": 0.0}
hit_rate = (total - self.stats["misses"]) / total
return {
"total_requests": total,
"hit_rate": hit_rate,
"l1_hit_rate": self.stats["l1_hits"] / total,
"l2_hit_rate": self.stats["l2_hits"] / total,
"l3_hit_rate": self.stats["l3_hits"] / total,
"miss_rate": self.stats["misses"] / total
}
# 使用示例
import redis
redis_client = redis.Redis(host="localhost", port=6379, db=0)
cache = MultiLayerCache(redis_client)
# 查询
result = cache.get(query="What is RAG?", context_hash="doc_v1")
if result is None:
# 缓存未命中,执行 RAG
answer = execute_rag_pipeline(query)
# 写入缓存
cache.set(query, answer, context_hash="doc_v1")
else:
# 缓存命中
print(f"Cache hit: {result['cache_hit']}")
answer = result
# 查看统计
stats = cache.get_stats()
print(f"Cache hit rate: {stats['hit_rate']:.2%}")优化效果:
- ✅ L1 命中延迟 < 1ms
- ✅ L2 命中延迟 < 10ms
- ✅ L3 命中延迟 < 100ms
- ✅ 整体缓存命中率可达 80%+
语义缓存(智能缓存)
传统缓存基于精确匹配,语义缓存基于相似度匹配:
class SemanticCache:
"""
语义缓存:相似查询复用缓存
示例:
- "What is RAG?" → 缓存
- "Can you explain RAG?" → 命中(相似度 > 阈值)
"""
def __init__(
self,
embedding_model,
redis_client,
similarity_threshold: float = 0.85
):
self.embedding_model = embedding_model
self.redis = redis_client
self.similarity_threshold = similarity_threshold
async def get(self, query: str) -> Optional[dict]:
"""查找语义相似的缓存"""
# 1. 生成查询 embedding
query_emb = await self.embedding_model.embed([query])
query_emb = query_emb[0]
# 2. 从 Redis 获取所有缓存的查询 embedding
cached_queries = self.redis.hgetall("semantic_cache:queries")
# 3. 计算相似度,找到最相似的
max_similarity = 0
best_match_key = None
for cache_key, cached_emb_bytes in cached_queries.items():
cached_emb = np.frombuffer(cached_emb_bytes, dtype=np.float32)
# 余弦相似度
similarity = np.dot(query_emb, cached_emb) / (
np.linalg.norm(query_emb) * np.linalg.norm(cached_emb)
)
if similarity > max_similarity:
max_similarity = similarity
best_match_key = cache_key
# 4. 判断是否命中
if max_similarity >= self.similarity_threshold:
# 从 Redis 获取缓存的答案
answer = self.redis.get(f"semantic_cache:answer:{best_match_key}")
if answer:
return {
"answer": json.loads(answer),
"cache_hit": True,
"similarity": max_similarity,
"cached_query": best_match_key.decode()
}
return None
async def set(self, query: str, answer: dict):
"""存储查询和答案"""
# 1. 生成查询 embedding
query_emb = await self.embedding_model.embed([query])
query_emb = query_emb[0]
# 2. 存储 embedding
cache_key = hashlib.md5(query.encode()).hexdigest()
self.redis.hset(
"semantic_cache:queries",
cache_key,
query_emb.tobytes()
)
# 3. 存储答案
self.redis.setex(
f"semantic_cache:answer:{cache_key}",
3600, # 1小时过期
json.dumps(answer)
)
# 使用示例
semantic_cache = SemanticCache(
embedding_model=my_embedding_model,
redis_client=redis_client,
similarity_threshold=0.85
)
# 查询1
result = await semantic_cache.get("What is RAG?")
# 缓存未命中,执行 RAG
answer = await execute_rag("What is RAG?")
await semantic_cache.set("What is RAG?", answer)
# 查询2(相似查询)
result = await semantic_cache.get("Can you explain RAG?")
# 缓存命中!similarity = 0.92优势:
- ✅ 命中率提升 20-30%(相似查询复用)
- ✅ 减少 LLM 调用(最昂贵的部分)
成本:
- ❌ 需要额外 embedding 计算(但比 LLM 便宜得多)
- ❌ Redis 存储成本略增
2. 并行化(Async/Await)
串行 vs 并行:
import asyncio
import time
# ❌ 串行实现(慢)
def serial_rag(query: str):
# 1. 检索(200ms)
docs = retriever.retrieve(query)
# 2. 重排序(300ms)
ranked_docs = reranker.rerank(query, docs)
# 3. LLM 生成(3000ms)
answer = llm.generate(query, ranked_docs)
return answer # 总计:3500ms
# ✅ 并行实现(快)
async def parallel_rag(query: str):
# 1. 并行检索多个索引
dense_task = asyncio.create_task(dense_retriever.retrieve(query))
sparse_task = asyncio.create_task(sparse_retriever.retrieve(query))
dense_docs, sparse_docs = await asyncio.gather(dense_task, sparse_task)
# 时间:max(100ms, 120ms) = 120ms(而非 220ms)
# 2. 合并并重排序
all_docs = dense_docs + sparse_docs
ranked_docs = await reranker.rerank(query, all_docs)
# 时间:300ms
# 3. LLM 生成(无法并行)
answer = await llm.generate(query, ranked_docs)
# 时间:3000ms
return answer # 总计:3420ms(节省 80ms)
# 更激进的并行化(风险:质量可能略降)
async def aggressive_parallel_rag(query: str):
# 同时检索 + 生成初步回答(基于缓存的相似查询)
retrieval_task = asyncio.create_task(full_retrieval_pipeline(query))
fast_answer_task = asyncio.create_task(get_cached_similar_answer(query))
# 如果缓存命中且相似度高,直接返回
fast_answer = await fast_answer_task
if fast_answer and fast_answer["similarity"] > 0.9:
retrieval_task.cancel() # 取消检索任务
return fast_answer["answer"]
# 否则等待完整检索
docs = await retrieval_task
answer = await llm.generate(query, docs)
return answer优化效果:
- 并行检索:节省 30-50% 检索时间
- 推测执行:命中缓存时节省 80%+ 总时间
3. 流式输出(降低首字延迟)
# ❌ 非流式(用户等待 3s)
def non_streaming_rag(query: str):
docs = retrieve(query) # 500ms
answer = llm.generate(query, docs) # 3000ms
return answer # 用户等待 3500ms 后才看到第一个字
# ✅ 流式(用户等待 500ms)
async def streaming_rag(query: str):
# 1. 快速检索
docs = await retrieve(query) # 500ms
# 2. 立即开始流式生成
async for token in llm.generate_stream(query, docs):
yield token # 用户在 500ms 后就开始看到输出
# 首字延迟:500ms(减少 83%)用户体验提升:
- 首字延迟从 3.5s → 0.5s
- 感知速度提升 7x
成本优化
1. 模型降级策略
class AdaptiveModelSelector:
"""
自适应模型选择器
策略:
- 简单查询 → 小模型(GPT-3.5, $0.002/1K tokens)
- 复杂查询 → 大模型(GPT-4, $0.03/1K tokens)
成本节省:40-60%(大部分查询用小模型)
"""
def __init__(self):
self.simple_model = "gpt-3.5-turbo"
self.complex_model = "gpt-4"
# 复杂度分类器(可用 ML 模型)
self.classifier = self._init_classifier()
def _init_classifier(self):
"""初始化查询复杂度分类器"""
# 简化版:基于规则
def classify(query: str) -> str:
query_lower = query.lower()
# 复杂查询特征
complex_keywords = [
"compare", "analyze", "explain in detail",
"pros and cons", "step by step",
"对比", "分析", "详细说明"
]
if any(kw in query_lower for kw in complex_keywords):
return "complex"
if len(query.split()) > 20: # 长查询
return "complex"
return "simple"
return classify
async def generate(self, query: str, context: list[str]) -> dict:
"""自适应生成"""
complexity = self.classifier(query)
if complexity == "simple":
model = self.simple_model
cost_per_1k_tokens = 0.002
else:
model = self.complex_model
cost_per_1k_tokens = 0.03
# 生成
response = await openai.ChatCompletion.create(
model=model,
messages=[
{"role": "system", "content": "..."},
{"role": "user", "content": f"{context}\n\n{query}"}
]
)
# 计算成本
total_tokens = response.usage.total_tokens
cost = (total_tokens / 1000) * cost_per_1k_tokens
return {
"answer": response.choices[0].message.content,
"model": model,
"tokens": total_tokens,
"cost": cost
}
# 成本对比
# 假设 1000 个查询,70% 简单,30% 复杂
# 方案1:全部用 GPT-4
# 成本 = 1000 * 500 tokens * $0.03 / 1000 = $15
# 方案2:自适应模型选择
# 成本 = (700 * 500 * $0.002 + 300 * 500 * $0.03) / 1000 = $5.2
# 节省:65%2. 上下文压缩(减少 Token)
class TokenOptimizer:
"""Token 优化器(降低 LLM 成本)"""
def compress_context(
self,
query: str,
contexts: list[str],
max_tokens: int = 2000
) -> str:
"""
压缩上下文到指定 token 限制
策略:
1. 提取与查询最相关的句子
2. 去除冗余信息
3. 截断到 token 限制
"""
# 1. 分句并打分
sentences_with_scores = []
for ctx in contexts:
sentences = self._split_sentences(ctx)
for sent in sentences:
score = self._relevance_score(query, sent)
sentences_with_scores.append((sent, score))
# 2. 按相关性排序
sentences_with_scores.sort(key=lambda x: x[1], reverse=True)
# 3. 贪心选择句子直到达到 token 限制
compressed = []
total_tokens = 0
for sent, score in sentences_with_scores:
sent_tokens = self._count_tokens(sent)
if total_tokens + sent_tokens <= max_tokens:
compressed.append(sent)
total_tokens += sent_tokens
else:
break
# 4. 重新排序(保持逻辑顺序)
# compressed = self._reorder_by_original_order(compressed)
return " ".join(compressed)
def _relevance_score(self, query: str, sentence: str) -> float:
"""计算句子与查询的相关性(简化版)"""
query_words = set(query.lower().split())
sent_words = set(sentence.lower().split())
# Jaccard 相似度
intersection = query_words & sent_words
union = query_words | sent_words
return len(intersection) / len(union) if union else 0
def _count_tokens(self, text: str) -> int:
"""估算 token 数(粗略:1 token ≈ 0.75 words)"""
return int(len(text.split()) * 1.33)
# 成本对比
optimizer = TokenOptimizer()
# 原始上下文:5000 tokens
original_context = "\n\n".join(long_documents)
# 压缩后:2000 tokens
compressed_context = optimizer.compress_context(query, long_documents, max_tokens=2000)
# 成本节省:(5000 - 2000) / 5000 = 60%3. 批处理(Batch Processing)
class BatchProcessor:
"""批处理优化器(降低 API 调用次数)"""
def __init__(self, batch_size: int = 20, wait_time: float = 0.5):
self.batch_size = batch_size
self.wait_time = wait_time
self.queue = []
self.processing = False
async def embed_with_batching(self, texts: list[str]) -> list[np.ndarray]:
"""批处理 embedding(减少 API 调用)"""
# 将文本加入队列
self.queue.extend(texts)
# 等待凑够一批或超时
await asyncio.sleep(self.wait_time)
if not self.processing and len(self.queue) >= self.batch_size:
self.processing = True
# 批量调用 embedding API
batch = self.queue[:self.batch_size]
embeddings = await embedding_api.embed(batch)
self.queue = self.queue[self.batch_size:]
self.processing = False
return embeddings
# 单独处理(队列不足)
return await embedding_api.embed(texts)
# 成本对比
# 假设 embedding API 有固定延迟(50ms)+ 可变延迟(10ms/text)
# 方案1:逐个调用(100 个文本)
# 时间 = 100 * (50ms + 10ms) = 6000ms
# 方案2:批处理(batch_size=20)
# 时间 = 5 * (50ms + 20 * 10ms) = 1250ms
# 节省:79%吞吐优化
1. 异步队列架构(onyx 方案)
from celery import Celery
import redis
# Celery 配置(分布式任务队列)
celery_app = Celery(
'rag_tasks',
broker='redis://localhost:6379/0',
backend='redis://localhost:6379/1'
)
@celery_app.task
def index_document_task(doc_id: str, content: str):
"""
异步索引任务
优势:
- 不阻塞 API 响应
- 可水平扩展 worker
- 自动重试失败任务
"""
try:
# 1. 分块
chunks = chunker.split(content)
# 2. 生成 embedding
embeddings = embedding_model.embed([c.text for c in chunks])
# 3. 存储到向量数据库
vector_db.insert(
ids=[c.id for c in chunks],
embeddings=embeddings,
metadata=[c.metadata for c in chunks]
)
return {"status": "success", "chunks": len(chunks)}
except Exception as e:
# 自动重试
raise self.retry(exc=e, countdown=60, max_retries=3)
# FastAPI 端点(非阻塞)
from fastapi import FastAPI
app = FastAPI()
@app.post("/documents/index")
async def index_document(doc_id: str, content: str):
"""非阻塞索引端点"""
# 提交任务到队列
task = index_document_task.delay(doc_id, content)
# 立即返回(不等待完成)
return {
"task_id": task.id,
"status": "queued",
"message": "Document indexing started"
}
@app.get("/tasks/{task_id}")
async def get_task_status(task_id: str):
"""查询任务状态"""
task = celery_app.AsyncResult(task_id)
return {
"task_id": task_id,
"status": task.state, # PENDING / STARTED / SUCCESS / FAILURE
"result": task.result if task.ready() else None
}
# 部署架构
"""
[用户] → [API Server 1] ┐
├→ [Redis Queue] → [Worker 1]
[用户] → [API Server 2] ┘ [Worker 2]
[Worker 3]
扩展方式:
1. 增加 API Server(处理更多并发请求)
2. 增加 Worker(处理更多后台任务)
3. Redis 集群(避免单点故障)
"""吞吐提升:
- API 响应时间:从 5s → 50ms(异步化)
- 支持并发:从 10 QPS → 1000 QPS(水平扩展)
2. 连接池优化
from qdrant_client import QdrantClient
from openai import AsyncOpenAI
# ❌ 每次请求新建连接(慢)
def bad_query(query: str):
client = QdrantClient(host="localhost", port=6333) # 新建连接(100ms)
results = client.search(...)
return results
# ✅ 连接池复用(快)
class RAGService:
def __init__(self):
# 初始化时创建连接池
self.vector_db = QdrantClient(
host="localhost",
port=6333,
timeout=30,
# 连接池配置
grpc_options={
"grpc.max_send_message_length": 100 * 1024 * 1024,
"grpc.max_receive_message_length": 100 * 1024 * 1024,
"grpc.keepalive_time_ms": 10000,
}
)
self.llm = AsyncOpenAI(
max_retries=3,
timeout=30.0
)
async def query(self, query: str):
# 复用连接(无额外延迟)
results = self.vector_db.search(...)
answer = await self.llm.chat.completions.create(...)
return answer
# 全局单例
rag_service = RAGService()
@app.post("/query")
async def handle_query(query: str):
return await rag_service.query(query)监控与可观测性
完整监控方案(onyx 级别)
from prometheus_client import Counter, Histogram, Gauge
import time
# Prometheus 指标
request_count = Counter(
'rag_requests_total',
'Total RAG requests',
['status', 'model']
)
request_latency = Histogram(
'rag_request_duration_seconds',
'RAG request latency',
['stage'], # retrieval / rerank / generation
buckets=[0.1, 0.5, 1.0, 2.0, 5.0, 10.0]
)
cache_hit_rate = Gauge(
'rag_cache_hit_rate',
'Cache hit rate'
)
cost_per_request = Histogram(
'rag_cost_per_request_dollars',
'Cost per request in dollars'
)
class InstrumentedRAGPipeline:
"""带监控的 RAG Pipeline"""
async def query(self, query: str) -> dict:
start_time = time.time()
try:
# 1. 检索(监控)
retrieval_start = time.time()
docs = await self.retrieve(query)
request_latency.labels(stage='retrieval').observe(
time.time() - retrieval_start
)
# 2. 重排序(监控)
rerank_start = time.time()
ranked_docs = await self.rerank(query, docs)
request_latency.labels(stage='rerank').observe(
time.time() - rerank_start
)
# 3. 生成(监控)
generation_start = time.time()
answer = await self.generate(query, ranked_docs)
request_latency.labels(stage='generation').observe(
time.time() - generation_start
)
# 4. 记录成本
cost = self._calculate_cost(answer)
cost_per_request.observe(cost)
# 5. 记录成功
request_count.labels(status='success', model=answer['model']).inc()
return answer
except Exception as e:
# 记录失败
request_count.labels(status='error', model='unknown').inc()
raise
finally:
# 记录总延迟
total_latency = time.time() - start_time
request_latency.labels(stage='total').observe(total_latency)
# Grafana Dashboard 配置示例
"""
仪表盘面板:
1. QPS(每秒请求数)
- 查询:rate(rag_requests_total[1m])
2. P95 延迟
- 查询:histogram_quantile(0.95, rag_request_duration_seconds)
3. 缓存命中率
- 查询:rag_cache_hit_rate
4. 每个阶段延迟分布
- 查询:rag_request_duration_seconds{stage=~"retrieval|rerank|generation"}
5. 成本监控
- 查询:sum(rate(rag_cost_per_request_dollars_sum[1h])) * 3600
6. 错误率
- 查询:rate(rag_requests_total{status="error"}[5m]) / rate(rag_requests_total[5m])
"""性能基准与优化清单
延迟基准(目标)
| 阶段 | 基准 | 优化后 | 优化方法 |
|---|---|---|---|
| 检索 | 200ms | 50ms | 向量索引优化 + 并行检索 |
| 重排序 | 300ms | 100ms | 批处理 + 模型量化 |
| 生成 | 3000ms | 500ms (首字) | 流式输出 + 缓存 |
| 总延迟 | 3500ms | 1000ms | 综合优化 |
成本基准(每 1000 请求)
| 项目 | 基准成本 | 优化后 | 节省 |
|---|---|---|---|
| Embedding | $2 | $0.5 | 75% (缓存) |
| LLM 生成 | $15 | $5 | 67% (模型降级 + 压缩) |
| 向量检索 | $0.5 | $0.5 | - |
| 总成本 | $17.5 | $6 | 66% |
优化清单
延迟优化
- ✅ 实现多层缓存(L1 内存 + L2 Redis)
- ✅ 启用流式输出(降低首字延迟)
- ✅ 并行化检索和重排序
- ✅ 使用语义缓存(提升命中率)
成本优化
- ✅ 实现模型降级策略(简单查询用小模型)
- ✅ 压缩上下文(减少 token)
- ✅ 批处理 embedding(减少 API 调用)
- ✅ 监控并优化昂贵查询
吞吐优化
- ✅ 异步队列架构(Celery + Redis)
- ✅ 连接池复用(避免重复连接)
- ✅ 水平扩展(多 Worker)
- ✅ 负载均衡(Nginx / HAProxy)
监控
- ✅ Prometheus + Grafana 仪表盘
- ✅ 分阶段延迟追踪
- ✅ 成本监控与报警
- ✅ 错误率与缓存命中率
延伸阅读
- 如何提高 RAG 性能 - 包含性能优化章节
- 构建高质量的 RAG 系统 - 质量与性能平衡
参考文献
- Qdrant Docs - Performance tuning and HNSW parameters
- FAISS Wiki/Papers - IVF/IVFPQ/Scalar Quantization
- Milvus Docs - Index types and performance tuning
- Vespa Docs - Ranking profiles and hybrid search
- OpenAI/Anthropic best practices - Streaming, batching, prompt compression
**完成!**你已掌握 RAG Pipeline 节点的深度剖析。下一步进入 数据处理与索引策略。