01 跑通 Toy Graph 最小链路
01 跑通 Toy Graph 最小链路
这一篇只做一个结果:
发送一句话,后端跑一个最小 Graph,A2A 把 Graph 节点输出转成流式 artifact,前端实时渲染出来。
本文对应分支:chapter-02-01
核心代码位置:
data-agent-backend/src/main/kotlin/io/github/qifan777/server/graph/GraphConfiguration.ktdata-agent-backend/src/main/kotlin/io/github/qifan777/server/graph/nodes/ToyHelloNode.ktdata-agent-backend/src/main/kotlin/io/github/qifan777/server/a2a/ToyAgentExecutor.ktdata-agent-frontend/src/App.vue
本文目标
- 创建
START -> TOY_HELLO_NODE -> END最小图。 - 让 Graph 节点直接产出 Spring AI 的流式结果。
- 让 A2A 执行器把
NodeOutput翻译成前端可消费的artifact-update。 - 在前端看到真正的流式文本增长,而不是一次性整块返回。
1. 这次提交到底做了什么
如果上一章的 toy A2A 是:
收到文本 -> 直接 addArtifact("hello from a2a")
那这次提交变成了:
收到文本 -> 跑 Graph -> Graph 节点里调用 Spring AI -> Graph 产生 StreamingOutput -> A2A 转成 artifact-update -> 前端逐段渲染
也就是说,这次真正打通了 3 层:
- Spring AI Alibaba Graph 编排
- A2A 协议流式输出
- Vue 前端流式消费
2. 依赖层先补齐
这次后端关键依赖是下面几组:
implementation(platform("org.springframework.ai:spring-ai-bom:1.1.2"))
implementation("org.springframework.ai:spring-ai-starter-model-openai")
implementation("com.alibaba.cloud.ai:spring-ai-alibaba-graph-core:1.1.2.2")
implementation("io.github.a2asdk:a2a-java-sdk-transport-jsonrpc:0.3.2.Final")它们分别负责:
- Spring AI 模型调用能力
- OpenAI 兼容协议模型接入
- Graph 编排与流式节点输出
- A2A JSON-RPC 传输
同时,application.yml 里还补了模型配置:
spring:
ai:
openai:
base-url: openai协议baseurl(不带v1)
api-key:
chat:
options:
model:这意味着 ToyHelloNode 里的 ChatModel 已经不再是伪对象,而是真正会去调模型。
3. 第一步:先把 Graph 搭成最小固定链路
GraphConfiguration 很短,但它定义了整个执行骨架:
@Configuration
open class GraphConfiguration {
@Bean
open fun toyHelloGraph(toyHelloNode: ToyHelloNode): StateGraph {
return StateGraph()
.addNode("TOY_HELLO_NODE", AsyncNodeAction.node_async(toyHelloNode))
.addEdge(StateGraph.START, "TOY_HELLO_NODE")
.addEdge("TOY_HELLO_NODE", StateGraph.END)
}
}这里有 3 个关键点:
- 图里只有一个业务节点:
TOY_HELLO_NODE - 节点通过
AsyncNodeAction.node_async(...)挂进去 - 整个图是固定单链路:
START -> TOY_HELLO_NODE -> END
第一版不要急着做分支、条件跳转、循环。
先保证最简单的一条线能稳定跑通,后面再逐渐增加节点。
4. 第二步:让 Graph 节点直接返回流
这次提交最值得注意的地方在 ToyHelloNode:
@Component
class ToyHelloNode(private val chatModel: ChatModel) : NodeAction {
override fun apply(state: OverAllState): Map<String, Any> {
val input = state.value("input", "")
val flux = ChatClient.create(chatModel)
.prompt()
.user(input)
.options(OpenAiChatOptions.builder().extraBody(mapOf("enable_thinking" to false)).build())
.stream()
.chatResponse()
return mapOf("output" to flux)
}
}这里和“普通 Node 返回字符串”最大的不同是:
- 节点从
state里取输入 - 直接调用
ChatClient - 用
.stream().chatResponse()拿到一个Flux - 把这个
Flux放回 Graph 状态里
这一步非常重要,因为它决定了后面的 Graph 不只是“图结构”,而是“图里每个节点都可以天然产出流”。
你可以先把它理解成一句话:
ToyHelloNode 不是在节点里等模型完整返回后再交给 Graph,而是把模型的流式响应源头直接交给 Graph。
5. 第三步:A2A 执行器接管 Graph 输出
真正把 Graph 和 A2A 接起来的是 ToyAgentExecutor:
@Component
class ToyAgentExecutor(private val stateGraph: StateGraph) : AgentExecutor {
override fun execute(
context: RequestContext?,
eventQueue: EventQueue?
) {
val taskUpdater = TaskUpdater(context, eventQueue)
val artifactNum = AtomicInteger()
fun handleNodeOutput(nodeOutput: NodeOutput) {
if (nodeOutput is StreamingOutput<*>) {
when (nodeOutput.outputType) {
OutputType.GRAPH_NODE_STREAMING -> taskUpdater.addArtifact(
listOf(TextPart(nodeOutput.message().text)),
artifactNum.incrementAndGet().toString(),
nodeOutput.node(),
mapOf("outputType" to nodeOutput.outputType)
)
OutputType.GRAPH_NODE_FINISHED -> taskUpdater.addArtifact(
listOf(DataPart(nodeOutput.state().data())),
artifactNum.incrementAndGet().toString(),
nodeOutput.node(),
mapOf("outputType" to nodeOutput.outputType)
)
else -> {}
}
} else {
taskUpdater.addArtifact(
listOf(DataPart(nodeOutput.state().data())),
artifactNum.incrementAndGet().toString(),
nodeOutput.node(),
mapOf()
)
}
}
val text = ((context?.message?.parts?.first { it is TextPart }) as TextPart).text
stateGraph.compile().stream(mapOf("input" to text))
.doOnNext(::handleNodeOutput)
.doOnComplete(taskUpdater::complete)
.blockLast()
}
}这段代码建议分 4 步理解。
5.1 从 A2A 消息里拿输入
val text = ((context?.message?.parts?.first { it is TextPart }) as TextPart).text也就是说,A2A 用户消息里的第一段文本,会被作为 Graph 的 input 初始状态传进去。
5.2 让 Graph 开始流式执行
stateGraph.compile().stream(mapOf("input" to text))这行代码的含义是:
- 编译 StateGraph
- 用
input作为初始状态 - 不是
invoke()一次性执行,而是stream()连续拿输出
这就是后面能在前端看到一段段文本增长的根源。
5.3 把 Graph 的 NodeOutput 翻译成 A2A artifact
这里是整篇文章最核心的映射关系:
nodeOutput.node()->artifact.namenodeOutput.message().text->TextPartnodeOutput.state().data()->DataPartnodeOutput.outputType->artifact.metadata.outputType
尤其是 GRAPH_NODE_STREAMING 和 GRAPH_NODE_FINISHED 这两个分支:
OutputType.GRAPH_NODE_STREAMING -> TextPart(...)
OutputType.GRAPH_NODE_FINISHED -> DataPart(...)这相当于告诉前端:
- 流式阶段:你拿到的是持续追加的文本片段
- 节点结束阶段:你拿到的是该节点最终状态快照
5.4 任务结束时显式完成
.doOnComplete(taskUpdater::complete)没有这句,前端就可能一直处于 pending。
所以 Graph 跑完不等于 A2A 任务结束,A2A 任务完成态需要你显式推送出去。
6. 前端为什么能看到“流式增长”
前端代码这次其实没怎么变,但它刚好能承接这次 Graph+A2A 的升级。
App.vue 的核心逻辑是:
const stream = client.value.sendMessageStream({
message: {
messageId: crypto.randomUUID(),
role: 'user',
kind: 'message',
parts: [{ kind: 'text', text: userInput.value }],
},
})
for await (const event of stream) {
if (event.kind === 'artifact-update') {
const artifact = event.artifact
if (artifact.name !== 'TOY_HELLO_NODE') continue
const textPart = artifact.parts.find((p) => p.kind === 'text')
if (textPart) {
currentStep.name = artifact.name
currentStep.content += textPart.text
currentStep.status = 'pending'
}
}
if (event.kind === 'status-update' && event.status.state === 'completed') {
currentStep.status = 'success'
}
}关键点只有两个:
- 只消费
TOY_HELLO_NODE的 artifact - 用
currentStep.content += textPart.text做增量拼接
所以只要后端持续发 GRAPH_NODE_STREAMING 对应的 TextPart,页面文本就会一点点增长。
换句话说,这次流式体验不是前端“主动轮询”出来的,而是:
LLM Flux -> Graph StreamingOutput -> A2A artifact-update -> Vue 累加渲染
7. 现在这套最小链路可以怎么记
你可以把整条链路记成下面这张脑图:
- 前端发送 A2A 消息
ToyAgentExecutor取出用户文本stateGraph.compile().stream(...)启动图ToyHelloNode调用 Spring AI,返回模型流- Graph 产出
StreamingOutput ToyAgentExecutor把它转成artifact-update- 前端命中
TOY_HELLO_NODE,把文本追加到页面 - Graph 执行完成后,A2A 发出
status-update(completed)
8. 启动与验证
启动后端:
rtk ./gradlew bootRun启动前端:
rtk pnpm dev先验一下 Agent Card:
rtk curl http://localhost:3500/api/.well-known/agent-card.json然后在页面输入一句话,比如:
介绍一下图编排
预期现象:
- 浏览器 Network 出现
/api/a2a/jsonrpc - 前端开始收到多次
artifact-update Toy Hello Node文本逐段增长- 最终收到
status-update(completed),状态变成success
9. 常见问题
1) Graph 跑了,但前端没有文本流
ToyHelloNode返回的不是流,而是一次性字符串- 执行器没有处理
OutputType.GRAPH_NODE_STREAMING - 前端没有对
TextPart做累加
2) 只有最终结果,没有中间流
- 节点里没有用
.stream().chatResponse() - 只处理了
GRAPH_NODE_FINISHED,没处理GRAPH_NODE_STREAMING
3) 页面一直 pending
taskUpdater.complete()没有被调用- Graph 执行异常后没有正常结束
本章完成标准
满足以下 3 条,就说明这一章真正跑通了:
- 后端图结构确实执行到了
TOY_HELLO_NODE - A2A 流里能看到来自 Graph 的多次
artifact-update - 前端页面能看到文本逐段增长,并在结束后变成
success
如果要把这章压缩成一句话,可以记成:
Spring AI 负责产出模型流,Graph 负责组织节点,A2A 负责把节点流送到前端。
