springboot~SSE做消息推送
一、SSE是什么?
SSE技術是基于單工通信模式,只是單純的客戶端向服務端發送請求,服務端不會主動發送給客戶端。服務端采取的策略是抓住這個請求不放,等數據更新的時候才返回給客戶端,當客戶端接收到消息后,再向服務端發送請求,周而復始。
- 注意:因為EventSource對象是SSE的客戶端,可能會有瀏覽器對其不支持,但谷歌、火狐、360是可以的,IE不可以。
- 優點:SSE和WebSocket相比,最大的優勢是便利,服務端不需要其他的類庫,開發難度較低,SSE和輪詢相比它不用處理很多請求,不用每次建立新連接,延遲較低。
- 缺點:如果客戶端有很多,那就要保持很多長連接,這會占用服務器大量內存和連接數
- sse 規范:在 html5 的定義中,服務端 sse,一般需要遵循以下要求:
Content-Type: text/event-stream;
charset=UTF-8Cache-Control: no-cache
Connection: keep-alive
實現一個例子
后端代碼
/**
* 用于創建連接
*/
@GetMapping("/connect/{userId}")
public SseEmitter connect(@PathVariable String userId) {
return SseEmitterUtil.connect(userId);
}
/**
* 推送給所有人
* @param message
* @return
*/
@GetMapping("/push/{message}")
public ResponseEntity<String> push(@PathVariable(name = "message") String message) {
// 獲取連接人數
int userCount = SseEmitterUtil.getUserCount();
// 如果無在線人數,返回
if (userCount < 1) {
return ResponseEntity.status(500).body("無人在線!");
}
SseEmitterUtil.batchSendMessage(message);
return ResponseEntity.ok("發送成功!");
}
SseEmitterUtil代碼
public class SseEmitterUtil {
/**
* 當前連接數
*/
private static AtomicInteger count = new AtomicInteger(0);
/**
* 使用map對象,便于根據userId來獲取對應的SseEmitter,或者放redis里面
*/
private static Map<String, SseEmitter> sseEmitterMap = new ConcurrentHashMap<>();
/**
* 創建用戶連接并返回 SseEmitter
* @param userId 用戶ID
* @return SseEmitter
*/
public static SseEmitter connect(String userId) {
// 設置超時時間,0表示不過期。默認30秒,超過時間未完成會拋出異常:AsyncRequestTimeoutException
SseEmitter sseEmitter = new SseEmitter(0L);
// 注冊回調
sseEmitter.onCompletion(completionCallBack(userId));
sseEmitter.onError(errorCallBack(userId));
sseEmitter.onTimeout(timeoutCallBack(userId));
sseEmitterMap.put(userId, sseEmitter);
// 數量+1
count.getAndIncrement();
log.info("創建新的sse連接,當前用戶:{}", userId);
return sseEmitter;
}
/**
* 給指定用戶發送信息
*/
public static void sendMessage(String userId, String message) {
if (sseEmitterMap.containsKey(userId)) {
try {
// sseEmitterMap.get(userId).send(message, MediaType.APPLICATION_JSON);
sseEmitterMap.get(userId).send(message);
}
catch (IOException e) {
log.error("用戶[{}]推送異常:{}", userId, e.getMessage());
removeUser(userId);
}
}
}
/**
* 群發消息
*/
public static void batchSendMessage(String wsInfo, List<String> ids) {
ids.forEach(userId -> sendMessage(wsInfo, userId));
}
/**
* 群發所有人
*/
public static void batchSendMessage(String wsInfo) {
sseEmitterMap.forEach((k, v) -> {
try {
v.send(wsInfo, MediaType.APPLICATION_JSON);
}
catch (IOException e) {
log.error("用戶[{}]推送異常:{}", k, e.getMessage());
removeUser(k);
}
});
}
/**
* 移除用戶連接
*/
public static void removeUser(String userId) {
sseEmitterMap.remove(userId);
// 數量-1
count.getAndDecrement();
log.info("移除用戶:{}", userId);
}
/**
* 獲取當前連接信息
*/
public static List<String> getIds() {
return new ArrayList<>(sseEmitterMap.keySet());
}
/**
* 獲取當前連接數量
*/
public static int getUserCount() {
return count.intValue();
}
private static Runnable completionCallBack(String userId) {
return () -> {
log.info("結束連接:{}", userId);
removeUser(userId);
};
}
private static Runnable timeoutCallBack(String userId) {
return () -> {
log.info("連接超時:{}", userId);
removeUser(userId);
};
}
private static Consumer<Throwable> errorCallBack(String userId) {
return throwable -> {
log.info("連接異常:{}", userId);
removeUser(userId);
};
}
}
前端代碼
<script>
let source = null;
// 用時間戳模擬登錄用戶
const userId = new Date().getTime();
if (window.EventSource) {
// 建立連接
source = new EventSource('/sse/connect/' + userId);
/**
* 連接一旦建立,就會觸發open事件
* 另一種寫法:source.onopen = function (event) {}
*/
source.addEventListener('open', function (e) {
setMessageInnerHTML("建立連接。。。");
}, false);
/**
* 客戶端收到服務器發來的數據
* 另一種寫法:source.onmessage = function (event) {}
*/
source.addEventListener('message', function (e) {
setMessageInnerHTML(e.data);
});
/**
* 如果發生通信錯誤(比如連接中斷),就會觸發error事件
* 或者:
* 另一種寫法:source.onerror = function (event) {}
*/
source.addEventListener('error', function (e) {
if (e.readyState === EventSource.CLOSED) {
setMessageInnerHTML("連接關閉");
} else {
console.log(e);
}
}, false);
} else {
setMessageInnerHTML("你的瀏覽器不支持SSE");
}
// 監聽窗口關閉事件,主動去關閉sse連接,如果服務端設置永不過期,瀏覽器關閉后手動清理服務端數據
window.onbeforeunload = function () {
closeSse();
};
// 關閉Sse連接
function closeSse() {
source.close();
const httpRequest = new XMLHttpRequest();
httpRequest.open('GET', '/sse/close/' + userId, true);
httpRequest.send();
console.log("close");
}
// 將消息顯示在網頁上
function setMessageInnerHTML(innerHTML) {
document.getElementById('message').innerHTML += innerHTML + '<br/>';
}
</script>
浙公網安備 33010602011771號