RabbitMQ 消費者
原文連接:http://www.rzrgm.cn/ysmc/p/16225142.html
項目需要引用 RabbitMQ.Client Nuget包
創建異步工廠
IAsyncConnectionFactory connectionFactory = new ConnectionFactory { HostName = _rabbitMqOptions.HostName, Port = _rabbitMqOptions.Port, UserName = _rabbitMqOptions.UserName, Password = _rabbitMqOptions.Password, VirtualHost = _rabbitMqOptions.VirtualHost, DispatchConsumersAsync = true };
訂閱
//創建連接 var connection = connectionFactory.CreateConnection(); //創建通道 var channel = connection.CreateModel(); channel.BasicQos(prefetchSize, prefetchCount, false); //事件基本消費者 var consumer = new AsyncEventingBasicConsumer(channel); //接收到消息事件 consumer.Received += async (ch, ea) => { try { var message = Encoding.UTF8.GetString(ea.Body.ToArray()); var requeue = await executeAsync(message); if (requeue) { //確認該消息已被消費 channel.BasicAck(ea.DeliveryTag, false); } else { channel.BasicNack(ea.DeliveryTag, false, true); } } catch (Exception) { channel.BasicAck(ea.DeliveryTag, false); } }; //啟動消費者 設置為手動應答消息 channel.BasicConsume(queueName, false, consumer);
單次或輪詢
private async Task SingleOrPolling(Func<string, Task<bool>> executeAsync, bool isSingle, string? queueName = null) { var connectionFactory = _rabbitMQBaseService.GetAsyncConnectionFactory(); using var connection = connectionFactory.CreateConnection(); using var channel = connection.CreateModel(); queueName = GetQueueName(queueName); while (true) { var response = channel.BasicGet(queueName, false); try { if (null != response) { var message = Encoding.UTF8.GetString(response.Body.ToArray()); var requeue = await executeAsync(message); if (requeue) { //確認該消息已被消費 channel.BasicAck(response.DeliveryTag, false); } else { channel.BasicNack(response.DeliveryTag, false, true); } } } catch (Exception) { channel.BasicAck(response.DeliveryTag, false); } if (isSingle) { break; } } }
本文來自博客園,作者:一事冇誠,轉載請注明原文鏈接:http://www.rzrgm.cn/ysmc/p/16225142.html

浙公網安備 33010602011771號