<output id="qn6qe"></output>

    1. <output id="qn6qe"><tt id="qn6qe"></tt></output>
    2. <strike id="qn6qe"></strike>

      亚洲 日本 欧洲 欧美 视频,日韩中文字幕有码av,一本一道av中文字幕无码,国产线播放免费人成视频播放,人妻少妇偷人无码视频,日夜啪啪一区二区三区,国产尤物精品自在拍视频首页,久热这里只有精品12

      NetCore+Web客戶(hù)端實(shí)現(xiàn)gRPC實(shí)時(shí)推送

      之前出過(guò)websocket推送,sse推送,grpc的推送應(yīng)該更具性?xún)r(jià)比,雖然前端要求復(fù)雜了一點(diǎn)點(diǎn)。下面快速的一步一步完成一個(gè)netcore服務(wù)端+web客戶(hù)端的推送。

      后端項(xiàng)目結(jié)構(gòu)

      GrpcRealtimePush/
      ├── Services/
      │ └── ChatService.cs # gRPC服務(wù)實(shí)現(xiàn)
      ├── Protos/
      │ └── chat.proto # Protocol Buffers定義
      ├── Program.cs # 服務(wù)啟動(dòng)配置
      ├── GrpcRealtimePush.csproj # 項(xiàng)目文件
      └── appsettings.json # 配置文件

      1.安裝必要的grpc包

      <Project Sdk="Microsoft.NET.Sdk.Web">
        <PropertyGroup>
          <TargetFramework>net9.0</TargetFramework>
          <Nullable>enable</Nullable>
          <ImplicitUsings>enable</ImplicitUsings>
        </PropertyGroup>
      
        <ItemGroup>
          <Protobuf Include="Protos\chat.proto" GrpcServices="Server" />
        </ItemGroup>
      
        <ItemGroup>
          <PackageReference Include="Grpc.AspNetCore" Version="2.64.0" />
          <PackageReference Include="Grpc.AspNetCore.Web" Version="2.64.0" />
        </ItemGroup>
      </Project>

       

      2.創(chuàng)建好proto文件

      syntax = "proto3";
      
      package chat;
      
      option csharp_namespace = "GrpcRealtimePush";
      
      // 服務(wù)定義
      service ChatService {
        // 服務(wù)端流式推送方法
        rpc StartRealtimePush(RealtimePushRequest) returns (stream RealtimePushResponse);
      }
      
      // 請(qǐng)求消息
      message RealtimePushRequest {
        string client_id = 1;    // 客戶(hù)端ID
        int64 timestamp = 2;      // 時(shí)間戳
      }
      
      // 響應(yīng)消息
      message RealtimePushResponse {
        string data = 1;          // 推送數(shù)據(jù)
        int64 timestamp = 2;      // 時(shí)間戳
        string data_type = 3;     // 數(shù)據(jù)類(lèi)型
      }

      proto文件定義就這樣:

      - **`service ChatService`**: 定義gRPC服務(wù)
      - **`rpc StartRealtimePush`**: 服務(wù)端流式方法,返回 `stream`表示持續(xù)推送
      - **`message`**: 定義請(qǐng)求和響應(yīng)的數(shù)據(jù)結(jié)構(gòu)
      - **字段編號(hào)**: 1, 2, 3等是字段的唯一標(biāo)識(shí),用于序列化

      3.實(shí)現(xiàn)上面的方法

      using Grpc.Core;
      
      namespace GrpcRealtimePush.Services;
      
      public class ChatService : GrpcRealtimePush.ChatService.ChatServiceBase
      {
          private readonly ILogger<ChatService> _logger;
      
          public ChatService(ILogger<ChatService> logger)
          {
              _logger = logger;
          }
      
          public override async Task StartRealtimePush(RealtimePushRequest request, 
              IServerStreamWriter<RealtimePushResponse> responseStream, ServerCallContext context)
          {
              _logger.LogInformation("?? 實(shí)時(shí)推送已啟動(dòng)! 客戶(hù)端: {ClientId}", request.ClientId);
              
              try
              {
                  // 開(kāi)始連續(xù)數(shù)據(jù)推送
                  var counter = 1;
                  var random = new Random();
                  var dataTypes = new[] { "系統(tǒng)狀態(tài)", "用戶(hù)活動(dòng)", "數(shù)據(jù)更新", "通知消息", "性能指標(biāo)" };
                  
                  _logger.LogInformation("?? 開(kāi)始連續(xù)數(shù)據(jù)推送循環(huán)...");
                  
                  while (!context.CancellationToken.IsCancellationRequested && counter <= 100)
                  {
                      // 模擬不同類(lèi)型的實(shí)時(shí)數(shù)據(jù)
                      var dataType = dataTypes[random.Next(dataTypes.Length)];
                      var value = random.Next(1, 1000);
                      var timestamp = DateTime.UtcNow;
                      
                      var response = new RealtimePushResponse
                      {
                          Data = $"#{counter:D4} - 數(shù)值: {value} | 時(shí)間: {timestamp:HH:mm:ss.fff}",
                          Timestamp = DateTimeOffset.UtcNow.ToUnixTimeSeconds(),
                          DataType = dataType
                      };
      
                      await responseStream.WriteAsync(response);
                      _logger.LogInformation("?? 推送數(shù)據(jù) #{Counter}: [{DataType}] = {Value} at {Time}", 
                          counter, dataType, value, timestamp.ToString("HH:mm:ss.fff"));
                      
                      counter++;
                      
                      // 等待2秒后發(fā)送下一條數(shù)據(jù)
                      await Task.Delay(2000, context.CancellationToken);
                  }
                  
                  // 發(fā)送完成消息
                  await responseStream.WriteAsync(new RealtimePushResponse
                  {
                      Data = "實(shí)時(shí)推送測(cè)試完成!",
                      Timestamp = DateTimeOffset.UtcNow.ToUnixTimeSeconds(),
                      DataType = "系統(tǒng)消息"
                  });
                  
              }
              catch (OperationCanceledException)
              {
                  _logger.LogInformation("實(shí)時(shí)推送會(huì)話已取消,客戶(hù)端: {ClientId}", request.ClientId);
              }
              catch (Exception ex)
              {
                  _logger.LogError(ex, "實(shí)時(shí)推送會(huì)話出錯(cuò): {Error}", ex.Message);
                  
                  // 嘗試向客戶(hù)端發(fā)送錯(cuò)誤消息
                  try
                  {
                      await responseStream.WriteAsync(new RealtimePushResponse
                      {
                          Data = $"服務(wù)器錯(cuò)誤: {ex.Message}",
                          Timestamp = DateTimeOffset.UtcNow.ToUnixTimeSeconds(),
                          DataType = "錯(cuò)誤消息"
                      });
                  }
                  catch (Exception sendError)
                  {
                      _logger.LogError(sendError, "發(fā)送錯(cuò)誤消息失敗");
                  }
              }
              
              _logger.LogInformation("實(shí)時(shí)推送會(huì)話結(jié)束,客戶(hù)端: {ClientId}", request.ClientId);
          }
      }

      4.Program文件

      using GrpcRealtimePush.Services;
      
      var builder = WebApplication.CreateBuilder(args);
      
      // 添加gRPC服務(wù)
      builder.Services.AddGrpc();
      
      // 配置CORS策略,支持gRPC-Web
      builder.Services.AddCors(options =>
      {
          options.AddPolicy("AllowAll", policy =>
          {
              policy.AllowAnyOrigin()
                    .AllowAnyMethod()
                    .AllowAnyHeader()
                    .WithExposedHeaders("Grpc-Status", "Grpc-Message", "Grpc-Encoding", "Grpc-Accept-Encoding", "Content-Type");
          });
      });
      
      var app = builder.Build();
      
      // 配置HTTP請(qǐng)求管道
      
      // 啟用CORS
      app.UseCors("AllowAll");
      
      // 啟用gRPC-Web中間件
      app.UseGrpcWeb();
      
      // 配置HTTPS重定向(gRPC-Web需要)
      app.UseHttpsRedirection();
      
      // 映射gRPC服務(wù)并啟用gRPC-Web支持
      app.MapGrpcService<ChatService>().EnableGrpcWeb();
      
      app.MapGet("/", () => "Communication with gRPC endpoints must be made through a gRPC client. To learn how to create a client, visit: https://go.microsoft.com/fwlink/?linkid=2086909");
      
      app.Run();

       

      以上代碼對(duì)于后端來(lái)說(shuō)應(yīng)該輕車(chē)熟路,后端服務(wù)就這樣起來(lái)了。

      先測(cè)試一下后端服務(wù)是否正常,我這里有g(shù)o環(huán)境,直接安裝grpcurl工具。

      # 安裝grpcurl工具
      go install github.com/fullstorydev/grpcurl/cmd/grpcurl@latest
      
      # 測(cè)試服務(wù)
      grpcurl -insecure localhost:5201 list
      grpcurl -insecure -d "{\"client_id\":\"test-client\",\"timestamp\":1234567890}" localhost:5201 chat.ChatService/StartRealtimePush

       

      Snipaste_2025-09-23_23-58-34

       

      下面就是完成前端代碼了,這里使用js+html。


      前端的結(jié)構(gòu)如下:

      client/
      ├── generated/ # 生成的代碼
      │ ├── chat_pb_browser.js # Protocol Buffers消息類(lèi)
      │ └── chat_grpc_web_pb_browser.js # gRPC服務(wù)客戶(hù)端
      ├── grpc-web-shim.js # gRPC-Web兼容層
      ├── client.js # 主要業(yè)務(wù)邏輯
      ├── index.html # 用戶(hù)界面

      前端準(zhǔn)備工作安裝protoc和插件。protoc把后端的proto文件轉(zhuǎn)成兩個(gè)js文件,插件就是grpc鏈接需要的。

      # 安裝Protocol Buffers編譯器
      # Windows: 下載 https://github.com/protocolbuffers/protobuf/releases
      # macOS: brew install protobuf
      # Linux: apt-get install protobuf-compiler
      
      # 驗(yàn)證安裝
      protoc --version
      
      # 安裝gRPC-Web插件
      npm install -g grpc-web

      核心轉(zhuǎn)換代碼腳本如下:

      protoc -I=GrpcRealtimePush\Protos `
        --js_out=import_style=commonjs:client\generated `
        --grpc-web_out=import_style=commonjs,mode=grpcwebtext:client\generated `
        GrpcRealtimePush\Protos\chat.proto

       

      執(zhí)行了protoc后會(huì)生成下面2個(gè)js文件


      1. `chat_pb_browser.js`

      // Browser-compatible version of chat_pb.js
      (function () {
          'use strict';
      
          // 確保命名空間存在
          if (!window.proto) window.proto = {};
          if (!window.proto.chat) window.proto.chat = {};
      
          // RealtimePushRequest類(lèi)
          window.proto.chat.RealtimePushRequest = function (opt_data) {
              jspb.Message.initialize(this, opt_data, 0, -1, null, null);
          };
      
          // 繼承jspb.Message
          if (jspb.Message) {
              window.proto.chat.RealtimePushRequest.prototype = Object.create(jspb.Message.prototype);
              window.proto.chat.RealtimePushRequest.prototype.constructor = window.proto.chat.RealtimePushRequest;
          }
      
          // RealtimePushRequest方法
          window.proto.chat.RealtimePushRequest.prototype.getClientId = function () {
              return jspb.Message.getFieldWithDefault(this, 1, "");
          };
      
          window.proto.chat.RealtimePushRequest.prototype.setClientId = function (value) {
              return jspb.Message.setProto3StringField(this, 1, value);
          };
      
          window.proto.chat.RealtimePushRequest.prototype.getTimestamp = function () {
              return jspb.Message.getFieldWithDefault(this, 2, 0);
          };
      
          window.proto.chat.RealtimePushRequest.prototype.setTimestamp = function (value) {
              return jspb.Message.setProto3IntField(this, 2, value);
          };
      
          // 序列化方法
          window.proto.chat.RealtimePushRequest.prototype.serializeBinary = function () {
              const writer = new jspb.BinaryWriter();
              window.proto.chat.RealtimePushRequest.serializeBinaryToWriter(this, writer);
              return writer.getResultBuffer();
          };
      
          window.proto.chat.RealtimePushRequest.serializeBinaryToWriter = function (message, writer) {
              const f = message.getClientId();
              if (f.length > 0) {
                  writer.writeString(1, f);
              }
              const f2 = message.getTimestamp();
              if (f2 !== 0) {
                  writer.writeInt64(2, f2);
              }
          };
      
          window.proto.chat.RealtimePushRequest.deserializeBinary = function (bytes) {
              const reader = new jspb.BinaryReader(bytes);
              const msg = new window.proto.chat.RealtimePushRequest();
              return window.proto.chat.RealtimePushRequest.deserializeBinaryFromReader(msg, reader);
          };
      
          window.proto.chat.RealtimePushRequest.deserializeBinaryFromReader = function (msg, reader) {
              while (reader.nextField()) {
                  if (reader.isEndGroup()) {
                      break;
                  }
                  const field = reader.getFieldNumber();
                  switch (field) {
                      case 1:
                          const value = reader.readString();
                          msg.setClientId(value);
                          break;
                      case 2:
                          const value2 = reader.readInt64();
                          msg.setTimestamp(value2);
                          break;
                      default:
                          reader.skipField();
                          break;
                  }
              }
              return msg;
          };
      
          // RealtimePushResponse類(lèi)
          window.proto.chat.RealtimePushResponse = function (opt_data) {
              jspb.Message.initialize(this, opt_data, 0, -1, null, null);
          };
      
          // 繼承jspb.Message
          if (jspb.Message) {
              window.proto.chat.RealtimePushResponse.prototype = Object.create(jspb.Message.prototype);
              window.proto.chat.RealtimePushResponse.prototype.constructor = window.proto.chat.RealtimePushResponse;
          }
      
          // RealtimePushResponse方法
          window.proto.chat.RealtimePushResponse.prototype.getData = function () {
              return jspb.Message.getFieldWithDefault(this, 1, "");
          };
      
          window.proto.chat.RealtimePushResponse.prototype.setData = function (value) {
              return jspb.Message.setProto3StringField(this, 1, value);
          };
      
          window.proto.chat.RealtimePushResponse.prototype.getTimestamp = function () {
              return jspb.Message.getFieldWithDefault(this, 2, 0);
          };
      
          window.proto.chat.RealtimePushResponse.prototype.setTimestamp = function (value) {
              return jspb.Message.setProto3IntField(this, 2, value);
          };
      
          window.proto.chat.RealtimePushResponse.prototype.getDataType = function () {
              return jspb.Message.getFieldWithDefault(this, 3, "");
          };
      
          window.proto.chat.RealtimePushResponse.prototype.setDataType = function (value) {
              return jspb.Message.setProto3StringField(this, 3, value);
          };
      
          // 序列化方法
          window.proto.chat.RealtimePushResponse.prototype.serializeBinary = function () {
              const writer = new jspb.BinaryWriter();
              window.proto.chat.RealtimePushResponse.serializeBinaryToWriter(this, writer);
              return writer.getResultBuffer();
          };
      
          window.proto.chat.RealtimePushResponse.serializeBinaryToWriter = function (message, writer) {
              const f = message.getData();
              if (f.length > 0) {
                  writer.writeString(1, f);
              }
              const f2 = message.getTimestamp();
              if (f2 !== 0) {
                  writer.writeInt64(2, f2);
              }
              const f3 = message.getDataType();
              if (f3.length > 0) {
                  writer.writeString(3, f3);
              }
          };
      
          window.proto.chat.RealtimePushResponse.deserializeBinary = function (bytes) {
              const reader = new jspb.BinaryReader(bytes);
              const msg = new window.proto.chat.RealtimePushResponse();
              return window.proto.chat.RealtimePushResponse.deserializeBinaryFromReader(msg, reader);
          };
      
          window.proto.chat.RealtimePushResponse.deserializeBinaryFromReader = function (msg, reader) {
              while (reader.nextField()) {
                  if (reader.isEndGroup()) {
                      break;
                  }
                  const field = reader.getFieldNumber();
                  switch (field) {
                      case 1:
                          const value = reader.readString();
                          msg.setData(value);
                          break;
                      case 2:
                          const value2 = reader.readInt64();
                          msg.setTimestamp(value2);
                          break;
                      case 3:
                          const value3 = reader.readString();
                          msg.setDataType(value3);
                          break;
                      default:
                          reader.skipField();
                          break;
                  }
              }
              return msg;
          };
      
          console.log('chat_pb_browser.js loaded successfully');
      })();

       

      2. `chat_grpc_web_pb_browser.js`

      // Browser-compatible version of chat_grpc_web_pb.js
      (function () {
          'use strict';
      
          // 確保命名空間存在
          if (!window.proto) window.proto = {};
          if (!window.proto.chat) window.proto.chat = {};
      
          // ChatServiceClient類(lèi)
          window.proto.chat.ChatServiceClient = function (hostname, credentials, options) {
              if (!options) options = {};
              options['format'] = options['format'] || 'text';
      
              // 使用gRPC-Web基類(lèi)
              window.grpc.web.GrpcWebClientBase.call(this, options);
      
              this.hostname_ = hostname;
              this.credentials_ = credentials;
              this.options_ = options;
          };
      
          // 繼承基類(lèi)
          if (window.grpc && window.grpc.web && window.grpc.web.GrpcWebClientBase) {
              window.proto.chat.ChatServiceClient.prototype = Object.create(window.grpc.web.GrpcWebClientBase.prototype);
              window.proto.chat.ChatServiceClient.prototype.constructor = window.proto.chat.ChatServiceClient;
          }
      
          // 方法描述符
          const methodDescriptor_StartRealtimePush = new window.grpc.web.MethodDescriptor(
              '/chat.ChatService/StartRealtimePush',
              window.grpc.web.MethodType.SERVER_STREAMING,
              window.proto.chat.RealtimePushRequest,
              window.proto.chat.RealtimePushResponse,
              function (request) { 
                  return request.serializeBinary(); 
              },
              function (bytes) { 
                  return window.proto.chat.RealtimePushResponse.deserializeBinary(bytes); 
              }
          );
      
          // StartRealtimePush方法
          window.proto.chat.ChatServiceClient.prototype.startRealtimePush = function (request, metadata) {
              const url = this.hostname_ + '/chat.ChatService/StartRealtimePush';
              return this.serverStreaming(url, request, metadata || {}, methodDescriptor_StartRealtimePush);
          };
      
          console.log('chat_grpc_web_pb_browser.js loaded successfully');
      })();

       

      下面就需要?jiǎng)?chuàng)建連接層代碼,該代碼手動(dòng)創(chuàng)建,有需要可以拷貝更改復(fù)用。

      `grpc-web-shim.js`

      // gRPC-Web compatibility shim
      (function() {
          'use strict';
      
          // 創(chuàng)建grpc命名空間
          if (typeof window.grpc === 'undefined') {
              window.grpc = {};
          }
      
          if (typeof window.grpc.web === 'undefined') {
              window.grpc.web = {};
          }
      
          // 方法類(lèi)型枚舉
          window.grpc.web.MethodType = {
              UNARY: 'unary',
              SERVER_STREAMING: 'server_streaming',
              CLIENT_STREAMING: 'client_streaming',
              BIDIRECTIONAL_STREAMING: 'bidirectional_streaming'
          };
      
          // 方法描述符
          window.grpc.web.MethodDescriptor = function(path, methodType, requestType, responseType, requestSerializeFn, responseDeserializeFn) {
              this.path = path;
              this.methodType = methodType;
              this.requestType = requestType;
              this.responseType = responseType;
              this.requestSerializeFn = requestSerializeFn;
              this.responseDeserializeFn = responseDeserializeFn;
          };
      
          // 基礎(chǔ)客戶(hù)端類(lèi)
          window.grpc.web.GrpcWebClientBase = function(options) {
              this.options = options || {};
              this.format = this.options.format || 'text';
          };
      
          // 服務(wù)端流式方法
          window.grpc.web.GrpcWebClientBase.prototype.serverStreaming = function(url, request, metadata, methodDescriptor) {
              const self = this;
              
              // 創(chuàng)建簡(jiǎn)單的事件發(fā)射器
              const stream = {
                  listeners: {},
                  
                  on: function(event, callback) {
                      if (!this.listeners[event]) {
                          this.listeners[event] = [];
                      }
                      this.listeners[event].push(callback);
                  },
                  
                  emit: function(event, data) {
                      if (this.listeners[event]) {
                          this.listeners[event].forEach(callback => callback(data));
                      }
                  }
              };
      
              try {
                  // 序列化請(qǐng)求
                  const serializedRequest = methodDescriptor.requestSerializeFn(request);
                  
                  // 創(chuàng)建gRPC-Web幀
                  const frameHeader = new Uint8Array(5);
                  frameHeader[0] = 0; // 壓縮標(biāo)志
                  
                  const messageLength = serializedRequest.length;
                  frameHeader[1] = (messageLength >>> 24) & 0xFF;
                  frameHeader[2] = (messageLength >>> 16) & 0xFF;
                  frameHeader[3] = (messageLength >>> 8) & 0xFF;
                  frameHeader[4] = messageLength & 0xFF;
                  
                  const framedMessage = new Uint8Array(5 + messageLength);
                  framedMessage.set(frameHeader, 0);
                  framedMessage.set(serializedRequest, 5);
                  
                  const base64Request = btoa(String.fromCharCode.apply(null, framedMessage));
                  
                  const headers = {
                      'Content-Type': 'application/grpc-web-text',
                      'X-Grpc-Web': '1',
                      'Accept': 'application/grpc-web-text'
                  };
                  
                  // 添加元數(shù)據(jù)
                  if (metadata) {
                      Object.keys(metadata).forEach(key => {
                          if (key.toLowerCase() !== 'content-type') {
                              headers[key] = metadata[key];
                          }
                      });
                  }
                  
                  const fetchOptions = {
                      method: 'POST',
                      headers: headers,
                      body: base64Request
                  };
      
                  fetch(url, fetchOptions)
                      .then(response => {
                          if (!response.ok) {
                              throw new Error(`HTTP ${response.status}: ${response.statusText}`);
                          }
                          
                          console.log('開(kāi)始讀取流式響應(yīng)...');
                          
                          // 使用ReadableStream讀取gRPC-Web流式響應(yīng)
                          const reader = response.body.getReader();
                          const decoder = new TextDecoder();
                          let buffer = '';
                          let messageCount = 0;
                          
                          function readStreamChunk() {
                              return reader.read().then(({ done, value }) => {
                                  if (done) {
                                      console.log('?? 流讀取完成,總共處理消息:', messageCount);
                                      if (buffer.length > 0) {
                                          console.log('?? 處理流結(jié)束時(shí)的剩余緩沖區(qū)');
                                          processStreamBuffer();
                                      }
                                      stream.emit('end');
                                      return;
                                  }
                                  
                                  // 將新數(shù)據(jù)添加到緩沖區(qū)
                                  const chunk = decoder.decode(value, { stream: true });
                                  buffer += chunk;
                                  console.log('?? 收到流數(shù)據(jù)塊:', chunk.length, '字符,緩沖區(qū)總計(jì):', buffer.length);
                                  
                                  // 處理緩沖區(qū)中的完整消息
                                  processStreamBuffer();
                                  
                                  // 繼續(xù)讀取
                                  return readStreamChunk();
                              }).catch(error => {
                                  console.error('? 流讀取錯(cuò)誤:', error);
                                  stream.emit('error', error);
                              });
                          }
                          
                          function processStreamBuffer() {
                              console.log('?? 處理緩沖區(qū),長(zhǎng)度:', buffer.length);
                              
                              while (buffer.length > 0) {
                                  try {
                                      // 查找完整的base64塊
                                      let messageBase64 = buffer;
                                      
                                      // 檢查是否包含trailer標(biāo)記
                                      const trailerMarkers = ['gAAAA', 'gAAA', 'gAA', 'gA'];
                                      let trailerIndex = -1;
                                      
                                      for (const marker of trailerMarkers) {
                                          const index = messageBase64.indexOf(marker);
                                          if (index > 0) {
                                              trailerIndex = index;
                                              break;
                                          }
                                      }
                                      
                                      if (trailerIndex > 0) {
                                          messageBase64 = messageBase64.substring(0, trailerIndex);
                                          console.log('?? 在索引處找到trailer:', trailerIndex);
                                      }
                                      
                                      // 清理base64字符串
                                      const cleanBase64 = messageBase64.replace(/[^A-Za-z0-9+/=]/g, '');
                                      
                                      // 確保base64字符串長(zhǎng)度是4的倍數(shù)
                                      let paddedBase64 = cleanBase64;
                                      const padding = paddedBase64.length % 4;
                                      if (padding > 0) {
                                          paddedBase64 += '='.repeat(4 - padding);
                                      }
                                      
                                      if (paddedBase64.length === 0) {
                                          console.log('? 清理后base64為空');
                                          buffer = '';
                                          break;
                                      }
                                      
                                      // 解碼base64
                                      const binaryString = atob(paddedBase64);
                                      const responseBytes = new Uint8Array(binaryString.length);
                                      for (let i = 0; i < binaryString.length; i++) {
                                          responseBytes[i] = binaryString.charCodeAt(i);
                                      }
                                      
                                      console.log('?? 解碼字節(jié)長(zhǎng)度:', responseBytes.length);
                                      
                                      // 檢查是否有足夠的數(shù)據(jù)來(lái)讀取gRPC幀頭
                                      if (responseBytes.length >= 5) {
                                          const compressionFlag = responseBytes[0];
                                          const frameMsgLength = (responseBytes[1] << 24) | (responseBytes[2] << 16) | (responseBytes[3] << 8) | responseBytes[4];
                                          
                                          console.log(`?? 流幀: 壓縮=${compressionFlag}, 長(zhǎng)度=${frameMsgLength}, 總計(jì)=${responseBytes.length}`);
                                          
                                          // 檢查是否有完整的消息數(shù)據(jù)
                                          if (responseBytes.length >= 5 + frameMsgLength && frameMsgLength > 0) {
                                              const messageBytes = responseBytes.slice(5, 5 + frameMsgLength);
                                              
                                              try {
                                                  const response = methodDescriptor.responseDeserializeFn(messageBytes);
                                                  messageCount++;
                                                  console.log(`? 成功解析消息 #${messageCount},發(fā)射數(shù)據(jù)`);
                                                  stream.emit('data', response);
                                                  
                                                  // 處理完成后,移除已處理的數(shù)據(jù)
                                                  if (trailerIndex > 0) {
                                                      buffer = buffer.substring(trailerIndex);
                                                      console.log('?? 移動(dòng)緩沖區(qū)越過(guò)trailer,剩余長(zhǎng)度:', buffer.length);
                                                  } else {
                                                      buffer = '';
                                                      console.log('?? 完全清空緩沖區(qū)');
                                                  }
                                                  
                                              } catch (deserializeError) {
                                                  console.error('? 反序列化錯(cuò)誤:', deserializeError);
                                                  buffer = '';
                                                  break;
                                              }
                                          } else {
                                              console.log('? 幀數(shù)據(jù)不完整或長(zhǎng)度無(wú)效');
                                              if (buffer.length < 200) {
                                                  break;
                                              } else {
                                                  buffer = '';
                                                  break;
                                              }
                                          }
                                      } else {
                                          console.log('? 幀太短,等待更多數(shù)據(jù)');
                                          break;
                                      }
                                      
                                  } catch (parseError) {
                                      console.error('? 處理流消息錯(cuò)誤:', parseError);
                                      buffer = '';
                                      break;
                                  }
                              }
                              
                              console.log('?? 剩余緩沖區(qū)長(zhǎng)度:', buffer.length);
                          }
                          
                          // 開(kāi)始讀取流
                          return readStreamChunk();
                      })
                      .catch(error => {
                          console.error('流獲取錯(cuò)誤:', error);
                          stream.emit('error', error);
                      });
                      
              } catch (error) {
                  setTimeout(() => stream.emit('error', error), 0);
              }
      
              return stream;
          };
      
          console.log('gRPC-Web shim loaded successfully');
      })();

       

      下面就是簡(jiǎn)單的獲取實(shí)時(shí)數(shù)據(jù)的業(yè)務(wù)邏輯了

      `client.js`

      // gRPC-Web Chat Client Implementation
      class RealtimePushClient {
          constructor() {
              this.client = null;
              this.isConnected = false;
              this.serverUrl = 'https://localhost:5201';
              
              // 流式傳輸相關(guān)屬性
              this.currentStream = null;
              this.streamMessageCount = 0;
              this.streamStartTime = null;
              
              this.initializeUI();
          }
      
          initializeUI() {
              const streamButton = document.getElementById('streamButton');
              const stopStreamButton = document.getElementById('stopStreamButton');
              const clearButton = document.getElementById('clearButton');
      
              streamButton.addEventListener('click', () => this.startStreamingChat());
              stopStreamButton.addEventListener('click', () => this.stopStreaming());
              clearButton.addEventListener('click', () => this.clearMessages());
      
              // 初始化連接狀態(tài)
              this.updateConnectionStatus(false, '正在初始化...');
      
              // 頁(yè)面加載時(shí)嘗試連接
              this.connect();
          }
      
          connect() {
              try {
                  // 初始化gRPC-Web客戶(hù)端
                  console.log('正在初始化實(shí)時(shí)推送客戶(hù)端...');
                  
                  // 檢查必要的依賴(lài)是否可用
                  if (typeof jspb === 'undefined') {
                      throw new Error('google-protobuf 庫(kù)未加載');
                  }
                  
                  if (typeof grpc === 'undefined' || !grpc.web) {
                      console.warn('grpc-web 庫(kù)未完全加載,等待重試...');
                      setTimeout(() => this.connect(), 1000);
                      return;
                  }
                  
                  if (typeof proto === 'undefined' || !proto.chat || !proto.chat.ChatServiceClient) {
                      throw new Error('gRPC 生成的客戶(hù)端代碼未加載');
                  }
      
                  // 創(chuàng)建gRPC-Web客戶(hù)端
                  this.client = new proto.chat.ChatServiceClient(this.serverUrl, null, {
                      format: 'text',
                      withCredentials: false
                  });
                  
                  console.log('實(shí)時(shí)推送客戶(hù)端創(chuàng)建成功');
                  this.updateConnectionStatus(true, '已連接');
                  this.addMessage('系統(tǒng)', '?? 實(shí)時(shí)推送客戶(hù)端已就緒', 'system');
                  
              } catch (error) {
                  console.error('連接初始化失敗:', error);
                  this.updateConnectionStatus(false, '初始化失敗');
                  this.addMessage('系統(tǒng)', '初始化失敗: ' + this.getErrorMessage(error), 'error');
              }
          }
      
          startStreamingChat() {
              if (!this.isConnected) {
                  this.addMessage('系統(tǒng)', '未連接到服務(wù)器,無(wú)法啟動(dòng)實(shí)時(shí)推送', 'error');
                  return;
              }
      
              if (!this.client) {
                  this.addMessage('系統(tǒng)', 'gRPC客戶(hù)端未初始化', 'error');
                  return;
              }
      
              // 檢查是否已在流式傳輸
              if (this.currentStream) {
                  this.addMessage('系統(tǒng)', '實(shí)時(shí)推送已在運(yùn)行中', 'system');
                  return;
              }
      
              try {
                  // 創(chuàng)建實(shí)時(shí)推送請(qǐng)求
                  const pushRequest = new proto.chat.RealtimePushRequest();
                  pushRequest.setClientId('web-client-' + Date.now());
                  pushRequest.setTimestamp(Math.floor(Date.now() / 1000));
      
                  console.log('啟動(dòng)實(shí)時(shí)推送:', {
                      clientId: pushRequest.getClientId(),
                      timestamp: pushRequest.getTimestamp()
                  });
      
                  // 添加流式傳輸?shù)脑獢?shù)據(jù)
                  const metadata = {
                      'x-user-agent': 'grpc-web-realtime-client'
                  };
      
                  // 開(kāi)始流式傳輸
                  const stream = this.client.startRealtimePush(pushRequest, metadata);
                  
                  if (!stream) {
                      throw new Error('無(wú)法創(chuàng)建實(shí)時(shí)推送連接');
                  }
      
                  // 存儲(chǔ)流引用
                  this.currentStream = stream;
                  this.streamMessageCount = 0;
                  this.streamStartTime = Date.now();
      
                  // 更新UI顯示流式傳輸已激活
                  this.updateStreamingUI(true);
      
                  stream.on('data', (response) => {
                      if (response && typeof response.getData === 'function') {
                          this.streamMessageCount++;
                          
                          // 添加帶有實(shí)時(shí)數(shù)據(jù)特殊樣式的消息
                          this.addRealtimeMessage(
                              `[${response.getDataType()}] ${response.getData()}`, 
                              this.streamMessageCount
                          );
                          
                          // 更新統(tǒng)計(jì)信息
                          this.updateStreamStats();
                      }
                  });
      
                  stream.on('error', (error) => {
                      console.error('實(shí)時(shí)推送錯(cuò)誤:', error);
                      this.addMessage('系統(tǒng)', '實(shí)時(shí)推送錯(cuò)誤: ' + this.getErrorMessage(error), 'error');
                      this.stopStreaming();
                  });
      
                  stream.on('end', () => {
                      console.log('實(shí)時(shí)推送結(jié)束');
                      this.addMessage('系統(tǒng)', '實(shí)時(shí)推送已結(jié)束', 'system');
                      this.stopStreaming();
                  });
      
                  this.addMessage('系統(tǒng)', '?? 實(shí)時(shí)數(shù)據(jù)推送已啟動(dòng)', 'system');
                  
              } catch (error) {
                  console.error('啟動(dòng)實(shí)時(shí)推送失敗:', error);
                  this.addMessage('系統(tǒng)', '啟動(dòng)實(shí)時(shí)推送失敗: ' + this.getErrorMessage(error), 'error');
              }
          }
      
          // 其他方法實(shí)現(xiàn)...
          updateConnectionStatus(connected, message = '') {
              const statusDiv = document.getElementById('status');
              const streamButton = document.getElementById('streamButton');
              
              this.isConnected = connected;
              
              if (connected) {
                  statusDiv.textContent = '狀態(tài): 已連接' + (message ? ' - ' + message : '');
                  statusDiv.className = 'status connected';
                  streamButton.disabled = false;
              } else {
                  statusDiv.textContent = '狀態(tài): 未連接' + (message ? ' - ' + message : '');
                  statusDiv.className = 'status disconnected';
                  streamButton.disabled = true;
              }
          }
      
          addMessage(sender, content, type) {
              const chatContainer = document.getElementById('chatContainer');
              const messageDiv = document.createElement('div');
              messageDiv.className = `message ${type}`;
              
              const timestamp = new Date().toLocaleTimeString();
              messageDiv.innerHTML = `
                  <div><strong>${sender}</strong> <small>${timestamp}</small></div>
                  <div>${content}</div>
              `;
      
              chatContainer.appendChild(messageDiv);
              chatContainer.scrollTop = chatContainer.scrollHeight;
          }
      
          addRealtimeMessage(content, count) {
              const chatContainer = document.getElementById('chatContainer');
              const messageDiv = document.createElement('div');
              messageDiv.className = 'message realtime';
              
              const timestamp = new Date().toLocaleTimeString();
              messageDiv.innerHTML = `
                  <div class="realtime-header">
                      <strong>?? 實(shí)時(shí)數(shù)據(jù) #${count}</strong> 
                      <small>${timestamp}</small>
                  </div>
                  <div class="realtime-content">${content}</div>
              `;
      
              chatContainer.appendChild(messageDiv);
              chatContainer.scrollTop = chatContainer.scrollHeight;
      
              // 保持最后100條消息以防止內(nèi)存問(wèn)題
              const messages = chatContainer.querySelectorAll('.message');
              if (messages.length > 100) {
                  for (let i = 0; i < messages.length - 100; i++) {
                      messages[i].remove();
                  }
              }
          }
      
          getErrorMessage(error) {
              if (!error) return '未知錯(cuò)誤';
              
              // 處理gRPC-Web特定錯(cuò)誤
              if (error.code !== undefined) {
                  const grpcErrorCodes = {
                      0: 'OK',
                      1: 'CANCELLED - 操作被取消',
                      2: 'UNKNOWN - 未知錯(cuò)誤',
                      3: 'INVALID_ARGUMENT - 無(wú)效參數(shù)',
                      4: 'DEADLINE_EXCEEDED - 請(qǐng)求超時(shí)',
                      5: 'NOT_FOUND - 未找到',
                      6: 'ALREADY_EXISTS - 已存在',
                      7: 'PERMISSION_DENIED - 權(quán)限被拒絕',
                      8: 'RESOURCE_EXHAUSTED - 資源耗盡',
                      9: 'FAILED_PRECONDITION - 前置條件失敗',
                      10: 'ABORTED - 操作被中止',
                      11: 'OUT_OF_RANGE - 超出范圍',
                      12: 'UNIMPLEMENTED - 未實(shí)現(xiàn)',
                      13: 'INTERNAL - 內(nèi)部錯(cuò)誤',
                      14: 'UNAVAILABLE - 服務(wù)不可用',
                      15: 'DATA_LOSS - 數(shù)據(jù)丟失',
                      16: 'UNAUTHENTICATED - 未認(rèn)證'
                  };
                  
                  const codeDescription = grpcErrorCodes[error.code] || `未知錯(cuò)誤代碼: ${error.code}`;
                  return `gRPC錯(cuò)誤: ${codeDescription}`;
              }
              
              return error.message || error.toString();
          }
      }
      
      // 頁(yè)面加載時(shí)初始化實(shí)時(shí)推送客戶(hù)端
      document.addEventListener('DOMContentLoaded', () => {
          window.realtimePushClient = new RealtimePushClient();
      });

       

      最后創(chuàng)建一個(gè)html界面

      `?index.html`

      <!DOCTYPE html>
      <html lang="zh-CN">
      <head>
          <meta charset="UTF-8">
          <meta name="viewport" content="width=device-width, initial-scale=1.0">
          <title>gRPC-Web 實(shí)時(shí)數(shù)據(jù)推送</title>
          <style>
              body {
                  font-family: Arial, sans-serif;
                  max-width: 800px;
                  margin: 0 auto;
                  padding: 20px;
                  background-color: #f5f5f5;
              }
              
              h1 {
                  color: #333;
                  text-align: center;
                  margin-bottom: 30px;
              }
              
              .chat-container {
                  border: 1px solid #ccc;
                  height: 400px;
                  overflow-y: auto;
                  padding: 10px;
                  margin-bottom: 20px;
                  background-color: #fff;
                  border-radius: 8px;
                  box-shadow: 0 2px 4px rgba(0,0,0,0.1);
              }
              
              .message {
                  margin-bottom: 10px;
                  padding: 8px;
                  border-radius: 5px;
                  border-left: 4px solid #ddd;
              }
      
              .system {
                  background-color: #fff3e0;
                  border-left-color: #ff9800;
                  text-align: center;
                  font-style: italic;
              }
              
              .error {
                  background-color: #ffebee;
                  border-left-color: #f44336;
                  color: #c62828;
                  text-align: center;
              }
              
              .realtime {
                  background-color: #e8f5e8;
                  border-left-color: #4caf50;
                  animation: fadeIn 0.3s ease-in;
              }
              
              .realtime-header {
                  font-weight: bold;
                  color: #2e7d32;
                  margin-bottom: 5px;
              }
              
              .realtime-content {
                  font-family: 'Courier New', monospace;
                  font-size: 0.9em;
                  color: #1b5e20;
              }
              
              .input-container {
                  display: flex;
                  gap: 10px;
                  margin-top: 20px;
              }
              
              button {
                  padding: 12px 24px;
                  border: none;
                  border-radius: 6px;
                  cursor: pointer;
                  font-size: 14px;
                  font-weight: bold;
                  transition: background-color 0.3s;
              }
              
              #streamButton {
                  background-color: #4caf50;
                  color: white;
              }
              
              #streamButton:hover:not(:disabled) {
                  background-color: #388e3c;
              }
              
              #streamButton:disabled {
                  background-color: #cccccc;
                  cursor: not-allowed;
                  opacity: 0.6;
              }
              
              #stopStreamButton {
                  background-color: #f44336;
                  color: white;
              }
              
              #stopStreamButton:hover {
                  background-color: #d32f2f;
              }
              
              #clearButton {
                  background-color: #757575;
                  color: white;
              }
              
              #clearButton:hover {
                  background-color: #616161;
              }
              
              .status {
                  margin-bottom: 15px;
                  padding: 10px;
                  border-radius: 6px;
                  font-weight: bold;
                  text-align: center;
              }
              
              .connected {
                  background-color: #c8e6c9;
                  color: #2e7d32;
                  border: 1px solid #4caf50;
              }
              
              .disconnected {
                  background-color: #ffcdd2;
                  color: #c62828;
                  border: 1px solid #f44336;
              }
              
              .stream-stats {
                  background-color: #f3e5f5;
                  padding: 10px;
                  margin: 10px 0;
                  border-radius: 6px;
                  font-size: 0.9em;
                  color: #4a148c;
                  border: 1px solid #9c27b0;
              }
              
              @keyframes fadeIn {
                  from { opacity: 0; transform: translateY(-10px); }
                  to { opacity: 1; transform: translateY(0); }
              }
          </style>
      </head>
      <body>
          <h1>?? gRPC-Web 實(shí)時(shí)數(shù)據(jù)推送系統(tǒng)</h1>
          
          <div id="status" class="status disconnected">
              狀態(tài): 未連接
          </div>
          
          <div id="chatContainer" class="chat-container">
              <div class="loading">正在初始化客戶(hù)端...</div>
          </div>
          
          <div class="input-container">
              <button id="streamButton">?? 啟動(dòng)實(shí)時(shí)推送</button>
              <button id="stopStreamButton" style="display: none;">?? 停止推送</button>
              <button id="clearButton">??? 清空消息</button>
          </div>
      
          <!-- 引入依賴(lài)庫(kù) -->
          <script src="https://unpkg.com/google-protobuf@3.21.2/google-protobuf.js"></script>
          
          <!-- 本地gRPC-Web兼容層 -->
          <script src="./grpc-web-shim.js"></script>
          
          <!-- 瀏覽器兼容的gRPC-Web文件 -->
          <script src="./generated/chat_pb_browser.js"></script>
          <script src="./generated/chat_grpc_web_pb_browser.js"></script>
          
          <!-- 主要客戶(hù)端腳本 -->
          <script src="./client.js"></script>
      </body>
      </html>

       

      直接雙擊index.html,或者通過(guò)http.server啟動(dòng)服務(wù)就能愉快的接收推送的實(shí)時(shí)數(shù)據(jù)了

      Snipaste_2025-09-24_00-12-30

       

      跟其他推送送相比,類(lèi)型安全,性能高,壓縮傳輸?shù)鹊龋乔岸酥С窒鄬?duì)沒(méi)那么友好。

       

      posted @ 2025-09-24 00:24  星仔007  閱讀(903)  評(píng)論(8)    收藏  舉報(bào)
      主站蜘蛛池模板: 韩国午夜福利片在线观看| 亚洲 欧洲 无码 在线观看| 日韩人妻无码一区二区三区综合部| 97人妻精品一区二区三区| 精品国产乱来一区二区三区| 亚洲午夜天堂| 国产中文三级全黄| 久久99久久99精品免观看| 丰满少妇又爽又紧又丰满在线观看| 欧美成本人视频免费播放| 亚洲精品中文字幕码专区| AV极品无码专区亚洲AV| 国内精品久久久久久无码不卡 | 精品久久人人妻人人做精品| 亚洲午夜理论无码电影| 国产一区二区a毛片色欲| 欧美高清狂热视频60一70| 九九热视频精选在线播放| 久久久久久曰本av免费免费| 日本无遮挡真人祼交视频| 性色欲情网站iwww九文堂| 尤物yw193无码点击进入| 成年午夜无码av片在线观看| 久久精品一区二区三区综合| 亚洲av综合av一区| 国产高跟黑色丝袜在线| 亚洲乱码日产精品一二三| 视频免费完整版在线播放| 亚洲精品久荜中文字幕| 人妻夜夜爽天天爽三区麻豆av| 亚洲最大天堂在线看视频| 日日爽日日操| 在线免费播放av观看| 大香蕉av一区二区三区| 色综合天天综合天天综| 中国少妇人妻xxxxx| 久久精品国产精品第一区| 国产内射性高湖| 色欲综合久久中文字幕网| 国产亚洲精品视频一二区| 国产成人精品午夜2022|