<output id="qn6qe"></output>

    1. <output id="qn6qe"><tt id="qn6qe"></tt></output>
    2. <strike id="qn6qe"></strike>

      亚洲 日本 欧洲 欧美 视频,日韩中文字幕有码av,一本一道av中文字幕无码,国产线播放免费人成视频播放,人妻少妇偷人无码视频,日夜啪啪一区二区三区,国产尤物精品自在拍视频首页,久热这里只有精品12

      rabbitMq

       

      1,新建exchange  新建queue
         在exchange中綁定queue并設置routing_key
      2,消息發送,發送到exchange,routingKey  
         不同的queue會存放自己綁定routingkey的消息
         消息監聽,監聽queue 
      3,各個消費者監聽自己的queue, 每個queue可以存放同樣的消息,就是廣播模式了
      4, exchange類型有:
      direct直連類型  在exchange中queue需要一一逐個綁定routingKey  test.aa test.bb
      topic主題類型  queue只需要模糊匹配綁定就行  test.#
      

       

      1,多個服務器實例,多個消費者,每個消費者又可以多線程,并發消費同一隊列
      2,多個監聽消費者,不同隊列,同一條數據每個消費者都會消費到,廣播  (消費發送到exchange交換機、routingkey上,不同的queue會存放自己綁定routingkey的消息)
      
      fanout 可由direct替代,它是一種廣播式的交換器(散開分列),表示分發,所有的消費者得到同樣的隊列信息,發布/訂閱,會將接收到的消息發送給所有綁定到該交換器的隊列,
      而不考慮路由鍵(routing key)。這意味著無論消息發布時指定了什么路由鍵,Fanout 交換器都會忽略這些信息,并簡單地將消息復制并分發給所有的綁定隊列。
      direct可以建多個queue監聽同一個routingkey來實現分發 可來替代fanout

      concurrency 多線程消費 

      Purge 隊列清理
      
          @Autowired
          private RabbitTemplate rabbitTemplate;
          public String mq(String name) {
              log.info("消息生產:{}", name);
              rabbitTemplate.convertAndSend("queuexmh", name);
              /**
               * one_q one_x 映射到 queue_x1, two_q映射到 queue_x2
               */
              rabbitTemplate.convertAndSend("exchange_direct_xmh","one_q", "queue_x1"+name);
              rabbitTemplate.convertAndSend("exchange_direct_xmh","one_x", "queue_x1"+name);
              rabbitTemplate.convertAndSend("exchange_direct_xmh","two_q", "queue_x2"+name);
      
              /**
               * one.t1 映射到 queue_x1, two.t1 two.t2 映射到 queue_x2
               * 后臺做了關聯 two.# 映射到 queue_x2, one.# 映射到 queue_x1
               */
              rabbitTemplate.convertAndSend("exchange_topic_xmh","two.t1", "queue_x2"+name);
              rabbitTemplate.convertAndSend("exchange_topic_xmh","two.t2", "queue_x2"+name);
              rabbitTemplate.convertAndSend("exchange_topic_xmh","one.t1", "queue_x1"+name);
              return "hello"+name;
          }
      
      @Component
      public class RabbitConsumer {
          //指定queue消費
          @RabbitListener(queues = "queue_x1",concurrency = "1")
          public void queueX1(Message message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag)  throws IOException {
             try {
                 System.out.println("queue_x1收到消息:" + message);
                 //設置為手動確認時生效 手動確認消息(第二個參數false表示不批量確認)
                 //channel.basicAck(deliveryTag, false);
             } catch (Exception e) {
                 // 處理失敗時拒絕消息(requeue=false表示不重回隊列)
                 channel.basicNack(deliveryTag, false, false);
             }
          }
          //指定queue消費
          @RabbitListener(queues = "queue_x2",concurrency = "1")
          public void queueX2(Message message) {
              System.out.println("queue_x2收到消息:" + message);
          }
          //指定queue消費
          @RabbitListener(queues = "${rabbitmq.queue.sendSms}",concurrency = "5",containerFactory =
                  "rabbitListenerContainerFactory")
          public void onMessage(Message message) {
      }
      
              <dependency>
                  <groupId>org.springframework.boot</groupId>
                  <artifactId>spring-boot-starter-amqp</artifactId>
                  <version>2.6.8</version>
              </dependency>
      
      spring:
        rabbitmq:
          port: 5672
          host: 192.168.243.128:5672
          username: admin
          password: admin
          virtual-host: xmh
          publisher-confirm-type: correlated
          publisher-returns: true
          listener:
            direct:
              acknowledge-mode: auto
            simple:
              acknowledge-mode: auto
              retry:
                enabled: true
      
      @Configuration
      public class RabbitConfig {
          @Value("${spring.rabbitmq.host}")
          private String addresses;
          @Value("${spring.rabbitmq.username}")
          private String username;
          @Value("${spring.rabbitmq.password}")
          private String password;
          @Value("${spring.rabbitmq.virtual-host}")
          private String virtualHost;
          // 構建mq實例工廠
          @Bean
          public ConnectionFactory connectionFactory(){
              CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
              connectionFactory.setAddresses(addresses);
              connectionFactory.setUsername(username);
              connectionFactory.setPassword(password);
              connectionFactory.setVirtualHost(virtualHost);
              return connectionFactory;
          }
          @Bean
          public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
              SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
              factory.setConnectionFactory(connectionFactory);
              // 若設置為手動確認,必須手動調用ack   AUTO 自動確認 默認值
              factory.setAcknowledgeMode(AcknowledgeMode.AUTO);
              factory.setMessageConverter(new Jackson2JsonMessageConverter());
              return factory;
          }
          @Bean
          public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory){
              return new RabbitAdmin(connectionFactory);
          }
          @Bean
          @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE) //每次注入的時候回自動創建一個新的bean實例
          public RabbitTemplate rabbitTemplate(){
              return new RabbitTemplate(connectionFactory());
          }
      

      自動綁定

      package com.xmh.config;
      import org.springframework.amqp.core.*;
      import org.springframework.context.annotation.Bean;
      import org.springframework.context.annotation.Configuration;
      @Configuration
      public class RabbitMQConfig {
          // 交換機名稱
          public static final String EXCHANGE_NAME = "xmh.exchange";
          // 隊列名稱
          public static final String QUEUE_NAME = "xmh.queue";
          public static final String QUEUE_NAME2 = "xmh2.queue";
          // 路由鍵
          public static final String ROUTING_KEY = "xmh.routing.key";
          public static final String ROUTING_KEY2 = "xmh2.routing.key";
          /**
           * 聲明交換機
           * 這里使用的是Direct類型交換機,也可以根據需要使用Topic、Fanout等類型
           */
          @Bean
          public DirectExchange exampleExchange() {
              // durable: 是否持久化 autoDelete: 是否自動刪除
              return new DirectExchange(EXCHANGE_NAME, true, false);
          }
          /**
           * 聲明隊列
           */
          @Bean
          public Queue exampleQueue() {
              // durable: 是否持久化 exclusive: 是否排他 autoDelete: 是否自動刪除
              return QueueBuilder.durable(QUEUE_NAME).build();
          }
          @Bean
          public Queue exampleQueue2() {
              // durable: 是否持久化 exclusive: 是否排他 autoDelete: 是否自動刪除
              return QueueBuilder.durable(QUEUE_NAME2).build();
          }
          /**
           * 綁定交換機和隊列
           */
          @Bean
          public Binding binding(Queue exampleQueue, DirectExchange exampleExchange) {
              // 將隊列通過路由鍵綁定到交換機
              return BindingBuilder.bind(exampleQueue).to(exampleExchange).with(ROUTING_KEY);
          }
          @Bean
          public Binding binding2(Queue exampleQueue2, DirectExchange exampleExchange) {
              // 將隊列通過路由鍵綁定到交換機
              return BindingBuilder.bind(exampleQueue2).to(exampleExchange).with(ROUTING_KEY2);
          }
      }
      
          rabbitTemplate.convertAndSend(EXCHANGE_NAME,ROUTING_KEY2, ROUTING_KEY2+name);
      
          //指定queue消費
          @SneakyThrows
          @RabbitListener(queues = QUEUE_NAME2,concurrency = "1")
          public void queueX2(Message message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) {
              String msg = new String(message.getBody(),"UTF-8");
              System.out.println(QUEUE_NAME2+"收到消息:"+msg+":"+ message);
          }
      
      RabbitMq 消費失敗,重試機制
      
      自定義異常業務處理
      步驟1 設置自動確認,自定義表記錄失敗數據(包含業務數據、失敗次數)觸發定時任務重新投放該消息,執行3次,每次間隔30分鐘
      執行成功,刪除重發任務
      執行失敗,第一次失敗執行步驟1,否則記錄失敗次數,達到一定次數刪除定時任務
      
      channel.basicReject(deliveryTag, true);// 拒絕消息并重回隊列
      

       

        

      
      image
      

       

       安裝rabbitmq

      https://www.rabbitmq.com/release-information
      rabbitmq erlang CentOS
      
      tar -zxf otp_src_28.0.2.tar.gz
      修改 Centos 鏡像地址 參考 http://www.rzrgm.cn/xingminghui111/articles/17514510.html
      sudo yum install -y ncurses-devel
      sudo yum install -y gcc gcc-c++ openssl-devel libssh-devel unixODBC-devel
      ./configure && make && make install
      erl -version
      
      rpm -i --nodeps rabbitmq-server-4.0.9-1.el8.noarch.rpm
      
      service rabbitmq-server start
      service rabbitmq-server status 查看服務狀態
      
      配置管理員賬號:
      rabbitmqctl add_user admin admin
      rabbitmqctl set_user_tags admin administrator # 設置用戶為管理員
      # 授予所有權限
      rabbitmqctl set_permissions -p / admin ".*" ".*" ".*"
      
      # 啟用管理插件
      rabbitmq-plugins enable rabbitmq_management
      # 重啟服務使配置生效
      systemctl restart rabbitmq-server
      關閉防火墻  
      sudo systemctl stop firewalld
      sudo systemctl disable firewalld
      
      后臺管理 http://192.168.243.128:15672/
      

      # 停止服務   自啟動
      systemctl stop rabbitmq-server     sudo systemctl enable rabbitmq-server

       

        

       其他

      @Service
      public class OrderItemServiceImpl  {
          /**
           * https://blog.csdn.net/aetawt/article/details/128957417
           * 監聽方式獲取消息 消息只消費一次,其他消費者消費后,本消費者不再消費
           * @RabbitHandler:標注在方法上
           * @RabbitListener: 標注在類、方法上
           * 使用 @RabbitHandler +  @RabbitListener 接受不同類型的消息
           */
          @RabbitListener(queues = {MqConstant.SEARCH_DATA_QUEUE})
          public void recieveOrderMessage(Message message,  Channel channel) throws IOException {
              System.out.println("收到了消息了--->" + message + " ====》內容:" + new String(message.getBody()));
              System.out.println("渠道數量:" + channel.getChannelNumber());
      
              MessageProperties messageProperties = message.getMessageProperties();
              System.out.println("消息處理完成---------------------------------");
              //消息順序,自增
              long deliveryTag = messageProperties.getDeliveryTag();
              System.out.println(deliveryTag);
              //回復,簽收消息, fasle表示只簽收當前消息,true簽收所有
              channel.basicAck(deliveryTag, false);
          }
      
      
      /**
       * RabbitTemplate 拉取的方式獲取消息
       */
      @Component
      public class RabbitListenerConfig {
          @Autowired
          private RabbitTemplate rabbitTemplate;
          @PostConstruct
          public void receive(){
              Thread thread = new Thread(){
                public void run(){
                    while (true){
                        System.out.println("*************ready");
                        String message = (String)rabbitTemplate.receiveAndConvert(MqConstant.SEARCH_DATA_QUEUE,1000*2);
                        System.out.println("*************receive:"+message);
                    }
                }
              };
              thread.start();
          }
      
      
      自定義多線程處理
      @RabbitListener(queues = {MqConstant.DEAL_TOBE_HANDLE})//,containerFactory = "batchQueueRabbitListenerContainerFactory"
          @Bean("batchQueueRabbitListenerContainerFactory")
          public RabbitListenerContainerFactory<?> rabbitListenerContainerFactory(ConnectionFactory connectionFactory){
              SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
              factory.setMessageConverter(new Jackson2JsonMessageConverter());
              //確認方式,manual為手動ack.
              factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
              //每次處理數據數量,提高并發量
              //factory.setPrefetchCount(250);
              //設置線程數
              //factory.setConcurrentConsumers(30);
              //最大線程數
              //factory.setMaxConcurrentConsumers(50);
              /* setConnectionFactory:設置spring-amqp的ConnectionFactory。 */
              factory.setConnectionFactory(connectionFactory);
              factory.setConcurrentConsumers(1);
              factory.setPrefetchCount(1);
              //factory.setDefaultRequeueRejected(true);
              //使用自定義線程池來啟動消費者。
              factory.setTaskExecutor(taskExecutor());
              return factory;
          }
      
          @Bean("correctTaskExecutor")
          @Primary
          public TaskExecutor taskExecutor() {
              ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
              // 設置核心線程數
              executor.setCorePoolSize(100);
              // 設置最大線程數
              executor.setMaxPoolSize(100);
              // 設置隊列容量
              executor.setQueueCapacity(0);
              // 設置線程活躍時間(秒)
              executor.setKeepAliveSeconds(300);
              // 設置默認線程名稱
              executor.setThreadNamePrefix("thread-rabbitmq");
              // 設置拒絕策略rejection-policy:當pool已經達到max size的時候, 調用者執行
              executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
              // 等待所有任務結束后再關閉線程池
              executor.setWaitForTasksToCompleteOnShutdown(true);
              return executor;
          }
      

        

      posted @ 2023-08-24 17:29  XUMT111  閱讀(25)  評論(0)    收藏  舉報
      主站蜘蛛池模板: 摸丰满大乳奶水www免费| 强奷漂亮人妻系列老师| 中国熟女仑乱hd| 无码日韩精品一区二区三区免费| 熟女蜜臀av麻豆一区二区| 国产精品中文字幕综合| 国产精品视频午夜福利| 亚洲一区二区色情苍井空| 无码专区 人妻系列 在线| 亚洲va在线∨a天堂va欧美va| 国产在线自拍一区二区三区| 最新亚洲av日韩av二区| 梓潼县| 精品国产乱码久久久久app下载| 永久黄网站色视频免费直播| 日本公妇乱偷中文字幕| 男女一级国产片免费视频| 郧西县| 国产午夜福利免费入口| 国产精品福利自产拍在线观看 | 久久香蕉国产线看观看怡红院妓院| 狠狠爱五月丁香亚洲综| 亚洲自拍偷拍一区二区三区| 欧美喷潮最猛视频| 92国产精品午夜福利免费| 国产精品久久久久久人妻精品| 四川丰满少妇无套内谢| 精品一区二区三区在线观看l| 国产精品高清一区二区不卡 | 丁香五月激情图片| 成全影视大全在线观看| 国产偷国产偷亚洲清高网站| 精品国产中文字幕在线看| 亚洲理论在线A中文字幕| 色婷婷亚洲精品综合影院| 丁香五月网久久综合| 人人妻人人做人人爽夜欢视频| av永久免费网站在线观看| 鹤岗市| 玩弄放荡人妻少妇系列| 亚洲码欧洲码一二三四五|