基于DotNetty實(shí)現(xiàn)自動發(fā)布 - 通信實(shí)現(xiàn)
基于 DotNetty 實(shí)現(xiàn)通信
DotNetty : 是微軟的 Azure 團(tuán)隊,使用 C#實(shí)現(xiàn)的 Netty 的版本發(fā)布。是.NET 平臺的優(yōu)秀網(wǎng)絡(luò)庫。
項目介紹
OpenDeploy.Communication 類庫項目,是通信相關(guān)基礎(chǔ)設(shè)施層

Codec模塊實(shí)現(xiàn)編碼解碼Convention模塊定義約定,比如抽象的業(yè)務(wù) Handler, 消息載體NettyMessage, 消息上下文 'NettyContext' 等
自定義消息格式
消息類為 NettyMessage ,封裝了消息頭 NettyHeader 和消息體 Body

NettyMessage
封裝了消息頭
NettyHeader和消息體Body
NettyMessage 點(diǎn)擊查看代碼
/// <summary> Netty消息 </summary>
public class NettyMessage
{
/// <summary> 消息頭 </summary>
public NettyHeader Header { get; init; } = default!;
/// <summary> 消息體(可空,可根據(jù)具體業(yè)務(wù)而定) </summary>
public byte[]? Body { get; init; }
/// <summary> 消息頭轉(zhuǎn)為字節(jié)數(shù)組 </summary>
public byte[] GetHeaderBytes()
{
var headerString = Header.ToString();
return Encoding.UTF8.GetBytes(headerString);
}
/// <summary> 是否同步消息 </summary>
public bool IsSync() => Header.Sync;
/// <summary> 創(chuàng)建Netty消息工廠方法 </summary>
public static NettyMessage Create(string endpoint, bool sync = false, byte[]? body = null)
{
return new NettyMessage
{
Header = new NettyHeader { EndPoint = endpoint, Sync = sync },
Body = body
};
}
/// <summary> 序列化為JSON字符串 </summary>
public override string ToString() => Header.ToString();
}
NettyHeader
消息頭,包含請求唯一標(biāo)識,是否同步消息,終結(jié)點(diǎn)等, 在傳輸數(shù)據(jù)時會序列化為 JSON
NettyHeader 點(diǎn)擊查看代碼
/// <summary> Netty消息頭 </summary>
public class NettyHeader
{
/// <summary> 請求消息唯一標(biāo)識 </summary>
public Guid RequestId { get; init; } = Guid.NewGuid();
/// <summary> 是否同步消息, 默認(rèn)false是異步消息 </summary>
public bool Sync { get; init; }
/// <summary> 終結(jié)點(diǎn) (借鑒MVC,約定為Control/Action模式) </summary>
public string EndPoint { get; init; } = string.Empty;
/// <summary> 序列化為JSON字符串 </summary>
public override string ToString() => this.ToJsonString();
}
- 請求消息唯一標(biāo)識
RequestId, 用來唯一標(biāo)識消息, 主要用于 發(fā)送同步請求, 因為默認(rèn)的消息是異步的,只管發(fā)出去,不需要等待響應(yīng) - 是否同步消息
Sync, 可以不需要,主要為了可視化,便于調(diào)試 - 終結(jié)點(diǎn)
EndPoint, (借鑒 MVC,約定為 Control/Action 模式), 服務(wù)端直接解析出對應(yīng)的處理器
編碼器
DefaultEncoder 點(diǎn)擊查看代碼
public class DefaultEncoder : MessageToByteEncoder<NettyMessage>
{
protected override void Encode(IChannelHandlerContext context, NettyMessage message, IByteBuffer output)
{
//消息頭轉(zhuǎn)為字節(jié)數(shù)組
var headerBytes = message.GetHeaderBytes();
//寫入消息頭長度
output.WriteInt(headerBytes.Length);
//寫入消息頭字節(jié)數(shù)組
output.WriteBytes(headerBytes);
//寫入消息體字節(jié)數(shù)組
if (message.Body != null && message.Body.Length > 0)
{
output.WriteBytes(message.Body);
}
}
}
解碼器
DefaultDecoder 點(diǎn)擊查看代碼
public class DefaultDecoder : MessageToMessageDecoder<IByteBuffer>
{
protected override void Decode(IChannelHandlerContext context, IByteBuffer input, List<object> output)
{
//消息總長度
var totalLength = input.ReadableBytes;
//消息頭長度
var headerLength = input.GetInt(input.ReaderIndex);
//消息體長度
var bodyLength = totalLength - 4 - headerLength;
//讀取消息頭字節(jié)數(shù)組
var headerBytes = new byte[headerLength];
input.GetBytes(input.ReaderIndex + 4, headerBytes, 0, headerLength);
byte[]? bodyBytes = null;
string? rawHeaderString = null;
NettyHeader? header;
try
{
//把消息頭字節(jié)數(shù)組,反序列化為JSON
rawHeaderString = Encoding.UTF8.GetString(headerBytes);
header = JsonSerializer.Deserialize<NettyHeader>(rawHeaderString);
}
catch (Exception ex)
{
Logger.Error($"解碼失敗: {rawHeaderString}, {ex}");
return;
}
if (header is null)
{
Logger.Error($"解碼失敗: {rawHeaderString}");
return;
}
//讀取消息體字節(jié)數(shù)組
if (bodyLength > 0)
{
bodyBytes = new byte[bodyLength];
input.GetBytes(input.ReaderIndex + 4 + headerLength, bodyBytes, 0, bodyLength);
}
//封裝為NettyMessage對象
var message = new NettyMessage
{
Header = header,
Body = bodyBytes,
};
output.Add(message);
}
}
NettyServer 實(shí)現(xiàn)
NettyServer 點(diǎn)擊查看代碼
public static class NettyServer
{
/// <summary>
/// 開啟Netty服務(wù)
/// </summary>
public static async Task RunAsync(int port = 20007)
{
var bossGroup = new MultithreadEventLoopGroup(1);
var workerGroup = new MultithreadEventLoopGroup();
try
{
var bootstrap = new ServerBootstrap().Group(bossGroup, workerGroup);
bootstrap
.Channel<TcpServerSocketChannel>()
.Option(ChannelOption.SoBacklog, 100)
.Option(ChannelOption.SoReuseaddr, true)
.Option(ChannelOption.SoReuseport, true)
.ChildHandler(new ActionChannelInitializer<IChannel>(channel =>
{
IChannelPipeline pipeline = channel.Pipeline;
pipeline.AddLast("framing-enc", new LengthFieldPrepender(4));
pipeline.AddLast("framing-dec", new LengthFieldBasedFrameDecoder(int.MaxValue, 0, 4, 0, 4));
pipeline.AddLast("decoder", new DefaultDecoder());
pipeline.AddLast("encoder", new DefaultEncoder());
pipeline.AddLast("handler", new ServerMessageEntry());
}));
var boundChannel = await bootstrap.BindAsync(port);
Logger.Info($"NettyServer啟動成功...{boundChannel}");
Console.ReadLine();
await boundChannel.CloseAsync();
Logger.Info($"NettyServer關(guān)閉監(jiān)聽了...{boundChannel}");
}
finally
{
await Task.WhenAll(
bossGroup.ShutdownGracefullyAsync(TimeSpan.FromMilliseconds(100), TimeSpan.FromSeconds(1)),
workerGroup.ShutdownGracefullyAsync(TimeSpan.FromMilliseconds(100), TimeSpan.FromSeconds(1))
);
Logger.Info($"NettyServer退出了...");
}
}
}
- 服務(wù)端管道最后我們添加了
ServerMessageEntry,作為消息處理的入口
ServerMessageEntry 點(diǎn)擊查看代碼
public class ServerMessageEntry : ChannelHandlerAdapter
{
/// <summary> Netty處理器選擇器 </summary>
private readonly DefaultNettyHandlerSelector handlerSelector = new();
public ServerMessageEntry()
{
//注冊Netty處理器
handlerSelector.RegisterHandlerTypes(typeof(EchoHandler).Assembly.GetTypes());
}
/// <summary> 通道激活 </summary>
public override void ChannelActive(IChannelHandlerContext context)
{
Logger.Warn($"ChannelActive: {context.Channel}");
}
/// <summary> 通道關(guān)閉 </summary>
public override void ChannelInactive(IChannelHandlerContext context)
{
Logger.Warn($"ChannelInactive: {context.Channel}");
}
/// <summary> 收到客戶端的消息 </summary>
public override async void ChannelRead(IChannelHandlerContext context, object message)
{
if (message is not NettyMessage nettyMessage)
{
Logger.Error("從客戶端接收消息為空");
return;
}
try
{
Logger.Info($"收到客戶端的消息: {nettyMessage}");
//封裝請求
var nettyContext = new NettyContext(context.Channel, nettyMessage);
//選擇處理器
AbstractNettyHandler handler = handlerSelector.SelectHandler(nettyContext);
//處理請求
await handler.ProcessAsync();
}
catch(Exception ex)
{
Logger.Error($"ServerMessageEntry.ChannelRead: {ex}");
}
}
}
-
按照約定, 把繼承
AbstractNettyHandler的類視為業(yè)務(wù)處理器 -
ServerMessageEntry拿到消息后,首先把消息封裝為NettyContext, 類似與 MVC 中的 HttpContext, 封裝了請求和響應(yīng)對象, 內(nèi)部解析請求的EndPoint, 拆分為HandlerName,ActionName -
DefaultNettyHandlerSelector提供注冊處理器的方法RegisterHandlerTypes, 和選擇處理器的方法SelectHandler -
SelectHandler, 默認(rèn)規(guī)則是查找已注冊的處理器中以HandlerName開頭的類型 -
AbstractNettyHandler的ProcessAsync方法,通過ActionName, 反射拿到MethodInfo, 調(diào)用終結(jié)點(diǎn)
NettyClient 實(shí)現(xiàn)
NettyClient 點(diǎn)擊查看代碼
public sealed class NettyClient(string serverHost, int serverPort) : IDisposable
{
public EndPoint ServerEndPoint { get; } = new IPEndPoint(IPAddress.Parse(serverHost), serverPort);
private static readonly Bootstrap bootstrap = new();
private static readonly IEventLoopGroup eventLoopGroup = new SingleThreadEventLoop();
private bool _disposed;
private IChannel? _channel;
public bool IsConnected => _channel != null && _channel.Open;
public bool IsWritable => _channel != null && _channel.IsWritable;
static NettyClient()
{
bootstrap
.Group(eventLoopGroup)
.Channel<TcpSocketChannel>()
.Option(ChannelOption.SoReuseaddr, true)
.Option(ChannelOption.SoReuseport, true)
.Handler(new ActionChannelInitializer<ISocketChannel>(channel =>
{
IChannelPipeline pipeline = channel.Pipeline;
//pipeline.AddLast("ping", new IdleStateHandler(0, 5, 0));
pipeline.AddLast("framing-enc", new LengthFieldPrepender(4));
pipeline.AddLast("framing-dec", new LengthFieldBasedFrameDecoder(int.MaxValue, 0, 4, 0, 4));
pipeline.AddLast("decoder", new DefaultDecoder());
pipeline.AddLast("encoder", new DefaultEncoder());
pipeline.AddLast("handler", new ClientMessageEntry());
}));
}
/// <summary> 連接服務(wù)器 </summary>
private async Task TryConnectAsync()
{
try
{
if (IsConnected) { return; }
_channel = await bootstrap.ConnectAsync(ServerEndPoint);
}
catch (Exception ex)
{
throw new Exception($"連接服務(wù)器失敗 : {ServerEndPoint} {ex.Message}");
}
}
/// <summary>
/// 發(fā)送消息
/// </summary>
/// <param name="endpoint">終結(jié)點(diǎn)</param>
/// <param name="sync">是否同步等待響應(yīng)</param>
/// <param name="body">正文</param>
public async Task SendAsync(string endpoint, bool sync = false, byte[]? body = null)
{
var message = NettyMessage.Create(endpoint, sync, body);
if (sync)
{
var task = ClientMessageSynchronizer.TryAdd(message);
try
{
await SendAsync(message);
await task;
}
catch
{
ClientMessageSynchronizer.TryRemove(message);
throw;
}
}
else
{
await SendAsync(message);
}
}
/// <summary>
/// 發(fā)送消息
/// </summary>
private async Task SendAsync(NettyMessage message)
{
await TryConnectAsync();
await _channel!.WriteAndFlushAsync(message);
}
/// <summary> 釋放連接(程序員手動釋放, 一般在代碼使用using語句,或在finally里面Dispose) </summary>
public void Dispose()
{
Dispose(true);
GC.SuppressFinalize(this);
}
/// <summary> 釋放連接 </summary>
private void Dispose(bool disposing)
{
if (_disposed)
{
return;
}
//釋放托管資源,比如嵌套的對象
if (disposing)
{
}
//釋放非托管資源
if (_channel != null)
{
_channel.CloseAsync();
_channel = null;
}
_disposed = true;
}
~NettyClient()
{
Dispose(true);
}
}
NettyClient封裝了 Netty 客戶端邏輯,提供發(fā)送異步請求(默認(rèn))和發(fā)布同步請求方法DotNetty默認(rèn)不提供同步請求,但是有些情況我們需要同步等待服務(wù)器的響應(yīng),所有需要自行實(shí)現(xiàn),實(shí)現(xiàn)也很簡單,把消息 ID 緩存起來,收到服務(wù)器響應(yīng)后激活就行了,具體實(shí)現(xiàn)在消息同步器ClientMessageSynchronizer, 就不貼了
總結(jié)
至此,我們實(shí)現(xiàn)了基于 DotNetty 搭建通信模塊, 實(shí)現(xiàn)了客戶端和服務(wù)器的編解碼,處理器選擇,客戶端實(shí)現(xiàn)了同步消息等,大家可以在 ConsoleHost 結(jié)尾的控制臺項目中,測試下同步和異步的消息,實(shí)現(xiàn)的簡單的 Echo 模式
代碼倉庫
項目暫且就叫
OpenDeploy吧
歡迎大家拍磚,Star
下一步
計劃下一步,基于WPF的客戶端, 實(shí)現(xiàn)接口項目的配置與發(fā)現(xiàn)

浙公網(wǎng)安備 33010602011771號