關于MQ
一、為什么要用MQ?
比如存在訂單系統,用戶進行下單,下單完成之后調用物流系統、調用通知系統。

這樣有沒有什么問題?
1.比如物流系統是第三方的,可能響應慢,或者宕機,這時當你采用rpc接口調用物流系統時會失敗,而導致用戶下單也失敗了。也就是耦合的,非高可用的。
所謂的MQ就是接收生產者的消息,進行存儲,這時如果物流系統是消費者,會不斷的拿消息進行消費。在分布式系統中,MQ完成消息的流轉,MQ的特點是穩定、高效、存儲機制,確保整個系統的異步解耦,削峰平谷。
采用消息中間件進行重新架構:

這樣做的好處就是,當用戶下單完畢之后,訂單系統會往MQ中發消息,整個用戶下單的流程就結束了。然后物流系統、通知系統會從MQ中拿消息進行消費。
生活中常見的例子是充話費,很多年前在微信上沖玩花費馬上就到賬了,那時候可能微信還是通過調電信、或者移動接口的形式進行充話費。這種方式,如果電信的接口有問題,或者網絡有問題,等等,這時候話費就充值失敗了。體驗非常不好。現在都是不立馬返回充話費的結果,改成了異步機制,沖完話費發消息到MQ中,結束。電信來消費這個消息,如果電信宕機了,沒有關系。等電信恢復了,再次消費成功。

2.MQ的第二個作用,流量削峰。
當遇到秒殺、大促活動時,用戶進行下單,比如tps達到1w/s,按照原來的方式,調用RPC接口的形式,接口最大并發量為1000tps,電信和移動的充話費接口最大tps只能為1000。如果不采用MQ,很多請求就會失敗了,或者超時了。假如超時時間為30s,1w的充話費請求并發量過來,而電信接口只能承載1000tps,勢必很多請求就會超時,用戶充值請求30s后收不到響應,就會充值失敗了。
但是中間加入MQ作為消息中間件的話,可以接收這1w個請求,MQ可以存儲消息,再慢慢進行消費。達到流量削峰的目的。
二、RabbitMQ整體及路由機制深度剖析
老的:IBM --MQ (收費) -> ActiveMQ Java開發,沒落了
主流:RabbitMQ、Kafka、RocketMQ
新的:Pulsar

RabbitMQ后端控制端:

Exchanges:

DirectExchange:

convertAndSend方法是void類型的,如果調用這個方法,執行到一半失敗了,網絡斷了怎么辦?怎么才能知道我有沒有發成功呢?
RabbitMQ:AMQP的最經典的實現。
RabbitMQ如何解決消息的丟失:
1、生產者發送到Exchange丟失 --> 引入發送者確認機制 connectionFacory.setPublisherConfirms(true)
2、消息正確的路由 --> 消息沒有正確的路由導致消息丟失 --> 加入失敗者通知 template.setMandatory(true)
3、在Queue中丟失了 -->mq宕機、斷電 --> 持久化
4、消費者也有可能丟失 -->盡量使用手動確認機制,來確保在消費端不丟失

消息發送成功,消費者消費成功,console打印:

路由失敗console打印:

代碼實現:
RabbitMQ的配置類:
package com.cy.mq.config;
import com.cy.mq.service.rabbit.Receiver;
import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
/**
* RabbitMQ的配置類
*/
@Configuration
public class RabbitConfig {
@Value("${spring.rabbitmq.host}")
private String addresses;
@Value("5672")
private String port;
@Value("guest")
private String username;
@Value("guest")
private String password;
@Value("/")
private String virtualHost;
@Autowired
private Receiver receiver;
/**
* 連接工廠
* @return
*/
@Bean
public ConnectionFactory connectionFactory() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
connectionFactory.setAddresses(addresses + ":" + port);
connectionFactory.setUsername(username);
connectionFactory.setPassword(password);
connectionFactory.setVirtualHost(virtualHost);
//如果要進行消息的回調,這里必須要設置為true
connectionFactory.setPublisherConfirms(true);
return connectionFactory;
}
/**
* rabbitAdmin封裝對RabbitMQ的管理操作
*/
@Bean
public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {
return new RabbitAdmin(connectionFactory);
}
/**
* 使用template給生產者,消費者,方便發消息
*/
@Bean
public RabbitTemplate newRabbitTemplate() {
RabbitTemplate template = new RabbitTemplate(connectionFactory());
//發送確認的回調方法的設置
template.setConfirmCallback(confirmCallback());
//開啟路由失敗通知
template.setMandatory(true);
//路由失敗的回調--這里只關注路由失敗的
template.setReturnCallback(returnCallback());
return template;
}
@Bean
public DirectExchange DirectExchange() {
return new DirectExchange("DirectExchange");
}
@Bean
public FanoutExchange FanoutExchange() {
return new FanoutExchange("FanoutExchange");
}
/**
* 聲明死信交換器(Fanout交換器)
*/
@Bean
public FanoutExchange DlxExchange() {
return new FanoutExchange("exchange-dlx");
}
@Bean
public Queue queue1() {
return new Queue("queue1");
}
/**
* 聲明消息過期隊列 --隊列ttl
*/
@Bean
public Queue queueTTL() {
Map<String, Object> arguments = new HashMap<>();
arguments.put("x-message-ttl", 30*1000);
arguments.put("x-dead-letter-exchange", "exchange-dlx");
arguments.put("x-dead-letter-routing-key", "*");
return new Queue("queue_ttl", true, false, false, arguments);
}
/**
* 聲明專門存放死信消息的隊列
*/
@Bean
public Queue queueDLX() {
return new Queue("queue_dlx");
}
/**
* 綁定關系 綁定直連direct交換器
*/
@Bean
public Binding bindingDirectExchange() {
return BindingBuilder.bind(queue1())
.to(DirectExchange())
.with("lijin.mq");
}
/**
* 生產者發送確認
*/
@Bean
public RabbitTemplate.ConfirmCallback confirmCallback() {
return new RabbitTemplate.ConfirmCallback() {
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
if (ack) {
System.out.println("發送者確認發送給mq-(Exchange)成功");
} else {
System.out.println("發送者發送給mq-(Exchange)失敗,考慮重發:" + cause);
}
}
};
}
/**
* 失敗通知
*/
@Bean
public RabbitTemplate.ReturnCallback returnCallback() {
return new RabbitTemplate.ReturnCallback() {
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
System.out.println("無法路由的消息,需要考慮另外處理");
System.out.println("Returned replyText:" + replyText);
System.out.println("Returned exchange:" + exchange);
System.out.println("Returned routingKey:" + routingKey);
String msgJson = new String(message.getBody());
System.out.println("Returned message:" + msgJson);
}
};
}
/**
* 手動消費者確認
*/
@Bean
public SimpleMessageListenerContainer messageContainer() {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory());
//綁定了這個隊列
container.setQueues(queue1());
//手動提交
container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
container.setMessageListener(receiver);
return container;
}
}
發送MQ消息:
package com.cy.mq.controller;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
/**
* @description com.cy.mq.controller
* @author: chengyu
* @date: 2025-05-11 16:10
*/
@RestController
@RequestMapping("/rabbit")
public class RabbitProducer {
@Autowired
private RabbitTemplate rabbitTemplate;
/**
* 普通直接交換器的測試
* DirectExchange: 交換器的名字
* lijin:路由鍵
* sendMsg:消息內容,可以是字符串,可以是json串等。
*/
@GetMapping("/direct")
public String direct() {
String sendMsg = "direct msg:" + System.currentTimeMillis();
this.rabbitTemplate.convertAndSend("DirectExchange", "lijin.mq", sendMsg);
return "發送direct消息成功!";
}
}
RabbitMQ監聽類:package com.cy.mq.service.rabbit;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
/**
* RabbitMQ監聽類
*/
@Component
@RabbitListener(queues = "queue1")
public class Consumer1 {
@RabbitHandler
public void process(String msg) {
int i = 0;
System.out.println("Consumer1-Receiver: " + msg);
//業務代碼,異常 怎么辦? 調用其他接口 返回值0代表處理失敗
}
}
Receiver:
package com.cy.mq.service.rabbit;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
import org.springframework.stereotype.Component;
/**
* 消費Queue
*/
@Component
public class Receiver implements ChannelAwareMessageListener {
/**
*
* @param message
* @param channel
* @throws Exception
*/
@Override
public void onMessage(Message message, Channel channel) throws Exception {
String msg = new String(message.getBody());
try {
System.out.println("Receiver>>>>消息已消費");
//參數1:消息的唯一性 參數2:是否批量處理
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} catch (Exception e) {
System.out.println(e.getMessage());
//參數3:消息是否重新發送
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
System.out.println("Receiver>>>>拒絕消息,要求MQ重新派發");
throw e;
}
}
}
application.yml:
spring:
rabbitmq:
host: 127.0.0.1
port: 5672
username: guest
password: guest
virtual-host: /
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.example</groupId>
<artifactId>mq-springboot</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>mq-springboot</name>
<description>mq project for Spring Boot</description>
<properties>
<java.version>1.8</java.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<spring-boot.version>2.6.13</spring-boot.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<!-- springboot引入rabbitMq -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
</dependencies>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-dependencies</artifactId>
<version>${spring-boot.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.1</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
<encoding>UTF-8</encoding>
</configuration>
</plugin>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<version>${spring-boot.version}</version>
<configuration>
<mainClass>com.MqSpringbootApplication</mainClass>
<skip>true</skip>
</configuration>
<executions>
<execution>
<id>repackage</id>
<goals>
<goal>repackage</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
--
浙公網安備 33010602011771號