Rabbit MQ的幾種模式
RabbitMQ是實(shí)現(xiàn)了高級(jí)消息隊(duì)列協(xié)議(AMQP)的開(kāi)源消息代理軟件(亦稱面向消息的中間件)。
官網(wǎng)文檔:https://www.rabbitmq.com/getstarted.html
Rabbit MQ有幾種工作方式:
簡(jiǎn)單模式:一個(gè)生產(chǎn)者,一個(gè)消費(fèi)者
work模式:一個(gè)生產(chǎn)者,多個(gè)消費(fèi)者,每個(gè)消費(fèi)者獲取到的消息唯一,平均消費(fèi)。
訂閱模式:一個(gè)生產(chǎn)者發(fā)送的消息會(huì)被多個(gè)消費(fèi)者獲取。
路由模式:發(fā)送消息到交換機(jī)并且要指定路由key ,消費(fèi)者將隊(duì)列綁定到交換機(jī)時(shí)需要指定路由key
topic模式:將路由鍵和某模式進(jìn)行匹配,此時(shí)隊(duì)列需要綁定在一個(gè)模式上,“#”匹配一個(gè)詞或多個(gè)詞,“*”只匹配一個(gè)詞。
rpc模式:客戶端向一個(gè)隊(duì)列中發(fā)送消息,并注冊(cè)一個(gè)回調(diào)的隊(duì)列用于接收服務(wù)端返回的消息,該消息需要聲明一個(gè)叫做correaltionId的屬性,
該屬性將是該次請(qǐng)求的唯一標(biāo)識(shí)。服務(wù)端在接受到消息(在需要時(shí)可以驗(yàn)證correaltionId)后,處理消息,并將消息發(fā)送到客戶端注冊(cè)的回調(diào)隊(duì)列中。
1、簡(jiǎn)單模式
配置: public final static String SIMPLE_QUEUE = "simpleQueue"; @Bean public Queue simpleQueue() { return new Queue(SIMPLE_QUEUE, true); } 生產(chǎn)者: rabbitTemplate.convertAndSend(RabbitConfig.SIMPLE_QUEUE, msg); 消費(fèi)者:
@RabbitListener(queues = RabbitConfig.SIMPLE_QUEUE) public void simpleListen(String msg) { System.out.println("simple隊(duì)列 接收到消息:" + msg); }
2、work模式
一個(gè)生產(chǎn)者,多個(gè)消費(fèi)者,每個(gè)消費(fèi)者獲取到的消息唯一。一條消息只能被其中一個(gè)消費(fèi)掉,相互爭(zhēng)奪資源。
配置: public final static String WORK_QUEUE = "workQueue"; @Bean public Queue workQueue() { return new Queue(WORK_QUEUE, true); } 生產(chǎn)者: public void sendWorkQueueMq(String msg) { rabbitTemplate.convertAndSend(RabbitConfig.WORK_QUEUE, msg); logger.info("發(fā)送消息:{}", msg); } 消費(fèi)者: @RabbitListener(queues = RabbitConfig.WORK_QUEUE) public void workListen1(String msg) { System.out.println("work模式1 接收到消息:" + msg); } @RabbitListener(queues = RabbitConfig.WORK_QUEUE) public void workListen2(String msg) { System.out.println("work模式2 接收到消息:" + msg); }
3、發(fā)布/訂閱模式
一個(gè)生產(chǎn)者發(fā)送的消息會(huì)被多個(gè)消費(fèi)者獲取
配置: public final static String FANOUT_QUEUE_ONE = "fanout_queue_one"; public final static String FANOUT_QUEUE_TWO = "fanout_queue_two"; public final static String FANOUT_EXCHANGE = "fanout_exchange"; // fanout 廣播者模式隊(duì)列 @Bean public Queue fanoutQueueOne() { return new Queue(FANOUT_QUEUE_ONE, true); } @Bean public Queue fanoutQueueTwo() { return new Queue(FANOUT_QUEUE_TWO, true); } // fanout 交換器 @Bean public FanoutExchange fanoutExchange() { return new FanoutExchange(FANOUT_EXCHANGE); } // 廣播模式綁定 @Bean public Binding fanoutExchangeBingingOne() { return BindingBuilder.bind(fanoutQueueOne()).to(fanoutExchange()); } @Bean public Binding fanoutExchangeBingingTwo() { return BindingBuilder.bind(fanoutQueueTwo()).to(fanoutExchange()); } 生產(chǎn)者: public void sendFanoutExchangeMq(String msg) { rabbitTemplate.convertAndSend(RabbitConfig.FANOUT_EXCHANGE, "", msg); logger.info("發(fā)送消息:{}", msg); } 消費(fèi)者: @RabbitListener(queues = RabbitConfig.FANOUT_QUEUE_ONE) public void fanoutListen1(String msg) { System.out.println("fanout模式1 接收到消息:" + msg); } @RabbitListener(queues = RabbitConfig.FANOUT_QUEUE_TWO) public void fanoutListen2(String msg) { System.out.println("fanout模式2 接收到消息:" + msg); }
4、路由模式
發(fā)送消息到交換機(jī)并且要指定路由key ,消費(fèi)者將隊(duì)列綁定到交換機(jī)時(shí)需要指定路由key。那么消息只會(huì)發(fā)送到相應(yīng)key相同的隊(duì)列,接著監(jiān)聽(tīng)該隊(duì)列的消費(fèi)者消費(fèi)消息。
配置: public final static String DIRECT_QUEUE_ONE = "direct_queue_one"; public final static String DIRECT_QUEUE_TWO = "direct_queue_two"; public final static String DIRECT_EXCHANGE = "direct_exchange"; // direct 路由模式隊(duì)列 @Bean public Queue directQueueOne() { return new Queue(DIRECT_QUEUE_ONE, true); } @Bean public Queue directQueueTwo() { return new Queue(DIRECT_QUEUE_TWO, true); } // direct 交換器 @Bean public DirectExchange directExchange() { return new DirectExchange(DIRECT_EXCHANGE); } //路由模式綁定 @Bean public Binding directExchangeBingingOne() { return BindingBuilder.bind(directQueueOne()).to(directExchange()).with("orange"); } @Bean public Binding directExchangeBingingTwo() { return BindingBuilder.bind(directQueueTwo()).to(directExchange()).with("black"); } 生產(chǎn)者: public void sendDirectExchangeMq(String routingKey, String msg) { rabbitTemplate.convertAndSend(RabbitConfig.DIRECT_EXCHANGE,"orange" , msg); logger.info("發(fā)送消息:{}", msg); } 消費(fèi)者: @RabbitListener(queues = RabbitConfig.DIRECT_QUEUE_ONE) public void directListenOne(String msg) { System.out.println("direct模式1 接收到消息:" + msg); } @RabbitListener(queues = RabbitConfig.DIRECT_QUEUE_TWO) public void directListenTwo(String msg) { System.out.println("direct模式2 接收到消息:" + msg); } 如上代碼,只有routingKey 為orange的能收到消息
5、topic模式
將路由鍵和某模式進(jìn)行匹配,此時(shí)隊(duì)列需要綁定在一個(gè)模式上,“#”匹配一個(gè)詞或多個(gè)詞,“*”只匹配一個(gè)詞。
配置: public final static String TOPIC_QUEUE_ONE = "topic_queue_one"; public final static String TOPIC_QUEUE_TWO = "topic_queue_two"; public final static String TOPIC_EXCHANGE = "topic_exchange"; public final static String TOPIC_ROUTINGKEY_ONE = "common.key"; public final static String TOPIC_ROUTINGKEY_TWO = "*.key"; // topic 訂閱者模式隊(duì)列 @Bean public Queue topicQueueOne() { return new Queue(TOPIC_QUEUE_ONE, true); } @Bean public Queue topicQueueTwo() { return new Queue(TOPIC_QUEUE_TWO, true); } // topic 交換器 @Bean public TopicExchange topExchange() { return new TopicExchange(TOPIC_EXCHANGE); } // 訂閱者模式綁定 @Bean public Binding topicExchangeBingingOne() { return BindingBuilder.bind(topicQueueOne()).to(topExchange()).with(TOPIC_ROUTINGKEY_ONE); } @Bean public Binding topicExchangeBingingTwo() { return BindingBuilder.bind(topicQueueTwo()).to(topExchange()).with(TOPIC_ROUTINGKEY_TWO); } 生產(chǎn)者: public void sendTopicExchangeMq(String routingKey, String msg) { rabbitTemplate.convertAndSend(RabbitConfig.TOPIC_EXCHANGE, "common.key", msg); logger.info("發(fā)送消息:{}", msg); } 消費(fèi)者: @RabbitListener(queues = RabbitConfig.TOPIC_QUEUE_ONE) public void topicListenOne(String msg) { System.out.println("topic模式1 接收到消息:" + msg); } @RabbitListener(queues = RabbitConfig.TOPIC_QUEUE_TWO) public void topicListenTwo(String msg) { System.out.println("topic模式2 接收到消息:" + msg); } 根據(jù)routingKey匹配對(duì)應(yīng)的才能收到消息
6、rpc模式
客戶端向一個(gè)隊(duì)列中發(fā)送消息,并注冊(cè)一個(gè)回調(diào)的隊(duì)列用于接收服務(wù)端返回的消息,該消息需要聲明一個(gè)叫做correaltionId的屬性,
該屬性將是該次請(qǐng)求的唯一標(biāo)識(shí)。服務(wù)端在接受到消息(在需要時(shí)可以驗(yàn)證correaltionId)后,處理消息,并將消息發(fā)送到客戶端注冊(cè)的回調(diào)隊(duì)列中。
配置: public final static String RPC_SIMPLE_QUEUE_ONE = "rpcSimpleQueue_one"; public final static String RPC_SIMPLE_QUEUE_TWO = "rpcSimpleQueue_two"; // rpc簡(jiǎn)單模式隊(duì)列 @Bean public Queue rpcSimpleQueueOne() { return new Queue(RPC_SIMPLE_QUEUE_ONE, true); } @Bean public Queue rpcSimpleQueueTwo() { return new Queue(RPC_SIMPLE_QUEUE_TWO, true); } @Value("${spring.rabbitmq.addresses}") private String host; @Value("${spring.rabbitmq.username}") private String username; @Value("${spring.rabbitmq.password}") private String password; @Autowired ConnectionFactory connectionFactory; @Autowired RabbitTemplate rabbitTemplate; @Bean(name = "connectionFactory") public ConnectionFactory connectionFactory() { CachingConnectionFactory connectionFactory = new CachingConnectionFactory(); connectionFactory.setAddresses(host); connectionFactory.setUsername(username); connectionFactory.setPassword(password); connectionFactory.setVirtualHost("/"); return connectionFactory; } public RabbitTemplate getRabbitTemplate() { rabbitTemplate.setReplyAddress(RPC_SIMPLE_QUEUE_TWO); rabbitTemplate.setReplyTimeout(2000); return rabbitTemplate; } @Bean(name = "replyMessageListenerContainer") public SimpleMessageListenerContainer createReplyListenerContainer() { SimpleMessageListenerContainer listenerContainer = new SimpleMessageListenerContainer(); listenerContainer.setConnectionFactory(connectionFactory); listenerContainer.setQueueNames(RPC_SIMPLE_QUEUE_TWO); listenerContainer.setMessageListener(getRabbitTemplate()); return listenerContainer; } 生產(chǎn)者: public Message sendRpcSimpleQueueMq(Message msg) { rabbitTemplate.setReplyAddress(RabbitConfig.RPC_SIMPLE_QUEUE_TWO); rabbitTemplate.setReplyTimeout(2000); Message message = rabbitTemplate.sendAndReceive(RabbitConfig.RPC_SIMPLE_QUEUE_ONE, msg); logger.info("發(fā)送消息:{}", msg); return message; } 消費(fèi)者: @RabbitListener(queues = RabbitConfig.RPC_SIMPLE_QUEUE_ONE) public void rpcSimpleListenOne(Message msg) { System.out.println("rpc simple 1隊(duì)列 接收到消息:" + msg); rabbitTemplate.send(RabbitConfig.RPC_SIMPLE_QUEUE_TWO, con("回復(fù)消息:" + new String(msg.getBody()), msg.getMessageProperties().getCorrelationId())); } public Message con(String s, String id) { MessageProperties mp = new MessageProperties(); byte[] src = s.getBytes(Charset.forName("UTF-8")); mp.setCorrelationId(id); mp.setContentType("application/json"); mp.setContentEncoding("UTF-8"); mp.setContentLength((long) s.length()); return new Message(src, mp); }
、問(wèn)題處理

浙公網(wǎng)安備 33010602011771號(hào)