<output id="qn6qe"></output>

    1. <output id="qn6qe"><tt id="qn6qe"></tt></output>
    2. <strike id="qn6qe"></strike>

      亚洲 日本 欧洲 欧美 视频,日韩中文字幕有码av,一本一道av中文字幕无码,国产线播放免费人成视频播放,人妻少妇偷人无码视频,日夜啪啪一区二区三区,国产尤物精品自在拍视频首页,久热这里只有精品12

      .NET Core中RabbitMQ初識

      .NET Core中RabbitMQ初識

      前提

      前段時間上班無事,上網沖浪看到了消息隊列RabbitMQ,就想著學習一下,網上看了點資料在嗶哩嗶哩上看的到codeman講的一個rabbitmq的視頻,就跟著仔細學習一下,敲一下代碼。視頻地址: rabbitmq視頻

      RabbitMq介紹

      什么是消息隊列

      MQ全稱為Message Queue,即消息隊列。“消息隊列”是在消息的傳輸過程中保存消息的容器。它是典型的:生產者、消費者模型。生產者不斷向消息隊列中生產消息,消費者不斷的從隊列中獲取消息。因為消息的生產和消費都是異步的,而且只關心消息的發送和接收,沒有業務邏輯的侵入,這樣就實現了生產者和消費者的解耦。

      image-20230713224815414

      應用場景

      削峰填谷

      在一個時間段很多用戶同時進行請求我們的A系統,我的MQ容器就可以用來存儲請求按照每秒多少的請求進行發送,減輕服務器的壓力。

      ? image-20230713224326738

      • 使用了MQ之后,限制消息消費的速度為3000,這樣一來,高峰就被“削”掉了,但是因為消息積壓,在高峰期過后一段時間內,消費消息的速度還是會維持在3000,直到消費完擠壓的消息,這就叫做“填谷”。

      • 使用MQ后,可以提供系統穩定性。

      image-20230713224647673

      異步提速

      • 在不使用MQ的情況下我們正常用戶通過訂單系統進行下單,我們需要900多ms,這就會出現用戶的體驗不好。

        image-20230713225203237

      • 在使用MQ的情況出現了總耗時只要25ms就給到了用戶回應

        這樣提升了用戶體驗感

        image-20230713225418594

      所有的問題當你解決一個問題就會出現另外的問題,外部依賴多系統的穩定性就越差,MQ但凡掛了,系統就會出問題,后面就會使用mq集群來解決這一問題。

      消息模型

      點對點模式

      image-20230713230146477

      image-20230714205804589

      在上圖的模型中,有以下概念:

      • Producer:生產者,也就是要發送消息的程序
      • Consumer:消費者:消息的接受者,會一直等待消息到來。
      • Queue:消息隊列。可以緩存消息;生產者向其中投遞消息,消費者從其中取出消息。
      • 點對點模式只會有一個消費者進行消費
      代碼附上

      image-20230713231805945

      新增兩個項目一個生產者 Z.RabbitMq.Producer,一個消費者Z.RabbitMQ.Consumer01

      1. 項目 Z.RabbitMq.Producer新增HelloProducer
      • public class HelloProducer
            {
                public static void HelloWorldShow()
                {
                    var factory = new ConnectionFactory();
                    factory.HostName = "127.0.0.1";
                    factory.Port = 5672;
                    factory.UserName = "admin";
                    factory.Password = "admin";
                    factory.VirtualHost = "my_vhost";
        
                    // 獲取TCP 長連接
                    using (var connection = factory.CreateConnection())
                    {
                        // 創建通信“通道”,相當于TCP中的虛擬連接
                        using (var channel = connection.CreateModel())
                        {
                            /*
                             * 創建隊列,聲明并創建一個隊列,如果隊列已存在,則使用這個隊列
                             * 第一個參數:隊列名稱ID
                             * 第二個參數:是否持久化,false對應不持久化數據,MQ停掉數據就會丟失
                             * 第三個參數:是否隊列私有化,false則代表所有的消費者都可以訪問,true代表只有第一次擁有它的消費者才能一直使用
                             * 第四個:是否自動刪除,false代表連接停掉后不自動刪除這個隊列
                             * 其他額外參數為null
                             */
                            channel.QueueDeclare(RabbitConstant.QUEUE_HELLO_WORLD, true, false, false, null);
                            Console.ForegroundColor = ConsoleColor.Red;
                            string message = "hello CodeMan 666";
                            var body = Encoding.UTF8.GetBytes(message);
        
                            /*
                             * exchange:交換機,暫時用不到,在進行發布訂閱時才會用到
                             * 路由key
                             * 額外的設置屬性
                             * 最后一個參數是要傳遞的消息字節數組
                             */
                            channel.BasicPublish("", RabbitConstant.QUEUE_HELLO_WORLD, null, body);
                            Console.WriteLine($"producer消息:{message}已發送");
                        }
                    }
                }
            }
        
      1. 項目 Z.RabbitMQ.Consumer01新增HelloConsumer
      • public class HelloConsumer
            {
                public static void HelloWorldShow()
                {
                    var factory = new ConnectionFactory();
                    factory.HostName = "127.0.0.1";
                    factory.Port = 5672;//5672是RabbitMQ默認的端口號
                    factory.UserName = "admin";
                    factory.Password = "admin";
                    factory.VirtualHost = "my_vhost";
        
                    using (var connection = factory.CreateConnection())
                    {
                        using (var channel = connection.CreateModel())
                        {
                            /*
                             * 創建隊列,聲明并創建一個隊列,如果隊列已存在,則使用這個隊列
                             * 第一個參數:隊列名稱ID
                             * 第二個參數:是否持久化,false對應不持久化數據,MQ停掉數據就會丟失
                             * 第三個參數:是否隊列私有化,false則代表所有的消費者都可以訪問,true代表只有第一次擁有它的消費者才能一直使用
                             * 第四個:是否自動刪除,false代表連接停掉后不自動刪除這個隊列
                             * 其他額外參數為null
                             */
                            //RabbitConstant.QUEUE_HELLO_WORLD 對應的生產者一樣名稱 "helloworld.queue"
                            channel.QueueDeclare(RabbitConstant.QUEUE_HELLO_WORLD, true, false, false, null);
                            Console.ForegroundColor = ConsoleColor.Cyan;
        
                            EventingBasicConsumer consumers = new EventingBasicConsumer(channel);
                            // 觸發事件
                            consumers.Received += (model, ea) =>
                            {
                                var body = ea.Body.ToArray();
                                var message = Encoding.UTF8.GetString(body);
        
                                // false只是確認簽收當前的消息,設置為true的時候則代表簽收該消費者所有未簽收的消息
                                channel.BasicAck(ea.DeliveryTag, false);
                                Console.WriteLine($"Consumer01接收消息:{message}");
                            };
                            /*
                             * 從MQ服務器中獲取數據
                             * 創建一個消息消費者
                             * 第一個參數:隊列名
                             * 第二個參數:是否自動確認收到消息,false代表手動確認消息,這是MQ推薦的做法
                             * 第三個參數:要傳入的IBasicConsumer接口
                             *
                             */
                            
                            //RabbitConstant.QUEUE_HELLO_WORLD ==  helloworld.queue
                            channel.BasicConsume(RabbitConstant.QUEUE_HELLO_WORLD, false, consumers);
                            Console.WriteLine("Press [Enter] to exit");
                            Console.Read();
                        }
                    }
                }
            }
        

      work消息模型

      工作隊列或者競爭消費者模式

      image-20230714205741999

      work queues與入門程序相比,多了一個消費端,兩個消費端共同消費同一個隊列中的消息,但是一個消息只能被一個消費者獲取。

      接下來我們來模擬這個流程:

      P:生產者:任務的發布者

      C1:消費者1:領取任務并且完成任務,假設完成速度較慢(模擬耗時)

      C2:消費者2:領取任務并且完成任務,假設完成速度較快

      代碼附上

      新增一個工具類用來獲取rabbitmq的連接信息

      public class RabbitUtils
      {
          public static ConnectionFactory GetConnection()
          {
              var factory = new ConnectionFactory();
              factory.HostName = "127.0.0.1";
              factory.Port = 5672;//5672是RabbitMQ默認的端口號
              factory.UserName = "admin";
              factory.Password = "admin";
              factory.VirtualHost = "my_vhost";
              return factory;
          }
      }
      
      • 消費者1(C1)在剛剛的 Z.RabbitMQ.Consumer01新增SmsReceive

        Program.cs中的main函數中進行調用 SmsReceive.Sender();

        消費者1 延遲30ms接受到信息

        public class SmsReceive
        {
          public static void Sender()
          {
              //使用工具類創建連接
              var connection = RabbitUtils.GetConnection().CreateConnection();
        
              var channel = connection.CreateModel();
        
              channel.QueueDeclare(RabbitConstant.QUEUE_SMS, true, false, false, null);
              // 如果不寫basicQos(1),則自動MQ會將所有請求平均發送給所有消費者
              // basicQos,MQ不再對消費者一次發送多個請求,而是消費者處理完一個消息后(確認后),在從隊列中獲取一個新的
              channel.BasicQos(0, 1, false);
        
              var consumer = new EventingBasicConsumer(channel);
        
              consumer.Received += (model, ea) =>
              {
                  var body = ea.Body.ToArray();
                  var message = Encoding.UTF8.GetString(body);
                  Thread.Sleep(30);
                  Console.WriteLine($"SmsSender-發送短信成功:{message}");
                  channel.BasicAck(ea.DeliveryTag, false);
              };
        
              channel.BasicConsume(RabbitConstant.QUEUE_SMS, false, consumer);
              Console.WriteLine("Press [Enter] to exit");
              Console.Read();
          }
        }
        
      • 消費者2(C2)在剛剛的 Z.RabbitMQ.Consumer02新增SmsReceive

        image-20230714210550189

        消費者1 延遲60ms接受到信息

        public class SmsReceive
        {
            public static void Sender()
            {
                var connection = RabbitUtils.GetConnection().CreateConnection();
                var channel = connection.CreateModel();
        
                channel.QueueDeclare(RabbitConstant.QUEUE_SMS, true, false, false, null);
                // 如果不寫basicQos(1),則自動MQ會將所有請求平均發送給所有消費者
                // basicQos,MQ不再對消費者一次發送多個請求,而是消費者處理完一個消息后(確認后),在從隊列中獲取一個新的
                channel.BasicQos(0, 1, false);//處理完一個取一個
        
                var consumer = new EventingBasicConsumer(channel);
        
                consumer.Received += (model, ea) =>
                {
                    var body = ea.Body.ToArray();
                    var message = Encoding.UTF8.GetString(body);
                    Thread.Sleep(60);
                    Console.WriteLine($"SmsSender-發送短信成功:{message}");
                    channel.BasicAck(ea.DeliveryTag, false);
                };
        
                channel.BasicConsume(RabbitConstant.QUEUE_SMS, false, consumer);
                Console.WriteLine("Press [Enter] to exit");
                Console.Read();
            }
        }
        
      • 生產者Z.RabbitMq.Producer中創建SmsSender類在main函數進行調用

        • 發送100條車票訂閱的消息
        public class SmsSender
        {
            public static void Sender()
            {
                using (var connection = RabbitUtils.GetConnection().CreateConnection())
                {
                    using (var channel = connection.CreateModel())
                    {
                        channel.QueueDeclare(RabbitConstant.QUEUE_SMS, true, false, false, null);
                        for (int i = 0; i < 100; i++)
                        {
                            Sms sms = new Sms("乘客" + i, "139000000" + i, "您的車票已預定成功");
                            string jsonSms = JsonConvert.SerializeObject(sms);
                            var body = Encoding.UTF8.GetBytes(jsonSms);
                            channel.BasicPublish("", RabbitConstant.QUEUE_SMS, null, body);
                            Console.WriteLine($"正在發送內容:{jsonSms}");
                        }
                        Console.WriteLine("發送數據成功");
                    }
                }
            }
        }
        

      運行結構如下

      image-20230714211516566

      能者多勞
      • 消費者1比消費者2的效率要快,一次任務的耗時較短
      • 消費者2大量時間處于空閑狀態,消費者1一直忙碌

      通過channel.BasicAck(ea.DeliveryTag, false);來完成能者多勞的效果,在完成上一次請求之后再去取下一條消息,這就會出現服務器快的消費的更多,慢的消費的更少。

      發布訂閱模式

      Publish/subscribe(交換機類型:Fanout,也稱為廣播 )

      image-20230714204841106

      image-20230714212152060

      和前面兩種模式不同:

      • 聲明Exchange,不再聲明Queue
      • 發送消息到Exchange,不再發送到Queue,通過exchange發送到queue上
      消費者1收到的天氣

      項目.RabbitMq.Consumer01 創建WeatherFanout使用exchange(交換機)

      public class WeatherFanout
      {
          public static void Weather()
          {
              using (var connection = RabbitUtils.GetConnection().CreateConnection())
              {
                  using (var channel = connection.CreateModel())
                  {
                      channel.ExchangeDeclare(RabbitConstant.EXCHANGE_WEATHER, ExchangeType.Fanout);
                      // 聲明隊列信息
                      channel.QueueDeclare(RabbitConstant.QUEUE_BAIDU, true, false, false, null);
                      /*
                               * queueBind 用于將隊列與交換機綁定
                               * 參數1:隊列名
                               * 參數2:交換機名
                               * 參數3:路由Key(暫時用不到)
                               */
                      channel.QueueBind(RabbitConstant.QUEUE_BAIDU, RabbitConstant.EXCHANGE_WEATHER, "");
      
                      channel.BasicQos(0, 1, false);
      
                      var consumer = new EventingBasicConsumer(channel);
      
                      consumer.Received += ((model, ea) =>
                                            {
                                                var message = Encoding.UTF8.GetString(ea.Body.ToArray());
                                                Console.WriteLine($"百度收到的氣象信息:{message}");
                                                channel.BasicAck(ea.DeliveryTag, false);
                                            });
      
                      channel.BasicConsume(RabbitConstant.QUEUE_BAIDU, false, consumer);
                      Console.WriteLine("Press [Enter] to exit");
                      Console.Read();
                  }
              }
          }
      }
      
      消費者2收到的天氣

      項目.RabbitMq.Consumer02 創建WeatherFanout使用exchange(交換機)

      代碼與消費者01一樣

      生產者發送天氣

      生產者把消息推送到交換機上

      public class WeatherFanout
      {
          public static void Weather()
          {
              using (var connection = RabbitUtils.GetConnection().CreateConnection())
              {
                  using (var channel = connection.CreateModel())
                  {
                      string message = "20度";
                      var body = Encoding.UTF8.GetBytes(message);
                      channel.BasicPublish(RabbitConstant.EXCHANGE_WEATHER, "", null, body);
                      Console.WriteLine("天氣信息發送成功!");
                  }
              }
          }
      }
      

      最后得到效果

      image-20230714213059332

      Routing 路由模型

      image-20230714213633197

      P:生產者,向Exchange發送消息,發送消息時,會指定一個routing key。

      X:Exchange(交換機),接收生產者的消息,然后把消息遞交給 與routing key完全匹配的隊列

      C1:消費者,其所在隊列指定了需要routing key 為 error 的消息

      C2:消費者,其所在隊列指定了需要routing key 為 info、error、warning 的消息

      • 隊列與交換機的綁定,不能是任意綁定,而是要指定一個RoutingKey

      • 消息的發送方在向Exchange發送消息時,也必須指定消息的RoutingKey

      • Exchange不再把消息交給每一個綁定的隊列,而是根據消息的RoutingKey進行判斷,只有隊列的RoutingKey與消息的RoutingKey完全一致,才會接收消息

      生產者
       public class WeatherDirect
       {
           public static void Weather()
           {
               Dictionary<string, string> area = new Dictionary<string, string>();
               area.Add("china.hunan.changsha.20210525", "中國湖南長沙20210525天氣數據");
               area.Add("china.hubei.wuhan.20210525", "中國湖北武漢20210525天氣數據");
               area.Add("china.hubei.xiangyang.20210525", "中國湖北襄陽20210525天氣數據");
               area.Add("us.cal.lsj.20210525", "美國加州洛杉磯20210525天氣數據");
      
               using (var connection = RabbitUtils.GetConnection().CreateConnection())
               {
                   using (var channel = connection.CreateModel())
                   {
                       foreach (var item in area)
                       {
                           channel.BasicPublish(RabbitConstant.EXCHANGE_WEATHER_ROUTING, item.Key,
                                                null, Encoding.UTF8.GetBytes(item.Value));
                       }
      
                       Console.WriteLine("氣象信息發送成功!");
                   }
               }
           }
       }
      
      消費者1

      接受百度路由的路由消息

      public class WeatherDirect
      {
          public static void Weather()
          {
              using (var connection = RabbitUtils.GetConnection().CreateConnection())
              {
                  using (var channel = connection.CreateModel())
                  {
                      channel.ExchangeDeclare(RabbitConstant.EXCHANGE_WEATHER_ROUTING, ExchangeType.Direct);
                      channel.QueueDeclare(RabbitConstant.QUEUE_BAIDU, true, false, false, null);
                      /*
                          * queueBind 用于將隊列與交換機綁定
                          * 參數1:隊列名
                          * 參數2:交換機名
                          * 參數3:路由Key(暫時用不到)
                          */
                      channel.QueueBind(RabbitConstant.QUEUE_BAIDU, RabbitConstant.EXCHANGE_WEATHER_ROUTING, "china.hunan.changsha.20210525");
                      channel.QueueBind(RabbitConstant.QUEUE_BAIDU, RabbitConstant.EXCHANGE_WEATHER_ROUTING, "china.hubei.wuhan.20210525");
      
                      channel.BasicQos(0, 1, false);
      
                      var consumer = new EventingBasicConsumer(channel);
      
                      consumer.Received += ((model, ea) =>
                                            {
                                                var message = Encoding.UTF8.GetString(ea.Body.ToArray());
                                                Console.WriteLine($"百度收到的氣象信息:{message}");
                                                channel.BasicAck(ea.DeliveryTag, false);
                                            });
      
                      channel.BasicConsume(RabbitConstant.QUEUE_BAIDU, false, consumer);
                      Console.WriteLine("Press [Enter] to exit");
                      Console.Read();
                  }
              }
          }
      }
      
      消費者2

      接受新浪的路由信息

      public class WeatherDirect
      {
          public static void Weather()
          {
              using (var connection = RabbitUtils.GetConnection().CreateConnection())
              {
                  using (var channel = connection.CreateModel())
                  {
                      channel.ExchangeDeclare(RabbitConstant.EXCHANGE_WEATHER_ROUTING, ExchangeType.Direct);
                      // 聲明隊列信息
                      channel.QueueDeclare(RabbitConstant.QUEUE_SINA, true, false, false, null);
                      /*
                           * queueBind 用于將隊列與交換機綁定
                           * 參數1:隊列名
                           * 參數2:交換機名
                           * 參數3:路由Key
                           */
                      channel.QueueBind(RabbitConstant.QUEUE_SINA, RabbitConstant.EXCHANGE_WEATHER_ROUTING, "china.hubei.xiangyang.20210525");
                      channel.QueueBind(RabbitConstant.QUEUE_SINA, RabbitConstant.EXCHANGE_WEATHER_ROUTING, "us.cal.lsj.20210525");
                      channel.QueueBind(RabbitConstant.QUEUE_SINA, RabbitConstant.EXCHANGE_WEATHER_ROUTING, "china.hubei.wuhan.20210525");
      
                      channel.BasicQos(0, 1, false);
      
                      var consumer = new EventingBasicConsumer(channel);
      
                      consumer.Received += ((model, ea) =>
                                            {
                                                var message = Encoding.UTF8.GetString(ea.Body.ToArray());
                                                Console.WriteLine($"新浪收到的氣象信息:{message}");
                                                channel.BasicAck(ea.DeliveryTag, false);
                                            });
      
                      channel.BasicConsume(RabbitConstant.QUEUE_SINA, false, consumer);
                      Console.WriteLine("Press [Enter] to exit");
                      Console.Read();
                  }
              }
          }
      }
      
      最后得到的效果
      • 新浪接收對應新浪的routingkey的信息
      • 百度接收對應百度的routingkey的信息

      image-20230714214750549

      Topics 通配符模式

      image-20230714215026047

      routingkey支持通配符匹配格式
      • 通配符格式
        • Topic類型與Direct相比,都是可以根據RoutingKey把消息路由到不同的隊列。只不過
        • Topic類型Exchange可以讓隊列在綁定RoutingKey的時候使用通配符
        • RoutingKey一般都是由一個或多個單詞組成,多個單詞之間以“.”分隔,例如:item.insert
        • 通配符規則:#匹配一個或多個詞,*恰好匹配一個詞,例如item.#能夠匹配item.insert.user或者item.insert,item.只能匹配item.insert或者item.user
      生產者
      public class WeatherTopic
      {
          public static void Weather()
          {
              Dictionary<string, string> area = new Dictionary<string, string>();
              area.Add("china.hunan.changsha.20210525", "中國湖南長沙20210525天氣數據");
              area.Add("china.hubei.wuhan.20210525", "中國湖北武漢20210525天氣數據");
              area.Add("china.hubei.xiangyang.20210525", "中國湖北襄陽20210525天氣數據");
              area.Add("us.cal.lsj.20210525", "美國加州洛杉磯20210525天氣數據");
      
              using (var connection = RabbitUtils.GetConnection().CreateConnection())
              {
                  using (var channel = connection.CreateModel())
                  {
                      foreach (var item in area)
                      {
                          channel.BasicPublish(RabbitConstant.EXCHANGE_WEATHER_TOPIC, item.Key,
                                               null, Encoding.UTF8.GetBytes(item.Value));
                      }
      
                      Console.WriteLine("氣象信息發送成功!");
                  }
              }
          }
      }
      
      消費者1

      獲取交換機中通配符為china.#的信息

      • ("china.hunan.changsha.20210525", "中國湖南長沙20210525天氣數據");
      • ("china.hubei.wuhan.20210525", "中國湖北武漢20210525天氣數據");
      • ("china.hubei.xiangyang.20210525", "中國湖北襄陽20210525天氣數據");
      public class WeatherTopic
      {
          public static void Weather()
          {
              using (var connection = RabbitUtils.GetConnection().CreateConnection())
              {
                  using (var channel = connection.CreateModel())
                  {
                      channel.ExchangeDeclare(RabbitConstant.EXCHANGE_WEATHER_TOPIC, ExchangeType.Topic);
                      // 聲明隊列信息
                      channel.QueueDeclare(RabbitConstant.QUEUE_BAIDU, true, false, false, null);
                      /*
                           * queueBind 用于將隊列與交換機綁定
                           * 參數1:隊列名
                           * 參數2:交換機名
                           * 參數3:路由Key(暫時用不到)
                           */
                      channel.QueueBind(RabbitConstant.QUEUE_BAIDU, RabbitConstant.EXCHANGE_WEATHER_TOPIC, "china.#");
      
                      channel.BasicQos(0, 1, false);
      
                      var consumer = new EventingBasicConsumer(channel);
      
                      consumer.Received += ((model, ea) =>
                                            {
                                                var message = Encoding.UTF8.GetString(ea.Body.ToArray());
                                                Console.WriteLine($"百度收到的氣象信息:{message}");
                                                channel.BasicAck(ea.DeliveryTag, false);
                                            });
      
                      channel.BasicConsume(RabbitConstant.QUEUE_BAIDU, false, consumer);
                      Console.WriteLine("Press [Enter] to exit");
                      Console.Read();
                  }
              }
          }
      }
      
      消費者2

      獲取交換機中通配符為china.hubei.*.20210525的信息

      • ("china.hubei.wuhan.20210525", "中國湖北武漢20210525天氣數據")
      • ("china.hubei.xiangyang.20210525", "中國湖北襄陽20210525天氣數據")
      public class WeatherTopic
      {
          public static void Weather()
          {
              using (var connection = RabbitUtils.GetConnection().CreateConnection())
              {
                  using (var channel = connection.CreateModel())
                  {
                      /*
                           * 生產者發送消息
                           * 隊列名稱
                           * 交換機名稱
                           * 路由key
                           *
                           */
                      channel.ExchangeDeclare(RabbitConstant.EXCHANGE_WEATHER_TOPIC, ExchangeType.Topic);
                      // 聲明隊列信息
                      channel.QueueDeclare(RabbitConstant.QUEUE_SINA, true, false, false, null);
                      /*
                           * queueBind 用于將隊列與交換機綁定
                           * 參數1:隊列名
                           * 參數2:交換機名
                           * 參數3:路由Key(暫時用不到)
                           */
                      channel.QueueBind(RabbitConstant.QUEUE_SINA, RabbitConstant.EXCHANGE_WEATHER_TOPIC, "china.hubei.*.20210525");
      
                      channel.BasicQos(0, 1, false);
      
                      var consumer = new EventingBasicConsumer(channel);
      
                      consumer.Received += ((model, ea) =>
                                            {
                                                var message = Encoding.UTF8.GetString(ea.Body.ToArray());
                                                Console.WriteLine($"新浪收到的氣象信息:{message}");
                                                channel.BasicAck(ea.DeliveryTag, false);
                                            });
      
                      channel.BasicConsume(RabbitConstant.QUEUE_SINA, false, consumer);
                      Console.WriteLine("Press [Enter] to exit");
                      Console.Read();
                  }
              }
          }
      }
      
      最后得到的效果
      • 百度獲取china.#的信息
      • 新浪獲取china.hubei.*.20210525的信息

      image-20230714220155754

      RPC

      image-20230714220336915

      基本概念:
      • Callback queue 回調隊列,客戶端向服務器發送請求,服務器端處理請求后,將其處理結果保存在一個存儲體中。而客戶端為了獲得處理結果,那么客戶在向服務器發送請求時,同時發送一個回調隊列地址reply_to。

      • Correlation id 關聯標識,客戶端可能會發送多個請求給服務器,當服務器處理完后,客戶端無法辨別在回調隊列中的響應具體和那個請求時對應的。為了處理這種情況,客戶端在發送每個請求時,同時會附帶一個獨有correlation_id屬性,這樣客戶端在回調隊列中根據correlation_id字段的值就可以分辨此響應屬于哪個請求。

      流程說明:
      • 當客戶端啟動的時候,它創建一個匿名獨享的回調隊列。
      • 在 RPC 請求中,客戶端發送帶有兩個屬性的消息:一個是設置回調隊列的 reply_to 屬性,另一個是設置唯一值的 correlation_id 屬性。
      • 將請求發送到一個 rpc_queue 隊列中。
      • 服務器等待請求發送到這個隊列中來。當請求出現的時候,它執行他的工作并且將帶有執行結果的消息發送給 reply_to 字段指定的隊列。
      • 客戶端等待回調隊列里的數據。當有消息出現的時候,它會檢查 correlation_id 屬性。如果此屬性的值與請求匹配,將它返回給應用

      分享幾題面試題

      RabbitMQ中消息可能有的幾種狀態?

      1. alpha: 消息內容(包括消息體、屬性和 headers) 和消息索引都存儲在內存中 。

        1. beta: 消息內容保存在磁盤中,消息索引保存在內存中。
        2. gamma: 消息內容保存在磁盤中,消息索引在磁盤和內存中都有 。
        3. delta: 消息內容和索引都在磁盤中 。
      2. 死信隊列?

        DLX,全稱為 Dead-Letter-Exchange,死信交換器,死信郵箱。當消息在一個隊列中變成死信 (dead message) 之后,它能被重新被發送到另一個交換器中,這個交換器就是 DLX,綁定 DLX 的隊列就稱之 為死信隊列。

      3. 導致的死信的幾種原因?

        1. 消息被拒(Basic.Reject /Basic.Nack) 且 requeue = false。
        2. 消息TTL過期。
        3. 隊列滿了

      到這里就結束,大家如果需要看視頻學習就是點最上面的鏈接就行了

      源碼:github

      posted @ 2023-07-14 22:41  做夢的努力者  閱讀(429)  評論(3)    收藏  舉報
      主站蜘蛛池模板: 性做久久久久久久久| 人妻少妇精品视频专区| 午夜av高清在线观看| 精品久久久久久无码专区不卡| 亚洲精品久久一区二区三区四区| 国产综合精品一区二区在线| 国产99视频精品免费视频36| 亚洲国产一区二区三区久| 国产精品一区在线蜜臀| 四虎亚洲精品高清在线观看| 亚洲精品日韩久久精品| 久久精品国产一区二区蜜芽| 思思99热精品在线| 性一交一乱一乱一视频| 九九热在线这里只有精品| 欧美不卡无线在线一二三区观| 亚洲av成人一区在线| 中文字幕一区有码视三区| 精品国产美女福到在线不卡| 亚洲av无码专区在线亚| 亚洲精品无码乱码成人| 99精品国产一区在线看| 日本一区二区三区专线| 国产精品一区二区国产主播| 亚洲乱码一二三四区| 337p粉嫩大胆噜噜噜| 国产精品一码二码三码四码| 精品国产一区二区三区av性色| 免费看欧美全黄成人片| 少妇被粗大的猛烈进出| 久久香蕉国产线看观看猫咪av| 亚洲欧美精品一中文字幕| 欧美人成精品网站播放| 久久人人97超碰人人澡爱香蕉| 久久综合97丁香色香蕉| 久久天堂无码av网站| 两个人的视频www免费| 国产精品国产精品偷麻豆| 人妻饥渴偷公乱中文字幕| 午夜福利国产片在线视频| 老司机精品影院一区二区三区 |