<output id="qn6qe"></output>

    1. <output id="qn6qe"><tt id="qn6qe"></tt></output>
    2. <strike id="qn6qe"></strike>

      亚洲 日本 欧洲 欧美 视频,日韩中文字幕有码av,一本一道av中文字幕无码,国产线播放免费人成视频播放,人妻少妇偷人无码视频,日夜啪啪一区二区三区,国产尤物精品自在拍视频首页,久热这里只有精品12

      基于DotNetty實(shí)現(xiàn)自動發(fā)布 - 通信實(shí)現(xiàn)

      基于 DotNetty 實(shí)現(xiàn)通信

      DotNetty : 是微軟的 Azure 團(tuán)隊,使用 C#實(shí)現(xiàn)的 Netty 的版本發(fā)布。是.NET 平臺的優(yōu)秀網(wǎng)絡(luò)庫。

      項目介紹

      OpenDeploy.Communication 類庫項目,是通信相關(guān)基礎(chǔ)設(shè)施層

      image

      • Codec 模塊實(shí)現(xiàn)編碼解碼
      • Convention 模塊定義約定,比如抽象的業(yè)務(wù) Handler, 消息載體 NettyMessage, 消息上下文 'NettyContext' 等

      自定義消息格式

      消息類為 NettyMessage ,封裝了消息頭 NettyHeader 和消息體 Body

      image

      NettyMessage

      封裝了消息頭 NettyHeader 和消息體 Body

      NettyMessage 點(diǎn)擊查看代碼
      /// <summary> Netty消息 </summary>
      public class NettyMessage
      {
          /// <summary> 消息頭 </summary>
          public NettyHeader Header { get; init; } = default!;
      
          /// <summary> 消息體(可空,可根據(jù)具體業(yè)務(wù)而定) </summary>
          public byte[]? Body { get; init; }
      
          /// <summary> 消息頭轉(zhuǎn)為字節(jié)數(shù)組 </summary>
          public byte[] GetHeaderBytes()
          {
              var headerString = Header.ToString();
              return Encoding.UTF8.GetBytes(headerString);
          }
      
          /// <summary> 是否同步消息 </summary>
          public bool IsSync() => Header.Sync;
      
          /// <summary> 創(chuàng)建Netty消息工廠方法 </summary>
          public static NettyMessage Create(string endpoint, bool sync = false, byte[]? body = null)
          {
              return new NettyMessage
              {
                  Header = new NettyHeader { EndPoint = endpoint, Sync = sync },
                  Body = body
              };
          }
      
          /// <summary> 序列化為JSON字符串 </summary>
          public override string ToString() => Header.ToString();
      }
      

      NettyHeader

      消息頭,包含請求唯一標(biāo)識,是否同步消息,終結(jié)點(diǎn)等, 在傳輸數(shù)據(jù)時會序列化為 JSON

      NettyHeader 點(diǎn)擊查看代碼
      /// <summary> Netty消息頭 </summary>
      public class NettyHeader
      {
          /// <summary> 請求消息唯一標(biāo)識 </summary>
          public Guid RequestId { get; init; } = Guid.NewGuid();
      
          /// <summary> 是否同步消息, 默認(rèn)false是異步消息 </summary>
          public bool Sync { get; init; }
      
          /// <summary> 終結(jié)點(diǎn) (借鑒MVC,約定為Control/Action模式) </summary>
          public string EndPoint { get; init; } = string.Empty;
      
          /// <summary> 序列化為JSON字符串 </summary>
          public override string ToString() => this.ToJsonString();
      }
      

      • 請求消息唯一標(biāo)識 RequestId , 用來唯一標(biāo)識消息, 主要用于 發(fā)送同步請求, 因為默認(rèn)的消息是異步的,只管發(fā)出去,不需要等待響應(yīng)
      • 是否同步消息 Sync , 可以不需要,主要為了可視化,便于調(diào)試
      • 終結(jié)點(diǎn) EndPoint , (借鑒 MVC,約定為 Control/Action 模式), 服務(wù)端直接解析出對應(yīng)的處理器

      編碼器

      DefaultEncoder 點(diǎn)擊查看代碼
      public class DefaultEncoder : MessageToByteEncoder<NettyMessage>
      {
          protected override void Encode(IChannelHandlerContext context, NettyMessage message, IByteBuffer output)
          {
              //消息頭轉(zhuǎn)為字節(jié)數(shù)組
              var headerBytes = message.GetHeaderBytes();
      
              //寫入消息頭長度
              output.WriteInt(headerBytes.Length);
      
              //寫入消息頭字節(jié)數(shù)組
              output.WriteBytes(headerBytes);
      
              //寫入消息體字節(jié)數(shù)組
              if (message.Body != null && message.Body.Length > 0)
              {
                  output.WriteBytes(message.Body);
              }
          }
      }
      

      解碼器

      DefaultDecoder 點(diǎn)擊查看代碼
      public class DefaultDecoder : MessageToMessageDecoder<IByteBuffer>
      {
          protected override void Decode(IChannelHandlerContext context, IByteBuffer input, List<object> output)
          {
              //消息總長度
              var totalLength = input.ReadableBytes;
      
              //消息頭長度
              var headerLength = input.GetInt(input.ReaderIndex);
      
              //消息體長度
              var bodyLength = totalLength - 4 - headerLength;
      
              //讀取消息頭字節(jié)數(shù)組
              var headerBytes = new byte[headerLength];
              input.GetBytes(input.ReaderIndex + 4, headerBytes, 0, headerLength);
      
              byte[]? bodyBytes = null;
              string? rawHeaderString = null;
              NettyHeader? header;
      
              try
              {
                  //把消息頭字節(jié)數(shù)組,反序列化為JSON
                  rawHeaderString = Encoding.UTF8.GetString(headerBytes);
                  header = JsonSerializer.Deserialize<NettyHeader>(rawHeaderString);
              }
              catch (Exception ex)
              {
                  Logger.Error($"解碼失敗: {rawHeaderString}, {ex}");
                  return;
              }
      
              if (header is null)
              {
                  Logger.Error($"解碼失敗: {rawHeaderString}");
                  return;
              }
      
              //讀取消息體字節(jié)數(shù)組
              if (bodyLength > 0)
              {
                  bodyBytes = new byte[bodyLength];
                  input.GetBytes(input.ReaderIndex + 4 + headerLength, bodyBytes, 0, bodyLength);
              }
      
              //封裝為NettyMessage對象
              var message = new NettyMessage
              {
                  Header = header,
                  Body = bodyBytes,
              };
      
              output.Add(message);
          }
      }
      

      NettyServer 實(shí)現(xiàn)

      NettyServer 點(diǎn)擊查看代碼
      public static class NettyServer
      {
          /// <summary>
          /// 開啟Netty服務(wù)
          /// </summary>
          public static async Task RunAsync(int port = 20007)
          {
              var bossGroup = new MultithreadEventLoopGroup(1);
              var workerGroup = new MultithreadEventLoopGroup();
      
              try
              {
                  var bootstrap = new ServerBootstrap().Group(bossGroup, workerGroup);
      
                  bootstrap
                      .Channel<TcpServerSocketChannel>()
                      .Option(ChannelOption.SoBacklog, 100)
                      .Option(ChannelOption.SoReuseaddr, true)
                      .Option(ChannelOption.SoReuseport, true)
                      .ChildHandler(new ActionChannelInitializer<IChannel>(channel =>
                      {
                          IChannelPipeline pipeline = channel.Pipeline;
                          pipeline.AddLast("framing-enc", new LengthFieldPrepender(4));
                          pipeline.AddLast("framing-dec", new LengthFieldBasedFrameDecoder(int.MaxValue, 0, 4, 0, 4));
                          pipeline.AddLast("decoder", new DefaultDecoder());
                          pipeline.AddLast("encoder", new DefaultEncoder());
                          pipeline.AddLast("handler", new ServerMessageEntry());
                      }));
      
                  var boundChannel = await bootstrap.BindAsync(port);
      
                  Logger.Info($"NettyServer啟動成功...{boundChannel}");
      
                  Console.ReadLine();
      
                  await boundChannel.CloseAsync();
      
                  Logger.Info($"NettyServer關(guān)閉監(jiān)聽了...{boundChannel}");
              }
              finally
              {
                  await Task.WhenAll(
                      bossGroup.ShutdownGracefullyAsync(TimeSpan.FromMilliseconds(100), TimeSpan.FromSeconds(1)),
                      workerGroup.ShutdownGracefullyAsync(TimeSpan.FromMilliseconds(100), TimeSpan.FromSeconds(1))
                  );
      
                  Logger.Info($"NettyServer退出了...");
              }
      
          }
      }
      
      • 服務(wù)端管道最后我們添加了 ServerMessageEntry ,作為消息處理的入口
      ServerMessageEntry 點(diǎn)擊查看代碼
      public class ServerMessageEntry : ChannelHandlerAdapter
      {
          /// <summary> Netty處理器選擇器 </summary>
          private readonly DefaultNettyHandlerSelector handlerSelector = new();
      
          public ServerMessageEntry()
          {
              //注冊Netty處理器
              handlerSelector.RegisterHandlerTypes(typeof(EchoHandler).Assembly.GetTypes());
          }
      
          /// <summary> 通道激活 </summary>
          public override void ChannelActive(IChannelHandlerContext context)
          {
              Logger.Warn($"ChannelActive: {context.Channel}");
          }
      
          /// <summary> 通道關(guān)閉 </summary>
          public override void ChannelInactive(IChannelHandlerContext context)
          {
              Logger.Warn($"ChannelInactive: {context.Channel}");
          }
      
          /// <summary> 收到客戶端的消息 </summary>
          public override async void ChannelRead(IChannelHandlerContext context, object message)
          {
              if (message is not NettyMessage nettyMessage)
              {
                  Logger.Error("從客戶端接收消息為空");
                  return;
              }
      
              try
              {
                  Logger.Info($"收到客戶端的消息: {nettyMessage}");
      
                  //封裝請求
                  var nettyContext = new NettyContext(context.Channel, nettyMessage);
      
                  //選擇處理器
                  AbstractNettyHandler handler = handlerSelector.SelectHandler(nettyContext);
      
                  //處理請求
                  await handler.ProcessAsync();
              }
              catch(Exception ex)
              {
                  Logger.Error($"ServerMessageEntry.ChannelRead: {ex}");
              }
          }
      }
      
      • 按照約定, 把繼承 AbstractNettyHandler 的類視為業(yè)務(wù)處理器

      • ServerMessageEntry 拿到消息后,首先把消息封裝為 NettyContext, 類似與 MVC 中的 HttpContext, 封裝了請求和響應(yīng)對象, 內(nèi)部解析請求的 EndPoint, 拆分為 HandlerName, ActionName

      • DefaultNettyHandlerSelector 提供注冊處理器的方法 RegisterHandlerTypes, 和選擇處理器的方法 SelectHandler

      • SelectHandler, 默認(rèn)規(guī)則是查找已注冊的處理器中以 HandlerName 開頭的類型

      • AbstractNettyHandlerProcessAsync 方法,通過 ActionName, 反射拿到 MethodInfo, 調(diào)用終結(jié)點(diǎn)

      NettyClient 實(shí)現(xiàn)

      NettyClient 點(diǎn)擊查看代碼
      public sealed class NettyClient(string serverHost, int serverPort) : IDisposable
      {
          public EndPoint ServerEndPoint { get; } = new IPEndPoint(IPAddress.Parse(serverHost), serverPort);
      
          private static readonly Bootstrap bootstrap = new();
          private static readonly IEventLoopGroup eventLoopGroup = new SingleThreadEventLoop();
      
          private bool _disposed;
          private IChannel? _channel;
          public bool IsConnected => _channel != null && _channel.Open;
          public bool IsWritable => _channel != null && _channel.IsWritable;
      
          static NettyClient()
          {
              bootstrap
                  .Group(eventLoopGroup)
                  .Channel<TcpSocketChannel>()
                  .Option(ChannelOption.SoReuseaddr, true)
                  .Option(ChannelOption.SoReuseport, true)
                  .Handler(new ActionChannelInitializer<ISocketChannel>(channel =>
                  {
                      IChannelPipeline pipeline = channel.Pipeline;
                      //pipeline.AddLast("ping", new IdleStateHandler(0, 5, 0));
                      pipeline.AddLast("framing-enc", new LengthFieldPrepender(4));
                      pipeline.AddLast("framing-dec", new LengthFieldBasedFrameDecoder(int.MaxValue, 0, 4, 0, 4));
                      pipeline.AddLast("decoder", new DefaultDecoder());
                      pipeline.AddLast("encoder", new DefaultEncoder());
                      pipeline.AddLast("handler", new ClientMessageEntry());
                  }));
          }
      
          /// <summary> 連接服務(wù)器 </summary>
          private async Task TryConnectAsync()
          {
              try
              {
                  if (IsConnected) { return; }
                  _channel = await bootstrap.ConnectAsync(ServerEndPoint);
              }
              catch (Exception ex)
              {
                  throw new Exception($"連接服務(wù)器失敗 : {ServerEndPoint} {ex.Message}");
              }
          }
      
          /// <summary>
          /// 發(fā)送消息
          /// </summary>
          /// <param name="endpoint">終結(jié)點(diǎn)</param>
          /// <param name="sync">是否同步等待響應(yīng)</param>
          /// <param name="body">正文</param>
          public async Task SendAsync(string endpoint, bool sync = false, byte[]? body = null)
          {
              var message = NettyMessage.Create(endpoint, sync, body);
              if (sync)
              {
                  var task = ClientMessageSynchronizer.TryAdd(message);
                  try
                  {
                      await SendAsync(message);
                      await task;
                  }
                  catch
                  {
                      ClientMessageSynchronizer.TryRemove(message);
                      throw;
                  }
              }
              else
              {
                  await SendAsync(message);
              }
          }
      
          /// <summary>
          /// 發(fā)送消息
          /// </summary>
          private async Task SendAsync(NettyMessage message)
          {
              await TryConnectAsync();
              await _channel!.WriteAndFlushAsync(message);
          }
      
          /// <summary> 釋放連接(程序員手動釋放, 一般在代碼使用using語句,或在finally里面Dispose) </summary>
          public void Dispose()
          {
              Dispose(true);
              GC.SuppressFinalize(this);
          }
      
          /// <summary> 釋放連接 </summary>
          private void Dispose(bool disposing)
          {
              if (_disposed)
              {
                  return;
              }
      
              //釋放托管資源,比如嵌套的對象
              if (disposing)
              {
      
              }
      
              //釋放非托管資源
              if (_channel != null)
              {
                  _channel.CloseAsync();
                  _channel = null;
              }
      
              _disposed = true;
          }
      
          ~NettyClient()
          {
              Dispose(true);
          }
      }
      
      • NettyClient 封裝了 Netty 客戶端邏輯,提供發(fā)送異步請求(默認(rèn))和發(fā)布同步請求方法
      • DotNetty 默認(rèn)不提供同步請求,但是有些情況我們需要同步等待服務(wù)器的響應(yīng),所有需要自行實(shí)現(xiàn),實(shí)現(xiàn)也很簡單,把消息 ID 緩存起來,收到服務(wù)器響應(yīng)后激活就行了,具體實(shí)現(xiàn)在消息同步器 ClientMessageSynchronizer, 就不貼了

      總結(jié)

      至此,我們實(shí)現(xiàn)了基于 DotNetty 搭建通信模塊, 實(shí)現(xiàn)了客戶端和服務(wù)器的編解碼,處理器選擇,客戶端實(shí)現(xiàn)了同步消息等,大家可以在 ConsoleHost 結(jié)尾的控制臺項目中,測試下同步和異步的消息,實(shí)現(xiàn)的簡單的 Echo 模式

      代碼倉庫

      項目暫且就叫 OpenDeploy

      歡迎大家拍磚,Star

      下一步

      計劃下一步,基于WPF的客戶端, 實(shí)現(xiàn)接口項目的配置與發(fā)現(xiàn)

      posted @ 2023-12-04 17:51  Broadm  閱讀(1869)  評論(3)    收藏  舉報
      主站蜘蛛池模板: 国产亚洲精品久久久久久久软件| 日本边添边摸边做边爱| 无码国内精品久久人妻蜜桃| 无码熟妇人妻av在线电影| 国产亚洲一区二区三不卡| 欧美人与禽2o2o性论交| 国产精品自在自线视频| 18禁精品一区二区三区| 午夜国产小视频| 亚洲国产色婷婷久久99精品91| XXXXXHD亚洲日本HD| 太仆寺旗| 国产黄色三级三级看三级| 欧美黑吊大战白妞| 免费无码AV一区二区波多野结衣| 国产成人av免费观看| 日本一区二区国产在线| 亚洲欧美日韩国产精品专区| 无码专区视频精品老司机| 久久久国产精品樱花网站| 成年女人免费碰碰视频| 日本三线免费视频观看| 美乳丰满人妻无码视频| 色多多性虎精品无码av| 成人精品动漫一区二区| 国产二区三区不卡免费| 阳谷县| 日本一卡2卡3卡四卡精品网站| 成人自拍短视频午夜福利| 国产果冻豆传媒麻婆精东| 亚洲人成网站77777在线观看| 亚洲免费网站观看视频| 97久久超碰精品视觉盛宴| 欧美成人午夜在线观看视频| 隆昌县| 国产高清精品在线一区二区| 亚洲中文字幕无码爆乳APP| 日韩一区精品视频一区二区| 国产成人精品无缓存在线播放| 亚洲熟妇自偷自拍另类| 亚洲精品一区二区动漫|