<output id="qn6qe"></output>

    1. <output id="qn6qe"><tt id="qn6qe"></tt></output>
    2. <strike id="qn6qe"></strike>

      亚洲 日本 欧洲 欧美 视频,日韩中文字幕有码av,一本一道av中文字幕无码,国产线播放免费人成视频播放,人妻少妇偷人无码视频,日夜啪啪一区二区三区,国产尤物精品自在拍视频首页,久热这里只有精品12

      關于MQ

      一、為什么要用MQ?

      比如存在訂單系統,用戶進行下單,下單完成之后調用物流系統、調用通知系統。
      這樣有沒有什么問題?
      1.比如物流系統是第三方的,可能響應慢,或者宕機,這時當你采用rpc接口調用物流系統時會失敗,而導致用戶下單也失敗了。也就是耦合的,非高可用的。
      所謂的MQ就是接收生產者的消息,進行存儲,這時如果物流系統是消費者,會不斷的拿消息進行消費。在分布式系統中,MQ完成消息的流轉,MQ的特點是穩定、高效、存儲機制,確保整個系統的異步解耦,削峰平谷。
       
      采用消息中間件進行重新架構:
      這樣做的好處就是,當用戶下單完畢之后,訂單系統會往MQ中發消息,整個用戶下單的流程就結束了。然后物流系統、通知系統會從MQ中拿消息進行消費。
       
      生活中常見的例子是充話費,很多年前在微信上沖玩花費馬上就到賬了,那時候可能微信還是通過調電信、或者移動接口的形式進行充話費。這種方式,如果電信的接口有問題,或者網絡有問題,等等,這時候話費就充值失敗了。體驗非常不好。現在都是不立馬返回充話費的結果,改成了異步機制,沖完話費發消息到MQ中,結束。電信來消費這個消息,如果電信宕機了,沒有關系。等電信恢復了,再次消費成功。
      2.MQ的第二個作用,流量削峰。
      當遇到秒殺、大促活動時,用戶進行下單,比如tps達到1w/s,按照原來的方式,調用RPC接口的形式,接口最大并發量為1000tps,電信和移動的充話費接口最大tps只能為1000。如果不采用MQ,很多請求就會失敗了,或者超時了。假如超時時間為30s,1w的充話費請求并發量過來,而電信接口只能承載1000tps,勢必很多請求就會超時,用戶充值請求30s后收不到響應,就會充值失敗了。
      但是中間加入MQ作為消息中間件的話,可以接收這1w個請求,MQ可以存儲消息,再慢慢進行消費。達到流量削峰的目的。
       

      二、RabbitMQ整體及路由機制深度剖析

      老的:IBM --MQ (收費) -> ActiveMQ Java開發,沒落了
      主流:RabbitMQ、Kafka、RocketMQ
      新的:Pulsar
       
      RabbitMQ后端控制端:
       
      Exchanges:
       
       
      DirectExchange:
       
       
      convertAndSend方法是void類型的,如果調用這個方法,執行到一半失敗了,網絡斷了怎么辦?怎么才能知道我有沒有發成功呢?
       
      RabbitMQ:AMQP的最經典的實現。
      RabbitMQ如何解決消息的丟失:
      1、生產者發送到Exchange丟失 --> 引入發送者確認機制 connectionFacory.setPublisherConfirms(true)
      2、消息正確的路由 --> 消息沒有正確的路由導致消息丟失 --> 加入失敗者通知 template.setMandatory(true)
      3、在Queue中丟失了 -->mq宕機、斷電 --> 持久化
      4、消費者也有可能丟失 -->盡量使用手動確認機制,來確保在消費端不丟失
       
       
       
      消息發送成功,消費者消費成功,console打印:
       
       
      路由失敗console打印:
       
       
      代碼實現:
      RabbitMQ的配置類:
      package com.cy.mq.config;
      
      import com.cy.mq.service.rabbit.Receiver;
      import org.springframework.amqp.core.AcknowledgeMode;
      import org.springframework.amqp.core.Binding;
      import org.springframework.amqp.core.BindingBuilder;
      import org.springframework.amqp.core.DirectExchange;
      import org.springframework.amqp.core.FanoutExchange;
      import org.springframework.amqp.core.Message;
      import org.springframework.amqp.core.Queue;
      import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
      import org.springframework.amqp.rabbit.connection.ConnectionFactory;
      import org.springframework.amqp.rabbit.connection.CorrelationData;
      import org.springframework.amqp.rabbit.core.RabbitAdmin;
      import org.springframework.amqp.rabbit.core.RabbitTemplate;
      import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
      import org.springframework.beans.factory.annotation.Autowired;
      import org.springframework.beans.factory.annotation.Value;
      import org.springframework.context.annotation.Bean;
      import org.springframework.context.annotation.Configuration;
      
      import java.util.HashMap;
      import java.util.Map;
      
      /**
       * RabbitMQ的配置類
       */
      @Configuration
      public class RabbitConfig {
      
          @Value("${spring.rabbitmq.host}")
          private String addresses;
      
          @Value("5672")
          private String port;
      
          @Value("guest")
          private String username;
      
          @Value("guest")
          private String password;
      
          @Value("/")
          private String virtualHost;
      
          @Autowired
          private Receiver receiver;
      
          /**
           * 連接工廠
           * @return
           */
          @Bean
          public ConnectionFactory connectionFactory() {
              CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
              connectionFactory.setAddresses(addresses + ":" + port);
              connectionFactory.setUsername(username);
              connectionFactory.setPassword(password);
              connectionFactory.setVirtualHost(virtualHost);
              //如果要進行消息的回調,這里必須要設置為true
              connectionFactory.setPublisherConfirms(true);
              return connectionFactory;
          }
      
          /**
           * rabbitAdmin封裝對RabbitMQ的管理操作
           */
          @Bean
          public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {
              return new RabbitAdmin(connectionFactory);
          }
      
          /**
           * 使用template給生產者,消費者,方便發消息
           */
          @Bean
          public RabbitTemplate newRabbitTemplate() {
              RabbitTemplate template = new RabbitTemplate(connectionFactory());
              //發送確認的回調方法的設置
              template.setConfirmCallback(confirmCallback());
              //開啟路由失敗通知
              template.setMandatory(true);
              //路由失敗的回調--這里只關注路由失敗的
              template.setReturnCallback(returnCallback());
              return template;
          }
      
          @Bean
          public DirectExchange DirectExchange() {
              return new DirectExchange("DirectExchange");
          }
          @Bean
          public FanoutExchange FanoutExchange() {
              return new FanoutExchange("FanoutExchange");
          }
      
          /**
           * 聲明死信交換器(Fanout交換器)
           */
          @Bean
          public FanoutExchange DlxExchange() {
              return new FanoutExchange("exchange-dlx");
          }
      
          @Bean
          public Queue queue1() {
              return new Queue("queue1");
          }
      
          /**
           * 聲明消息過期隊列 --隊列ttl
           */
          @Bean
          public Queue queueTTL() {
              Map<String, Object> arguments = new HashMap<>();
              arguments.put("x-message-ttl", 30*1000);
              arguments.put("x-dead-letter-exchange", "exchange-dlx");
              arguments.put("x-dead-letter-routing-key", "*");
              return new Queue("queue_ttl", true, false, false, arguments);
          }
      
          /**
           * 聲明專門存放死信消息的隊列
           */
          @Bean
          public Queue queueDLX() {
              return new Queue("queue_dlx");
          }
      
          /**
           * 綁定關系 綁定直連direct交換器
           */
          @Bean
          public Binding bindingDirectExchange() {
              return BindingBuilder.bind(queue1())
                      .to(DirectExchange())
                      .with("lijin.mq");
          }
      
          /**
           * 生產者發送確認
           */
          @Bean
          public RabbitTemplate.ConfirmCallback confirmCallback() {
              return new RabbitTemplate.ConfirmCallback() {
                  @Override
                  public void confirm(CorrelationData correlationData, boolean ack, String cause) {
                      if (ack) {
                          System.out.println("發送者確認發送給mq-(Exchange)成功");
                      } else {
                          System.out.println("發送者發送給mq-(Exchange)失敗,考慮重發:" + cause);
                      }
                  }
              };
          }
      
          /**
           * 失敗通知
           */
          @Bean
          public RabbitTemplate.ReturnCallback returnCallback() {
              return new RabbitTemplate.ReturnCallback() {
                  @Override
                  public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
                      System.out.println("無法路由的消息,需要考慮另外處理");
                      System.out.println("Returned replyText:" + replyText);
                      System.out.println("Returned exchange:" + exchange);
                      System.out.println("Returned routingKey:" + routingKey);
                      String msgJson = new String(message.getBody());
                      System.out.println("Returned message:" + msgJson);
                  }
              };
          }
      
          /**
           * 手動消費者確認
           */
          @Bean
          public SimpleMessageListenerContainer messageContainer() {
              SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory());
              //綁定了這個隊列
              container.setQueues(queue1());
              //手動提交
              container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
              container.setMessageListener(receiver);
              return container;
          }
      
      }
      發送MQ消息:
      package com.cy.mq.controller;
      
      import org.springframework.amqp.rabbit.core.RabbitTemplate;
      import org.springframework.beans.factory.annotation.Autowired;
      import org.springframework.web.bind.annotation.GetMapping;
      import org.springframework.web.bind.annotation.RequestMapping;
      import org.springframework.web.bind.annotation.RestController;
      
      /**
       * @description com.cy.mq.controller
       * @author: chengyu
       * @date: 2025-05-11 16:10
       */
      @RestController
      @RequestMapping("/rabbit")
      public class RabbitProducer {
      
          @Autowired
          private RabbitTemplate rabbitTemplate;
      
          /**
           * 普通直接交換器的測試
           * DirectExchange: 交換器的名字
           * lijin:路由鍵
           * sendMsg:消息內容,可以是字符串,可以是json串等。
           */
          @GetMapping("/direct")
          public String direct() {
              String sendMsg = "direct msg:" + System.currentTimeMillis();
              this.rabbitTemplate.convertAndSend("DirectExchange", "lijin.mq", sendMsg);
              return "發送direct消息成功!";
          }
      }
      RabbitMQ監聽類:
      package com.cy.mq.service.rabbit;
      
      import org.springframework.amqp.rabbit.annotation.RabbitHandler;
      import org.springframework.amqp.rabbit.annotation.RabbitListener;
      import org.springframework.stereotype.Component;
      
      /**
       * RabbitMQ監聽類
       */
      @Component
      @RabbitListener(queues = "queue1")
      public class Consumer1 {
      
          @RabbitHandler
          public void process(String msg) {
              int i = 0;
              System.out.println("Consumer1-Receiver: " + msg);
              //業務代碼,異常 怎么辦? 調用其他接口 返回值0代表處理失敗
      
          }
      }
      Receiver:
      package com.cy.mq.service.rabbit;
      
      import com.rabbitmq.client.Channel;
      import org.springframework.amqp.core.Message;
      import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
      import org.springframework.stereotype.Component;
      
      /**
       * 消費Queue
       */
      @Component
      public class Receiver implements ChannelAwareMessageListener {
      
          /**
           *
           * @param message
           * @param channel
           * @throws Exception
           */
          @Override
          public void onMessage(Message message, Channel channel) throws Exception {
              String msg = new String(message.getBody());
      
              try {
                  System.out.println("Receiver>>>>消息已消費");
                  //參數1:消息的唯一性  參數2:是否批量處理
                  channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
              } catch (Exception e) {
                  System.out.println(e.getMessage());
                  //參數3:消息是否重新發送
                  channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
                  System.out.println("Receiver>>>>拒絕消息,要求MQ重新派發");
                  throw e;
              }
          }
      
      }

       

      application.yml:
      spring:
        rabbitmq:
          host: 127.0.0.1
          port: 5672
          username: guest
          password: guest
          virtual-host: /
      pom.xml
       <?xml version="1.0" encoding="UTF-8"?>
      <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
               xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
          <modelVersion>4.0.0</modelVersion>
          <groupId>com.example</groupId>
          <artifactId>mq-springboot</artifactId>
          <version>0.0.1-SNAPSHOT</version>
          <name>mq-springboot</name>
          <description>mq project for Spring Boot</description>
          <properties>
              <java.version>1.8</java.version>
              <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
              <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
              <spring-boot.version>2.6.13</spring-boot.version>
          </properties>
          <dependencies>
              <dependency>
                  <groupId>org.springframework.boot</groupId>
                  <artifactId>spring-boot-starter</artifactId>
              </dependency>
      
              <dependency>
                  <groupId>org.springframework.boot</groupId>
                  <artifactId>spring-boot-starter-web</artifactId>
              </dependency>
      
              <dependency>
                  <groupId>org.springframework.boot</groupId>
                  <artifactId>spring-boot-starter-test</artifactId>
                  <scope>test</scope>
              </dependency>
      
              <!-- springboot引入rabbitMq -->
              <dependency>
                  <groupId>org.springframework.boot</groupId>
                  <artifactId>spring-boot-starter-amqp</artifactId>
              </dependency>
      
          </dependencies>
      
          <dependencyManagement>
              <dependencies>
                  <dependency>
                      <groupId>org.springframework.boot</groupId>
                      <artifactId>spring-boot-dependencies</artifactId>
                      <version>${spring-boot.version}</version>
                      <type>pom</type>
                      <scope>import</scope>
                  </dependency>
              </dependencies>
          </dependencyManagement>
      
          <build>
              <plugins>
                  <plugin>
                      <groupId>org.apache.maven.plugins</groupId>
                      <artifactId>maven-compiler-plugin</artifactId>
                      <version>3.8.1</version>
                      <configuration>
                          <source>1.8</source>
                          <target>1.8</target>
                          <encoding>UTF-8</encoding>
                      </configuration>
                  </plugin>
                  <plugin>
                      <groupId>org.springframework.boot</groupId>
                      <artifactId>spring-boot-maven-plugin</artifactId>
                      <version>${spring-boot.version}</version>
                      <configuration>
                          <mainClass>com.MqSpringbootApplication</mainClass>
                          <skip>true</skip>
                      </configuration>
                      <executions>
                          <execution>
                              <id>repackage</id>
                              <goals>
                                  <goal>repackage</goal>
                              </goals>
                          </execution>
                      </executions>
                  </plugin>
              </plugins>
          </build>
      
      </project>
       
       
       
       
       
       
       
       
       
       
       
       
       
       
       
       
       
       
       
      --
      posted on 2025-05-11 19:16  有點懶惰的大青年  閱讀(28)  評論(0)    收藏  舉報

      主站蜘蛛池模板: 久久精品欧美日韩精品| 91亚洲国产成人久久蜜臀| 亚洲AV永久无码天堂网一线| 永康市| 国产成本人片无码免费| 久久精品一本到99热免费| 中文字幕av一区二区三区人妻少妇| 精品少妇无码一区二区三批| 国产成人亚洲精品自产在线 | 四虎在线播放亚洲成人| 一区二区三区无码免费看| 中文字幕无码人妻aaa片| 日韩精品区一区二区三vr| 麻豆蜜桃av蜜臀av色欲av| 99re在线视频观看| 野花韩国高清电影| 精品人妻一区二区三区蜜臀| 激情亚洲一区国产精品| 国产丝袜在线精品丝袜不卡| 国产综合色在线精品| 精品 无码 国产观看| 日韩一区二区三区日韩精品| 精品视频国产狼友视频| 国产精品一区在线蜜臀| 乌兰浩特市| 婷婷四虎东京热无码群交双飞视频 | 专干老肥熟女视频网站| 亚洲精品麻豆一二三区| 亚洲高潮喷水无码AV电影| 亚洲色成人一区二区三区| 蜜臀久久精品亚洲一区| 蜜臀午夜一区二区在线播放| 国内精品一区二区在线观看| 中文字幕亚洲人妻系列| 国产成人久久蜜一区二区| 亚洲高清国产拍精品熟女| 亚洲欧美日韩综合久久| 国产一区二区不卡视频在线| 91网站在线看| 亚洲精品国产无套在线观| 日日噜久久人妻一区二区|