目录
一、SseEmitter基本方法
1、构造
2、发送数据
3、生命周期回调(清理资源)
4、结束连接
5、其他
二、SseEmitter的应用场景(优点):
1、配合Consumer,在业务进行中返回消息/进度。
2、AI 大模型流式对话(打字机效果)
3、实时消息与系统通知
三、SseEmitter和HTTP的对比
通信模式:
数据流向:
响应格式
连接状态:
断线重连:
适用场景:
四、SseEmitter的缺点
2、仅支持文本传输,不支持二进制
3、完全不兼容 IE 浏览器
4、连接管理与内存泄漏风险:
五、注意事项
1、生产环境的“隐形杀手”:代理服务器(Nginx)缓存
2、. 线程池的“大坑”:千万不要用默认线程池
3、 分布式部署的连接管理
六、关于前端的技术选型
1. 浏览器原生 EventSource
2. fetch API + ReadableStream
3. 第三方库 @microsoft/fetch-event-source( 强烈推荐)
核心差异对比与最终选型建议
首先了解一下SseEmitter的基本方法
一、SseEmitter基本方法
1、构造
(1)new SseEmitter /new SseEmitter(Long timeout):单位为毫秒
创建一个 SSE 连接对象。默认超时时间是 30 秒。
例如传入60000L表示60秒,0L表示永不超时(适合长订阅场景)。
(2)完整三参数构造(Spring 5.3+)
new SseEmitter(Long timeout ,Long writeTimeout,TimeZone timeZone);
timeout:连接总超时时间(毫秒)
writeTimeout:单次写数据超时(毫秒) |||| 即调用send()发送一条消息时最长允许阻塞等待时间 |||| 默认5000毫秒
timeZone:时间时区(一般传TimeZone.getDefault());
2、发送数据
(1)emitter.send(Object data)
(2)emitter.send(SseEmitter.event()...)
发送带有丰富元数据的 SSE 事件。这是最常用的方式,可以链式调用以下属性:
.name("事件名"):指定事件类型,前端可以通过 addEventListener('事件名') 单独监听。
.data(数据对象):实际要传输的业务数据。
.id("消息ID"):消息的唯一标识,用于断线重连时的消息追踪。
.reconnectTime(毫秒数):建议客户端在连接断开后,隔多久尝试重连。
.comment("注释"):发送注释(通常用于心跳保活,防止代理服务器断开空闲连接)。
示例:
// 1、最简单的纯文本推送(只包含 data) emitter.send(SseEmitter.event().data("这是一条简单的系统通知")); // 2、带事件名的业务推送(包含 event 和 data) emitter.send(SseEmitter.event() .name("APPROVAL_SUCCESS") .data("你的请假申请已被老板批准")); // 3、完整的防丢包推送(包含 id, retry, event, data) emitter.send(SseEmitter.event() .id("event_20260601_001") // 消息唯一ID,用于断线续传 .reconnectTime(3000L) // 断线后3秒重连 .name("NEW_MESSAGE") // 事件名称 .data(new ChatMessage("你好,这是一条新消息"))); // 实际数据对象 //4、 发送一个空注释作为心跳保活 emitter.send(SseEmitter.event().comment("ping"));
(3)send(Object data, MediaType mediaType)
mediaType的选用:
MediaType.TEXT_PLAIN :对应 HTTP 的 text/plain。 MediaType.APPLICATION_JSON:对应 HTTP 的 application/json。 MediaType.APPLICATION_XML:对应 HTTP 的 application/xml。 MediaType.APPLICATION_OCTET_STREAM:对应 HTTP 的 application/octet-stream。3、生命周期回调(清理资源)
emitter.onCompletion(Runnable):连接正常关闭(调用了 complete())时触发。
emitter.onTimeout(Runnable):连接超过设定时间未发送数据时触发。
emitter.onError(Consumer<Throwable>):连接发生异常(如客户端强行关闭)时触发。
可以用lambda表达式进行构造参数
例如:
(1)emitter.onCompletion(() -> emitters.remove(emitterId));//在SSE连接关闭后清理连接。
(2)
emitter.onCompletion(() -> { // 记录业务日志 log.info("用户 {} 的实时任务推送已正常结束", userId); // 更新数据库状态 taskService.updateStatus(userId, "FINISHED"); });//SSE连接关闭后记录该次任务完成。4、结束连接
(1)emitter.complete():主动正常关闭 SSE 连接。
(2)emitter.completeWithError(Throwable):主动以异常状态关闭连接。
(3)emitter.isComplete() / emitter.isExpired():判断连接是否已结束或已过期,在群发消息遍历集合时,可以用来过滤掉失效的连接。
可以使用Atomic原子类保证结束连接的原子性,例如:
private final AtomicBoolean isClosed = new AtomicBoolean(false); public void safeComplete() { // compareAndSet(false, true) 只有在当前值为 false 时才会将其设为 true,并返回 true if (isClosed.compareAndSet(false, true)) { emitter.complete(); } }注意:SSE(SseEmitter)的断线重连是无限次的,浏览器会一直尝试重连,直到你手动关闭页面或前端代码显式调用 .close() 方法为止。
5、其他
setTimeout(Long timeout):单位毫秒,动态修改超时时间。
二、SseEmitter的应用场景(优点):
了解一下它的应用场景,第一个示例展示了怎么使用
1、配合Consumer,在业务进行中返回消息/进度。
流程:
后端在控制层创建SseEmitter和Consumer回调
将Consumer传递给服务层,在服务层需要的时候进行回调传递给前端。
示例:
SseEmitter emitter=new SseEmitter(); Consumer<Event> eventCallback = event -> { try { emitter.send(SseEmitter.event() .name(event.getEventType()) .data(event)); } catch (IOException e) { log.error("发送 SSE 事件失败", e); emitters.remove(emitterId); }服务层如何使用:
对应用层传参的时候将eventCallback一并传入,在应用层只需要调用accept(Event event)即可回调传递消息给前端。(这里的event自己封装一下内容)
优点:
(1)跨层传递回调函数,将推送逻辑从 Controller 传递到 Engine,再传递到 Executor。以此闭包捕获 SseEmitter - Consumer 在定义时捕获了所在方法的局部变量(emitter)。
(2)解耦业务逻辑与推送逻辑 - 执行引擎不需要知道 SSE,只需要调用回调。
2、AI 大模型流式对话(打字机效果)
场景描述:当用户与大模型对话时,AI 的回复不是一下子全部返回的,而是流式输出。
为什么用 SseEmitter:大模型生成文本需要时间,后端每生成一段文字,就立刻通过 emitter.send() 推给前端。这种极低的延迟感提升了用户体验,对比WebSocket减少了运维成本,降低了开发复杂度。
3、实时消息与系统通知
场景描述:
站内信/消息提醒:右上角的小红点,当有新订单、新评论或系统公告时,无需刷新页面立刻弹出提示。
审批流通知:你的请假申请被老板通过了,页面立刻弹出“审批通过”的提示框。
为什么用 SseEmitter:替代了传统的“前端每x秒轮询一次接口”的方式。SSE 只有在真正有消息时才推送,极大地节省了服务器的 CPU 和网络带宽资源。
轮询:
(1)高频的短连接会让服务器的 Web 容器频繁地创建和销毁线程来处理请求。在高并发下,这会导致严重的 CPU 上下文切换,甚至把服务器的线程池耗尽。
(2)每一次轮询,客户端和服务器都要重新经历一次完整的“握手”流程:建立 TCP 连接、TLS 加密协商、传输完整的 HTTP 请求头。这些头部信息往往比实际的响应内容还要大,造成了极大的带宽浪费。
SSE:
(1)虽然连接一直挂着,但 SSE 连接在挂起状态下几乎不消耗 CPU。现代服务器维护成千上万个空闲的长连接,占用的仅仅是极少的内存,而不是宝贵的 CPU 时间片。
(2)只需要最开始建立连接时握手一次。
三、SseEmitter和HTTP的对比
与同样常用的通信模式HTTP做对比,也可以和WebSocket对比一下
通信模式:
HTTP:短连接
客户端发一次请求,服务端回一次响应,连接立刻断开。
SseEmitter:长连接
建立一次 HTTP 连接后,服务端可以源源不断地向客户端推送数据。
数据流向:
HTTP:单向拉取
只能由客户端主动发起,服务端无法主动给客户端发消息。
SseEmitter:单向推送
服务端可以主动向客户端推送数据(客户端发数据需另开普通 HTTP 请求)。
响应格式
HTTP:完整的 HTML、JSON 或文件等,响应体一次性返回。
SseEmitter:固定为 text/event-stream 格式的纯文本流,数据以 data: ... 的形式持续流出。
连接状态:
HTTP:无状态、无连接
每次请求都是独立的事务,服务器不保留连接状态。
SseEmitter:持久连接
依赖 HTTP 的 Connection: keep-alive,连接会一直保持直到业务结束或超时。
断线重连:
HTTP:无
如果请求失败,需要前端手动重新发起请求。
SseEmitter:
浏览器原生支持
前端 EventSource 会自动在断线后尝试重连,无需写额外代码。
适用场景:
HTTP:网页浏览、表单提交、常规的增删改查接口。
SseEmitter:AI 打字机流式输出、实时任务进度条、股票行情刷新、系统消息通知。
双向实时交互选WebSocket。
四、SseEmitter的缺点
这之后讲一讲SSE的缺点和注意事项
1、单向通信:
SSE 只能由服务器向客户端单向推送数据。如果客户端需要频繁向服务器发送消息(比如实时聊天、在线协作编辑),SSE 就无法胜任,必须依赖 WebSocket 或额外的 HTTP 请求。
2、仅支持文本传输,不支持二进制
SSE 只能推送 UTF-8 文本。如果你需要推送图片流、文件流或视频帧,必须先将二进制数据做 Base64 编码,这会导致数据体积膨胀约 33%,严重消耗带宽并增加编解码的 CPU 开销。
3、完全不兼容 IE 浏览器
4、连接管理与内存泄漏风险:
SseEmitter 会在服务器内存中维持长连接。如果客户端异常断开(如直接关闭浏览器、断网),而服务器端没有通过 onCompletion、onTimeout 和 onError 等回调及时清理资源,这些“僵尸连接”会一直占用内存,最终可能导致服务器内存溢出。
五、注意事项
1、生产环境的“隐形杀手”:代理服务器(Nginx)缓存
这是新手使用 SSE 最容易遇到的坑。在本地开发时一切正常,但一部署到测试或生产环境(通常前面挂着 Nginx),流式输出就会失效,变成“卡住很久,然后一次性吐出所有数据”。
原因:Nginx 等反向代理默认会开启缓冲(Buffering),它会傻傻地等后端把数据攒够了一大块,才一次性发给前端。
解决方案:必须在 Nginx 的配置中,针对 SSE 的接口路径关闭缓冲。
location /api/sse { proxy_pass http://backend; proxy_http_version 1.1; proxy_set_header Connection ''; proxy_buffering off; # 核心配置:关闭代理缓冲,实现真正的实时流 proxy_cache off; # 关闭缓存 }2、. 线程池的“大坑”:千万不要用默认线程池
在代码中经常会用异步线程去执行耗时任务并推送 SSE。不要直接使用CompletableFuture.runAsync(...)或 Spring 默认的线程池。
原因:CompletableFuture 默认使用的是 ForkJoinPool.commonPool(),这个全局共享线程池的线程数非常少(通常等于 CPU 核心数 - 1)。如果高并发下有几个 SSE 长连接任务占用了线程,会导致整个应用的其他异步任务全部卡死。
解决方案:为 SSE 业务自定义一个有界线程池(明确指定线程数、队列大小和拒绝策略),做到资源隔离。
3、 分布式部署的连接管理
用 Map<String, SseEmitter> 来存储连接,这在单机部署时完全没问题。
多台服务器:一旦需要集群部署(多台服务器),用户的 SSE 连接可能建立在 A 机器上,但触发推送的业务消息可能由 B 机器处理。此时 B 机器拿着emitterId在自己的内存 Map 里是找不到对应连接的。
解决方案:在分布式场景下,需要引入 Redis 等中间件。当业务触发推送时,先将消息发布到 Redis 的特定频道(Pub/Sub)或消息队列(如 Kafka/RocketMQ),各个服务节点订阅消息后,判断目标用户是否连接在本机上,如果是,再进行本地的 emitter.send() 推送。
六、关于前端的技术选型
1. 浏览器原生EventSource
这是最传统、最轻量级的方案,浏览器内置了完整的 SSE 协议解析能力。
核心特点:零依赖、开箱即用、浏览器自动处理断线重连。
代码示例:
const eventSource = new EventSource('/api/sse-connection'); // 监听普通消息(后端未指定 event name 时) eventSource.onmessage = (event) => { console.log('收到数据:', event.data); }; // 监听指定事件名的消息(对应后端 .name("xxx")) eventSource.addEventListener('APPROVAL_SUCCESS', (event) => { console.log('审批通过:', event.data); }); // 监听连接错误 eventSource.onerror = (error) => { console.error('连接发生错误:', error); // 浏览器会自动尝试重连,这里通常只需要记录日志 };缺点: 仅支持 GET 请求:无法在请求体(Body)中传递复杂参数。 无法自定义请求头:除了 Cookie(需开启 withCredentials),无法在 Header 中携带 Token 等鉴权信息。
适用场景:简单的实时通知、股票行情、运维大屏等不需要携带复杂鉴权信息或请求参数的公开或 Cookie 鉴权接口。
2. fetch API + ReadableStream
这是现代浏览器提供的原生流式请求方案,完全抛弃了 SSE 的协议封装,由前端手动解析流数据。
特点:极高的自由度,支持 POST、自定义 Header、完美配合 AbortController 中断请求。
代码示例:
const controller = new AbortController(); fetch('/api/sse-connection', { method: 'POST', // 支持 POST headers: { 'Content-Type': 'application/json', 'Authorization': 'Bearer YOUR_TOKEN' // 支持自定义 Header }, body: JSON.stringify({ prompt: '你好' }), signal: controller.signal // 支持随时中断 }) .then(response => { const reader = response.body.getReader(); const decoder = new TextDecoder('utf-8'); let buffer = ''; // 循环读取流数据 function readStream() { reader.read().then(({ done, value }) => { if (done) { console.log('流传输结束'); return; } // 将二进制流解码为文本 buffer += decoder.decode(value, { stream: true }); // 手动按 \n\n 分割并解析 SSE 格式 (data: xxx) const lines = buffer.split('\n\n'); buffer = lines.pop(); // 保留最后一个可能不完整的片段 lines.forEach(line => { if (line.startsWith('data:')) { const data = line.replace('data:', '').trim(); console.log('解析出的数据:', data); } }); readStream(); // 继续读取下一块 }); } readStream(); }); // 随时可以调用 controller.abort() 来中断连接(例如用户点击“停止生成”)缺点: 实现极其繁琐:需要手动处理 data:、event:、id: 的解析,还要自己写断线重连和心跳保活逻辑。
适用场景:后端返回的不是标准 SSE 格式,或者你需要完全掌控请求的每一个细节(如自定义超时、极其复杂的重试策略)。
3. 第三方库@microsoft/fetch-event-source( 强烈推荐)
这是微软官方维护的一个轻量级库(仅 2KB 左右),它完美结合了前两者的优点:底层基于 fetch,上层封装了标准的 SSE 协议解析和自动重连
核心特点:
支持 POST 和自定义 Header、自动处理 SSE 协议解析、内置可控的自动重连机制。
代码示例:
import { fetchEventSource } from '@microsoft/fetch-event-source'; const ctrl = new AbortController(); fetchEventSource('/api/sse-connection', { method: 'POST', headers: { 'Content-Type': 'application/json', 'Authorization': 'Bearer YOUR_TOKEN' }, body: JSON.stringify({ prompt: '你好' }), signal: ctrl.signal, openWhenHidden: true, // 页面隐藏时保持连接(解决移动端切后台断开的问题) onmessage(event) { if (event.event === 'APPROVAL_SUCCESS') { console.log('审批通过:', event.data); } else { console.log('普通消息:', event.data); } }, onerror(err) { // 精细控制重连策略 if (err.status === 401 || err.status === 403) { throw err; // 鉴权失败,直接停止重连 } // 其他错误默认会自动重连 } });适用场景:AI 大模型流式对话、需要 Token 鉴权的复杂业务推送、任何需要 POST 请求的 SSE 场景。
核心差异对比与最终选型建议
表格
| 维度 | 原生 EventSource | fetch + ReadableStream | @microsoft/fetch-event-source |
|---|---|---|---|
| 支持 POST 请求 | 仅支持 GET | 支持 | 支持 |
| 自定义 Header | 不支持 | 支持 | 支持 |
| SSE 协议解析 | 浏览器原生自动解析 | 需手动解析 | 库自动解析 |
| 自动重连 | 浏览器原生(无脑重连) | 需手动实现 | 内置(可精细控制) |
| 主动中断请求 | ️ 仅支持 close() | AbortController | AbortController |
| 代码复杂度 | 极低(几行代码) | 极高(需处理流和协议) | 低(封装完善) |
| 外部依赖 | 零依赖 | 零依赖 | 极小依赖 (~2KB) |
选型总结:
如果是极其简单、无需鉴权(或仅 Cookie 鉴权)的 GET 请求,直接用原生EventSource。
如果是AI 对话、需要 POST 传参、需要 Header 带 Token 鉴权的生产级项目,选@microsoft/fetch-event-source。
除非你有极其特殊的流处理需求(比如后端返回的不是标准 SSE 格式),否则不建议直接使用裸 fetch + ReadableStream,。