05 加入人工审核与计划执行
2026/3/31大约 4 分钟
本章我们将在 PLANNER(计划生成)与 PLAN_EXECUTION(计划执行)之间,插入一道坚实的人类防火墙:HUMAN_FEEDBACK(人工审核) 节点。
这也就是 AI Agent 领域常说的 HITL(Human-in-the-Loop,人工在环) 机制。
🎯 本文目标
- 防死循环机制:利用 Graph 的状态引擎限制大模型重试次数(最大 3 次)。
- 动态回环图:理解图是如何根据人类意见“打回重做”并跳回上游节点的。
- 执行调度:解析
PlanExecuteNode如何遍历执行计划,为下一步的工具链调用做准备。 - 前端闭环:基于 A2A 协议,在 Vue 前端挂起任务、展示审批表单并回传决策。
1. 核心守门员:HumanFeedbackNode
人工审核节点是一个纯粹的控制节点,它不需要调用大模型,只负责路由决策和状态控制。
package io.github.qifan777.server.agent.nodes
import com.alibaba.cloud.ai.graph.OverAllState
import com.alibaba.cloud.ai.graph.StateGraph
import com.alibaba.cloud.ai.graph.action.NodeAction
import io.github.qifan777.server.agent.DataAgentSpec
import org.springframework.stereotype.Component
@Component
class HumanFeedbackNode : NodeAction {
override fun apply(state: OverAllState): Map<String, Any> {
// 1. 防御机制:限制大模型的最大修复次数为 3 次
val count = state.value(DataAgentSpec.Graph.StateKey.Planning.REPAIR_COUNT, 1)
if (count >= 3) {
return mapOf(DataAgentSpec.Graph.StateKey.HumanReview.NEXT_NODE to StateGraph.END)
}
// 2. 获取用户的前端决策(通过 A2A Metadata 传入)
val approved = state.value(DataAgentSpec.Graph.StateKey.HumanReview.CONFIRMATION_APPROVED, false)
if (approved) {
// 3A. 审核通过:放行至计划执行节点
return mapOf(
DataAgentSpec.Graph.StateKey.HumanReview.NEXT_NODE to DataAgentSpec.Graph.Node.PLAN_EXECUTION,
)
} else {
// 3B. 审核拒绝:提取修改意见,打回 Planner 重做,并增加重试计数
val feedbackContent = state.value(DataAgentSpec.Graph.StateKey.HumanReview.CONFIRMATION_FEEDBACK, "")
return mapOf(
DataAgentSpec.Graph.StateKey.HumanReview.NEXT_NODE to DataAgentSpec.Graph.Node.PLANNER,
DataAgentSpec.Graph.StateKey.Planning.REPAIR_COUNT to count + 1,
DataAgentSpec.Graph.StateKey.HumanReview.CONFIRMATION_FEEDBACK to feedbackContent.ifEmpty { "Plan rejected by user" }
)
}
}
}💡 架构洞察:状态驱动的循环
这里并没有使用代码层面的 while 循环,而是通过修改图状态中的 NEXT_NODE 和 REPAIR_COUNT,利用 Graph 引擎原生的能力实现了带次数限制的反馈回环。
2. 动态路由边:HumanFeedbackEdge
节点负责“思考”去哪,Edge(边)负责真正的“跳转”。将这两者解耦,是 Graph 编排的核心思想。
package io.github.qifan777.server.agent.edges
import com.alibaba.cloud.ai.graph.OverAllState
import com.alibaba.cloud.ai.graph.StateGraph
import com.alibaba.cloud.ai.graph.action.EdgeAction
import io.github.qifan777.server.agent.DataAgentSpec
class HumanFeedbackEdge : EdgeAction {
override fun apply(state: OverAllState): String {
// 读取节点中计算好的下一步方向,默认兜底为 END
return state.value(DataAgentSpec.Graph.StateKey.HumanReview.NEXT_NODE, StateGraph.END)
}
}3. 计划调度器:PlanExecuteNode
当用户审核通过后,流程来到执行节点。这个节点目前负责解析计划,为后续的工具循环调用指明方向。
package io.github.qifan777.server.agent.nodes
import com.alibaba.cloud.ai.graph.OverAllState
import com.alibaba.cloud.ai.graph.StateGraph
import com.alibaba.cloud.ai.graph.action.NodeAction
import io.github.oshai.kotlinlogging.KotlinLogging
import io.github.qifan777.server.agent.DataAgentSpec
import io.github.qifan777.server.agent.model.Plan
import org.springframework.stereotype.Component
private val logger = KotlinLogging.logger {}
@Component
class PlanExecuteNode : NodeAction {
override fun apply(state: OverAllState): Map<String, Any> {
// 1. 反序列化出 Planner 节点生成的执行计划
val plan = Plan.getPlan(state)
val currentStep = state.value(DataAgentSpec.Graph.StateKey.Planning.CURRENT_STEP, 1)
val steps = plan.executionPlan
// 2. 如果当前游标超出了计划总步数,说明执行完毕
if (currentStep > steps.size) {
logger.info { "plan complete, current step is $currentStep, total step is ${steps.size}" }
return mapOf(
DataAgentSpec.Graph.StateKey.Planning.CURRENT_STEP to 1,
DataAgentSpec.Graph.StateKey.Planning.NEXT_NODE to StateGraph.END,
)
}
// 3. 取出当前步骤需要调用的具体工具,指明下一个流转节点
return mapOf(
DataAgentSpec.Graph.StateKey.Planning.NEXT_NODE to steps[currentStep - 1].toolToUse,
)
}
}🚧 预留伏笔
在本章中,我们暂时让执行节点直接连向了 END,但在代码内部,我们已经写好了根据 currentStep 获取 toolToUse 的逻辑。这是为下一章“工具链动态调度”打下的坚实基础。
4. 组装图引擎:GraphConfiguration
要让上面的节点和边生效,需要在配置中注册状态覆盖策略,并绘制图的连线。
// 1. 注册人机交互与执行强相关的状态覆盖策略 (ReplaceStrategy)
val keyStrategyFactory = KeyStrategyFactory {
val map = mutableMapOf<String, KeyStrategy>()
// ... 前置节点配置省略
map[DataAgentSpec.Graph.StateKey.Planning.REPAIR_COUNT] = ReplaceStrategy()
map[DataAgentSpec.Graph.StateKey.HumanReview.CONFIRMATION_APPROVED] = ReplaceStrategy()
map[DataAgentSpec.Graph.StateKey.HumanReview.CONFIRMATION_FEEDBACK] = ReplaceStrategy()
map[DataAgentSpec.Graph.StateKey.HumanReview.NEXT_NODE] = ReplaceStrategy()
map
}
// 2. 图的动态编排
return StateGraph(DataAgentSpec.GRAPH_NAME, keyStrategyFactory, serializer)
// ... 添加节点与前置连线省略
.addEdge(DataAgentSpec.Graph.Node.PLANNER, DataAgentSpec.Graph.Node.HUMAN_FEEDBACK)
// 核心条件边:控制打回重做或继续执行
.addConditionalEdges(
DataAgentSpec.Graph.Node.HUMAN_FEEDBACK, edge_async(HumanFeedbackEdge()),
mapOf(
END to END,
DataAgentSpec.Graph.Node.PLAN_EXECUTION to DataAgentSpec.Graph.Node.PLAN_EXECUTION,
DataAgentSpec.Graph.Node.PLANNER to DataAgentSpec.Graph.Node.PLANNER // 审核拒绝,打回重做
)
)
.addEdge(DataAgentSpec.Graph.Node.PLAN_EXECUTION, END)5. 前端交互:无缝衔接 A2A 协议
后端的图引擎在 HUMAN_FEEDBACK 节点前触发中断(interruptBefore)时,A2A 协议会向前端下发 input-required 状态。Vue 前端只需要监听该状态,渲染表单并回传结果即可。
5.1 展开审批表单
在流式接收事件的循环中,精准拦截状态并控制 UI 的展开/收起:
for await (const event of stream) {
// ... 其他 artifact 解析逻辑
if (event.kind === 'status-update' && event.status.state === 'completed') {
awaitingConfirmation.value = false
markAllPendingStepAsSuccess()
}
// 拦截等待输入状态,展开交互面板
if (event.kind === 'status-update' && event.status.state === 'input-required') {
awaitingConfirmation.value = true
}
}5.2 回传决策与续跑 (Resume)
用户提交表单后,携带特定的 Metadata 唤醒后端的任务上下文,继续跑图:
const submitConfirmation = async () => {
awaitingConfirmation.value = false
const approved = confirmationApproved.value
const feedback = (confirmationFeedback.value || '').trim()
// 发起续跑请求,keepSteps=true 保证历史卡片不被清空
await streamMessage(
feedback || (approved ? '确认继续' : '取消本次执行'),
true,
{
// 这里的 KEY 必须和后端 GraphConfiguration 中注册的 Key 完全对应!
[DATA_AGENT_MESSAGE_METADATA.CONFIRMATION_APPROVED]: approved,
[DATA_AGENT_MESSAGE_METADATA.CONFIRMATION_FEEDBACK]:
feedback || (approved ? '用户确认继续执行' : '用户取消本次执行'),
}
)
}