Deeptoai RAG系列教程

05 Agent 能力与实践

哪些项目原生支持 Agent?如何为不支持的项目提供 Agent 适配层?给出可落地的 LangGraph/Plan-Execute/自纠正链路范式

范围说明

9 个项目中,原生具备 Agent/工作流能力的主要是 onyx(LangGraph 工作流)与 Self-Corrective-Agentic-RAG(自纠正链路)。其余项目可通过“作为工具(Tool)”的方式挂接到统一 Agent 框架中。

05 Agent 能力与实践

1. Agent 架构模式速览

  • ReAct:思维链 + 工具调用(Thought-Action-Observation 循环)
  • Plan-and-Execute:规划器生成子任务,执行器顺序完成,适合长任务
  • Graph Agent(LangGraph):显式状态机/有向图,节点可回放与重试,适合生产
  • Router-Executor:路由器按意图分配到不同执行器(FAQ/检索/调用业务API)
  • Self-Corrective:低置信度时触发澄清/再检索/改写查询/工具切换
ReAct 回路(示意)
class ReactAgent:
    def __init__(self, tools: dict[str, callable], llm):
        self.tools = tools
        self.llm = llm

    async def run(self, question: str, max_steps: int = 6):
        scratch = []  # 记录思维链
        obs = ""
        for step in range(max_steps):
            prompt = self._build_prompt(question, scratch, obs)
            thought, action, action_input = await self.llm.react_step(prompt)
            scratch.append(thought)
            if action == "finish":
                return action_input
            obs = await self.tools[action](/docs/rag-project-analysis/05-agent-capabilities/action_input)
        return "Reached max steps without conclusion"

2. 项目 ⇄ Agent 能力映射

项目Agent/工作流框架工具/动作观测/回放典型用途
onyx原生工作流LangGraph检索/重排/生成/澄清/审计节点耗时、错误与事件企业搜索、客服、Slack Bot
Self-Corrective-Agentic-RAG原生自纠正自研链路(CRAG)再检索/改写/澄清/多路召回日志/步进高可靠问答
SurfSense无(可适配)-提供“混合检索”ToolSQL/监控可接入混检与轻量重排后端
LightRAG无(可适配)-提供“多策略检索”Tool-Baseline/PoC
ragflow无(可适配)-提供“文档解析”Tool解析日志多模态解析前置
RAG-Anything部分工作流自研解析/检索工具-多模态场景
kotaemon部分自研知识库管理/检索工具-UI 平台接入 Agent
Verba无(可适配)-组合式 Tool-轻量拼装
UltraRAG无(实验)-评测脚手架指标可采集实验对比

不具备原生 Agent 的项目,可作为“工具”接入到 LangGraph/Plan-Execute/或 ReAct 代理中。

3. 统一工具(Tool)接口与注册

tool-registry.ts
export type ToolContext = {
  user?: { id: string; tenant?: string; groups?: string[] };
  traceId?: string;
};

export type Tool<TIn, TOut> = {
  name: string;
  desc: string;
  run: (input: TIn, ctx?: ToolContext) => Promise<TOut>;
};

export class ToolRegistry {
  private tools = new Map<string, Tool<any, any>>();
  register<TIn, TOut>(tool: Tool<TIn, TOut>) {
    this.tools.set(tool.name, tool);
  }
  get(name: string) {
    const t = this.tools.get(name);
    if (!t) throw new Error(`tool not found: ${name}`);
    return t;
  }
}
tools-adapters.ts
// 适配各项目为可调用工具
import { Tool } from './tool-registry';

export const SurfSenseSearch: Tool<{ q: string; topK?: number }, { items: any[] }> = {
  name: 'surfsense.search',
  desc: '向量+BM25 融合检索(RRF)',
  async run({ q, topK = 10 }, ctx) {
    // 调用后端 /search?q=... 或直接走 SQL
    const res = await fetch(process.env.SURFSENSE_API + `/search?q=${encodeURIComponent(q)}&k=${topK}`, {
      headers: { 'x-tenant': ctx?.user?.tenant ?? '' },
    });
    return res.json();
  },
};

export const LightRAGRetrieve: Tool<{ q: string; mode?: 'naive'|'local'|'global'|'hybrid' }, { items: any[] }> = {
  name: 'lightrag.retrieve',
  desc: 'LightRAG 多策略检索',
  async run({ q, mode = 'hybrid' }) {
    const res = await fetch(process.env.LIGHTRAG_API + `/retrieve?mode=${mode}`, {
      method: 'POST', body: JSON.stringify({ q })
    });
    return res.json();
  },
};

export const RagflowParse: Tool<{ url: string }, { chunks: any[] }> = {
  name: 'ragflow.parse',
  desc: 'ragflow 文档解析(版面/表格/公式)',
  async run({ url }) {
    const res = await fetch(process.env.RAGFLOW_API + `/parse?url=${encodeURIComponent(url)}`);
    return res.json();
  },
};

4. LangGraph 工作流示例(企业生产)

langgraph_pipeline.py
from langgraph import StateGraph

# 状态:在上下文里记录工具调用与置信度
state = {
  "question": None,
  "draft": None,
  "ctx": [],
  "confidence": 0.0,
}

g = StateGraph()

@g.node('route')
async def route_node(s):
  # 根据问题选择子链:FAQ/检索/订单API/转人工
  s['route'] = await llm.classify_intent(s['question'])
  return s

@g.node('retrieve')
async def retrieve_node(s):
  items = await tools['surfsense.search'].run({ 'q': s['question'], 'topK': 20 })
  s['ctx'] = items['items']
  return s

@g.node('rerank')
async def rerank_node(s):
  s['ctx'] = await flashrank_rerank(s['question'], s['ctx'])
  return s

@g.node('generate')
async def generate_node(s):
  s['draft'], s['confidence'] = await llm.answer_with_confidence(s['question'], s['ctx'][:5])
  return s

@g.node('self_correct')
async def self_correct_node(s):
  if s['confidence'] < 0.6:
    # 低置信度:尝试改写/多路检索/澄清
    followup = await llm.clarify(s['question'], s['ctx'][:3])
    items = await tools['lightrag.retrieve'].run({ 'q': followup, 'mode': 'hybrid' })
    s['ctx'] = merge_unique(s['ctx'], items['items'])
    s['draft'], s['confidence'] = await llm.answer_with_confidence(s['question'], s['ctx'][:5])
  return s

@g.node('audit')
async def audit_node(s):
  await audit_log('qa', tenant_id, user_id, { 'q': s['question'], 'confidence': s['confidence'] })
  return s

@g.edge('route','retrieve')
@g.edge('retrieve','rerank')
@g.edge('rerank','generate')
@g.edge('generate','self_correct')
@g.edge('self_correct','audit')
async def edges(s):
  return s

要点:

  • 显式状态 + 可回放:每步可观测、可重试、可在生产中回放问题路径
  • 置信度门控:自纠正只在需要时触发,控制成本与延迟
  • 工具去耦合:检索/解析组件可替换(SurfSense/LightRAG/ragflow)

5. 自纠正链(Self-Corrective)参考实现

self_corrective.py
async def self_corrective_qa(q: str):
  # 1) 初检索
  ctx = await search(q, top_k=20)
  draft, conf = await llm.answer_with_confidence(q, ctx[:5])

  # 2) 置信度门控
  if conf >= 0.6:
    return draft, ctx[:5]

  # 3) 纠错策略(按序尝试,早停)
  # a) 查询改写 + 再检索
  q2 = await llm.rewrite(q)
  ctx2 = await search(q2, top_k=20)
  draft2, conf2 = await llm.answer_with_confidence(q, topN(ctx + ctx2, 5))
  if conf2 >= 0.6:
    return draft2, topN(ctx + ctx2, 5)

  # b) 多路召回(BM25/向量/图谱)
  ctx_bm25 = await bm25(q)
  ctx_vec = await vec(q)
  ctx = fuse_rrf([ctx_bm25, ctx_vec])
  draft3, conf3 = await llm.answer_with_confidence(q, ctx[:5])
  if conf3 >= 0.6:
    return draft3, ctx[:5]

  # c) 澄清问题
  cq = await llm.clarify(q, ctx[:3])
  ctx4 = await search(cq)
  return await llm.answer_with_confidence(q, topN(ctx + ctx4, 5))

6. 安全与治理(Guardrails)

guardrails.ts
export type Decision = 'allow' | 'redact' | 'deny';
export function policy(input: { userId: string; tenant: string; tool: string; payload: any }): Decision {
  // 示例:敏感租户禁止外部搜索;匿名用户禁止写操作
  if (input.tool === 'web.search' && input.tenant === 'regulated') return 'deny';
  if (input.tool.endsWith(':delete')) return 'deny';
  return 'allow';
}

7. 观测与回放

  • 埋点:每步(节点)记录输入/输出大小、耗时、错误、选用工具
  • 关联 tracing:traceId 贯穿;将关键中间体(重写查询、澄清问题)纳入日志
  • 回放:用相同输入重放节点;支持 diff 对比不同参数下的结果
telemetry.ts
export async function logStep(step: string, data: Record<string, unknown>) {
  await fetch(process.env.OBS_API + '/events', { method: 'POST', body: JSON.stringify({ step, ...data }) });
}

8. 评估与参数网格(Agent 专用)

agent_param_grid.yaml
agent:
  paradigm: ["react", "langgraph", "plan-execute"]
  max_steps: [4, 6, 8]
  self_correct_threshold: [0.5, 0.6, 0.7]
  tools:
    - ["surfsense.search", "lightrag.retrieve"]
    - ["surfsense.search", "lightrag.retrieve", "ragflow.parse"]
retrieval:
  top_k: [5, 8, 10]
  rrf_k: [30, 60]
rerank:
  enabled: [true, false]
  top_k: [50, 100]

9. 将各项目“接成工具”的建议

  • SurfSense → 检索工具:提供 SQL/HTTP 接口;支持过滤/排序;输出统一字段 schema
  • LightRAG → 多策略检索工具:暴露 mode=naive/local/global/hybrid;返回 chunk 与引用
  • ragflow → 解析工具:输入 URL/文件,输出结构化块(文本/表格/图片片段)
  • onyx → 直接运行工作流:保留其 LangGraph DAG,外部只需调用入口并订阅事件
  • RAG-Anything/kotaemon/Verba → 依据其模块化程度分别映射到 parse/retrieve 工具

10. 部署建议

  • 企业:选 LangGraph DAG + 工具适配;把 SurfSense/LightRAG/ragflow 接入为工具;开启审计与回放
  • 个人/小团队:ReAct + 1-2 个工具(LightRAG + ragflow)即可;保守的 max_steps 与阈值
  • 实验:UltraRAG 做评测脚手架;固化参数网格与数据集