基于surging的木舟IOT平臺如何添加網絡組件
一 、 概述
為了彌補代碼的遺失,木舟IOT平臺正在加班加點進行研發,后面不只是針對于IOT設備接入上報,告警,視頻管理,組態數據可視化大屏,后面還會有快速搭建微服務平臺,利用surging.cli工具根據數據庫表生成微服務,中間服務,能讓程序員快速完成BOSS交給的任務,從而在這個內卷的社會能占有一席之地。這些都是沒有完成任務的空話,現在發此篇的目的是作者有能力開發出優秀的IOT平臺,先介紹一個比較突出的功能,就是可以基于共享或者獨立配置添加網絡組件, 下面來介紹一下如何添加網絡組件。
一鍵運行打包成品下載:https://pan.baidu.com/s/1MVsjKtVYpUonauAz9ZXtPg?pwd=1q2g
測試用戶:fanly
測試密碼:123456
為了讓大家節約時間,能盡快運行產品看到效果,上面有 一鍵運行打包成品可以進行下載測試運行。
二、如何測試運行
以下是目錄結構,
IDE:consul 注冊中心
kayak.client: 網關
kayak.server:微服務
apache-skywalking-apm:skywalking鏈路跟蹤

以上是目錄結構,大家不需要一個個運行,只需要打開運行startup.bat,如果需要測試skywalking ,只需要apache-skywalking-apm\bin\startup.bat 文件就可以了,以下是運行的界面
三、如何添加組件
1.添加http服務組件,
打開平臺界面,然后點擊設備接入->網絡組件,然后可以看到如下界面

再點擊新增組件或者編輯組件,完成后注意啟動狀態是關閉狀態,此時并不能對于該組件功能進行訪問調用,只有把啟動狀態打開,才能訪問調用

以上是http服務組件,啟動完成后,如果設置了webservice和swagger,你可以訪問webservice和swagger,看是否可以訪問


2.添加/編輯Tcp服務組件
當添加/編輯Tcp組件時,設置Host:127.0.0.1 ,port:248并且還有解析方式選項,選項里面有不處理,固定長度,分隔符,自定義腳本,下面我們就來看自定義腳本

添加腳本如下:
parser.Fixed(4).Handler( function(buffer){ var buf = BytesUtils.Slice(buffer,1,4); parser.Fixed(buffer.ReadableBytes).Result(buf); }).Handler( function(buffer){parser.Fixed(8).Result(buffer);} ).Handler(function(buffer){ parser.Result('處理完成','gb2312').Complete(); } )
而基于TCP服務代碼如下,需要繼承于TcpBehavior
internal class TcpDeviceDataService : TcpBehavior, ITcpDeviceDataService { private readonly IDeviceProvider _deviceProvider; public TcpDeviceDataService(IDeviceProvider deviceProvider) { _deviceProvider = deviceProvider; } public override void Load(string clientId, NetworkProperties tcpServerProperties) { var deviceStatus = _deviceProvider.IsConnected(clientId); this.Parser.HandlePayload().Subscribe(async buffer => await ParserBuffer(buffer)); } public override void DeviceStatusProcess(DeviceStatus status, string clientId, NetworkProperties tcpServerProperties) { //throw new NotImplementedException(); } public async Task ParserBuffer(IByteBuffer buffer) { List<string> result = new List<string>(); while (buffer.ReadableBytes > 0) { result.Add(buffer.ReadString(this.Parser.GetNextFixedRecordLength(), Encoding.GetEncoding("gb2312"))); } // var str= buffer.ReadString(buffer.ReadableBytes, Encoding.UTF8); var byteBuffer = Unpooled.Buffer(); byteBuffer.WriteString("\r\n", Encoding.UTF8); byteBuffer.WriteString("處理完成", Encoding.GetEncoding("gb2312")); await Sender.SendAndFlushAsync(byteBuffer); // await Sender.SendAndFlushAsync("消息已接收",Encoding.GetEncoding("gb2312")); this.Parser.Close(); } public Task<bool> ChangeDeviceStage(string deviceId) { throw new NotImplementedException(); } }
用測試Tcp調試工具結果如下

3.添加/編輯UDP服務組件
當添加/編輯UDP組件時, 設置Host:127.0.0.1 ,port:267 并且可以是否開啟組播

而基于udp服務代碼如下,需要繼承于UdpBehavior
internal class UdpDeviceDataService : UdpBehavior, IUdpDeviceDataService { public Task<bool> ChangeDeviceStage(string deviceId) { throw new NotImplementedException(); } public override async Task Dispatch(IEnumerable<byte> bytes) { await Sender.SendAndFlushAsync("\r\n", Encoding.UTF8); await Sender.SendAndFlushAsync("處理完成", Encoding.GetEncoding("gb2312")); } }
測試結果如下:

4.添加/編輯WebSocket服務組件
當添加/編輯WebSocket組件時, 設置Host:127.0.0.1 ,port:55

而基于websocket服務代碼如下,需要繼承于WSBehavior
internal class WSDeviceDataService : WSBehavior, IWSDeviceDataService { protected override void OnMessage(MessageEventArgs e) { this.Client.Value.SendTo($"send:{e.Data},\r\n reply:hello,welcome to you!",ID); } protected override void OnOpen() { } }
測試結果如下:

5.添加/編輯UDP服務組件
當添加/編輯WebSocket組件時, 設置Host:127.0.0.1 ,port:345

添加greet.proto文件,腳本如下:
syntax = "proto3"; package Greet; service Greeter { // Sends a greeting rpc ChangeDeviceStage (DeviceRequest) returns (DeviceReply) {} } message DeviceRequest { string deviceId = 1; } message DeviceReply { bool message = 1; }
然后再創建GreeterBehavior,繼承Greeter.GreeterBase, IServiceBehavior,代碼如下
public partial class GreeterBehavior : Greeter.GreeterBase, IServiceBehavior { private ServerReceivedDelegate received; public event ServerReceivedDelegate Received { add { if (value == null) { received += value; } } remove { received -= value; } } public string MessageId { get; } = Guid.NewGuid().ToString("N"); public async Task Write(object result, int statusCode = 200, string exceptionMessage = "") { if (received == null) return; var message = new TransportMessage(MessageId, new ReactiveResultMessage { ExceptionMessage = exceptionMessage, StatusCode = statusCode, Result = result }); await received(message); } public T CreateProxy<T>(string key) where T : class { return ServiceLocator.GetService<IServiceProxyFactory>().CreateProxy<T>(key); } public object CreateProxy(Type type) { return ServiceLocator.GetService<IServiceProxyFactory>().CreateProxy(type); } public object CreateProxy(string key, Type type) { return ServiceLocator.GetService<IServiceProxyFactory>().CreateProxy(key, type); } public T CreateProxy<T>() where T : class { return ServiceLocator.GetService<IServiceProxyFactory>().CreateProxy<T>(); } public T GetService<T>(string key) where T : class { if (ServiceLocator.Current.IsRegisteredWithKey<T>(key)) return ServiceLocator.GetService<T>(key); else return ServiceLocator.GetService<IServiceProxyFactory>().CreateProxy<T>(key); } public T GetService<T>() where T : class { if (ServiceLocator.Current.IsRegistered<T>()) return ServiceLocator.GetService<T>(); else return ServiceLocator.GetService<IServiceProxyFactory>().CreateProxy<T>(); } public object GetService(Type type) { if (ServiceLocator.Current.IsRegistered(type)) return ServiceLocator.GetService(type); else return ServiceLocator.GetService<IServiceProxyFactory>().CreateProxy(type); } public object GetService(string key, Type type) { if (ServiceLocator.Current.IsRegisteredWithKey(key, type)) return ServiceLocator.GetService(key, type); else return ServiceLocator.GetService<IServiceProxyFactory>().CreateProxy(key, type); } public void Publish(IntegrationEvent @event) { GetService<IEventBus>().Publish(@event); } }
而基于grpc服務代碼如下,需要繼承于剛剛創建的GreeterBehavior
public class GrpcDeviceDataService : GreeterBehavior, IGrpcDeviceDataService { public override Task<DeviceReply> ChangeDeviceStage(DeviceRequest request, ServerCallContext context) { return Task.FromResult(new DeviceReply { Message = true }) ; } }
以下是測試結果:
6.添加/編輯MQTT服務組件
當添加/編輯MQTT組件時, 設置Host:127.0.0.1 ,port:425

而基于mqtt服務代碼如下,需要繼承于MqttBehavior
public class MQTTDeviceDataService : MqttBehavior, IMQTTDeviceDataService { public override async Task<bool> Authorized(string username, string password) { bool result = false; if (username == "admin" && password == "123456") result = true; return await Task.FromResult(result); } public async Task<bool> IsOnline(string deviceId) { return await base.GetDeviceIsOnine(deviceId); } public async Task Publish(string deviceId, WillMessage message) { var willMessage = new MqttWillMessage { WillMessage = message.Message, Qos = message.Qos, Topic = message.Topic, WillRetain = message.WillRetain }; await Publish(deviceId, willMessage); await RemotePublish(deviceId, willMessage); } }
以下是測試結果:

三、總結
木舟IOT平臺會在github開源社區版本,可以自由更改代碼,用于商業項目,但不能自營平臺,如低代碼平臺,IOT平臺等,如有違反,后果自負,還有最好不要更改命名空間,然后跟公司說是自己研發的,如果知道后,我在博客全網通報此人,以前surging相關的事件就算了,就當沒發生過。,如果碰到困難,比較緊急的話,可以聯系作者,加群:744677125

浙公網安備 33010602011771號