1 引言
- 什么是MQ
消息總線(Message Queue),是一種跨進(jìn)程、異步的通信機(jī)制,用于上下游傳遞消息。由消息系統(tǒng)來(lái)確保消息的可靠傳遞。
- 常用的MQ中間件有哪些
RabbitMQ:實(shí)現(xiàn)了高級(jí)消息隊(duì)列協(xié)議(AMQP)的開(kāi)源消息代理軟件(亦稱面向消息的中間件)。RabbitMQ:服務(wù)器是用Erlang語(yǔ)言編寫(xiě)的,而群集和故障轉(zhuǎn)移是構(gòu)建在開(kāi)放電信平臺(tái)框架上的。所有主要的編程語(yǔ)言均有與代理接口通訊的客戶端庫(kù)。
Kafka:一種高吞吐量的分布式發(fā)布訂閱消息系統(tǒng)。
ActiveMQ、RocketMQ等
- 安裝
安裝地址:https://www.rabbitmq.com/install-windows.html
先下載Erlang語(yǔ)言的安裝環(huán)境,再下Rabbit的服務(wù)版本
2 RabbitMQ基礎(chǔ)部分
- 基本概念
Connect:是RabbitMQ的socket鏈接,它封裝了socket協(xié)議相關(guān)部分邏輯。
Channel:是我們與RabbitMQ打交道的最重要的一個(gè)接口。如果每一次訪問(wèn)RabbitMQ都建立一個(gè)Connection,在消息量大的時(shí)候建立TCP Connection的開(kāi)銷將是巨大的,大減少了操作系統(tǒng)建立TCP connection的開(kāi)銷。
VirtualHost:管理各自的exchange,和bindings
Exchange:類似于數(shù)據(jù)通信網(wǎng)絡(luò)中的交換機(jī),提供消息路由策略。rabbitmq中,producer不是通過(guò)信道直接將消息發(fā)送給queue,而是先發(fā)送給Exchange。一個(gè)Exchange可以和多個(gè)Queue進(jìn)行綁定,producer在傳遞消息的時(shí)候,會(huì)傳遞一個(gè)ROUTING_KEY,Exchange會(huì)根據(jù)這個(gè)ROUTING_KEY按照特定的路由算法,將消息路由給指定的queue。
Queue:用于存儲(chǔ)消息隊(duì)列,并將它們轉(zhuǎn)發(fā)給消費(fèi)者
Producer:生產(chǎn)者
consumer:消費(fèi)者
- 工作模型

- 使用場(chǎng)景
應(yīng)用解耦、異步、流量削鋒、日志收集等等...

3 RabbitMQ核心部分
- 五大類型
helloWorld、work queues、publish/subscirbe(fanout)、routing(direct)、topic
- 代碼示例
/**********準(zhǔn)備工作:安裝包RabbitMQ.Client************/
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using RabbitMQ.Client.Exceptions;
//創(chuàng)建連接對(duì)象
var factory = new ConnectionFactory()
{
UserName = "kkbm",
Password = "kkbm1234",
HostName = "172.18.10.189",
Port = 5672, //RabbitMQ默認(rèn)的端口
};
using var conn = factory.CreateConnection();
var channel = conn.CreateModel();
//定義隊(duì)列
channel.QueueDeclare("workqueue", true, false, false, null);
//發(fā)送消息
for (int i = 0; i < 10; i++)
{
var message = "發(fā)送消息:" + i;
var body = Encoding.UTF8.GetBytes(message);
var properties = channel.CreateBasicProperties();
properties.Persistent = true;
channel.BasicPublish(exchange: "",
routingKey: "workqueue",
basicProperties: properties,
body: body);
Thread.Sleep(i * 100);
}
/**********準(zhǔn)備工作:安裝包RabbitMQ.Client************/
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using RabbitMQ.Client.Exceptions;
//創(chuàng)建連接對(duì)象工廠
var factory = new ConnectionFactory()
{
UserName = "kkbm",
Password = "kkbm1234",
HostName = "172.18.10.189",
Port = 5672,
};
var conn = factory.CreateConnection();
var channel = conn.CreateModel();
//綁定隊(duì)列
channel.QueueDeclare(queue: "workqueue",
durable: true,
exclusive: false,
autoDelete: false,
arguments: null);
channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);
var consumer = new EventingBasicConsumer(channel);
//接受消息
consumer.Received += (model, ea) =>
{
var body = ea.Body;
var message = Encoding.UTF8.GetString(body.ToArray());
WriteColorLine($"{DateTime.Now.ToString()}工作隊(duì)列接收到的消息:【{message}】",ConsoleColor.Blue);
//消息確認(rèn)
channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
};
channel.BasicConsume(queue: "workqueue",
autoAck: false,
consumer: consumer);
4 RabbitMQ進(jìn)階
- 消息丟失
1)、消息發(fā)出后,中途網(wǎng)絡(luò)故障,服務(wù)器沒(méi)收到;
2)、消息發(fā)出后,服務(wù)器收到了,還沒(méi)持久化,服務(wù)器宕機(jī);
解決方案:確認(rèn)消息是否發(fā)到RabbitMQ Server,如果生產(chǎn)者收到ACK不需要處理,收到Nack需要重新發(fā)送消息。
- 消息重復(fù)
1)、消息消費(fèi)成功,事務(wù)已提交,簽收時(shí)結(jié)果服務(wù)器宕機(jī)或網(wǎng)絡(luò)原因?qū)е潞炇帐。顟B(tài)會(huì)由unack轉(zhuǎn)變?yōu)閞eady,重新發(fā)送給其他消費(fèi)方;
2)、消息消費(fèi)失敗,由于retry重試機(jī)制,重新入隊(duì)又將消息發(fā)送出去。
解決方案:消費(fèi)者做好冪等性處理
- 消息積壓
1)、消費(fèi)方的服務(wù)掛掉,導(dǎo)致一直無(wú)法消費(fèi)消息;
2)、消費(fèi)方的服務(wù)節(jié)點(diǎn)太少,導(dǎo)致消費(fèi)能力不足,從而出現(xiàn)積壓,這種情況極可能就是生產(chǎn)方的流量過(guò)大導(dǎo)致。
解決方案:增加消費(fèi)者節(jié)點(diǎn);持久化到數(shù)據(jù)庫(kù)后處理
浙公網(wǎng)安備 33010602011771號(hào)