ai大模型流式輸出------基于SSE協(xié)議的長(zhǎng)連接實(shí)現(xiàn)
傳統(tǒng)的http1.0請(qǐng)求開發(fā),已經(jīng)滿足了我們?nèi)粘5膚eb開發(fā)。
一般請(qǐng)求就像下圖這樣子,客服端發(fā)起一個(gè)請(qǐng)求(觸發(fā)),服務(wù)端做出一個(gè)響應(yīng)(動(dòng)作):
有時(shí)會(huì)有諸如實(shí)時(shí)刷新,實(shí)時(shí)顯示的場(chǎng)景,我們往往是客戶端定時(shí)發(fā)起請(qǐng)求,不斷的嘗試獲取最新的數(shù)據(jù)。
但是每次請(qǐng)求都會(huì)創(chuàng)建并釋放一個(gè)新的連接,這樣對(duì)于需要頻繁請(qǐng)求的場(chǎng)景,性能損耗太大,此外對(duì)于實(shí)時(shí)性響應(yīng)的場(chǎng)景也很難評(píng)估輪詢周期。輪詢的周期短,很多查詢結(jié)果其實(shí)并沒有變化,增加了成本開銷。輪詢周期長(zhǎng),又不能實(shí)時(shí)的展示數(shù)據(jù),周期值變成了一個(gè)經(jīng)驗(yàn)值,而且不同場(chǎng)景都需要不斷的調(diào)整。這屬實(shí)不夠友好。
于是http1.1協(xié)議對(duì)此進(jìn)行了擴(kuò)展,允許長(zhǎng)連接的存在。今天要介紹的SSE協(xié)議,就屬于http1.1下的新協(xié)議。
SSE全稱為 Sever-Sent Event
指服務(wù)器端事件發(fā)送。當(dāng)客戶端請(qǐng)求成功后,服務(wù)端會(huì)依次將事件(其實(shí)就是響應(yīng)信息),分多次發(fā)送到客戶端??蛻舳酥灰邮帐录憫?yīng)信息),做出相應(yīng)的處理即可。
就像下圖的樣子:

比如K線增長(zhǎng)圖,實(shí)時(shí)熱力圖,各種增長(zhǎng)曲線等等,都可以實(shí)時(shí)的,由后端主動(dòng)將事件推送到前端,不再需要前端每次建立一個(gè)新的連接來請(qǐng)求。這種方式也稱之為長(zhǎng)連接。
除了SSE,像websocket 、TCP等都屬于長(zhǎng)連接的類型。依次連接可以多次交互。
SSE其實(shí)最初并不受重視,甚至很多人都不知道這個(gè)協(xié)議。如果是簡(jiǎn)單一點(diǎn)的話,通常直接多輪詢幾遍就解決問題了,如果是復(fù)雜一點(diǎn)的話,直接就使用websocket這樣的重協(xié)議來處理了,功能也相對(duì)來說比較強(qiáng)大。但是自從交互大模型問世以后,大模型的流式對(duì)話往往能更高效的輸出,這種流式輸出的用戶體驗(yàn)也更好。這種主要是側(cè)重大模型響應(yīng)的交互模式,(防盜連接:本文首發(fā)自http://www.rzrgm.cn/jilodream/ )反而使得SSE的優(yōu)勢(shì)又體現(xiàn)出來了。
下面我們看下如何在springboot中使用sse來開發(fā):
由于springboot的封裝,我們使用SSE開發(fā)變得異常簡(jiǎn)單,
核心思路是:
創(chuàng)建一個(gè) SseEmitter 對(duì)象,返回給前端
這個(gè)SseEmitter類似于一個(gè)socket,我們只管向里邊塞數(shù)據(jù)即可,
而前端在收到SseEmitter對(duì)象后,則只管從sseEmitter中取數(shù)據(jù)即可。(注意此處一般采用注冊(cè)響應(yīng)方式)
后端代碼如下:
pom文件新增依賴:
1 <dependency> 2 <groupId>org.springframework.boot</groupId> 3 <artifactId>spring-boot-starter-web</artifactId> 4 </dependency>
controller類:
1 package com.example.demo.learnsse; 2 3 import lombok.extern.slf4j.Slf4j; 4 import org.springframework.http.MediaType; 5 import org.springframework.web.bind.annotation.CrossOrigin; 6 import org.springframework.web.bind.annotation.GetMapping; 7 import org.springframework.web.bind.annotation.RequestParam; 8 import org.springframework.web.bind.annotation.RestController; 9 import org.springframework.web.servlet.mvc.method.annotation.SseEmitter; 10 11 import java.io.IOException; 12 import java.util.concurrent.TimeUnit; 13 14 /** 15 * @discription 16 */ 17 @Slf4j 18 @RestController 19 @CrossOrigin(origins = "*") 20 public class SseController { 21 22 23 @GetMapping(value = "/learn/sseChat" , produces = {MediaType.TEXT_EVENT_STREAM_VALUE}) 24 public SseEmitter chat(@RequestParam String name) throws IOException { 25 SseEmitter sseEmitter = new SseEmitter(360000L); 26 sseEmitter.onCompletion(() -> log.warn("sse complete!!!" + Thread.currentThread().getName())); 27 sseEmitter.onError(throwable -> { 28 log.warn("sse error " + Thread.currentThread().getName(), throwable); 29 }); 30 sseEmitter.send("start"); 31 Runnable r = () -> { 32 int i = 1; 33 try { 34 while (i <= 10) { 35 sseEmitter.send(Thread.currentThread().getName()+": the next index:" + i); 36 log.warn(Thread.currentThread().getName() + ":" + i); 37 i++; 38 TimeUnit.SECONDS.sleep(3); 39 } 40 sseEmitter.complete(); 41 } catch (Exception e) { 42 log.warn("catch a ex", e); 43 sseEmitter.completeWithError(e); 44 } 45 }; 46 Thread t = new Thread(r); 47 t.start(); 48 log.warn("start return sse"); 49 return sseEmitter; 50 } 51 }
我們可以不寫前端,直接用瀏覽器或者命令行訪問,
瀏覽器效果如下:
真實(shí)效果是一行行輸出的
data:start data:Thread-2: the next index:1 data:Thread-2: the next index:2 data:Thread-2: the next index:3 data:Thread-2: the next index:4 data:Thread-2: the next index:5 data:Thread-2: the next index:6 data:Thread-2: the next index:7 data:Thread-2: the next index:8 data:Thread-2: the next index:9 data:Thread-2: the next index:10
日志輸出如下:
2024-12-02 11:06:36.267 WARN 2032 --- [nio-8081-exec-4] com.example.demo.learnsse.SseController : sse complete!!!http-nio-8081-exec-4 2024-12-02 11:06:38.440 WARN 2032 --- [ Thread-2] com.example.demo.learnsse.SseController : Thread-2:2 2024-12-02 11:06:41.442 WARN 2032 --- [ Thread-2] com.example.demo.learnsse.SseController : Thread-2:3 2024-12-02 11:06:44.450 WARN 2032 --- [ Thread-2] com.example.demo.learnsse.SseController : Thread-2:4 2024-12-02 11:06:47.458 WARN 2032 --- [ Thread-2] com.example.demo.learnsse.SseController : Thread-2:5 2024-12-02 11:06:50.468 WARN 2032 --- [ Thread-2] com.example.demo.learnsse.SseController : Thread-2:6 2024-12-02 11:06:53.471 WARN 2032 --- [ Thread-2] com.example.demo.learnsse.SseController : Thread-2:7 2024-12-02 11:06:56.475 WARN 2032 --- [ Thread-2] com.example.demo.learnsse.SseController : Thread-2:8 2024-12-02 11:06:59.483 WARN 2032 --- [ Thread-2] com.example.demo.learnsse.SseController : Thread-2:9 2024-12-02 11:07:02.495 WARN 2032 --- [ Thread-2] com.example.demo.learnsse.SseController : Thread-2:10 2024-12-02 11:07:05.508 WARN 2032 --- [nio-8081-exec-5] com.example.demo.learnsse.SseController : sse complete!!!http-nio-8081-exec-5
這樣一個(gè)簡(jiǎn)單的單次連接,服務(wù)器多次推送的示例就寫完了。
當(dāng)然你也可以寫一個(gè)簡(jiǎn)短的前端代碼,查看效果,注意此時(shí)涉及到跨域了,因此我們的java代碼要使用注解@CrossOrigin(origins = "*") 來解決跨域,請(qǐng)看controller代碼中紅色字體
1 <!DOCTYPE html> 2 <html> 3 <head> 4 <title>SSE Example</title> 5 </head> 6 <body> 7 <div id="events"></div> 8 <script> 9 const eventSource = new EventSource('http://127.0.0.1:8081/learn/sseChat?name=xx'); 10 11 eventSource.onmessage = function(event) { 12 const newElement = document.createElement("div"); 13 newElement.textContent = "New message: " + event.data; 14 document.getElementById("events").appendChild(newElement); 15 }; 16 17 eventSource.onerror = function(error) { 18 console.error("Error:", error); 19 const newElement = document.createElement("div"); 20 newElement.textContent = "error message: " + error; 21 document.getElementById("events").appendChild(newElement); 22 eventSource.close(); 23 }; 24 25 eventSource.onclose = function(event) { 26 const newElement = document.createElement("div"); 27 newElement.textContent = "close message: " + event.data; 28 document.getElementById("events").appendChild(newElement); 29 eventSource.close(); 30 }; 31 </script> 32 </body> 33 </html>
我們?cè)趧?chuàng)建好SSE示例時(shí),一般會(huì)設(shè)置以下幾個(gè)回調(diào)方法:
onCompletion(Runnable callback):當(dāng)異步請(qǐng)求完成時(shí),我們會(huì)調(diào)用此方法注冊(cè)的回調(diào)函數(shù)。
onError(Consumer<Throwable> callback) 當(dāng)異步處理期間發(fā)生錯(cuò)誤時(shí),會(huì)調(diào)用該方法設(shè)置的回調(diào)函數(shù)
服務(wù)端發(fā)現(xiàn)任務(wù)結(jié)束時(shí),主動(dòng)知會(huì)客戶端關(guān)閉連接:
complete():表示已經(jīng)完成推送,通知客戶端不再有新的事件發(fā)送。
completeWithError(Throwable ex) 表示由于發(fā)生了某個(gè)異常而結(jié)束推送。springmvc將通過異常處理機(jī)制傳遞該異常。
一般在對(duì)接大模型時(shí),(防盜連接:本文首發(fā)自http://www.rzrgm.cn/jilodream/ )我們除了完成SSE相關(guān)的注冊(cè),還會(huì)設(shè)置與大模型的連接,
一般的思路是這樣的:
1、當(dāng)前端發(fā)送請(qǐng)求提問來后端時(shí),
2、我們首先創(chuàng)建一個(gè)SseEmitter,作為未來發(fā)送的套接字,
3、接著啟動(dòng)一個(gè)http連接,來請(qǐng)求大模型,
4、此時(shí)我們會(huì)使用Reactor-Mono之類的響應(yīng)式編程框架,來回調(diào)處理大模型推送回來的數(shù)據(jù)。(其中Reactor部分的代碼實(shí)現(xiàn),由于篇幅有限,我會(huì)在后邊的文章中講解)
5、在Mono的每次回調(diào)到大模型推送回來的數(shù)據(jù)時(shí),我們通過SseEmitter發(fā)送給前端
6、將第二步創(chuàng)建好的SseEmitter,返回給前端。
注意3/4/5步都是作為異步回調(diào)注冊(cè)到mono中的。整體的結(jié)構(gòu)圖如下:

如果你覺得寫的不錯(cuò),歡迎轉(zhuǎn)載和點(diǎn)贊。 轉(zhuǎn)載時(shí)請(qǐng)保留作者署名jilodream/王若伊_恩賜解脫(博客鏈接:http://www.rzrgm.cn/jilodream/

浙公網(wǎng)安備 33010602011771號(hào)