<output id="qn6qe"></output>

    1. <output id="qn6qe"><tt id="qn6qe"></tt></output>
    2. <strike id="qn6qe"></strike>

      亚洲 日本 欧洲 欧美 视频,日韩中文字幕有码av,一本一道av中文字幕无码,国产线播放免费人成视频播放,人妻少妇偷人无码视频,日夜啪啪一区二区三区,国产尤物精品自在拍视频首页,久热这里只有精品12

      Java 實現 WebSocket 集群轉發:使用 Redis 發布訂閱

      視頻說明:https://www.bilibili.com/video/BV1Yh4y1F7SV?p=3

      場景

      后端服務被部署到多個節點上,通過彈性負載均衡對外提供服務。

      客戶端(瀏覽器) 客戶端1 連接到了服務端 A 的 WebSocket 節點。
      客戶端通過彈性負載均衡,把請求分配到了服務端 B,比如計算服務會輸出一些過程信息,服務端 B 上沒有 客戶端1 的 WS 連接。

      需求

      服務端 B 把消息轉發到服務端 A 上,找到 客戶端1 的連接,發送出去。

      畫示意圖

      代碼

      代碼:https://github.com/ioufev/websocket-cluster-forward

      備份:藍奏云

      Redis 發布類

      import org.springframework.data.redis.core.RedisTemplate;
      import org.springframework.stereotype.Component;
      
      import javax.annotation.Resource;
      
      @Component
      public class RedisPublisher {
      
          @Resource
          private RedisTemplate<String, byte[]> redisTemplate;
      
          public void publishMessage(String channel, byte[] message) {
              redisTemplate.convertAndSend(channel, message);
          }
      
      }
      

      Redis 訂閱類

      import com.ioufev.wsforward.consts.RedisConst;
      import com.ioufev.wsforward.ws.WebSocketServer;
      import org.springframework.context.annotation.Bean;
      import org.springframework.data.redis.connection.Message;
      import org.springframework.data.redis.connection.MessageListener;
      import org.springframework.data.redis.connection.RedisConnectionFactory;
      import org.springframework.data.redis.listener.ChannelTopic;
      import org.springframework.data.redis.listener.RedisMessageListenerContainer;
      import org.springframework.stereotype.Component;
      
      import javax.annotation.Resource;
      import java.nio.charset.StandardCharsets;
      import java.util.Base64;
      
      @Component
      public class RedisMessageListener implements MessageListener {
      
          @Resource
          private WebSocketServer webSocket;
      
          public RedisMessageListener(WebSocketServer webSocket) {
              this.webSocket = webSocket;
          }
      
          @Override
          public void onMessage(Message message, byte[] pattern) {
      
              // 獲取頻道名稱
              String channel = new String(message.getChannel());
      
              // 判斷是否為需要轉發的頻道
              if(channel.equals(RedisConst.PUB_SUB_TOPIC)){
      
                  // 獲取頻道內容
                  byte[] body = message.getBody();
                  String contentBase64WithQuotes = new String(body, StandardCharsets.UTF_8); // 帶引號的Base64
                  String contentBase64 = removeQuotes(contentBase64WithQuotes); // base64
                  String content = new String(Base64.getDecoder().decode(contentBase64), StandardCharsets.UTF_8); // 原來的字符串
      
                  String key = content.split("::")[0];
                  String wsContent  = content.substring((key + "::").length());
                  webSocket.sendOneMessageForRedisMessage(key, wsContent);
              }
      
          }
      
          @Bean
          public RedisMessageListenerContainer container(RedisConnectionFactory factory,
                                                         RedisMessageListener listener) {
              RedisMessageListenerContainer container = new RedisMessageListenerContainer();
              container.setConnectionFactory(factory);
              container.addMessageListener(listener, new ChannelTopic(RedisConst.PUB_SUB_TOPIC));
              return container;
          }
      
          /**
           * 移除存在Redis中的值開頭和結尾的引號
           * @param input 輸入
           * @return 輸出
           */
          private String removeQuotes(String input) {
              if (input != null && input.length() >= 2 && input.startsWith("\"") && input.endsWith("\"")) {
                  return input.substring(1, input.length() - 1);
              }
              return input;
          }
      
      }
      

      WebSocket 服務端控制類

      import com.ioufev.wsforward.consts.RedisConst;
      import com.ioufev.wsforward.redis.RedisPublisher;
      import org.slf4j.Logger;
      import org.slf4j.LoggerFactory;
      import org.springframework.beans.factory.annotation.Autowired;
      
      import javax.websocket.OnClose;
      import javax.websocket.OnMessage;
      import javax.websocket.OnOpen;
      import javax.websocket.Session;
      import javax.websocket.server.PathParam;
      import java.nio.charset.StandardCharsets;
      import java.util.Map;
      import java.util.concurrent.ConcurrentHashMap;
      import java.util.concurrent.CopyOnWriteArraySet;
      
      
      import org.springframework.stereotype.Component;
      
      import javax.websocket.server.ServerEndpoint;
      
      @Component
      @ServerEndpoint("/websocket/{key}")
      public class WebSocketServer {
      
          private static final Logger log = LoggerFactory.getLogger(WebSocketServer.class);
      
          private String sessionId;
          private Session session;
      
          private static RedisPublisher redisPublisher;
      
          @Autowired
          public void setApplicationContext(RedisPublisher redisPublisher) {
              WebSocketServer.redisPublisher= redisPublisher;
          }
      
          private static CopyOnWriteArraySet<WebSocketServer> webSockets = new CopyOnWriteArraySet<>();
      
          private static Map<String, Session> sessionPool = new ConcurrentHashMap<>();
      
      
          @OnOpen
          public void onOpen(Session session, @PathParam(value = "key") String key) {
      
              this.sessionId = key;
              this.session = session;
              webSockets.add(this);
              sessionPool.put(key, session);
              log.info(key + "【websocket消息】有新的連接,總數為:" + webSockets.size() + ", session count is :" + sessionPool.size());
              for(WebSocketServer webSocket : webSockets) {
                  log.info("【webSocket】key is :" + webSocket.sessionId);
              }
      
          }
      
          @OnClose
          public void onClose() {
              sessionPool.remove(this.sessionId);
              webSockets.remove(this);
              log.info("【websocket消息】連接斷開,總數為:" + webSockets.size());
          }
      
          @OnMessage
          public void onMessage(@PathParam(value = "key") String key, String message) {
              log.info("【websocket消息】收到消息message:" + message);
              sendOneMessage(key, message);
          }
      
          /**
           * 廣播消息
           */
          public void sendAllMessage(String message) {
              for (WebSocketServer webSocket : webSockets) {
                  log.info("【websocket消息】廣播消息:" + message);
                  try {
                      webSocket.session.getAsyncRemote().sendText(message);
                  } catch (Exception e) {
                      e.printStackTrace();
                  }
              }
          }
      
          /**
           * 單點消息
           */
          public void sendOneMessage(String key, String message) {
      
      //		Session session = sessionPool.get(key);
              Session session = getSession(key);
              if (session != null) {
                  try {
                      session.getBasicRemote().sendText(message);
                  } catch (Exception e) {
                      e.printStackTrace();
                  }
              } else {
                  redisPublisher.publishMessage(RedisConst.PUB_SUB_TOPIC, (key + "::" + message).getBytes(StandardCharsets.UTF_8));
              }
          }
      
          /**
           * 用來Redis訂閱后使用
           */
          public void sendOneMessageForRedisMessage(String key, String message) {
              Session session = getSession(key);
              if (session != null) {
                  try {
                      session.getBasicRemote().sendText(message);
                  } catch (Exception e) {
                      e.printStackTrace();
                  }
              }
          }
      
          private static Session getSession(String key){
              for (WebSocketServer webSocket : webSockets) {
                  if(webSocket.sessionId.equals(key)){
                      return webSocket.session;
                  }
              }
              return null;
          }
      
      }
      

      參考文章

      ?? WebSocket 集群解決方案
      ?? 圖畫的好,理解起來很清楚。

      ?? WebSocket 集群解決方案,不用 MQ
      ?? 在上面的思路基礎上,想給服務端添加一個標識,用來記錄用戶連接和服務端的關聯關系,我也有類似的想法,不過關于用戶ID和服務端ID關聯關系的存儲問題,還沒處理好。

      ?? Spring Cloud 一個配置注解實現 WebSocket 集群方案
      ?? 這個思路更大膽,既然是集群轉發,沒什么不能直接使用 WebSocket 本身

      ?? 分布式 WebSocket 集群解決方案
      ?? 用戶連接和服務端的關聯關系,用一致性哈希存儲

      ?? Spring Boot WebSocket 的 6 種集成方式
      ?? 喜歡文章的標題,內容看看目錄就行了。

      ?? 構建通用 WebSocket 推送網關的設計與實踐
      ?? 生產環境值得參考,但是用來入門參考顯然沒說清楚重點和難點

      ?? 石墨文檔是如何通過 WebSocket 實現百萬長連接的?
      ?? 生產環境值得參考,但是用來入門參考顯然沒說清楚重點和難點,這個比上面文章說更詳細,顯然具有可操作性。

      總結

      1、需要有一個統一的地方來保存用戶連接和服務端的關聯關系,可以是: Redis、MQ、Zookeeper、微服務的服務發現。

      2、Redis 發布訂閱用來集群轉發非常簡單,適用于實時發布消息那種,比如一個計算過程的實時步驟輸出。

      3、如果要確保消息不丟失,盡量送達之類的,那就用 MQ。

      4、最佳方式:每個服務端有一個ID,每個用戶連接也有一個ID,然后服務端轉發的時候,找到需要的服務端,只轉發一次就好了。

      posted @ 2023-07-17 11:05  ioufev  閱讀(2602)  評論(0)    收藏  舉報
      主站蜘蛛池模板: 亚洲综合一区二区精品导航| 被黑人巨大一区二区三区| 久热这里只有精品12| 老女老肥熟国产在线视频| 午夜性色一区二区三区不卡视频| 久久久久无码国产精品一区| 欧美成人精品| 国产精品自在线拍国产| 精品偷拍被偷拍在线观看| 亚洲精品美女一区二区| 云阳县| 国产精品线在线精品| 99国产精品永久免费视频| 国产精品午夜福利视频| 国产a在亚洲线播放| 亚洲中文无码永久免费| 奇米四色7777中文字幕| 亚洲欧美人成电影在线观看| 欧美日韩中文字幕久久伊人 | 日本少妇被黑人xxxxx| 国产成人亚洲日韩欧美| 92国产福利午夜757小视频| 久热视频这里只有精品6| 日韩免费视频一一二区| 天天看片视频免费观看| 国产成人一区二区三区免费| 日日躁夜夜躁狠狠躁超碰97| 美女胸18下看禁止免费视频| 疯狂做受xxxx高潮视频免费| 亚洲第三十四九中文字幕| 精品久久久中文字幕人妻| 国产精品免费AⅤ片在线观看| 国产一区二区在线有码| 日本久久久久亚洲中字幕| 色婷婷日日躁夜夜躁| 久久国产精品波多野结衣| 亚洲人成人伊人成综合网无码| XXXXXHD亚洲日本HD| 运城市| 国产精品黄色大片在线看| 国产日韩AV免费无码一区二区三区|