NetCore+Web客戶(hù)端實(shí)現(xiàn)gRPC實(shí)時(shí)推送
之前出過(guò)websocket推送,sse推送,grpc的推送應(yīng)該更具性?xún)r(jià)比,雖然前端要求復(fù)雜了一點(diǎn)點(diǎn)。下面快速的一步一步完成一個(gè)netcore服務(wù)端+web客戶(hù)端的推送。
后端項(xiàng)目結(jié)構(gòu)
GrpcRealtimePush/
├── Services/
│ └── ChatService.cs # gRPC服務(wù)實(shí)現(xiàn)
├── Protos/
│ └── chat.proto # Protocol Buffers定義
├── Program.cs # 服務(wù)啟動(dòng)配置
├── GrpcRealtimePush.csproj # 項(xiàng)目文件
└── appsettings.json # 配置文件
1.安裝必要的grpc包
<Project Sdk="Microsoft.NET.Sdk.Web"> <PropertyGroup> <TargetFramework>net9.0</TargetFramework> <Nullable>enable</Nullable> <ImplicitUsings>enable</ImplicitUsings> </PropertyGroup> <ItemGroup> <Protobuf Include="Protos\chat.proto" GrpcServices="Server" /> </ItemGroup> <ItemGroup> <PackageReference Include="Grpc.AspNetCore" Version="2.64.0" /> <PackageReference Include="Grpc.AspNetCore.Web" Version="2.64.0" /> </ItemGroup> </Project>
2.創(chuàng)建好proto文件
syntax = "proto3"; package chat; option csharp_namespace = "GrpcRealtimePush"; // 服務(wù)定義 service ChatService { // 服務(wù)端流式推送方法 rpc StartRealtimePush(RealtimePushRequest) returns (stream RealtimePushResponse); } // 請(qǐng)求消息 message RealtimePushRequest { string client_id = 1; // 客戶(hù)端ID int64 timestamp = 2; // 時(shí)間戳 } // 響應(yīng)消息 message RealtimePushResponse { string data = 1; // 推送數(shù)據(jù) int64 timestamp = 2; // 時(shí)間戳 string data_type = 3; // 數(shù)據(jù)類(lèi)型 }
proto文件定義就這樣:
- **`service ChatService`**: 定義gRPC服務(wù)
- **`rpc StartRealtimePush`**: 服務(wù)端流式方法,返回 `stream`表示持續(xù)推送
- **`message`**: 定義請(qǐng)求和響應(yīng)的數(shù)據(jù)結(jié)構(gòu)
- **字段編號(hào)**: 1, 2, 3等是字段的唯一標(biāo)識(shí),用于序列化
3.實(shí)現(xiàn)上面的方法
using Grpc.Core; namespace GrpcRealtimePush.Services; public class ChatService : GrpcRealtimePush.ChatService.ChatServiceBase { private readonly ILogger<ChatService> _logger; public ChatService(ILogger<ChatService> logger) { _logger = logger; } public override async Task StartRealtimePush(RealtimePushRequest request, IServerStreamWriter<RealtimePushResponse> responseStream, ServerCallContext context) { _logger.LogInformation("?? 實(shí)時(shí)推送已啟動(dòng)! 客戶(hù)端: {ClientId}", request.ClientId); try { // 開(kāi)始連續(xù)數(shù)據(jù)推送 var counter = 1; var random = new Random(); var dataTypes = new[] { "系統(tǒng)狀態(tài)", "用戶(hù)活動(dòng)", "數(shù)據(jù)更新", "通知消息", "性能指標(biāo)" }; _logger.LogInformation("?? 開(kāi)始連續(xù)數(shù)據(jù)推送循環(huán)..."); while (!context.CancellationToken.IsCancellationRequested && counter <= 100) { // 模擬不同類(lèi)型的實(shí)時(shí)數(shù)據(jù) var dataType = dataTypes[random.Next(dataTypes.Length)]; var value = random.Next(1, 1000); var timestamp = DateTime.UtcNow; var response = new RealtimePushResponse { Data = $"#{counter:D4} - 數(shù)值: {value} | 時(shí)間: {timestamp:HH:mm:ss.fff}", Timestamp = DateTimeOffset.UtcNow.ToUnixTimeSeconds(), DataType = dataType }; await responseStream.WriteAsync(response); _logger.LogInformation("?? 推送數(shù)據(jù) #{Counter}: [{DataType}] = {Value} at {Time}", counter, dataType, value, timestamp.ToString("HH:mm:ss.fff")); counter++; // 等待2秒后發(fā)送下一條數(shù)據(jù) await Task.Delay(2000, context.CancellationToken); } // 發(fā)送完成消息 await responseStream.WriteAsync(new RealtimePushResponse { Data = "實(shí)時(shí)推送測(cè)試完成!", Timestamp = DateTimeOffset.UtcNow.ToUnixTimeSeconds(), DataType = "系統(tǒng)消息" }); } catch (OperationCanceledException) { _logger.LogInformation("實(shí)時(shí)推送會(huì)話已取消,客戶(hù)端: {ClientId}", request.ClientId); } catch (Exception ex) { _logger.LogError(ex, "實(shí)時(shí)推送會(huì)話出錯(cuò): {Error}", ex.Message); // 嘗試向客戶(hù)端發(fā)送錯(cuò)誤消息 try { await responseStream.WriteAsync(new RealtimePushResponse { Data = $"服務(wù)器錯(cuò)誤: {ex.Message}", Timestamp = DateTimeOffset.UtcNow.ToUnixTimeSeconds(), DataType = "錯(cuò)誤消息" }); } catch (Exception sendError) { _logger.LogError(sendError, "發(fā)送錯(cuò)誤消息失敗"); } } _logger.LogInformation("實(shí)時(shí)推送會(huì)話結(jié)束,客戶(hù)端: {ClientId}", request.ClientId); } }
4.Program文件
using GrpcRealtimePush.Services; var builder = WebApplication.CreateBuilder(args); // 添加gRPC服務(wù) builder.Services.AddGrpc(); // 配置CORS策略,支持gRPC-Web builder.Services.AddCors(options => { options.AddPolicy("AllowAll", policy => { policy.AllowAnyOrigin() .AllowAnyMethod() .AllowAnyHeader() .WithExposedHeaders("Grpc-Status", "Grpc-Message", "Grpc-Encoding", "Grpc-Accept-Encoding", "Content-Type"); }); }); var app = builder.Build(); // 配置HTTP請(qǐng)求管道 // 啟用CORS app.UseCors("AllowAll"); // 啟用gRPC-Web中間件 app.UseGrpcWeb(); // 配置HTTPS重定向(gRPC-Web需要) app.UseHttpsRedirection(); // 映射gRPC服務(wù)并啟用gRPC-Web支持 app.MapGrpcService<ChatService>().EnableGrpcWeb(); app.MapGet("/", () => "Communication with gRPC endpoints must be made through a gRPC client. To learn how to create a client, visit: https://go.microsoft.com/fwlink/?linkid=2086909"); app.Run();
以上代碼對(duì)于后端來(lái)說(shuō)應(yīng)該輕車(chē)熟路,后端服務(wù)就這樣起來(lái)了。
先測(cè)試一下后端服務(wù)是否正常,我這里有g(shù)o環(huán)境,直接安裝grpcurl工具。
# 安裝grpcurl工具 go install github.com/fullstorydev/grpcurl/cmd/grpcurl@latest # 測(cè)試服務(wù) grpcurl -insecure localhost:5201 list
grpcurl -insecure -d "{\"client_id\":\"test-client\",\"timestamp\":1234567890}" localhost:5201 chat.ChatService/StartRealtimePush

下面就是完成前端代碼了,這里使用js+html。
前端的結(jié)構(gòu)如下:
client/
├── generated/ # 生成的代碼
│ ├── chat_pb_browser.js # Protocol Buffers消息類(lèi)
│ └── chat_grpc_web_pb_browser.js # gRPC服務(wù)客戶(hù)端
├── grpc-web-shim.js # gRPC-Web兼容層
├── client.js # 主要業(yè)務(wù)邏輯
├── index.html # 用戶(hù)界面
前端準(zhǔn)備工作安裝protoc和插件。protoc把后端的proto文件轉(zhuǎn)成兩個(gè)js文件,插件就是grpc鏈接需要的。
# 安裝Protocol Buffers編譯器 # Windows: 下載 https://github.com/protocolbuffers/protobuf/releases # macOS: brew install protobuf # Linux: apt-get install protobuf-compiler # 驗(yàn)證安裝 protoc --version # 安裝gRPC-Web插件 npm install -g grpc-web
核心轉(zhuǎn)換代碼腳本如下:
protoc -I=GrpcRealtimePush\Protos ` --js_out=import_style=commonjs:client\generated ` --grpc-web_out=import_style=commonjs,mode=grpcwebtext:client\generated ` GrpcRealtimePush\Protos\chat.proto
執(zhí)行了protoc后會(huì)生成下面2個(gè)js文件
1. `chat_pb_browser.js`
// Browser-compatible version of chat_pb.js (function () { 'use strict'; // 確保命名空間存在 if (!window.proto) window.proto = {}; if (!window.proto.chat) window.proto.chat = {}; // RealtimePushRequest類(lèi) window.proto.chat.RealtimePushRequest = function (opt_data) { jspb.Message.initialize(this, opt_data, 0, -1, null, null); }; // 繼承jspb.Message if (jspb.Message) { window.proto.chat.RealtimePushRequest.prototype = Object.create(jspb.Message.prototype); window.proto.chat.RealtimePushRequest.prototype.constructor = window.proto.chat.RealtimePushRequest; } // RealtimePushRequest方法 window.proto.chat.RealtimePushRequest.prototype.getClientId = function () { return jspb.Message.getFieldWithDefault(this, 1, ""); }; window.proto.chat.RealtimePushRequest.prototype.setClientId = function (value) { return jspb.Message.setProto3StringField(this, 1, value); }; window.proto.chat.RealtimePushRequest.prototype.getTimestamp = function () { return jspb.Message.getFieldWithDefault(this, 2, 0); }; window.proto.chat.RealtimePushRequest.prototype.setTimestamp = function (value) { return jspb.Message.setProto3IntField(this, 2, value); }; // 序列化方法 window.proto.chat.RealtimePushRequest.prototype.serializeBinary = function () { const writer = new jspb.BinaryWriter(); window.proto.chat.RealtimePushRequest.serializeBinaryToWriter(this, writer); return writer.getResultBuffer(); }; window.proto.chat.RealtimePushRequest.serializeBinaryToWriter = function (message, writer) { const f = message.getClientId(); if (f.length > 0) { writer.writeString(1, f); } const f2 = message.getTimestamp(); if (f2 !== 0) { writer.writeInt64(2, f2); } }; window.proto.chat.RealtimePushRequest.deserializeBinary = function (bytes) { const reader = new jspb.BinaryReader(bytes); const msg = new window.proto.chat.RealtimePushRequest(); return window.proto.chat.RealtimePushRequest.deserializeBinaryFromReader(msg, reader); }; window.proto.chat.RealtimePushRequest.deserializeBinaryFromReader = function (msg, reader) { while (reader.nextField()) { if (reader.isEndGroup()) { break; } const field = reader.getFieldNumber(); switch (field) { case 1: const value = reader.readString(); msg.setClientId(value); break; case 2: const value2 = reader.readInt64(); msg.setTimestamp(value2); break; default: reader.skipField(); break; } } return msg; }; // RealtimePushResponse類(lèi) window.proto.chat.RealtimePushResponse = function (opt_data) { jspb.Message.initialize(this, opt_data, 0, -1, null, null); }; // 繼承jspb.Message if (jspb.Message) { window.proto.chat.RealtimePushResponse.prototype = Object.create(jspb.Message.prototype); window.proto.chat.RealtimePushResponse.prototype.constructor = window.proto.chat.RealtimePushResponse; } // RealtimePushResponse方法 window.proto.chat.RealtimePushResponse.prototype.getData = function () { return jspb.Message.getFieldWithDefault(this, 1, ""); }; window.proto.chat.RealtimePushResponse.prototype.setData = function (value) { return jspb.Message.setProto3StringField(this, 1, value); }; window.proto.chat.RealtimePushResponse.prototype.getTimestamp = function () { return jspb.Message.getFieldWithDefault(this, 2, 0); }; window.proto.chat.RealtimePushResponse.prototype.setTimestamp = function (value) { return jspb.Message.setProto3IntField(this, 2, value); }; window.proto.chat.RealtimePushResponse.prototype.getDataType = function () { return jspb.Message.getFieldWithDefault(this, 3, ""); }; window.proto.chat.RealtimePushResponse.prototype.setDataType = function (value) { return jspb.Message.setProto3StringField(this, 3, value); }; // 序列化方法 window.proto.chat.RealtimePushResponse.prototype.serializeBinary = function () { const writer = new jspb.BinaryWriter(); window.proto.chat.RealtimePushResponse.serializeBinaryToWriter(this, writer); return writer.getResultBuffer(); }; window.proto.chat.RealtimePushResponse.serializeBinaryToWriter = function (message, writer) { const f = message.getData(); if (f.length > 0) { writer.writeString(1, f); } const f2 = message.getTimestamp(); if (f2 !== 0) { writer.writeInt64(2, f2); } const f3 = message.getDataType(); if (f3.length > 0) { writer.writeString(3, f3); } }; window.proto.chat.RealtimePushResponse.deserializeBinary = function (bytes) { const reader = new jspb.BinaryReader(bytes); const msg = new window.proto.chat.RealtimePushResponse(); return window.proto.chat.RealtimePushResponse.deserializeBinaryFromReader(msg, reader); }; window.proto.chat.RealtimePushResponse.deserializeBinaryFromReader = function (msg, reader) { while (reader.nextField()) { if (reader.isEndGroup()) { break; } const field = reader.getFieldNumber(); switch (field) { case 1: const value = reader.readString(); msg.setData(value); break; case 2: const value2 = reader.readInt64(); msg.setTimestamp(value2); break; case 3: const value3 = reader.readString(); msg.setDataType(value3); break; default: reader.skipField(); break; } } return msg; }; console.log('chat_pb_browser.js loaded successfully'); })();
2. `chat_grpc_web_pb_browser.js`
// Browser-compatible version of chat_grpc_web_pb.js (function () { 'use strict'; // 確保命名空間存在 if (!window.proto) window.proto = {}; if (!window.proto.chat) window.proto.chat = {}; // ChatServiceClient類(lèi) window.proto.chat.ChatServiceClient = function (hostname, credentials, options) { if (!options) options = {}; options['format'] = options['format'] || 'text'; // 使用gRPC-Web基類(lèi) window.grpc.web.GrpcWebClientBase.call(this, options); this.hostname_ = hostname; this.credentials_ = credentials; this.options_ = options; }; // 繼承基類(lèi) if (window.grpc && window.grpc.web && window.grpc.web.GrpcWebClientBase) { window.proto.chat.ChatServiceClient.prototype = Object.create(window.grpc.web.GrpcWebClientBase.prototype); window.proto.chat.ChatServiceClient.prototype.constructor = window.proto.chat.ChatServiceClient; } // 方法描述符 const methodDescriptor_StartRealtimePush = new window.grpc.web.MethodDescriptor( '/chat.ChatService/StartRealtimePush', window.grpc.web.MethodType.SERVER_STREAMING, window.proto.chat.RealtimePushRequest, window.proto.chat.RealtimePushResponse, function (request) { return request.serializeBinary(); }, function (bytes) { return window.proto.chat.RealtimePushResponse.deserializeBinary(bytes); } ); // StartRealtimePush方法 window.proto.chat.ChatServiceClient.prototype.startRealtimePush = function (request, metadata) { const url = this.hostname_ + '/chat.ChatService/StartRealtimePush'; return this.serverStreaming(url, request, metadata || {}, methodDescriptor_StartRealtimePush); }; console.log('chat_grpc_web_pb_browser.js loaded successfully'); })();
下面就需要?jiǎng)?chuàng)建連接層代碼,該代碼手動(dòng)創(chuàng)建,有需要可以拷貝更改復(fù)用。
`grpc-web-shim.js`
// gRPC-Web compatibility shim (function() { 'use strict'; // 創(chuàng)建grpc命名空間 if (typeof window.grpc === 'undefined') { window.grpc = {}; } if (typeof window.grpc.web === 'undefined') { window.grpc.web = {}; } // 方法類(lèi)型枚舉 window.grpc.web.MethodType = { UNARY: 'unary', SERVER_STREAMING: 'server_streaming', CLIENT_STREAMING: 'client_streaming', BIDIRECTIONAL_STREAMING: 'bidirectional_streaming' }; // 方法描述符 window.grpc.web.MethodDescriptor = function(path, methodType, requestType, responseType, requestSerializeFn, responseDeserializeFn) { this.path = path; this.methodType = methodType; this.requestType = requestType; this.responseType = responseType; this.requestSerializeFn = requestSerializeFn; this.responseDeserializeFn = responseDeserializeFn; }; // 基礎(chǔ)客戶(hù)端類(lèi) window.grpc.web.GrpcWebClientBase = function(options) { this.options = options || {}; this.format = this.options.format || 'text'; }; // 服務(wù)端流式方法 window.grpc.web.GrpcWebClientBase.prototype.serverStreaming = function(url, request, metadata, methodDescriptor) { const self = this; // 創(chuàng)建簡(jiǎn)單的事件發(fā)射器 const stream = { listeners: {}, on: function(event, callback) { if (!this.listeners[event]) { this.listeners[event] = []; } this.listeners[event].push(callback); }, emit: function(event, data) { if (this.listeners[event]) { this.listeners[event].forEach(callback => callback(data)); } } }; try { // 序列化請(qǐng)求 const serializedRequest = methodDescriptor.requestSerializeFn(request); // 創(chuàng)建gRPC-Web幀 const frameHeader = new Uint8Array(5); frameHeader[0] = 0; // 壓縮標(biāo)志 const messageLength = serializedRequest.length; frameHeader[1] = (messageLength >>> 24) & 0xFF; frameHeader[2] = (messageLength >>> 16) & 0xFF; frameHeader[3] = (messageLength >>> 8) & 0xFF; frameHeader[4] = messageLength & 0xFF; const framedMessage = new Uint8Array(5 + messageLength); framedMessage.set(frameHeader, 0); framedMessage.set(serializedRequest, 5); const base64Request = btoa(String.fromCharCode.apply(null, framedMessage)); const headers = { 'Content-Type': 'application/grpc-web-text', 'X-Grpc-Web': '1', 'Accept': 'application/grpc-web-text' }; // 添加元數(shù)據(jù) if (metadata) { Object.keys(metadata).forEach(key => { if (key.toLowerCase() !== 'content-type') { headers[key] = metadata[key]; } }); } const fetchOptions = { method: 'POST', headers: headers, body: base64Request }; fetch(url, fetchOptions) .then(response => { if (!response.ok) { throw new Error(`HTTP ${response.status}: ${response.statusText}`); } console.log('開(kāi)始讀取流式響應(yīng)...'); // 使用ReadableStream讀取gRPC-Web流式響應(yīng) const reader = response.body.getReader(); const decoder = new TextDecoder(); let buffer = ''; let messageCount = 0; function readStreamChunk() { return reader.read().then(({ done, value }) => { if (done) { console.log('?? 流讀取完成,總共處理消息:', messageCount); if (buffer.length > 0) { console.log('?? 處理流結(jié)束時(shí)的剩余緩沖區(qū)'); processStreamBuffer(); } stream.emit('end'); return; } // 將新數(shù)據(jù)添加到緩沖區(qū) const chunk = decoder.decode(value, { stream: true }); buffer += chunk; console.log('?? 收到流數(shù)據(jù)塊:', chunk.length, '字符,緩沖區(qū)總計(jì):', buffer.length); // 處理緩沖區(qū)中的完整消息 processStreamBuffer(); // 繼續(xù)讀取 return readStreamChunk(); }).catch(error => { console.error('? 流讀取錯(cuò)誤:', error); stream.emit('error', error); }); } function processStreamBuffer() { console.log('?? 處理緩沖區(qū),長(zhǎng)度:', buffer.length); while (buffer.length > 0) { try { // 查找完整的base64塊 let messageBase64 = buffer; // 檢查是否包含trailer標(biāo)記 const trailerMarkers = ['gAAAA', 'gAAA', 'gAA', 'gA']; let trailerIndex = -1; for (const marker of trailerMarkers) { const index = messageBase64.indexOf(marker); if (index > 0) { trailerIndex = index; break; } } if (trailerIndex > 0) { messageBase64 = messageBase64.substring(0, trailerIndex); console.log('?? 在索引處找到trailer:', trailerIndex); } // 清理base64字符串 const cleanBase64 = messageBase64.replace(/[^A-Za-z0-9+/=]/g, ''); // 確保base64字符串長(zhǎng)度是4的倍數(shù) let paddedBase64 = cleanBase64; const padding = paddedBase64.length % 4; if (padding > 0) { paddedBase64 += '='.repeat(4 - padding); } if (paddedBase64.length === 0) { console.log('? 清理后base64為空'); buffer = ''; break; } // 解碼base64 const binaryString = atob(paddedBase64); const responseBytes = new Uint8Array(binaryString.length); for (let i = 0; i < binaryString.length; i++) { responseBytes[i] = binaryString.charCodeAt(i); } console.log('?? 解碼字節(jié)長(zhǎng)度:', responseBytes.length); // 檢查是否有足夠的數(shù)據(jù)來(lái)讀取gRPC幀頭 if (responseBytes.length >= 5) { const compressionFlag = responseBytes[0]; const frameMsgLength = (responseBytes[1] << 24) | (responseBytes[2] << 16) | (responseBytes[3] << 8) | responseBytes[4]; console.log(`?? 流幀: 壓縮=${compressionFlag}, 長(zhǎng)度=${frameMsgLength}, 總計(jì)=${responseBytes.length}`); // 檢查是否有完整的消息數(shù)據(jù) if (responseBytes.length >= 5 + frameMsgLength && frameMsgLength > 0) { const messageBytes = responseBytes.slice(5, 5 + frameMsgLength); try { const response = methodDescriptor.responseDeserializeFn(messageBytes); messageCount++; console.log(`? 成功解析消息 #${messageCount},發(fā)射數(shù)據(jù)`); stream.emit('data', response); // 處理完成后,移除已處理的數(shù)據(jù) if (trailerIndex > 0) { buffer = buffer.substring(trailerIndex); console.log('?? 移動(dòng)緩沖區(qū)越過(guò)trailer,剩余長(zhǎng)度:', buffer.length); } else { buffer = ''; console.log('?? 完全清空緩沖區(qū)'); } } catch (deserializeError) { console.error('? 反序列化錯(cuò)誤:', deserializeError); buffer = ''; break; } } else { console.log('? 幀數(shù)據(jù)不完整或長(zhǎng)度無(wú)效'); if (buffer.length < 200) { break; } else { buffer = ''; break; } } } else { console.log('? 幀太短,等待更多數(shù)據(jù)'); break; } } catch (parseError) { console.error('? 處理流消息錯(cuò)誤:', parseError); buffer = ''; break; } } console.log('?? 剩余緩沖區(qū)長(zhǎng)度:', buffer.length); } // 開(kāi)始讀取流 return readStreamChunk(); }) .catch(error => { console.error('流獲取錯(cuò)誤:', error); stream.emit('error', error); }); } catch (error) { setTimeout(() => stream.emit('error', error), 0); } return stream; }; console.log('gRPC-Web shim loaded successfully'); })();
下面就是簡(jiǎn)單的獲取實(shí)時(shí)數(shù)據(jù)的業(yè)務(wù)邏輯了
`client.js`
// gRPC-Web Chat Client Implementation class RealtimePushClient { constructor() { this.client = null; this.isConnected = false; this.serverUrl = 'https://localhost:5201'; // 流式傳輸相關(guān)屬性 this.currentStream = null; this.streamMessageCount = 0; this.streamStartTime = null; this.initializeUI(); } initializeUI() { const streamButton = document.getElementById('streamButton'); const stopStreamButton = document.getElementById('stopStreamButton'); const clearButton = document.getElementById('clearButton'); streamButton.addEventListener('click', () => this.startStreamingChat()); stopStreamButton.addEventListener('click', () => this.stopStreaming()); clearButton.addEventListener('click', () => this.clearMessages()); // 初始化連接狀態(tài) this.updateConnectionStatus(false, '正在初始化...'); // 頁(yè)面加載時(shí)嘗試連接 this.connect(); } connect() { try { // 初始化gRPC-Web客戶(hù)端 console.log('正在初始化實(shí)時(shí)推送客戶(hù)端...'); // 檢查必要的依賴(lài)是否可用 if (typeof jspb === 'undefined') { throw new Error('google-protobuf 庫(kù)未加載'); } if (typeof grpc === 'undefined' || !grpc.web) { console.warn('grpc-web 庫(kù)未完全加載,等待重試...'); setTimeout(() => this.connect(), 1000); return; } if (typeof proto === 'undefined' || !proto.chat || !proto.chat.ChatServiceClient) { throw new Error('gRPC 生成的客戶(hù)端代碼未加載'); } // 創(chuàng)建gRPC-Web客戶(hù)端 this.client = new proto.chat.ChatServiceClient(this.serverUrl, null, { format: 'text', withCredentials: false }); console.log('實(shí)時(shí)推送客戶(hù)端創(chuàng)建成功'); this.updateConnectionStatus(true, '已連接'); this.addMessage('系統(tǒng)', '?? 實(shí)時(shí)推送客戶(hù)端已就緒', 'system'); } catch (error) { console.error('連接初始化失敗:', error); this.updateConnectionStatus(false, '初始化失敗'); this.addMessage('系統(tǒng)', '初始化失敗: ' + this.getErrorMessage(error), 'error'); } } startStreamingChat() { if (!this.isConnected) { this.addMessage('系統(tǒng)', '未連接到服務(wù)器,無(wú)法啟動(dòng)實(shí)時(shí)推送', 'error'); return; } if (!this.client) { this.addMessage('系統(tǒng)', 'gRPC客戶(hù)端未初始化', 'error'); return; } // 檢查是否已在流式傳輸 if (this.currentStream) { this.addMessage('系統(tǒng)', '實(shí)時(shí)推送已在運(yùn)行中', 'system'); return; } try { // 創(chuàng)建實(shí)時(shí)推送請(qǐng)求 const pushRequest = new proto.chat.RealtimePushRequest(); pushRequest.setClientId('web-client-' + Date.now()); pushRequest.setTimestamp(Math.floor(Date.now() / 1000)); console.log('啟動(dòng)實(shí)時(shí)推送:', { clientId: pushRequest.getClientId(), timestamp: pushRequest.getTimestamp() }); // 添加流式傳輸?shù)脑獢?shù)據(jù) const metadata = { 'x-user-agent': 'grpc-web-realtime-client' }; // 開(kāi)始流式傳輸 const stream = this.client.startRealtimePush(pushRequest, metadata); if (!stream) { throw new Error('無(wú)法創(chuàng)建實(shí)時(shí)推送連接'); } // 存儲(chǔ)流引用 this.currentStream = stream; this.streamMessageCount = 0; this.streamStartTime = Date.now(); // 更新UI顯示流式傳輸已激活 this.updateStreamingUI(true); stream.on('data', (response) => { if (response && typeof response.getData === 'function') { this.streamMessageCount++; // 添加帶有實(shí)時(shí)數(shù)據(jù)特殊樣式的消息 this.addRealtimeMessage( `[${response.getDataType()}] ${response.getData()}`, this.streamMessageCount ); // 更新統(tǒng)計(jì)信息 this.updateStreamStats(); } }); stream.on('error', (error) => { console.error('實(shí)時(shí)推送錯(cuò)誤:', error); this.addMessage('系統(tǒng)', '實(shí)時(shí)推送錯(cuò)誤: ' + this.getErrorMessage(error), 'error'); this.stopStreaming(); }); stream.on('end', () => { console.log('實(shí)時(shí)推送結(jié)束'); this.addMessage('系統(tǒng)', '實(shí)時(shí)推送已結(jié)束', 'system'); this.stopStreaming(); }); this.addMessage('系統(tǒng)', '?? 實(shí)時(shí)數(shù)據(jù)推送已啟動(dòng)', 'system'); } catch (error) { console.error('啟動(dòng)實(shí)時(shí)推送失敗:', error); this.addMessage('系統(tǒng)', '啟動(dòng)實(shí)時(shí)推送失敗: ' + this.getErrorMessage(error), 'error'); } } // 其他方法實(shí)現(xiàn)... updateConnectionStatus(connected, message = '') { const statusDiv = document.getElementById('status'); const streamButton = document.getElementById('streamButton'); this.isConnected = connected; if (connected) { statusDiv.textContent = '狀態(tài): 已連接' + (message ? ' - ' + message : ''); statusDiv.className = 'status connected'; streamButton.disabled = false; } else { statusDiv.textContent = '狀態(tài): 未連接' + (message ? ' - ' + message : ''); statusDiv.className = 'status disconnected'; streamButton.disabled = true; } } addMessage(sender, content, type) { const chatContainer = document.getElementById('chatContainer'); const messageDiv = document.createElement('div'); messageDiv.className = `message ${type}`; const timestamp = new Date().toLocaleTimeString(); messageDiv.innerHTML = ` <div><strong>${sender}</strong> <small>${timestamp}</small></div> <div>${content}</div> `; chatContainer.appendChild(messageDiv); chatContainer.scrollTop = chatContainer.scrollHeight; } addRealtimeMessage(content, count) { const chatContainer = document.getElementById('chatContainer'); const messageDiv = document.createElement('div'); messageDiv.className = 'message realtime'; const timestamp = new Date().toLocaleTimeString(); messageDiv.innerHTML = ` <div class="realtime-header"> <strong>?? 實(shí)時(shí)數(shù)據(jù) #${count}</strong> <small>${timestamp}</small> </div> <div class="realtime-content">${content}</div> `; chatContainer.appendChild(messageDiv); chatContainer.scrollTop = chatContainer.scrollHeight; // 保持最后100條消息以防止內(nèi)存問(wèn)題 const messages = chatContainer.querySelectorAll('.message'); if (messages.length > 100) { for (let i = 0; i < messages.length - 100; i++) { messages[i].remove(); } } } getErrorMessage(error) { if (!error) return '未知錯(cuò)誤'; // 處理gRPC-Web特定錯(cuò)誤 if (error.code !== undefined) { const grpcErrorCodes = { 0: 'OK', 1: 'CANCELLED - 操作被取消', 2: 'UNKNOWN - 未知錯(cuò)誤', 3: 'INVALID_ARGUMENT - 無(wú)效參數(shù)', 4: 'DEADLINE_EXCEEDED - 請(qǐng)求超時(shí)', 5: 'NOT_FOUND - 未找到', 6: 'ALREADY_EXISTS - 已存在', 7: 'PERMISSION_DENIED - 權(quán)限被拒絕', 8: 'RESOURCE_EXHAUSTED - 資源耗盡', 9: 'FAILED_PRECONDITION - 前置條件失敗', 10: 'ABORTED - 操作被中止', 11: 'OUT_OF_RANGE - 超出范圍', 12: 'UNIMPLEMENTED - 未實(shí)現(xiàn)', 13: 'INTERNAL - 內(nèi)部錯(cuò)誤', 14: 'UNAVAILABLE - 服務(wù)不可用', 15: 'DATA_LOSS - 數(shù)據(jù)丟失', 16: 'UNAUTHENTICATED - 未認(rèn)證' }; const codeDescription = grpcErrorCodes[error.code] || `未知錯(cuò)誤代碼: ${error.code}`; return `gRPC錯(cuò)誤: ${codeDescription}`; } return error.message || error.toString(); } } // 頁(yè)面加載時(shí)初始化實(shí)時(shí)推送客戶(hù)端 document.addEventListener('DOMContentLoaded', () => { window.realtimePushClient = new RealtimePushClient(); });
最后創(chuàng)建一個(gè)html界面
`?index.html`
<!DOCTYPE html> <html lang="zh-CN"> <head> <meta charset="UTF-8"> <meta name="viewport" content="width=device-width, initial-scale=1.0"> <title>gRPC-Web 實(shí)時(shí)數(shù)據(jù)推送</title> <style> body { font-family: Arial, sans-serif; max-width: 800px; margin: 0 auto; padding: 20px; background-color: #f5f5f5; } h1 { color: #333; text-align: center; margin-bottom: 30px; } .chat-container { border: 1px solid #ccc; height: 400px; overflow-y: auto; padding: 10px; margin-bottom: 20px; background-color: #fff; border-radius: 8px; box-shadow: 0 2px 4px rgba(0,0,0,0.1); } .message { margin-bottom: 10px; padding: 8px; border-radius: 5px; border-left: 4px solid #ddd; } .system { background-color: #fff3e0; border-left-color: #ff9800; text-align: center; font-style: italic; } .error { background-color: #ffebee; border-left-color: #f44336; color: #c62828; text-align: center; } .realtime { background-color: #e8f5e8; border-left-color: #4caf50; animation: fadeIn 0.3s ease-in; } .realtime-header { font-weight: bold; color: #2e7d32; margin-bottom: 5px; } .realtime-content { font-family: 'Courier New', monospace; font-size: 0.9em; color: #1b5e20; } .input-container { display: flex; gap: 10px; margin-top: 20px; } button { padding: 12px 24px; border: none; border-radius: 6px; cursor: pointer; font-size: 14px; font-weight: bold; transition: background-color 0.3s; } #streamButton { background-color: #4caf50; color: white; } #streamButton:hover:not(:disabled) { background-color: #388e3c; } #streamButton:disabled { background-color: #cccccc; cursor: not-allowed; opacity: 0.6; } #stopStreamButton { background-color: #f44336; color: white; } #stopStreamButton:hover { background-color: #d32f2f; } #clearButton { background-color: #757575; color: white; } #clearButton:hover { background-color: #616161; } .status { margin-bottom: 15px; padding: 10px; border-radius: 6px; font-weight: bold; text-align: center; } .connected { background-color: #c8e6c9; color: #2e7d32; border: 1px solid #4caf50; } .disconnected { background-color: #ffcdd2; color: #c62828; border: 1px solid #f44336; } .stream-stats { background-color: #f3e5f5; padding: 10px; margin: 10px 0; border-radius: 6px; font-size: 0.9em; color: #4a148c; border: 1px solid #9c27b0; } @keyframes fadeIn { from { opacity: 0; transform: translateY(-10px); } to { opacity: 1; transform: translateY(0); } } </style> </head> <body> <h1>?? gRPC-Web 實(shí)時(shí)數(shù)據(jù)推送系統(tǒng)</h1> <div id="status" class="status disconnected"> 狀態(tài): 未連接 </div> <div id="chatContainer" class="chat-container"> <div class="loading">正在初始化客戶(hù)端...</div> </div> <div class="input-container"> <button id="streamButton">?? 啟動(dòng)實(shí)時(shí)推送</button> <button id="stopStreamButton" style="display: none;">?? 停止推送</button> <button id="clearButton">??? 清空消息</button> </div> <!-- 引入依賴(lài)庫(kù) --> <script src="https://unpkg.com/google-protobuf@3.21.2/google-protobuf.js"></script> <!-- 本地gRPC-Web兼容層 --> <script src="./grpc-web-shim.js"></script> <!-- 瀏覽器兼容的gRPC-Web文件 --> <script src="./generated/chat_pb_browser.js"></script> <script src="./generated/chat_grpc_web_pb_browser.js"></script> <!-- 主要客戶(hù)端腳本 --> <script src="./client.js"></script> </body> </html>
直接雙擊index.html,或者通過(guò)http.server啟動(dòng)服務(wù)就能愉快的接收推送的實(shí)時(shí)數(shù)據(jù)了

跟其他推送送相比,類(lèi)型安全,性能高,壓縮傳輸?shù)鹊龋乔岸酥С窒鄬?duì)沒(méi)那么友好。


浙公網(wǎng)安備 33010602011771號(hào)