<output id="qn6qe"></output>

    1. <output id="qn6qe"><tt id="qn6qe"></tt></output>
    2. <strike id="qn6qe"></strike>

      亚洲 日本 欧洲 欧美 视频,日韩中文字幕有码av,一本一道av中文字幕无码,国产线播放免费人成视频播放,人妻少妇偷人无码视频,日夜啪啪一区二区三区,国产尤物精品自在拍视频首页,久热这里只有精品12

      Rabbitmq的使用

      rabbitmq的使用

      1. 使用場景及它的特點介紹

      image

      2. mq的5種常用消息模型

      2.1 隊列模型—-1 對 1

      image

      image

      2.2 隊列模型 — 1(生產者)對多(消費者)

      特點:
      	1.當有多個消費者時,無論消費者處理的性能是否相同,生產者的消費會平均分配給每一個消費者
      	2.每個消費者處理的消息是否存在重復? 不會重復
      	解釋:為什么開啟多個消費者時,會出現有的消費者雖然處理的慢,但是也會收到相同的消息的個數?
      		rabbitmq有消息默認的分配機制:平均分配(有多少個消費者,都將平均分配要處理的消息數)
      優化: 能者多勞
      	在消費處理消息時,可以設置由隊列每次分配給消費者的消息數量,不要一次性全分完
      

      2.3 隊列模式的代碼實現

      2.3.1 生產的核心代碼

      import cn.itsource.mq.utils.ConnectionUtil;
      import com.rabbitmq.client.Channel;
      import com.rabbitmq.client.Connection;
      public class Producer {
      	private static final String QUEUE_NAME = "queue_workqueue";
      	public static void main(String[] args) throws Exception {
      		//1.創建連接
      		Connection connection = ConnectionUtil.getConnection();
      		//2.生產者與服務端之間建立通道
      		Channel channel = connection.createChannel();
      		for (int i = 0; i < 20; i++) {
      			/**
      			 * 發送消息到隊列
      			 * @param exchange 交換機名稱
      			 * @param routingKey 發送到哪個隊列(這個參數很容易搞錯:沒有交換機時,這個參數必須填隊列名稱;有交換機的時候,就填路由)
      			 * @param props 消息的其他屬性
      			 * @param body 消息體
      			 */
       //在實際開發中,我們也會將發送的內容,以字符串進行傳輸。但是涉及到對象類型,會將其先轉為json字符串。
      			String message = "queue_workqueue: 這是一個消息!" + i;
      			System.out.println(message);
      			//3. 調用API進行消息的發送
      			channel.basicPublish("",QUEUE_NAME,null,message.getBytes("utf-8"));
      		}
      		//5.關閉連接
      		connection.close();
      	}
      }
      

      2.3.2 消費者的代碼實現

      import cn.itsource.mq.utils.ConnectionUtil;
      import com.rabbitmq.client.*;
      import java.io.IOException;
      public class Consumer01 {
      	//隊列的名稱,必須要與接收的消息生產者,設置的隊列名相同
      	private static final String QUEUE_NAME = "queue_workqueue";
      	public static void main(String[] args) throws Exception {
      		//1.創建連接
      		Connection connection = ConnectionUtil.getConnection();
      		//2.生產者與服務端之間建立通道
      		Channel channel = connection.createChannel();
      		//3.聲明隊列:因為生產者那邊已經聲明過隊列了,所以這邊就不需要聲明隊列
      		/**
      		 * 3.聲明隊列
      		 * @param queue 隊列名稱
      		 * @param durable 是否持久化
      		 * @param exclusive 是否為專用隊列
      		 * @param autoDelete 是否自動刪除
      		 * @param arguments 其他參數
      		 */
      		channel.queueDeclare(QUEUE_NAME, true, false, false, null);
      		//設置消費者每次預提取1個消息【這是一個提高消息處理效率的參數。表示每次接收幾個消息】
      		channel.basicQos(1);
      		//4. 采用匿名內部類 寫一個DefaultConsumer的子類,子類中重寫handleDelivery方法
      		DefaultConsumer consumer = new DefaultConsumer(channel) {
      			@Override
      			public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
      				try {
      					//接收消息
      					String message = new String(body, "utf-8");
      					Thread.sleep(500);
      					System.out.println("消費者收到消息:" + message);
      					long deliveryTag = envelope.getDeliveryTag();
      					/**
      					【如果采用默認的 自動確認ACK機制 ,則可省略】
      					 * 正常情況下的手動回執
      					 * @param deliveryTag 處理消息的標識
      					 * @param multiple 是否自動批量處理(自動處理隊列中當前消息,及之前的所有消息) false表示只處理當前消息
      					 */
      		  //注意:當ACK采用手動確認機制時,確認消息的成功發送的代碼,一定要放在當前方法體的最后一行
      					channel.basicAck(deliveryTag, false);
      				} catch (Exception e) {
      					e.printStackTrace();
      				}
      			}
      		};
      		/**
      		 * 5.監聽隊列
      		 *  一旦被監聽的隊列中有新的消息,就自動調用consumer對象的handleDelivery方法來接收消息
      		 * @param queue 隊列名稱
      		 * @param autoAck 是否自動回執 true表示自動回執,false表示手動回執
      		 * @param callback 接收消息的回調方法Consumer
      		 */
      		channel.basicConsume(QUEUE_NAME, false, consumer);
      	}
      }
      

      2.4 訂閱模型的代碼實現

      2.4.1 訂閱模型分3種

      1. fanout類型 : 1.不需要設置routekey,生產者的消息,會統一分別發給每一個消費者
      2. direct : 1. 設置routekey,且生產者在發送消息時,也要指定routekey,且消息在過濾時,需要完全匹配生產指定的routekey
      3. topic  : 1. 在設置toutekey時,可以引用【通配符】 ;2.通配符分2種:*:單個匹配;#:多個匹配
      
      • fanout模型的效果圖
        image

      • direct效果圖
        image

      • topic效果圖
        image

      2.4.2 生產者的代碼實現

      import cn.itsource.mq.utils.ConnectionUtil;
      import com.rabbitmq.client.BuiltinExchangeType;
      import com.rabbitmq.client.Channel;
      import com.rabbitmq.client.Connection;
      public class Producer {
      	private static final String EXCHANGE_NAME = "direct_exchange";
      	public static void main(String[] args) throws Exception {
      		//1.創建連接
      		Connection connection = ConnectionUtil.getConnection();
      		//2.生產者與服務端之間建立通道
      		Channel channel = connection.createChannel();
      		/**
      		 * 3.聲明交換機
      		 * @param exchange 交換機名稱
      		 * @param type 交換機類型
      		 * @param durable 是否持久化
      		 */
      		channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT, true);
      		/**
      		 * 4.發送消息到隊列
      		 * @param exchange 交換機名稱
      		 * @param routingKey 發送到哪個隊列(這個參數很容易搞錯:沒有交換機時,這個參數必須填隊列名稱;有交換機的時候,就填路由)
      		 * @param props 消息的其他屬性
      		 * @param body 消息體
      		 */
      		String message = "這是一個消息!" + System.currentTimeMillis();
      		System.out.println(message);
      		//要指定 路由key  : routekey。設置后,對應的消費者,只要在監聽指定的路由key的消息,才會收取到 
      		channel.basicPublish(EXCHANGE_NAME,"email",null,message.getBytes("utf-8"));
      		//5.關閉連接
      		connection.close();
      	}
      }
      

      2.4.3 消費者的代碼實現

      import cn.itsource.mq.utils.ConnectionUtil;
      import com.rabbitmq.client.*;
      import java.io.IOException;
      public class ConsumerEMAIL {
      	private static final String QUEUE_NAME_EMAIL = "queue_direct_email";
      	private static final String EXCHANGE_NAME = "direct_exchange";
      	public static void main(String[] args) throws Exception {
      		//1.創建連接
      		Connection connection = ConnectionUtil.getConnection();
      		//2.生產者與服務端之間建立通道
      		Channel channel = connection.createChannel();
      		/**
      		 * 3.聲明隊列
      		 * @param queue 隊列名稱
      		 * @param durable 是否持久化
      		 * @param exclusive 是否為專用隊列
      		 * @param autoDelete 是否自動刪除
      		 * @param arguments 其他參數
      		 */
      		channel.queueDeclare(QUEUE_NAME_EMAIL,true,false,false,null);
      		/**
      			在綁定到 指定的交換機時,要同時指定接收什么類型的 routekey消息
      		 * 4.將隊列綁定到交換機
      		 * @param queue 隊列名稱
      		 * @param exchange 交換機名稱
      		 * @param routingKey 路由設置
      		 * @param arguments 其他參數
      		 */
      		channel.queueBind(QUEUE_NAME_EMAIL,EXCHANGE_NAME,"email",null);
      		//設置消費者每次預提取1個消息
      		channel.basicQos(1);
      		//采用匿名內部類 寫一個DefaultConsumer的子類,子類中重寫handleDelivery方法
      		DefaultConsumer consumer = new DefaultConsumer(channel){
      			@Override
      			public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
      				try {
      					//接收消息
      					String message = new String(body,"utf-8");
      					Thread.sleep(2000);
      					System.out.println("消費者收到消息:"+message);
      					/**
      					 * 正常情況下的手動回執
      					 * @param deliveryTag 處理消息的標識
      					 * @param multiple 是否自動批量處理(自動處理隊列中當前消息,及之前的所有消息) false表示只處理當前消息
      					 */
      					channel.basicAck(envelope.getDeliveryTag(),false);
      				} catch (Exception e) {
      					e.printStackTrace();
      				}
      			}
      		};
      		/**
      		 * 4.監聽隊列
      		 *  一旦被監聽的隊列中有新的消息,就自動調用consumer對象的handleDelivery方法來接收消息
      		 * @param queue 隊列名稱
      		 * @param autoAck 是否自動回執 true表示自動回執,false表示手動回執
      		 * @param callback 接收消息的回調方法Consumer
      		 */
      		channel.basicConsume(QUEUE_NAME_EMAIL, false, consumer);
      	}
      }
      

      3. springboot整合mq

      • springboot整合mq時,在企業開發中,都會將生產者和消費者分開集成到 2個工程中

      3.1 整合生產者

      3.1.1 導入pom依賴

      <!--spirngboot集成rabbitmq-->
      <dependency>
      	<groupId>org.springframework.boot</groupId>
      	<artifactId>spring-boot-starter-amqp</artifactId>
      </dependency>
      

      3.1.2 配置yml

      spring:
        rabbitmq:
      	  host: 127.0.0.1
      	  port: 5672
      	  username: guest
      	  password: guest
      	  virtualHost: /
      	  listener:
      		simple:
      		  acknowledge-mode: manual #手動簽收
      		  prefetch: 1     #預提取1條消息
      	  publisher-confirms: true #消息發送到交換機失敗回調
      	  publisher-returns: true  #消息發送到隊列失敗回調
      	  template:
      		mandatory: true # 必須設置成true 消息路由失敗通知監聽者,而不是將消息丟棄
      

      3.1.3 配置啟動類的注解

      • 不需要在啟動類添加開啟的注解,但是需要添加幾個@Bean的配置

      • 配置bean

          public static final String EXCHANGE_NAME = "springboot-rabbitmq-exchange";
          public static final String QUEUE_NAME_SMS = "springboot-rabbitmq-queue-sms";
          public static final String QUEUE_NAME_EMAIL = "springboot-rabbitmq-queue-email";
          /**
           * 聲明交換機
           * @return
           */
          @Bean(EXCHANGE_NAME)
          public Exchange EXCHANGE_NAME(){
          	return ExchangeBuilder.topicExchange(EXCHANGE_NAME).durable(true).build();
          }
          /**
           * 聲明隊列:sms
           * @return
           */
          @Bean(QUEUE_NAME_SMS)
          public Queue QUEUE_NAME_SMS(){
          	return QueueBuilder.durable(QUEUE_NAME_SMS).build();
          }
          /**
           * 聲明隊列:email
           * @return
           */
          @Bean(QUEUE_NAME_EMAIL)
          public Queue QUEUE_NAME_EMAIL(){
          	return QueueBuilder.durable(QUEUE_NAME_EMAIL).build();
          }
          /**
           * sms隊列綁定到交換機
           *  需要參數有兩個辦法:
           *      1)直接在方法內部調用其他方法獲取對象
           *      2)直接方法參數中寫變量,Spring會自動從Spring容器取出對象進行依賴注入
           * @param queue
           * @param exchange
           * @return
           */
          @Bean
          public Binding BINDING_QUEUE_NAME_SMS(@Qualifier(QUEUE_NAME_SMS)Queue queue, Exchange exchange){
          	return BindingBuilder.bind(queue).to(exchange).with("user.#.sms").noargs();
          }
          /**
           * email隊列綁定到交換機
           * @param queue
           * @param exchange
           * @return
           */
          @Bean
          public Binding BINDING_QUEUE_NAME_EMAIL(@Qualifier(QUEUE_NAME_EMAIL)Queue queue, Exchange exchange){
          	return BindingBuilder.bind(queue).to(exchange).with("user.#.email").noargs();
          }
        

      3.1.4 測試

      • 先定義一個controller,調用RabbitmqTemplate方法。

      在瀏覽器中,調用一次下面的消息發送的方法,就到 rabbitmq服務器中,檢查是否生成了對應的exchange和queue的內容

      import org.springframework.amqp.rabbit.core.RabbitTemplate;
      import org.springframework.beans.factory.annotation.Autowired;
      import org.springframework.web.bind.annotation.GetMapping;
      import org.springframework.web.bind.annotation.RestController;
      @RestController
      public class ProducerController {
      	@Autowired
      	private RabbitTemplate rabbitTemplate;
      	@GetMapping("/sendMsg")
      	public void sendMsg(String msg) {
      		//發送一個消息給mq服務器
      		rabbitTemplate.convertAndSend(Contants.EXCHANGE_NAME, "user.email", msg);
      	}
      }
      
      • 檢查rabbitmq服務器,是否會生成對應的exchange和queue的數據

      image

      image

      3.2 整合消費者

      3.2.1 導入pom依賴

      <!--spirngboot集成rabbitmq-->
      <dependency>
      	<groupId>org.springframework.boot</groupId>
      	<artifactId>spring-boot-starter-amqp</artifactId>
      </dependency>
      

      3.2.2 配置yml

      spring:
        rabbitmq:
      	  host: 127.0.0.1
      	  port: 5672
      	  username: guest
      	  password: guest
      	  virtualHost: /
      	  listener:
      		simple:
      		  acknowledge-mode: manual #手動簽收
      		  prefetch: 1     #預提取1條消息
      	  publisher-confirms: true #消息發送到交換機失敗回調
      	  publisher-returns: true  #消息發送到隊列失敗回調
      	  template:
      		mandatory: true # 必須設置成true 消息路由失敗通知監聽者,而不是將消息丟棄
      

      3.2.3 配置啟動注解或bean

      3.2.4 測試

      • 消費者的核心代碼

          import com.rabbitmq.client.Channel;
          import org.springframework.amqp.core.Message;
          import org.springframework.amqp.rabbit.annotation.RabbitListener;
          import org.springframework.stereotype.Component;
          import java.io.IOException;
          @Component
          public class ConsumerListener {
          	public static final String QUEUE_NAME_SMS = "springboot-rabbitmq-queue-sms";
          	public static final String QUEUE_NAME_EMAIL = "springboot-rabbitmq-queue-email";
          	/**
          	 * 監聽器:監聽一個或者多個隊列
          	 *  被監聽的隊列中一旦有了新的消息,就自動執行此方法來處理消息
          	 * @param msg
          	 * @param message
          	 * @param channel
          	 */
          	@RabbitListener(queues = {QUEUE_NAME_SMS})
          	public void accept_sms(String msg, Message message, Channel channel){
          		try {
          			System.out.println("SMS消費者收到消息:" + msg);
          			//成功接收消息
          			channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
          		} catch (IOException e) {
          			e.printStackTrace();
          		}
          	}
          	/**
          	 * 監聽器:監聽一個或者多個隊列
          	 *  被監聽的隊列中一旦有了新的消息,就自動執行此方法來處理消息
          	 * @param msg
          	 * @param message
          	 * @param channel
          	 */
          	@RabbitListener(queues = {QUEUE_NAME_EMAIL})
          	public void accept_email(String msg, Message message, Channel channel){
          		try {
          			System.out.println("EMAIL消費者收到消息:" + msg);
          			//成功接收消息
          			channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
          		} catch (IOException e) {
          			e.printStackTrace();
          		}
          	}
          }
        
      • 啟動消費者的工程后,不需要做任何事,只要生產者發送成功一條消息,消費者就應該能接收到 消息內容,如果接收不到 ,說明 環境 配置失敗

      posted @ 2024-07-04 21:16  二價亞鐵  閱讀(173)  評論(0)    收藏  舉報
      主站蜘蛛池模板: 清纯唯美人妻少妇第一页| 天门市| 亚洲中文字幕一区二区| 国产成人亚洲日韩欧美| 久久精品色一情一乱一伦| 欧美乱码伦视频免费| 亚洲综合网国产精品一区| 国产精品成人久久电影| 国产目拍亚洲精品二区| 国产亚洲久久久久久久| 久久天天躁夜夜躁狠狠ds005| 在线视频一区二区三区色| 精品久久久久久无码免费| 热久久这里只有精品国产| 国产一级av在线播放| 日韩国产成人精品视频| 日韩视频中文字幕精品偷拍| 亚洲在线一区二区三区四区| 久久亚洲精品11p| 伊人精品成人久久综合| 99久久精品国产亚洲精品| 国产日韩入口一区二区| 精品午夜久久福利大片| 国产精品人伦一区二区三| 国产精品亚洲av三区色| 国产免费午夜福利蜜芽无码| 仙桃市| 日韩av中文字幕有码| 在线免费播放av观看| 国产色无码专区在线观看| 久久国产免费观看精品3| 五月婷婷激情第四季| 成人午夜大片免费看爽爽爽| 亚洲综合天堂一区二区三区| 小雪被老外黑人撑破了视频| 亚洲美女厕所偷拍美女尿尿| av一本久道久久波多野结衣| 国产av精品一区二区三区| 精品偷拍一区二区三区| AV在线亚洲欧洲日产一区二区 | 国产情侣激情在线对白|