<output id="qn6qe"></output>

    1. <output id="qn6qe"><tt id="qn6qe"></tt></output>
    2. <strike id="qn6qe"></strike>

      亚洲 日本 欧洲 欧美 视频,日韩中文字幕有码av,一本一道av中文字幕无码,国产线播放免费人成视频播放,人妻少妇偷人无码视频,日夜啪啪一区二区三区,国产尤物精品自在拍视频首页,久热这里只有精品12

      springboot的netty代碼實操

      參考:http://www.rzrgm.cn/mc-74120/p/13622008.html

      pom文件

      <dependency>
          <groupId>io.netty</groupId>
          <artifactId>netty-all</artifactId>
      </dependency>

      啟動類

      @EnableFeignClients
      @EnableDiscoveryClient
      @EnableScheduling
      @SpringBootApplication
      @EnableAsync
      public class ChimetaCoreApplication  implements CommandLineRunner{
          
          @Autowired
          private NettyServerListener nettyServerListener;
          
          public static void main(String[] args) {
              SpringApplication.run(ChimetaCoreApplication.class, args);
          }
          
          @Override
          public void run(String... args) throws Exception {
             
             nettyServerListener.start();
          }
      }

      服務端代碼的listener

      package com.chimeta.netty;
      
      import javax.annotation.PreDestroy;
      import javax.annotation.Resource;
      
      import org.springframework.beans.factory.annotation.Value;
      import org.springframework.stereotype.Component;
      
      import com.chimeta.netty.protobuf.ImProto;
      
      import io.netty.bootstrap.ServerBootstrap;
      import io.netty.channel.ChannelFuture;
      import io.netty.channel.ChannelInitializer;
      import io.netty.channel.ChannelOption;
      import io.netty.channel.EventLoopGroup;
      import io.netty.channel.nio.NioEventLoopGroup;
      import io.netty.channel.socket.SocketChannel;
      import io.netty.channel.socket.nio.NioServerSocketChannel;
      import io.netty.handler.codec.protobuf.ProtobufDecoder;
      import io.netty.handler.codec.protobuf.ProtobufEncoder;
      import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;
      import io.netty.handler.codec.protobuf.ProtobufVarint32LengthFieldPrepender;
      import io.netty.handler.logging.LogLevel;
      import io.netty.handler.logging.LoggingHandler;
      import io.netty.handler.timeout.IdleStateHandler;
      import lombok.extern.slf4j.Slf4j;
      
      /**
       * 服務啟動監聽器
       *
       * @author mwan
       */
      @Component
      @Slf4j
      public class NettyServerListener {
          /**
           * 創建bootstrap
           */
          ServerBootstrap serverBootstrap = new ServerBootstrap();
          /**
           * BOSS
           */
          EventLoopGroup boss = new NioEventLoopGroup();
          /**
           * Worker
           */
          EventLoopGroup work = new NioEventLoopGroup();
          /**
           * 通道適配器
           */
          @Resource
          private ServerChannelHandlerAdapter channelHandlerAdapter;
          /**
           * 從配置中心獲取NETTY服務器配置
           */
          @Value("${server.netty.port:10001}")
          private int NETTY_PORT;
          
          @Value("${server.netty.maxthreads:5000}")
          private int MAX_THREADS;
      
          /**
           * 關閉服務器方法
           */
          @PreDestroy
          public void close() {
              log.info("關閉服務器....");
              //優雅退出
              boss.shutdownGracefully();
              work.shutdownGracefully();
          }
      
          /**
           * 開啟及服務線程
           */
          public void start() {
              serverBootstrap.group(boss, work)
                      .channel(NioServerSocketChannel.class)
                      .option(ChannelOption.SO_BACKLOG, MAX_THREADS) //最大客戶端連接數為1024  
                      .handler(new LoggingHandler(LogLevel.INFO)).childOption(ChannelOption.SO_KEEPALIVE, true); ;
              try {
                  //設置事件處理
                  serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
                      @Override
                      protected void initChannel(SocketChannel ch) throws Exception {
                          // 下面的每一個addLast都有自己的含義,需要每個都過一下
                          ch.pipeline().addLast(new IdleStateHandler(18,0,0));
                          ch.pipeline().addLast(new ProtobufVarint32FrameDecoder());
                          //ch.pipeline().addLast(new CustomProtobufInt32FrameDecoder());
                          ch.pipeline().addLast(new ProtobufDecoder(ImProto.ImMsg.getDefaultInstance()));
                          ch.pipeline().addLast(new ProtobufVarint32LengthFieldPrepender());
                          //ch.pipeline().addLast(new CustomProtobufInt32LengthFieldPrepender());
                          ch.pipeline().addLast(new ProtobufEncoder());
                          // 業務處理
                          ch.pipeline().addLast(channelHandlerAdapter);
                      }
                  });
                  log.info("netty服務器在[{}]端口啟動監聽", NETTY_PORT);
                  ChannelFuture f = serverBootstrap.bind(NETTY_PORT).sync();
                  f.channel().closeFuture().sync();
              } catch (InterruptedException e) {
                  log.error("[出現異常] 釋放資源", e);
                  boss.shutdownGracefully();
                  work.shutdownGracefully();
                  log.info("服務已關閉!");
              }
          }
      }

       ServerChannelHandlerAdapter處理類

      package com.chimeta.netty;
      
      import org.apache.commons.lang3.StringUtils;
      import org.springframework.beans.factory.annotation.Autowired;
      import org.springframework.stereotype.Component;
      
      import com.chimeta.netty.model.SessionCloseReason;
      import com.chimeta.netty.protobuf.ImProto.ImMsg;
      import com.chimeta.netty.util.ChannelUtils;
      import com.google.protobuf.InvalidProtocolBufferException;
      import com.google.protobuf.util.JsonFormat;
      import io.netty.channel.ChannelHandler.Sharable;
      import io.netty.handler.timeout.IdleState;
      import io.netty.handler.timeout.IdleStateEvent;
      import io.netty.channel.Channel;
      import io.netty.channel.ChannelHandlerContext;
      import io.netty.channel.ChannelInboundHandlerAdapter;
      import lombok.extern.slf4j.Slf4j;
      
      /**
       * 通信服務處理器
       */
      @Component
      @Sharable
      @Slf4j
      public class ServerChannelHandlerAdapter extends ChannelInboundHandlerAdapter {
          /**
           * 注入請求分排器
           */
          @Autowired
          private MessageDispatcher messageDispatcher;
          
          @Autowired
          private DeviceSessionManager sessionManager;
      
          /** 用來記錄當前在線連接數。應該把它設計成線程安全的。  */
          //private AtomicInteger sessionCount = new AtomicInteger(0);
          
          @Override  
          public void handlerAdded(ChannelHandlerContext ctx) throws Exception {  
              super.handlerAdded(ctx); 
              
              if (!ChannelUtils.addChannelSession(ctx.channel(), new IoSession(ctx.channel()))) {
                ctx.channel().close();
                log.error("Duplicate session,IP=[{}]",ChannelUtils.getRemoteIp(ctx.channel()));
             }     
      
              //String server_ip = NetworkUtils.getRealIp();//獲得本機IP
              // 緩存計數器加1
          }  
            
          @Override  
          public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {  
              super.handlerRemoved(ctx); 
      
              // 緩存計數器減1
              //String server_ip = NetworkUtils.getRealIp();//獲得本機IP
              log.info(ctx.channel().id()+"離開了");  
          }
          
          @Override
          public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
              
              ImMsg gameMessage = (ImMsg)msg;
              final Channel channel = ctx.channel();
             IoSession session = ChannelUtils.getSessionBy(channel);
             if(session.isHeartbeated()) {
                session.setHeartbeated(false);
             }
             
             String deviceCode="";
             if(session.getDevice() != null && StringUtils.isNotBlank(session.getDevice().getDeviceCode())) {
                deviceCode = session.getDevice().getDeviceCode();
             }
      //     if(!MessagingConst.TYPE_UPOS_REQUEST.equals(gameMessage.getMsg().getTypeUrl())) {
                try {
                   log.info("Inbound message is :" + JsonFormat.printer().usingTypeRegistry(DeviceSessionManager.typeRegistry).print(gameMessage.toBuilder())
                         + ", from device " + deviceCode);
                } catch (InvalidProtocolBufferException e) {
                   log.info("Inbound message is :" + gameMessage.toString());
                }
      //     }
             
             messageDispatcher.dispatch(gameMessage, session);
          }
           
          @Override
          public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {  
              ctx.flush();  
          } 
          
          @Override  
          public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)  
                  throws Exception {  
              
              log.error("通信發生異常:", cause);
              ctx.close();   
          } 
          
          /**
           * 一段時間未進行讀寫操作 回調
           */
          @Override
          public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
              /*心跳處理*/
              if (evt instanceof IdleStateEvent) {
                  IdleStateEvent event = (IdleStateEvent) evt;
                  if (event.state() == IdleState.READER_IDLE) {
                      /*讀超時*/
                      log.info("READER_IDLE read overtime,close session");
                      final Channel channel = ctx.channel();
                     IoSession session = ChannelUtils.getSessionBy(channel);
                      
                   /*
                    * if(messageDispatcher.sendHeartbeat(session) == false) { //如果心跳檢測失敗,則連接異常,主動斷開
                    * session.setSessionCloseReason(SessionCloseReason.OVER_TIME); ctx.close(); };
                    */
                     
                     session.setSessionCloseReason(SessionCloseReason.OVER_TIME);
                    ctx.close();
                      
                  } else if (event.state() == IdleState.WRITER_IDLE) {
                      /*寫超時*/   
                      log.info("WRITER_IDLE 寫超時");
                  } else if (event.state() == IdleState.ALL_IDLE) {
                      /*總超時*/
                      log.info("ALL_IDLE 總超時");
                  }
              }
          }
      
          @Override
          public void channelInactive(ChannelHandlerContext ctx) throws Exception {
      
              sessionManager.unregisterUserContext(ctx.channel());
              log.info(ctx.channel().id() + "已掉線!");
              // 這里加入玩家的掉線處理
              ctx.close();
      
          }
      
      }

      MessageDispatcher分派各個處理器

      package com.chimeta.netty;
      
      import com.chimeta.netty.service.TerminalService;
      import org.springframework.beans.factory.annotation.Autowired;
      import org.springframework.scheduling.annotation.Async;
      import org.springframework.stereotype.Component;
      
      import com.chimeta.netty.constant.MessagingConst;
      import com.chimeta.netty.model.SessionCloseReason;
      import com.chimeta.netty.protobuf.ImProto.ImMsg;
      import com.chimeta.netty.service.LoginService;
      import com.chimeta.netty.util.MessageBuilder;
      import com.google.protobuf.InvalidProtocolBufferException;
      import lombok.extern.slf4j.Slf4j;
      
      import javax.annotation.Resource;
      
      /**
       * 請求分排器
       */
      @Component
      @Slf4j
      public class MessageDispatcher{
          
          @Autowired
          private LoginService loginService;
      
          @Resource
          private TerminalService terminalService;
          
          /**
           * 消息分發處理
           *
           * @param gameMsg
           * @throws InvalidProtocolBufferException 
           */
          @Async
          public void dispatch(ImMsg imMsg, IoSession currSession) throws InvalidProtocolBufferException {
      
              if(imMsg.getId() < 0) {
                 currSession.sendMessage(MessageBuilder.buildErrorResponse(imMsg, MessagingConst.RESPONSE_ERR_CODE_400, "Invalid message!"));
                 return;
              }
              //log.info("接收到的消息TypeUrl是: "+imMsg.getMsg().getTypeUrl());
              switch(imMsg.getMsg().getTypeUrl()) {
              
                  case MessagingConst.TYPE_ONLINE_REQUEST:
                     // 處理設備上線請求
                     loginService.doLogin(imMsg, currSession);
                     break;
                  case MessagingConst.TYPE_USER_LOGON_REQUEST:
                     // 處理請求
                     loginService.doUserLogon(imMsg, currSession);
                     break;
                  case MessagingConst.TYPE_USER_LOGOFF_REQUEST:
                     // 處理請求
                     loginService.doUserLogoff(imMsg, currSession);
                     break;
                case MessagingConst.TYPE_TERMINAL_STATE_REQUEST:
                      // 我寫的
                   terminalService.multiInsert(imMsg, currSession);
                   break;
                  default:
                     if(currSession != null) {
                        // 返回客戶端發來的心跳消息
                        responseHeartbeat(imMsg, currSession);
                     }
                     break;
              }
          }
          
          /**
           * 發送心跳包消息
           * @param gameMsg
           * @param currSession
           * @return
           */
          public boolean sendHeartbeat(IoSession currSession) {
             
             try {
                if(currSession.isHeartbeated()) {
                   return false;
                }
                ImMsg.Builder imMsgBuilder = ImMsg.newBuilder();
                
                currSession.sendMessage(imMsgBuilder.build());
                
                currSession.setHeartbeated(true);
                
                return true;
             }catch(Exception e) {
                log.error("主動發送心跳包時發生異常:", e);
                currSession.close(SessionCloseReason.EXCEPTION);
                return false;
             }
             
          }
          /**
           * 返回客戶端發來的心跳包消息
           * @param imMsg
           * @param currSession
           */
          private void responseHeartbeat(ImMsg imMsg,IoSession currSession) {
             ImMsg.Builder imMsgBuilder = ImMsg.newBuilder();
             
             currSession.sendMessage(imMsgBuilder.build());
          }
          
      }

      最后到service業務處理TerminalService

      package com.chimeta.netty.service;
      
      import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
      import com.chimeta.common.entity.terminal.TerminalStateMonitorDO;
      import com.chimeta.netty.IoSession;
      import com.chimeta.netty.constant.MessagingConst;
      import com.chimeta.netty.model.DeviceInfo;
      import com.chimeta.netty.protobuf.ImProto;
      import com.chimeta.netty.util.MessageBuilder;
      import com.chimeta.terminal.mapper.TerminalStateMonitorMapper;
      import lombok.extern.slf4j.Slf4j;
      import org.springframework.stereotype.Service;
      import org.springframework.transaction.annotation.Transactional;
      import org.springframework.util.CollectionUtils;
      
      import java.math.BigDecimal;
      import java.util.ArrayList;
      import java.util.List;
      
      /**
       * 盒子設備相關的實現類
       */
      @Service
      @Slf4j
      public class TerminalService extends ServiceImpl<TerminalStateMonitorMapper, TerminalStateMonitorDO> {
      
          @Transactional(rollbackFor = Exception.class)
          public void multiInsert(ImProto.ImMsg imMsg, IoSession currSession){
              DeviceInfo deviceInfo = currSession.getDevice();
              if(deviceInfo == null) {
                  currSession.sendMessage(MessageBuilder.buildErrorResponse(imMsg, MessagingConst.RESPONSE_ERR_CODE_400, "device not online!"));
                  return;
              }
              try {
                  ImProto.TerminalStateList terminalStateList = imMsg.getMsg().unpack(ImProto.TerminalStateList.class);
                  log.info("TerminalService multiInsert TerminalStateList:{}", terminalStateList);
                  List<ImProto.TerminalState> requestTerminalStateList = terminalStateList.getTerminalStateList();
      
                  if (!CollectionUtils.isEmpty(requestTerminalStateList)){
                      List<TerminalStateMonitorDO> tmplist = new ArrayList<>();
                      for (ImProto.TerminalState requestTerminalState : requestTerminalStateList){
                          TerminalStateMonitorDO terminalStateMonitorDO = new TerminalStateMonitorDO();
                          terminalStateMonitorDO.setBatteryLevel(requestTerminalState.getBatteryLevel());
                          terminalStateMonitorDO.setChargingState(requestTerminalState.getChargingState());
                          terminalStateMonitorDO.setTemperature(BigDecimal.valueOf(requestTerminalState.getTemperature()));
                          terminalStateMonitorDO.setUniqueCode(deviceInfo.getDeviceCode());
                          terminalStateMonitorDO.setStateTime(requestTerminalState.getStateTime());
                          tmplist.add(terminalStateMonitorDO);
                      }
                      this.saveBatch(tmplist);
                  }
              } catch (Exception e) {
                  log.error("TerminalService multiInsert error:{}", e);
              }
      
          }
      
      }

      至此,服務端的處理邏輯寫完,然后比較費時間的是自己寫client的請求,終于經過兩三天時間總結好了,寫了個test類,如下

      package com.chimeta.core;
      
      import com.chimeta.netty.protobuf.ImProto;
      import com.google.protobuf.Any;
      import io.netty.bootstrap.Bootstrap;
      import io.netty.buffer.Unpooled;
      import io.netty.channel.Channel;
      import io.netty.channel.ChannelInitializer;
      import io.netty.channel.nio.NioEventLoopGroup;
      import io.netty.channel.socket.nio.NioSocketChannel;
      import io.netty.handler.codec.protobuf.ProtobufEncoder;
      import io.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;
      import io.netty.handler.codec.protobuf.ProtobufVarint32LengthFieldPrepender;
      import lombok.extern.slf4j.Slf4j;
      import org.junit.jupiter.api.Test;
      import org.junit.runner.RunWith;
      import org.mockito.junit.MockitoJUnitRunner;
      
      
      @Slf4j
      @RunWith(MockitoJUnitRunner.class)
      class NettyTerminalTest {
      
      
          @Test
          public void tryTest()  throws InterruptedException {
      
              ImProto.TerminalStateList terminalstateList = ImProto.TerminalStateList.newBuilder().build();
              for (int i = 0; i < 3; i++) {
                  ImProto.TerminalState build = ImProto.TerminalState.newBuilder()
                          .setBatteryLevel(i)
                          .setChargingState(i * 11)
                          .setTemperature(i * 11.1)
                          .setStateTime(i * 111)
                          .build();
                  terminalstateList = terminalstateList.toBuilder().addTerminalState(build).build();
              }
      
              ImProto.ImMsg imMsg = ImProto.ImMsg.newBuilder().setId(66).setMsg(Any.pack(terminalstateList)).build();
      
              Channel channel = new Bootstrap()
                      .group(new NioEventLoopGroup(1))
                      .handler(new ChannelInitializer<NioSocketChannel>() {
                          @Override
                          protected void initChannel(NioSocketChannel ch) throws Exception {
                              System.out.println("初始化連接...");
                              ch.pipeline().addLast("encode", new ProtobufEncoder())
                                      .addLast(new ProtobufVarint32FrameDecoder()).addLast(new ProtobufVarint32LengthFieldPrepender());
                          }
                      })
                      .channel(NioSocketChannel.class).connect("192.168.123.123", 10001)
                      .sync()
                      .channel();
      
      //        channel.pipeline().addLast(new StringEncoder()).writeAndFlush(ByteBufAllocator.DEFAULT.buffer().writeBytes(imMsg.toByteArray()));
              channel.pipeline().writeAndFlush(Unpooled.copiedBuffer(imMsg.toByteArray()));
              System.out.println("over!");
          }
      
      }

       好了,記錄下,以后就不會忘記了

       

      posted @ 2024-04-24 15:44  蝸牛使勁沖  閱讀(112)  評論(0)    收藏  舉報
      主站蜘蛛池模板: 亚洲午夜福利精品无码不卡| 亚洲国产一区二区在线| 久久蜜臀av一区三区| 少妇高潮喷水惨叫久久久久电影| 欧美国产精品啪啪| 亚洲午夜无码av毛片久久| 久热这里只有精品蜜臀av| 国产日韩精品中文字幕| 国产中文字幕在线精品| 国产最新AV在线播放不卡| 欧美日韩国产一区二区三区欧| 国产一区一一区高清不卡| 日本va欧美va精品发布| 国产精品国产三级国快看| 亚洲av产在线精品亚洲第一站| 亚洲成av人片不卡无码手机版| 国产精品天干天干综合网| 国产成人精品av| 丰满人妻熟妇乱又伦精品劲| 亚洲小说乱欧美另类| 婷婷丁香五月激情综合| 激情综合五月网| av深夜免费在线观看| 日韩在线视频一区二区三区| 中文无码av一区二区三区 | 九九热视频免费在线播放| 亚洲国产中文字幕在线视频综合| 国产片一区二区三区视频| 亚洲av男人电影天堂热app| 99精品热在线在线观看视| 亚洲人成网网址在线看| 久久亚洲精品成人综合网| 国产高清自产拍av在线| 天天做日日做天天添天天欢公交车| 玖玖在线精品免费视频| 国产无人区码一区二区| 动漫av网站免费观看| 黑人巨大亚洲一区二区久| 亚洲鸥美日韩精品久久| 中文字幕日韩一区二区不卡| 精品少妇无码一区二区三批|