SpringBoot_ActiveMQ
一、ActiveMQ介紹【消息隊列中間件】
1、但凡耗時長的功能都可以通過消息隊列異步交給其他服務完成
寫入ActiveMQ或讀取ActiveMQ
ActiveMQ就是一個容器
常用的RabbitMQ、RocketMQ、ActiveMQ、Kafka、ZeroMQ、MetaMq等
2、安裝ActiveMQ
下載
新版:http://activemq.apache.org/components/artemis/download/
老版:http://activemq.apache.org/components/classic/download/
運行:activemq.bat
運行后訪問:http://localhost:8161/
用戶名和密碼都是:admin
3、queue與topic
a、點對點:Queue,不可重復消費
消息生產者生產消息發送到queue中,然后消息消費者從queue中取出并且消費消息。
消息被消費以后,queue中不再有存儲,所以消息消費者不可能消費到已經被消費的消息。
Queue支持存在多個消費者,但是對一個消息而言,只會有一個消費者可以消費、其它的則不能消費此消息了。
當消費者不存在時,消息會一直保存,直到有消費消費
b、發布/訂閱:Topic,可以重復消費
消息生產者(發布)將消息發布到topic中,同時有多個消息消費者(訂閱)消費該消息。
和點對點方式不同,發布到topic的消息會被所有訂閱者消費。
當生產者發布消息,不管是否有消費者。都不會保存消息
4、引用依賴
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-activemq</artifactId>
</dependency>
<!--消息隊列連接池-->
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-pool</artifactId>
<version>5.15.0</version>
</dependency>
二、queue 兩個微服務點對點通信
1、接收者和發送者雙方微服務添加以下配置
spring.activemq.broker-url=tcp://localhost:61616
spring.activemq.user=admin
spring.activemq.password=admin
#默認queue、true群發、false不群發、此配置只需要對接收者配置,發送者不需要下面的配置
#spring.jms.pub-sub-domain=true
2、接收者微服務、創建一個項目、編寫一個GetMsg接收消息的類、然后啟動、等待接受消息
@Component
public class GetMsg {
/**
* 監聽和讀取active.queue消息、destination和名必須和發送者名稱一致
*/
@JmsListener(destination = "active.queue")
public void readActiveTopic2(String message) {
System.out.println("1接受到:" + message);
}
}
3、發送者微服務、創建一個項目、編寫一個發送者的方法、測試發送消息
如果有多個接收者、只能被其中一個獲取、如果沒有接收者、會被存儲起來、等待接收者
@RestController
public class SendMsgController {
@Autowired
private JmsMessagingTemplate jmsMessagingTemplate;
@RequestMapping("/send")
public Object send(String msg) {
//把消息發送到點對點的active.queue中
jmsMessagingTemplate.convertAndSend(new ActiveMQQueue("active.queue"), msg);
return "ok";
}
}
三、topic 群發
1、接收者和發送者雙方微服務都需要修改配置
spring.activemq.broker-url=tcp://localhost:61616
spring.activemq.user=admin
spring.activemq.password=admin
#默認queue、true群發、false不群發、此配置只需要對接收者配置,發送者不需要下面的配置
spring.jms.pub-sub-domain=true
2、接收者微服務、創建一個項目、編寫一個GetMsg接收消息的類、然后啟動、等待接受消息【可以多個接受者微服務】
@Component
public class GetMsg {
/**
* 監聽和讀取topic消息、destination和名必須和發送者名稱一致
*/
@JmsListener(destination = "active.topic")
public void readActiveTopic2(String message) {
System.out.println("1接受到:" + message);
}
}
3、發送者微服務、創建一個項目、編寫一個發送者的方法、測試發送消息
如果當時有多個接收者、大家都可以接受到、如果沒有接收者、不會存儲起來、消息直接失效
@RestController
public class SendMsgController {
@Autowired
private JmsMessagingTemplate jmsMessagingTemplate;
@RequestMapping("/send")
public Object send(String msg) {
//發送點對點消息消息
//jmsMessagingTemplate.convertAndSend(new ActiveMQQueue("active.queue"), msg);
//群發消息
jmsMessagingTemplate.convertAndSend(new ActiveMQTopic("active.topic"), msg);
return "ok";
}
}
四、讓接收者能既能接受點對點,亦能接受群發
1、在接收者微服務添加以下配置
import javax.jms.ConnectionFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jms.annotation.EnableJms;
import org.springframework.jms.config.DefaultJmsListenerContainerFactory;
import org.springframework.jms.config.JmsListenerContainerFactory;
@Configuration
@EnableJms
public class JmsConfig {
@Bean
public JmsListenerContainerFactory<?> topicListenerFactory(ConnectionFactory connectionFactory) {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
factory.setPubSubDomain(true);
factory.setConnectionFactory(connectionFactory);
return factory;
}
@Bean
public JmsListenerContainerFactory<?> queueListenerFactory(ConnectionFactory connectionFactory) {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
factory.setPubSubDomain(false);
factory.setConnectionFactory(connectionFactory);
return factory;
}
}
2、接受類的方法修改成以下方式
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;
@Component
public class GetMsg {
/**
* 監聽和讀取active.queue消息
*/
@JmsListener(destination = "active.queue",containerFactory="queueListenerFactory")
public void readActiveTopic1(String message) {
System.out.println("點對點的----avtivemq001接受到:" + message);
}
/**
* 監聽和讀取active.queue消息
*/
@JmsListener(destination = "active.queue",containerFactory="topicListenerFactory")
public void readActiveTopic2(String message) {
System.out.println("群發的----avtivemq001接受到:" + message);
}
}
浙公網安備 33010602011771號