目錄
方式一:使用 ResponseEntity + StreamingResponseBody
方式二:使用 SseEmitter(Server-Sent Events)
方式三:使用 Flux 實現(推薦)
流式輸出 指的是服務器將數據以“流”的方式逐步寫入響應體,而不是一次性生成整個結果後再返回。
比較典型的就是現在大語言模型的對話功能,幾乎都是以流式輸出的方式的。好處就是不用阻止用户等很長時間。想要實現流式輸出,有很多種方式。
方式一:使用 ResponseEntity + StreamingResponseBody
@GetMapping("/chat")
public ResponseEntity<StreamingResponseBody> chat() {
StreamingResponseBody body = outputStream -> {
for (int i = 0; i < 10; i++) {
String data = "data chunk " + i + "\n";
outputStream.write(data.getBytes(StandardCharsets.UTF_8));
outputStream.flush();
try {
Thread.sleep(500); // 模擬延遲
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
};
return ResponseEntity.ok()
.header(HttpHeaders.CONTENT_TYPE, MediaType.TEXT_EVENT_STREAM_VALUE)
.body(body);
}
StreamingResponseBody 是一個函數式接口,其內部通過 OutputStream 將數據逐步寫入響應流。
Spring 在處理該返回值時會延遲執行該函數,直到響應提交前才調用 writeTo(OutputStream) 方法。
每次寫入後調用 flush() 強制刷新緩衝區,使客户端能實時接收內容。
這個方式適用於傳統的 Spring MVC 應用,用於將數據分塊寫入 HTTP 響應體。但是需要注意的是,它是一種同步的方式,也就是説請求會一直佔用連接,等全部寫入結束後才會斷開。
方式二:使用 SseEmitter(Server-Sent Events)
SSE(Server-Sent Events,服務器推送事件)是一種基於 HTTP 協議的單向通信機制,允許服務器主動將實時數據推送給客户端(比如我們的瀏覽器),而不是客户端頻繁輪詢請求數據。
使用方式如下:
@GetMapping("/chat1")
public SseEmitter sse() {
SseEmitter emitter = new SseEmitter(60_000L); // 設置超時時間
Executors.newSingleThreadExecutor().submit(() -> {
try {
for (int i = 0; i < 10; i++) {
emitter.send("Message " + i);
Thread.sleep(1000);
}
emitter.complete(); // 正常結束連接
} catch (Exception ex) {
emitter.completeWithError(ex); // 錯誤終止連接
}
});
return emitter;
}
Spring 為 SSE 提供了專門的支持:SseEmitter,其中的 send() 方法用於發送數據到客户端,complete() 用於正常結束連接,而 completeWithError() 則是錯誤終止連接。
這個方案可以做成異步,即通過線程池不斷地向 SseEmitter 中 send 數據。
方式三:使用 Flux 實現(推薦)
還有一種更簡單的方式,那就是用 Flux 實現。
Flux 是響應式編程庫中的一個核心類型,它表示一個異步的、可變化數量(0 到 N 個)的數據流,支持流式處理、背壓(Backpressure)和非阻塞。
使用方式如下:
@GetMapping(value = "/chat2")
public Flux<String> fluxStream() {
return Flux.interval(Duration.ofSeconds(1))
.map(seq -> "stream element - " + seq);
}
只需要接口返回一個 Flux 類型即可實現,非常的簡單。這是一種異步非阻塞的實現方式。推薦用這種。