Java 實現 WebSocket 集群轉發:使用 Redis 發布訂閱
場景
后端服務被部署到多個節點上,通過彈性負載均衡對外提供服務。
客戶端(瀏覽器) 客戶端1 連接到了服務端 A 的 WebSocket 節點。
客戶端通過彈性負載均衡,把請求分配到了服務端 B,比如計算服務會輸出一些過程信息,服務端 B 上沒有 客戶端1 的 WS 連接。
需求
服務端 B 把消息轉發到服務端 A 上,找到 客戶端1 的連接,發送出去。
畫示意圖
代碼
代碼:https://github.com/ioufev/websocket-cluster-forward
備份:藍奏云
Redis 發布類
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
@Component
public class RedisPublisher {
@Resource
private RedisTemplate<String, byte[]> redisTemplate;
public void publishMessage(String channel, byte[] message) {
redisTemplate.convertAndSend(channel, message);
}
}
Redis 訂閱類
import com.ioufev.wsforward.consts.RedisConst;
import com.ioufev.wsforward.ws.WebSocketServer;
import org.springframework.context.annotation.Bean;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.listener.ChannelTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.nio.charset.StandardCharsets;
import java.util.Base64;
@Component
public class RedisMessageListener implements MessageListener {
@Resource
private WebSocketServer webSocket;
public RedisMessageListener(WebSocketServer webSocket) {
this.webSocket = webSocket;
}
@Override
public void onMessage(Message message, byte[] pattern) {
// 獲取頻道名稱
String channel = new String(message.getChannel());
// 判斷是否為需要轉發的頻道
if(channel.equals(RedisConst.PUB_SUB_TOPIC)){
// 獲取頻道內容
byte[] body = message.getBody();
String contentBase64WithQuotes = new String(body, StandardCharsets.UTF_8); // 帶引號的Base64
String contentBase64 = removeQuotes(contentBase64WithQuotes); // base64
String content = new String(Base64.getDecoder().decode(contentBase64), StandardCharsets.UTF_8); // 原來的字符串
String key = content.split("::")[0];
String wsContent = content.substring((key + "::").length());
webSocket.sendOneMessageForRedisMessage(key, wsContent);
}
}
@Bean
public RedisMessageListenerContainer container(RedisConnectionFactory factory,
RedisMessageListener listener) {
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(factory);
container.addMessageListener(listener, new ChannelTopic(RedisConst.PUB_SUB_TOPIC));
return container;
}
/**
* 移除存在Redis中的值開頭和結尾的引號
* @param input 輸入
* @return 輸出
*/
private String removeQuotes(String input) {
if (input != null && input.length() >= 2 && input.startsWith("\"") && input.endsWith("\"")) {
return input.substring(1, input.length() - 1);
}
return input;
}
}
WebSocket 服務端控制類
import com.ioufev.wsforward.consts.RedisConst;
import com.ioufev.wsforward.redis.RedisPublisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import javax.websocket.OnClose;
import javax.websocket.OnMessage;
import javax.websocket.OnOpen;
import javax.websocket.Session;
import javax.websocket.server.PathParam;
import java.nio.charset.StandardCharsets;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArraySet;
import org.springframework.stereotype.Component;
import javax.websocket.server.ServerEndpoint;
@Component
@ServerEndpoint("/websocket/{key}")
public class WebSocketServer {
private static final Logger log = LoggerFactory.getLogger(WebSocketServer.class);
private String sessionId;
private Session session;
private static RedisPublisher redisPublisher;
@Autowired
public void setApplicationContext(RedisPublisher redisPublisher) {
WebSocketServer.redisPublisher= redisPublisher;
}
private static CopyOnWriteArraySet<WebSocketServer> webSockets = new CopyOnWriteArraySet<>();
private static Map<String, Session> sessionPool = new ConcurrentHashMap<>();
@OnOpen
public void onOpen(Session session, @PathParam(value = "key") String key) {
this.sessionId = key;
this.session = session;
webSockets.add(this);
sessionPool.put(key, session);
log.info(key + "【websocket消息】有新的連接,總數為:" + webSockets.size() + ", session count is :" + sessionPool.size());
for(WebSocketServer webSocket : webSockets) {
log.info("【webSocket】key is :" + webSocket.sessionId);
}
}
@OnClose
public void onClose() {
sessionPool.remove(this.sessionId);
webSockets.remove(this);
log.info("【websocket消息】連接斷開,總數為:" + webSockets.size());
}
@OnMessage
public void onMessage(@PathParam(value = "key") String key, String message) {
log.info("【websocket消息】收到消息message:" + message);
sendOneMessage(key, message);
}
/**
* 廣播消息
*/
public void sendAllMessage(String message) {
for (WebSocketServer webSocket : webSockets) {
log.info("【websocket消息】廣播消息:" + message);
try {
webSocket.session.getAsyncRemote().sendText(message);
} catch (Exception e) {
e.printStackTrace();
}
}
}
/**
* 單點消息
*/
public void sendOneMessage(String key, String message) {
// Session session = sessionPool.get(key);
Session session = getSession(key);
if (session != null) {
try {
session.getBasicRemote().sendText(message);
} catch (Exception e) {
e.printStackTrace();
}
} else {
redisPublisher.publishMessage(RedisConst.PUB_SUB_TOPIC, (key + "::" + message).getBytes(StandardCharsets.UTF_8));
}
}
/**
* 用來Redis訂閱后使用
*/
public void sendOneMessageForRedisMessage(String key, String message) {
Session session = getSession(key);
if (session != null) {
try {
session.getBasicRemote().sendText(message);
} catch (Exception e) {
e.printStackTrace();
}
}
}
private static Session getSession(String key){
for (WebSocketServer webSocket : webSockets) {
if(webSocket.sessionId.equals(key)){
return webSocket.session;
}
}
return null;
}
}
參考文章
?? WebSocket 集群解決方案
?? 圖畫的好,理解起來很清楚。
?? WebSocket 集群解決方案,不用 MQ
?? 在上面的思路基礎上,想給服務端添加一個標識,用來記錄用戶連接和服務端的關聯關系,我也有類似的想法,不過關于用戶ID和服務端ID關聯關系的存儲問題,還沒處理好。
?? Spring Cloud 一個配置注解實現 WebSocket 集群方案
?? 這個思路更大膽,既然是集群轉發,沒什么不能直接使用 WebSocket 本身
?? 分布式 WebSocket 集群解決方案
?? 用戶連接和服務端的關聯關系,用一致性哈希存儲
?? Spring Boot WebSocket 的 6 種集成方式
?? 喜歡文章的標題,內容看看目錄就行了。
?? 構建通用 WebSocket 推送網關的設計與實踐
?? 生產環境值得參考,但是用來入門參考顯然沒說清楚重點和難點
?? 石墨文檔是如何通過 WebSocket 實現百萬長連接的?
?? 生產環境值得參考,但是用來入門參考顯然沒說清楚重點和難點,這個比上面文章說更詳細,顯然具有可操作性。
總結
1、需要有一個統一的地方來保存用戶連接和服務端的關聯關系,可以是: Redis、MQ、Zookeeper、微服務的服務發現。
2、Redis 發布訂閱用來集群轉發非常簡單,適用于實時發布消息那種,比如一個計算過程的實時步驟輸出。
3、如果要確保消息不丟失,盡量送達之類的,那就用 MQ。
4、最佳方式:每個服務端有一個ID,每個用戶連接也有一個ID,然后服務端轉發的時候,找到需要的服務端,只轉發一次就好了。

浙公網安備 33010602011771號