aspnetcore使用websocket實時更新商品信息
先演示一下效果,再展示代碼邏輯。


中間幾次調用過程省略。。。
暫時只用到了下面四個項目

1.產品展示頁面中第一次通過接口去獲取數據庫的列表數據
/// <summary> /// 獲取指定的商品目錄 /// </summary> /// <param name="pageSize"></param> /// <param name="pageIndex"></param> /// <param name="ids"></param> /// <returns></returns> [HttpGet] [Route("items")] [ProducesResponseType(typeof(PaginatedViewModel<Catalog>), StatusCodes.Status200OK)] [ProducesResponseType(typeof(IEnumerable<ProductDto>), StatusCodes.Status200OK)] [ProducesResponseType(StatusCodes.Status400BadRequest)] public async Task<IActionResult> Catalogs([FromQuery] int pageSize = 10, [FromQuery] int pageIndex = 0, string ids = null) { if (!string.IsNullOrEmpty(ids)) { var items = await GetItemByIds(ids); if (!items.Any()) { return BadRequest("ids value invalid. Must be comma-separated list of numbers"); } return Ok(items); } var totalItems = await _catalogContext.Catalogs .LongCountAsync(); var itemsOnPage = await _catalogContext.Catalogs .OrderBy(c => c.Name) .Skip(pageSize * pageIndex) .Take(pageSize) .ToListAsync(); var result = itemsOnPage.Select(x => new ProductDto(x.Id.ToString(), x.Name, x.Price.ToString(), x.Stock.ToString(), x.ImgPath)); var model = new PaginatedViewModel<ProductDto>(pageIndex, pageSize, totalItems, result); return Ok(model); }
2.在前端頁面會把當前頁面的產品列表id都發送到websocket中去
function updateAndSendProductIds(ids) {
productIds = ids;
// Check if the WebSocket is open
if (socket.readyState === WebSocket.OPEN) {
// Send the list of product IDs through the WebSocket connection
socket.send(JSON.stringify(productIds));
}
}
function fetchData() {
const apiUrl = baseUrl + `/Catalog/items?pageSize=${pageSize}&pageIndex=${currentPage}`;
axios.get(apiUrl)
.then(response => {
const data = response.data.data;
displayProducts(baseUrl, data);
const newProductIds = data.map(product => product.Id);
// Check if the WebSocket is open
updateAndSendProductIds(newProductIds);
// 從響應中獲取總頁數
const totalPages = Math.ceil(response.data.count / pageSize);
displayPagination(totalPages);
// 更新當前頁數的顯示
const currentPageElement = document.getElementById('currentPage');
currentPageElement.textContent = `當前頁數: ${currentPage + 1} / 總頁數: ${totalPages}`;
})
.catch(error => {
console.error('獲取數據失敗:', error);
});
}
3.websocket拿到了id數據可以精確的把當前頁面的產品都查出來再推送給product.html頁面,通過下面的ReceiveAsync方法獲取html發送的數據,再通過timer定時器每秒鐘Send方法實時的往頁面發送獲取到的數據,當然這個是不斷的去從redis中去查的。
using System.Net.WebSockets;
using System.Threading.Tasks;
using System;
using WsServer.Handler;
using WsServer.Manager;
using StackExchange.Redis;
using Microsoft.Extensions.Configuration;
using System.Collections.Generic;
using Catalogs.Domain.Catalogs;
using Catalogs.Domain.Dtos;
using System.Net.Sockets;
namespace WebScoket.Server.Services
{
/// <summary>
/// 實時推送產品主要是最新的庫存,其他信息也會更新
/// </summary>
public class ProductListHandler : WebSocketHandler
{
private System.Threading.Timer _timer;
private readonly IDatabase _redisDb;
//展示列表推送
private string productIdsStr;
public ProductListHandler(WebSocketConnectionManager webSocketConnectionManager,IConfiguration configuration) : base(webSocketConnectionManager)
{
ConnectionMultiplexer redis = ConnectionMultiplexer.Connect(configuration["DistributedRedis:ConnectionString"] ?? throw new Exception("$未能獲取distributedredis連接字符串"));
_redisDb = redis.GetDatabase();
_timer = new System.Threading.Timer(Send, null, TimeSpan.Zero, TimeSpan.FromSeconds(1));
}
private void Send(object state)
{
// 獲取當前時間并發送給所有連接的客戶端
if (productIdsStr != null)
{
string[] productIds = System.Text.Json.JsonSerializer.Deserialize<string[]>(productIdsStr);
string hashKeyToRetrieve = "products";
List<ProductDto> products = new List<ProductDto>();
foreach (var productId in productIds)
{
if(productId == "null") {
continue;
}
string retrievedProductValue = _redisDb.HashGet(hashKeyToRetrieve, productId);
if (!string.IsNullOrEmpty(retrievedProductValue))
{
//反序列化和構造函數沖突,改造了一下Catalog
Catalog catalog = System.Text.Json.JsonSerializer.Deserialize<Catalog>(retrievedProductValue);
products.Add(new ProductDto(catalog.Id.ToString(), catalog.Name, catalog.Price.ToString(), catalog.Stock.ToString(), catalog.ImgPath));
}
}
if (products.Count > 0)
{
SendMessageToAllAsync(System.Text.Json.JsonSerializer.Serialize(products)).Wait();
}
else
{
SendMessageToAllAsync("NoProduct").Wait();
}
}
}
public override async Task ReceiveAsync(WebSocket socket, WebSocketReceiveResult result, byte[] buffer)
{
//每次頁面有刷新就會拿到展示的id列表
productIdsStr = System.Text.Encoding.UTF8.GetString(buffer, 0, result.Count);
}
}
}
4.html頁面就可以拿到最新數據再去綁定到頁面
socket.addEventListener('message', (event) => {
if (event.data == "NoProduct") {
clearProductList();
}
// Handle the received product data and update the product list
const productData = JSON.parse(event.data);
// Update the product list with the received data (call your displayProducts function)
displayProducts(baseUrl, productData);
});
整個流程就這么簡單,但是這里需要保持數據庫和redis的數據實時同步,否則頁面展示的就不是最新的數據就沒意義了。
再回到Catalog.Service服務中。
private async Task DeleteCache()
{
//await _redisDb.HashDeleteAsync("products",id); //沒必要了
await _channel.Writer.WriteAsync("delete_catalog_fromredis");
}
再做更新、新增、刪除等動作的時候就調用一下DeleteCache方法,往后臺服務發送一個channel,當后臺收到后就做redis刪除并且從初始化sqlserver到redis列表同步的操作
using System.Reflection; using System.Threading.Channels; using Catalogs.Infrastructure.Database; using Microsoft.EntityFrameworkCore; using Microsoft.Extensions.Hosting; using Microsoft.Extensions.Logging; using StackExchange.Redis; namespace Catalogs.WebApi.BackgroudServices { /// <summary> /// 記得任何刪除了或者購買了產品后需要刪除改產品的鍵 /// </summary> public class InitProductListToRedisService : BackgroundService { private readonly IServiceScopeFactory _serviceScopeFactory; private readonly IDatabase _redisDb; private readonly Channel<string> _channel; private readonly ILogger _logger; public InitProductListToRedisService(IServiceScopeFactory serviceScopeFactory, IConfiguration configuration, Channel<string> channel, ILogger<InitProductListToRedisService> logger) { _serviceScopeFactory = serviceScopeFactory; ConnectionMultiplexer redis = ConnectionMultiplexer.Connect(configuration["DistributedRedis:ConnectionString"] ?? throw new Exception("$未能獲取distributedredis連接字符串")); _redisDb = redis.GetDatabase(); _channel = channel; _logger = logger; } protected override async Task ExecuteAsync(CancellationToken stoppingToken) { await Init(); while (!_channel.Reader.Completion.IsCompleted) { var msg = await _channel.Reader.ReadAsync(); if(msg == "delete_catalog_fromredis") { await Init(); } } } private async Task Init() { using var scope = _serviceScopeFactory.CreateScope(); try { CatalogContext _context = scope.ServiceProvider.GetRequiredService<CatalogContext>(); string hashKey = "products"; var products = await _context.Catalogs.ToListAsync(); await _redisDb.KeyDeleteAsync(hashKey); foreach (var product in products) { string productField = product.Id.ToString(); string productValue = System.Text.Json.JsonSerializer.Serialize(product); _redisDb.HashSet(hashKey, new HashEntry[] { new HashEntry(productField, productValue) }); } _logger.LogInformation($"ProductList is over stored in Redis Hash."); } catch(Exception ex) { _logger.LogError($"ProductLis stored in Redis Hash error."); } } } }
這里還有優化的空間可以只針對怕products的hashset的某個id去更新、刪除、新增一條數據。
示例代碼:
liuzhixin405/efcore-template (github.com)


浙公網安備 33010602011771號