Day14-SpringWebFlux与SSE实现AI流式对话接口
2026/7/4 12:08:16 网站建设 项目流程

专栏:《Java后端工程师进阶之路》(Day 14 / 90) 主题:从零搭建流式AI聊天后端:SSE协议原理 + WebFlux响应式流 + 前端EventSource接收

一、当用户盯着“转圈圈”时,我们在浪费什么?

流式输出的本质不是炫技,而是对齐用户心理预期。人眼对“有反馈的等待”容忍度极高,对“死寂的等待”零容忍。今天这篇,我们就把 WebFlux + SSE + Spring AI 流式对话接口的完整实现拆清楚。


二、先搞懂 SSE:比 WebSocket 更省事的“服务器单向推送”

SSE(Server-Sent Events)是一种让服务器向浏览器单向推送文本流的标准协议。它和 WebSocket 最大的区别:

特性SSEWebSocket
通信方向服务器→客户端单向全双工
协议层基于 HTTP,天然支持心跳/重连需要单独握手
数据格式纯文本,默认 UTF-8二进制/文本皆可
适用场景流式输出、股票行情、日志推送即时聊天、游戏、协同编辑

AI 流式对话只需要服务器推给客户端,SSE 完全够用,而且心智负担小得多。Spring WebFlux 对 SSE 的支持非常自然,返回Flux<ServerSentEvent<String>>即可。


三、代码实战一:最简 SSE 接口,先把“流”跑起来

依赖(Spring Boot 3.2.x):

<parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>3.2.5</version> </parent> <dependencies> <!-- WebFlux --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-webflux</artifactId> </dependency> </dependencies>

最简 SSE 控制器:

import org.springframework.http.MediaType; import org.springframework.http.codec.ServerSentEvent; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RestController; import reactor.core.publisher.Flux; import java.time.Duration; @RestController public class SseDemoController { /** * 每 200ms 推送一个数字,模拟后端持续产生数据 */ @GetMapping(value = "/sse/numbers", produces = MediaType.TEXT_EVENT_STREAM_VALUE) public Flux<ServerSentEvent<String>> numberStream() { return Flux.interval(Duration.ofMillis(200)) .take(20) // 只推送 20 条,避免演示时停不下来 .map(seq -> ServerSentEvent.<String>builder() .id(String.valueOf(seq)) .event("number") .data("当前序号:" + seq) .comment("心跳保持") // 浏览器端不会触发 onmessage,但能保持连接活性 .build()); } }

注意几个细节:

  • produces = MediaType.TEXT_EVENT_STREAM_VALUE把响应头设为Content-Type: text/event-stream
  • ServerSentEvent.builder()可以显式设置ideventdatacomment,这是 SSE 协议的标准字段。
  • Flux.interval是冷的还是热的?它是热的,订阅后按时间轴发数据,适合模拟真实推送。

启动后,浏览器访问 http://localhost:8080/sse/numbers,如果看到一段段data:开头的文本,说明 SSE 通路已通。


四、代码实战二:接入 Spring AI,实现真正的流式对话

Spring AI 从 0.8 开始就支持stream()方法,返回Flux<String>Flux<ChatResponse>。我们把它桥接到 SSE。

<dependency> <groupId>org.springframework.ai</groupId> <artifactId>spring-ai-openai-spring-boot-starter</artifactId> <version>1.0.0-M1</version> </dependency> <!-- 如果你用通义千问 / DeepSeek,换成对应的 starter 即可 -->

配置文件application.yml

spring: ai: openai: api-key: ${OPENAI_API_KEY} base-url: https://api.openai.com/v1 # 国内环境通常需要代理或中转 chat: options: model: gpt-4o-mini temperature: 0.7

流式对话控制器:

import org.springframework.ai.chat.client.ChatClient; import org.springframework.ai.chat.messages.UserMessage; import org.springframework.ai.chat.model.ChatResponse; import org.springframework.ai.chat.prompt.Prompt; import org.springframework.http.MediaType; import org.springframework.http.codec.ServerSentEvent; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.RestController; import reactor.core.publisher.Flux; @RestController public class ChatStreamController { private final ChatClient chatClient; public ChatStreamController(ChatClient.Builder chatClientBuilder) { this.chatClient = chatClientBuilder.build(); } @GetMapping(value = "/ai/chat/stream", produces = MediaType.TEXT_EVENT_STREAM_VALUE) public Flux<ServerSentEvent<String>> chatStream(@RequestParam String question) { // 1. 构造 Prompt Prompt prompt = new Prompt(new UserMessage(question)); // 2. 调用大模型流式输出 Flux<ChatResponse> responseFlux = chatClient.prompt(prompt) .stream() .chatResponse(); // 3. 把 ChatResponse 转成 SSE 事件 return responseFlux .map(resp -> { String content = resp.getResult() != null && resp.getResult().getOutput() != null ? resp.getResult().getOutput().getContent() : ""; return ServerSentEvent.<String>builder() .event("message") .data(content) .build(); }) // 4. 流结束时推送一个 [DONE] 标记,前端好做收尾 .concatWith(Flux.just( ServerSentEvent.<String>builder() .event("done") .data("[DONE]") .build() )) // 5. 异常处理:不要让整个流直接断掉 .onErrorResume(e -> Flux.just( ServerSentEvent.<String>builder() .event("error") .data("调用模型失败:" + e.getMessage()) .build() )); } }

这里有几个老兵经验:

  1. 不要直接返回Flux<String>给前端。ServerSentEvent包装后,前端可以通过event字段区分“正常内容”“结束标记”“错误消息”。
  2. 一定要处理空 token。大模型流式接口有时会发空包,直接getContent()可能 NPE。
  3. 错误处理用onErrorResume如果模型 API 超时或限流,整个Flux会终止,必须给用户一个错误事件,而不是浏览器半天没反应。

五、代码实战三:前端 EventSource 接收与渲染

SSE 在浏览器端用原生EventSource即可,无需额外库:

<!DOCTYPE html> <html> <head> <meta charset="UTF-8"> <title>AI 流式对话</title> </head> <body> <input id="question" type="text" value="用 Java 写一个线程安全的单例模式" style="width: 400px;"> <button onclick="startChat()">发送</button> <pre id="answer" style="background:#f5f5f5;padding:16px;border-radius:6px;min-height:60px;"></pre> <script> function startChat() { const question = document.getElementById('question').value; const answerEl = document.getElementById('answer'); answerEl.textContent = ''; // 注意:EventSource 只支持 GET,且不能自定义请求头 const evtSource = new EventSource(`/ai/chat/stream?question=${encodeURIComponent(question)}`); evtSource.addEventListener('message', (event) => { answerEl.textContent += event.data; }); evtSource.addEventListener('done', () => { console.log('流式输出完成'); evtSource.close(); }); evtSource.addEventListener('error', (event) => { console.error('发生错误', event.data); evtSource.close(); }); // 浏览器自动重连:如果连接断开,EventSource 会按指数退避重试 // 生产环境建议配合 last-event-id 做断点续传 } </script> </body> </html>

前端有三个注意点:

  • EventSource只支持 GET,参数放在 URL 里。如果需要 POST 或复杂请求头,得用fetch + ReadableStream自己解析 SSE。
  • 收到done事件后一定要close(),否则浏览器会按 SSE 规范自动重连。
  • 生产环境可以利用Last-Event-ID做断线续传,但这需要后端在ServerSentEvent中设置id()

六、原理图解:一条 AI 回复是怎么“流”到前端的

┌─────────────┐ HTTP GET/SSE ┌─────────────────────┐ │ 浏览器 │ ◄───────────────────── │ Spring WebFlux │ │ EventSource │ text/event-stream │ ChatStreamController│ └─────────────┘ └──────────┬──────────┘ │ ▼ ┌──────────────────┐ │ Spring AI │ │ ChatClient │ └──────────┬───────┘ │ ▼ ┌───────────────┐ │ 大模型 API │ │ stream() │ └───────────────┘

整个链路是异步非阻塞的:

  1. WebFlux 用 Reactor Netty 接收请求,不会为每个连接占一个线程。
  2. Spring AI 调用大模型流式接口,拿到Flux<ChatResponse>
  3. 每个 token 到达后,立即通过 SSE 推送给浏览器。
  4. 浏览器逐字渲染,用户感知不到后端等待。

在高并发场景下,虚拟线程 + WebFlux + SSE 的组合,能让一台 4C8G 的机器轻松支撑几千个并发长连接。换成传统 Servlet + 每个连接一个线程的方案,线程数早就被打爆了。


七、建议

  1. 超时与限流必须做。大模型 API 不是你家 MySQL,延迟波动极大。给Flux加上.timeout(Duration.ofSeconds(30)),网关层配 Token 桶限流,避免一个慢请求拖垮连接池。

  2. 日志不要打印完整流。我见过有人在doOnNext里把每个 token 都打印出来,结果 3000 字的回复打了 3000 行日志,ELK 直接爆表。只记录首包时间和总 token 数即可。

  3. 简单问题别用流式。如果业务场景是“生成一句话摘要”,直接同步返回更省资源。流式更适合长文本、低延迟感知、强交互感的场景。


八、结尾

AI 时代,接口设计正在从“请求-响应”进化成“请求-流”。用户体验的差距,往往不在模型本身,而在你能不能让用户“看见”答案正在生成。

下一篇预告:Day 15《Spring IOC 容器启动全流程:从 ApplicationContext 到 Bean 实例化》,我们正式进入 Spring 源码深水区。

需要专业的网站建设服务?

联系我们获取免费的网站建设咨询和方案报价,让我们帮助您实现业务目标

立即咨询