05 Agent 能力与实践
哪些项目原生支持 Agent?如何为不支持的项目提供 Agent 适配层?给出可落地的 LangGraph/Plan-Execute/自纠正链路范式
范围说明
9 个项目中,原生具备 Agent/工作流能力的主要是 onyx(LangGraph 工作流)与 Self-Corrective-Agentic-RAG(自纠正链路)。其余项目可通过“作为工具(Tool)”的方式挂接到统一 Agent 框架中。
05 Agent 能力与实践
1. Agent 架构模式速览
- ReAct:思维链 + 工具调用(Thought-Action-Observation 循环)
- Plan-and-Execute:规划器生成子任务,执行器顺序完成,适合长任务
- Graph Agent(LangGraph):显式状态机/有向图,节点可回放与重试,适合生产
- Router-Executor:路由器按意图分配到不同执行器(FAQ/检索/调用业务API)
- Self-Corrective:低置信度时触发澄清/再检索/改写查询/工具切换
class ReactAgent:
def __init__(self, tools: dict[str, callable], llm):
self.tools = tools
self.llm = llm
async def run(self, question: str, max_steps: int = 6):
scratch = [] # 记录思维链
obs = ""
for step in range(max_steps):
prompt = self._build_prompt(question, scratch, obs)
thought, action, action_input = await self.llm.react_step(prompt)
scratch.append(thought)
if action == "finish":
return action_input
obs = await self.tools[action](/docs/rag-project-analysis/05-agent-capabilities/action_input)
return "Reached max steps without conclusion"2. 项目 ⇄ Agent 能力映射
| 项目 | Agent/工作流 | 框架 | 工具/动作 | 观测/回放 | 典型用途 |
|---|---|---|---|---|---|
| onyx | 原生工作流 | LangGraph | 检索/重排/生成/澄清/审计 | 节点耗时、错误与事件 | 企业搜索、客服、Slack Bot |
| Self-Corrective-Agentic-RAG | 原生自纠正 | 自研链路(CRAG) | 再检索/改写/澄清/多路召回 | 日志/步进 | 高可靠问答 |
| SurfSense | 无(可适配) | - | 提供“混合检索”Tool | SQL/监控可接入 | 混检与轻量重排后端 |
| LightRAG | 无(可适配) | - | 提供“多策略检索”Tool | - | Baseline/PoC |
| ragflow | 无(可适配) | - | 提供“文档解析”Tool | 解析日志 | 多模态解析前置 |
| RAG-Anything | 部分工作流 | 自研 | 解析/检索工具 | - | 多模态场景 |
| kotaemon | 部分 | 自研 | 知识库管理/检索工具 | - | UI 平台接入 Agent |
| Verba | 无(可适配) | - | 组合式 Tool | - | 轻量拼装 |
| UltraRAG | 无(实验) | - | 评测脚手架 | 指标可采集 | 实验对比 |
不具备原生 Agent 的项目,可作为“工具”接入到 LangGraph/Plan-Execute/或 ReAct 代理中。
3. 统一工具(Tool)接口与注册
export type ToolContext = {
user?: { id: string; tenant?: string; groups?: string[] };
traceId?: string;
};
export type Tool<TIn, TOut> = {
name: string;
desc: string;
run: (input: TIn, ctx?: ToolContext) => Promise<TOut>;
};
export class ToolRegistry {
private tools = new Map<string, Tool<any, any>>();
register<TIn, TOut>(tool: Tool<TIn, TOut>) {
this.tools.set(tool.name, tool);
}
get(name: string) {
const t = this.tools.get(name);
if (!t) throw new Error(`tool not found: ${name}`);
return t;
}
}// 适配各项目为可调用工具
import { Tool } from './tool-registry';
export const SurfSenseSearch: Tool<{ q: string; topK?: number }, { items: any[] }> = {
name: 'surfsense.search',
desc: '向量+BM25 融合检索(RRF)',
async run({ q, topK = 10 }, ctx) {
// 调用后端 /search?q=... 或直接走 SQL
const res = await fetch(process.env.SURFSENSE_API + `/search?q=${encodeURIComponent(q)}&k=${topK}`, {
headers: { 'x-tenant': ctx?.user?.tenant ?? '' },
});
return res.json();
},
};
export const LightRAGRetrieve: Tool<{ q: string; mode?: 'naive'|'local'|'global'|'hybrid' }, { items: any[] }> = {
name: 'lightrag.retrieve',
desc: 'LightRAG 多策略检索',
async run({ q, mode = 'hybrid' }) {
const res = await fetch(process.env.LIGHTRAG_API + `/retrieve?mode=${mode}`, {
method: 'POST', body: JSON.stringify({ q })
});
return res.json();
},
};
export const RagflowParse: Tool<{ url: string }, { chunks: any[] }> = {
name: 'ragflow.parse',
desc: 'ragflow 文档解析(版面/表格/公式)',
async run({ url }) {
const res = await fetch(process.env.RAGFLOW_API + `/parse?url=${encodeURIComponent(url)}`);
return res.json();
},
};4. LangGraph 工作流示例(企业生产)
from langgraph import StateGraph
# 状态:在上下文里记录工具调用与置信度
state = {
"question": None,
"draft": None,
"ctx": [],
"confidence": 0.0,
}
g = StateGraph()
@g.node('route')
async def route_node(s):
# 根据问题选择子链:FAQ/检索/订单API/转人工
s['route'] = await llm.classify_intent(s['question'])
return s
@g.node('retrieve')
async def retrieve_node(s):
items = await tools['surfsense.search'].run({ 'q': s['question'], 'topK': 20 })
s['ctx'] = items['items']
return s
@g.node('rerank')
async def rerank_node(s):
s['ctx'] = await flashrank_rerank(s['question'], s['ctx'])
return s
@g.node('generate')
async def generate_node(s):
s['draft'], s['confidence'] = await llm.answer_with_confidence(s['question'], s['ctx'][:5])
return s
@g.node('self_correct')
async def self_correct_node(s):
if s['confidence'] < 0.6:
# 低置信度:尝试改写/多路检索/澄清
followup = await llm.clarify(s['question'], s['ctx'][:3])
items = await tools['lightrag.retrieve'].run({ 'q': followup, 'mode': 'hybrid' })
s['ctx'] = merge_unique(s['ctx'], items['items'])
s['draft'], s['confidence'] = await llm.answer_with_confidence(s['question'], s['ctx'][:5])
return s
@g.node('audit')
async def audit_node(s):
await audit_log('qa', tenant_id, user_id, { 'q': s['question'], 'confidence': s['confidence'] })
return s
@g.edge('route','retrieve')
@g.edge('retrieve','rerank')
@g.edge('rerank','generate')
@g.edge('generate','self_correct')
@g.edge('self_correct','audit')
async def edges(s):
return s要点:
- 显式状态 + 可回放:每步可观测、可重试、可在生产中回放问题路径
- 置信度门控:自纠正只在需要时触发,控制成本与延迟
- 工具去耦合:检索/解析组件可替换(SurfSense/LightRAG/ragflow)
5. 自纠正链(Self-Corrective)参考实现
async def self_corrective_qa(q: str):
# 1) 初检索
ctx = await search(q, top_k=20)
draft, conf = await llm.answer_with_confidence(q, ctx[:5])
# 2) 置信度门控
if conf >= 0.6:
return draft, ctx[:5]
# 3) 纠错策略(按序尝试,早停)
# a) 查询改写 + 再检索
q2 = await llm.rewrite(q)
ctx2 = await search(q2, top_k=20)
draft2, conf2 = await llm.answer_with_confidence(q, topN(ctx + ctx2, 5))
if conf2 >= 0.6:
return draft2, topN(ctx + ctx2, 5)
# b) 多路召回(BM25/向量/图谱)
ctx_bm25 = await bm25(q)
ctx_vec = await vec(q)
ctx = fuse_rrf([ctx_bm25, ctx_vec])
draft3, conf3 = await llm.answer_with_confidence(q, ctx[:5])
if conf3 >= 0.6:
return draft3, ctx[:5]
# c) 澄清问题
cq = await llm.clarify(q, ctx[:3])
ctx4 = await search(cq)
return await llm.answer_with_confidence(q, topN(ctx + ctx4, 5))6. 安全与治理(Guardrails)
export type Decision = 'allow' | 'redact' | 'deny';
export function policy(input: { userId: string; tenant: string; tool: string; payload: any }): Decision {
// 示例:敏感租户禁止外部搜索;匿名用户禁止写操作
if (input.tool === 'web.search' && input.tenant === 'regulated') return 'deny';
if (input.tool.endsWith(':delete')) return 'deny';
return 'allow';
}7. 观测与回放
- 埋点:每步(节点)记录输入/输出大小、耗时、错误、选用工具
- 关联 tracing:traceId 贯穿;将关键中间体(重写查询、澄清问题)纳入日志
- 回放:用相同输入重放节点;支持 diff 对比不同参数下的结果
export async function logStep(step: string, data: Record<string, unknown>) {
await fetch(process.env.OBS_API + '/events', { method: 'POST', body: JSON.stringify({ step, ...data }) });
}8. 评估与参数网格(Agent 专用)
agent:
paradigm: ["react", "langgraph", "plan-execute"]
max_steps: [4, 6, 8]
self_correct_threshold: [0.5, 0.6, 0.7]
tools:
- ["surfsense.search", "lightrag.retrieve"]
- ["surfsense.search", "lightrag.retrieve", "ragflow.parse"]
retrieval:
top_k: [5, 8, 10]
rrf_k: [30, 60]
rerank:
enabled: [true, false]
top_k: [50, 100]9. 将各项目“接成工具”的建议
- SurfSense → 检索工具:提供 SQL/HTTP 接口;支持过滤/排序;输出统一字段 schema
- LightRAG → 多策略检索工具:暴露 mode=naive/local/global/hybrid;返回 chunk 与引用
- ragflow → 解析工具:输入 URL/文件,输出结构化块(文本/表格/图片片段)
- onyx → 直接运行工作流:保留其 LangGraph DAG,外部只需调用入口并订阅事件
- RAG-Anything/kotaemon/Verba → 依据其模块化程度分别映射到 parse/retrieve 工具
10. 部署建议
- 企业:选 LangGraph DAG + 工具适配;把 SurfSense/LightRAG/ragflow 接入为工具;开启审计与回放
- 个人/小团队:ReAct + 1-2 个工具(LightRAG + ragflow)即可;保守的 max_steps 与阈值
- 实验:UltraRAG 做评测脚手架;固化参数网格与数据集