专栏:《Java后端工程师进阶之路》(Day 14 / 90) 主题:从零搭建流式AI聊天后端:SSE协议原理 + WebFlux响应式流 + 前端EventSource接收
一、当用户盯着“转圈圈”时,我们在浪费什么?
流式输出的本质不是炫技,而是对齐用户心理预期。人眼对“有反馈的等待”容忍度极高,对“死寂的等待”零容忍。今天这篇,我们就把 WebFlux + SSE + Spring AI 流式对话接口的完整实现拆清楚。
二、先搞懂 SSE:比 WebSocket 更省事的“服务器单向推送”
SSE(Server-Sent Events)是一种让服务器向浏览器单向推送文本流的标准协议。它和 WebSocket 最大的区别:
| 特性 | SSE | WebSocket |
|---|---|---|
| 通信方向 | 服务器→客户端单向 | 全双工 |
| 协议层 | 基于 HTTP,天然支持心跳/重连 | 需要单独握手 |
| 数据格式 | 纯文本,默认 UTF-8 | 二进制/文本皆可 |
| 适用场景 | 流式输出、股票行情、日志推送 | 即时聊天、游戏、协同编辑 |
AI 流式对话只需要服务器推给客户端,SSE 完全够用,而且心智负担小得多。Spring WebFlux 对 SSE 的支持非常自然,返回Flux<ServerSentEvent<String>>即可。
三、代码实战一:最简 SSE 接口,先把“流”跑起来
依赖(Spring Boot 3.2.x):
<parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>3.2.5</version> </parent> <dependencies> <!-- WebFlux --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-webflux</artifactId> </dependency> </dependencies>最简 SSE 控制器:
import org.springframework.http.MediaType; import org.springframework.http.codec.ServerSentEvent; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RestController; import reactor.core.publisher.Flux; import java.time.Duration; @RestController public class SseDemoController { /** * 每 200ms 推送一个数字,模拟后端持续产生数据 */ @GetMapping(value = "/sse/numbers", produces = MediaType.TEXT_EVENT_STREAM_VALUE) public Flux<ServerSentEvent<String>> numberStream() { return Flux.interval(Duration.ofMillis(200)) .take(20) // 只推送 20 条,避免演示时停不下来 .map(seq -> ServerSentEvent.<String>builder() .id(String.valueOf(seq)) .event("number") .data("当前序号:" + seq) .comment("心跳保持") // 浏览器端不会触发 onmessage,但能保持连接活性 .build()); } }注意几个细节:
produces = MediaType.TEXT_EVENT_STREAM_VALUE把响应头设为Content-Type: text/event-stream。ServerSentEvent.builder()可以显式设置id、event、data、comment,这是 SSE 协议的标准字段。Flux.interval是冷的还是热的?它是热的,订阅后按时间轴发数据,适合模拟真实推送。
启动后,浏览器访问 http://localhost:8080/sse/numbers,如果看到一段段data:开头的文本,说明 SSE 通路已通。
四、代码实战二:接入 Spring AI,实现真正的流式对话
Spring AI 从 0.8 开始就支持stream()方法,返回Flux<String>或Flux<ChatResponse>。我们把它桥接到 SSE。
<dependency> <groupId>org.springframework.ai</groupId> <artifactId>spring-ai-openai-spring-boot-starter</artifactId> <version>1.0.0-M1</version> </dependency> <!-- 如果你用通义千问 / DeepSeek,换成对应的 starter 即可 -->配置文件application.yml:
spring: ai: openai: api-key: ${OPENAI_API_KEY} base-url: https://api.openai.com/v1 # 国内环境通常需要代理或中转 chat: options: model: gpt-4o-mini temperature: 0.7流式对话控制器:
import org.springframework.ai.chat.client.ChatClient; import org.springframework.ai.chat.messages.UserMessage; import org.springframework.ai.chat.model.ChatResponse; import org.springframework.ai.chat.prompt.Prompt; import org.springframework.http.MediaType; import org.springframework.http.codec.ServerSentEvent; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.RestController; import reactor.core.publisher.Flux; @RestController public class ChatStreamController { private final ChatClient chatClient; public ChatStreamController(ChatClient.Builder chatClientBuilder) { this.chatClient = chatClientBuilder.build(); } @GetMapping(value = "/ai/chat/stream", produces = MediaType.TEXT_EVENT_STREAM_VALUE) public Flux<ServerSentEvent<String>> chatStream(@RequestParam String question) { // 1. 构造 Prompt Prompt prompt = new Prompt(new UserMessage(question)); // 2. 调用大模型流式输出 Flux<ChatResponse> responseFlux = chatClient.prompt(prompt) .stream() .chatResponse(); // 3. 把 ChatResponse 转成 SSE 事件 return responseFlux .map(resp -> { String content = resp.getResult() != null && resp.getResult().getOutput() != null ? resp.getResult().getOutput().getContent() : ""; return ServerSentEvent.<String>builder() .event("message") .data(content) .build(); }) // 4. 流结束时推送一个 [DONE] 标记,前端好做收尾 .concatWith(Flux.just( ServerSentEvent.<String>builder() .event("done") .data("[DONE]") .build() )) // 5. 异常处理:不要让整个流直接断掉 .onErrorResume(e -> Flux.just( ServerSentEvent.<String>builder() .event("error") .data("调用模型失败:" + e.getMessage()) .build() )); } }这里有几个老兵经验:
- 不要直接返回
Flux<String>给前端。用ServerSentEvent包装后,前端可以通过event字段区分“正常内容”“结束标记”“错误消息”。 - 一定要处理空 token。大模型流式接口有时会发空包,直接
getContent()可能 NPE。 - 错误处理用
onErrorResume。如果模型 API 超时或限流,整个Flux会终止,必须给用户一个错误事件,而不是浏览器半天没反应。
五、代码实战三:前端 EventSource 接收与渲染
SSE 在浏览器端用原生EventSource即可,无需额外库:
<!DOCTYPE html> <html> <head> <meta charset="UTF-8"> <title>AI 流式对话</title> </head> <body> <input id="question" type="text" value="用 Java 写一个线程安全的单例模式" style="width: 400px;"> <button onclick="startChat()">发送</button> <pre id="answer" style="background:#f5f5f5;padding:16px;border-radius:6px;min-height:60px;"></pre> <script> function startChat() { const question = document.getElementById('question').value; const answerEl = document.getElementById('answer'); answerEl.textContent = ''; // 注意:EventSource 只支持 GET,且不能自定义请求头 const evtSource = new EventSource(`/ai/chat/stream?question=${encodeURIComponent(question)}`); evtSource.addEventListener('message', (event) => { answerEl.textContent += event.data; }); evtSource.addEventListener('done', () => { console.log('流式输出完成'); evtSource.close(); }); evtSource.addEventListener('error', (event) => { console.error('发生错误', event.data); evtSource.close(); }); // 浏览器自动重连:如果连接断开,EventSource 会按指数退避重试 // 生产环境建议配合 last-event-id 做断点续传 } </script> </body> </html>前端有三个注意点:
EventSource只支持 GET,参数放在 URL 里。如果需要 POST 或复杂请求头,得用fetch + ReadableStream自己解析 SSE。- 收到
done事件后一定要close(),否则浏览器会按 SSE 规范自动重连。 - 生产环境可以利用
Last-Event-ID做断线续传,但这需要后端在ServerSentEvent中设置id()。
六、原理图解:一条 AI 回复是怎么“流”到前端的
┌─────────────┐ HTTP GET/SSE ┌─────────────────────┐ │ 浏览器 │ ◄───────────────────── │ Spring WebFlux │ │ EventSource │ text/event-stream │ ChatStreamController│ └─────────────┘ └──────────┬──────────┘ │ ▼ ┌──────────────────┐ │ Spring AI │ │ ChatClient │ └──────────┬───────┘ │ ▼ ┌───────────────┐ │ 大模型 API │ │ stream() │ └───────────────┘整个链路是异步非阻塞的:
- WebFlux 用 Reactor Netty 接收请求,不会为每个连接占一个线程。
- Spring AI 调用大模型流式接口,拿到
Flux<ChatResponse>。 - 每个 token 到达后,立即通过 SSE 推送给浏览器。
- 浏览器逐字渲染,用户感知不到后端等待。
在高并发场景下,虚拟线程 + WebFlux + SSE 的组合,能让一台 4C8G 的机器轻松支撑几千个并发长连接。换成传统 Servlet + 每个连接一个线程的方案,线程数早就被打爆了。
七、建议
超时与限流必须做。大模型 API 不是你家 MySQL,延迟波动极大。给
Flux加上.timeout(Duration.ofSeconds(30)),网关层配 Token 桶限流,避免一个慢请求拖垮连接池。日志不要打印完整流。我见过有人在
doOnNext里把每个 token 都打印出来,结果 3000 字的回复打了 3000 行日志,ELK 直接爆表。只记录首包时间和总 token 数即可。简单问题别用流式。如果业务场景是“生成一句话摘要”,直接同步返回更省资源。流式更适合长文本、低延迟感知、强交互感的场景。
八、结尾
AI 时代,接口设计正在从“请求-响应”进化成“请求-流”。用户体验的差距,往往不在模型本身,而在你能不能让用户“看见”答案正在生成。
下一篇预告:Day 15《Spring IOC 容器启动全流程:从 ApplicationContext 到 Bean 实例化》,我们正式进入 Spring 源码深水区。