06 SQL 生成与执行——从计划指令到结构化结果回填
上一篇我们已经把 HUMAN_FEEDBACK 和 PLAN_EXECUTION 串起来,Agent 终于具备了“人工审批后再执行”的企业级安全感。
这一篇,我们进入真正“干活”的核心阶段:把计划步骤翻译成 SQL,并在执行器里跑出结构化结果。
🎯 本文目标
- 理解
PLAN_EXECUTION -> SQL_GENERATION -> SQL_EXECUTION的图流转逻辑。 - 掌握
SqlGeneratorNode如何把计划步骤转成可执行 SQL。 - 掌握
SqlExecuteNode如何执行 SQL 并回填统一结果结构。 - 理解前端 A2UI 如何消费这两个节点的产物并可视化展示。
1. 从计划到 SQL:执行链路全貌
在当前工程中,SQL 生成与执行的主干链路是:
PLANNER
-> HUMAN_FEEDBACK
-> PLAN_EXECUTION
-> SQL_GENERATION
-> SQL_EXECUTION
-> END关键节点常量定义在 DataAgentSpec:
object Node {
const val PLAN_EXECUTION = "PLAN_EXECUTE_NODE"
const val SQL_GENERATION = "SQL_GENERATE_NODE"
const val SQL_EXECUTION = "SQL_EXECUTE_NODE"
}对应的核心状态键:
object Planning {
const val PLAN = "PLANNER_NODE_OUTPUT"
const val NEXT_NODE = "PLAN_NEXT_NODE"
const val CURRENT_STEP = "PLAN_CURRENT_STEP"
const val EXECUTION_OUTPUT = "PLAN_EXECUTE_NODE_OUTPUT"
}
object Execution {
const val SQL_GENERATION_RESULT = "SQL_GENERATE_OUTPUT"
const val SQL_EXECUTION_RESULT = "SQL_EXECUTE_OUTPUT"
}2. 计划调度器:PlanExecuteNode + PlanExecutorEdge
PlanExecuteNode 的职责很纯粹:读取 Planner 产出的 Plan,根据 CURRENT_STEP 计算下一跳节点。
@Component
class PlanExecuteNode : NodeAction {
override fun apply(state: OverAllState): Map<String, Any> {
val plan = Plan.getPlan(state)
val currentStep = state.value(DataAgentSpec.Graph.StateKey.Planning.CURRENT_STEP, 1)
val steps = plan.executionPlan
if (currentStep > steps.size) {
return mapOf(
DataAgentSpec.Graph.StateKey.Planning.CURRENT_STEP to 1,
DataAgentSpec.Graph.StateKey.Planning.NEXT_NODE to StateGraph.END,
)
}
return mapOf(
DataAgentSpec.Graph.StateKey.Planning.NEXT_NODE to steps[currentStep - 1].toolToUse,
)
}
}PlanExecutorEdge 只做一件事:读 PLAN_NEXT_NODE,然后真正跳转。
class PlanExecutorEdge : EdgeAction {
override fun apply(state: OverAllState): String {
val nextNode = state.value(DataAgentSpec.Graph.StateKey.Planning.NEXT_NODE, END)
if (nextNode == END) return END
return nextNode
}
}设计价值
节点负责“计算目标”,边负责“执行跳转”,这种分层让路由逻辑可测试、可替换,也更容易做多工具扩展。
3. SQL 生成节点:SqlGeneratorNode
SqlGeneratorNode 会把“当前计划步骤”转成 SQL。它的输入来源有三块:
- 当前步骤指令:
Plan.getCurrentStep(state).toolParameters.instruction - 结构化 Schema:
TABLE_RELATION_OUTPUT - 业务证据与改写问题:
EVIDENCE+REWRITE_QUERY
核心代码:
@Component
class SqlGeneratorNode(private val chatModel: ChatModel, private val promptManager: PromptManager) : NodeAction {
override fun apply(state: OverAllState): Map<String, Any> {
val step = Plan.getCurrentStep(state)
val instruction = step.toolParameters.instruction ?: throw RuntimeException("sql 生成步骤 instruction为空")
val schemeDto = JsonUtil.fromJson(
state.value(DataAgentSpec.Graph.StateKey.Recall.TABLE_RELATION, String::class.java).orElseThrow(),
Schema::class.java
)!!
val rewriteQuery = state.value(DataAgentSpec.Graph.StateKey.Recall.REWRITE_QUERY, "")
val evidence = state.value(DataAgentSpec.Graph.StateKey.Recall.EVIDENCE, "")
val dialect = "mysql"
val sqlPrompt = promptManager.newSqlGeneratorPromptTemplate.render(
mapOf(
"dialect" to dialect,
"question" to rewriteQuery,
"schema_info" to schemeDto.buildSchemePrompt(),
"evidence" to evidence,
"execution_description" to instruction
)
)
val sql = ChatClient.create(chatModel)
.prompt()
.system(sqlPrompt)
.options(OpenAiChatOptions.builder().extraBody(mapOf("enable_thinking" to false)).build())
.call()
.content()!!
return mapOf(
DataAgentSpec.Graph.StateKey.Execution.SQL_GENERATION_RESULT to MarkdownParserUtil.extractRawText(sql)
)
}
}3.1 为什么要 extractRawText
虽然 Prompt 已要求“只输出 SQL,不要 Markdown 标记”,但真实模型输出仍可能包一层 sql ... ``` 。MarkdownParserUtil.extractRawText 作为兜底,把代码块壳剥掉,避免执行层直接爆语法错误。
3.2 Prompt 约束的关键点
new-sql-generate.st 里有几个非常实用的硬约束:
- 禁止
SELECT * - 要求按方言语法输出
- 要求表名和列名必须存在于 Schema
- 输出只允许 SQL 本身
这类“硬约束 + 结构化上下文”组合,是 SQL 质量稳定的关键。
4. SQL 执行节点:SqlExecuteNode
SqlExecuteNode 的工作是:取 SQL_GENERATE_OUTPUT,连接目标数据源执行,再把结果包装成统一结构。
@Component
class SqlExecuteNode : NodeAction {
override fun apply(state: OverAllState): Map<String, Any> {
val currentStep = state.value(DataAgentSpec.Graph.StateKey.Planning.CURRENT_STEP, 1)
val sql = state.value(DataAgentSpec.Graph.StateKey.Execution.SQL_GENERATION_RESULT, "")
val databaseId = state.value(DataAgentSpec.Graph.StateKey.Input.DATABASE_ID, "")
val resultSetWrapper = SqliteSchemaDataSourceProvider.get(databaseId).connection.use {
ResultSetBuilder.buildFrom(it.createStatement().executeQuery(sql))
}
val displaySpec = buildDisplaySpec(resultSetWrapper)
Plan.getCurrentStep(state).toolParameters.sqlQuery = sql
return mapOf(
DataAgentSpec.Graph.StateKey.Planning.CURRENT_STEP to currentStep + 1,
DataAgentSpec.Graph.StateKey.Execution.SQL_EXECUTION_RESULT to Result(
resultSet = resultSetWrapper,
display = displaySpec,
),
DataAgentSpec.Graph.StateKey.Planning.EXECUTION_OUTPUT to
mutableMapOf("step_$currentStep" to JsonUtil.toJson(resultSetWrapper))
)
}
}ResultSetBuilder 会做两件工程化处理:
- 最多读取 1000 行(防止结果无限膨胀)
- 清理列名中的引号与反引号,统一前端展示
4.1 执行结果的数据契约
SQL_EXECUTE_OUTPUT 最终是一个对象:
{
"resultSet": {
"column": ["col1", "col2"],
"data": [{"col1": "v1", "col2": "v2"}],
"errorMsg": null
},
"display": {
"type": "table",
"title": "SQL执行结果",
"x": "col1",
"y": ["col2"]
}
}这个结构后续不仅可渲染表格,也能直接转图表(bar/line/pie)。
5. 图编排配置:GraphConfiguration
SQL 阶段生效的关键连线如下:
.addConditionalEdges(
DataAgentSpec.Graph.Node.PLAN_EXECUTION, edge_async(PlanExecutorEdge()),
mapOf(
DataAgentSpec.Graph.Node.SQL_GENERATION to DataAgentSpec.Graph.Node.SQL_GENERATION,
END to END,
)
)
.addEdge(DataAgentSpec.Graph.Node.SQL_GENERATION, DataAgentSpec.Graph.Node.SQL_EXECUTION)
.addEdge(DataAgentSpec.Graph.Node.SQL_EXECUTION, END)并且状态策略里显式注册了:
map[DataAgentSpec.Graph.StateKey.Execution.SQL_GENERATION_RESULT] = ReplaceStrategy()
map[DataAgentSpec.Graph.StateKey.Execution.SQL_EXECUTION_RESULT] = ReplaceStrategy()这保证每轮执行都会覆盖旧值,不会把历史 SQL 结果混进新任务。
6. 前端 A2UI:SQL 节点如何展示
前端在 App.vue 里监听 artifact-update,根据 artifact.name 和 outputType 更新步骤状态:
if (artifactName) {
const outputType = String(artifact.metadata?.outputType ?? '')
const status: GraphStep['status'] =
outputType === DATA_AGENT_ARTIFACT_OUTPUT.GRAPH_NODE_FINISHED ? 'success' : 'pending'
upsertStep(artifactName, text, status, data)
}sql-generation-node-card.vue 读取 SQL_GENERATE_OUTPUT 展示 SQL 文本;sql-execution-node-card.vue 读取 SQL_EXECUTE_OUTPUT.resultSet 渲染表格。
这种设计的优点是:后端只管产出标准状态,前端按节点协议自动呈现,新增节点也容易平滑接入。
7. 工程注意事项(建议你现在就修)
方言与执行引擎不一致
当前 SQL 生成默认dialect = "mysql",但执行器是 SQLite 数据源。
建议改为根据databaseId动态推断方言,避免函数/引号规则冲突。异常兜底不足
SqlExecuteNode当前直接执行executeQuery(sql),遇到 SQL 错误会抛异常中断。
建议把异常写入errorMsg,并路由到后续“SQL 错误修复节点(sql-error-fixer)”。执行输出覆盖策略
目前PLAN_EXECUTE_NODE_OUTPUT每次回填mutableMapOf("step_$currentStep" -> ...)。
如果后续要保留全链路历史,建议改为 merge 策略,避免只保留单步视图。
8. 小结
到这里,我们已经把“计划”变成了“可执行 SQL + 结构化结果”:
PlanExecuteNode负责挑步骤SqlGeneratorNode负责写 SQLSqlExecuteNode负责跑 SQL 并标准化输出- 前端 A2UI 负责流式可视化
下一篇我们进入 07 Python 沙箱执行分析,把 SQL 结果继续交给 Python 节点做高级计算和可视化分析,完成从“查询”到“洞察”的跃迁。
