<output id="qn6qe"></output>

    1. <output id="qn6qe"><tt id="qn6qe"></tt></output>
    2. <strike id="qn6qe"></strike>

      亚洲 日本 欧洲 欧美 视频,日韩中文字幕有码av,一本一道av中文字幕无码,国产线播放免费人成视频播放,人妻少妇偷人无码视频,日夜啪啪一区二区三区,国产尤物精品自在拍视频首页,久热这里只有精品12

      哎呀,我老大寫Bug啦——記一次MessageQueue的優化

        MessageQueue,顧名思義消息隊列,在系統開發中也是用的比較多的一個中間件吧。我們這里主要用它來做日志管理和訂單管理的,記得老老大(恩,是的,就是老老大,因為他已經跳槽了)還在的時候,當時也是為了趕項目進度,他也參與開發了,那時候我才剛剛入職,他負責寫后端這塊,我來了就把他手上的任務接過來了,(接著接著……就辭職了)。

      之后我們的開發仍然有條不紊的開發著,直到今年的一月份吧,才上線開始運行,然后就出現了常規狀態,上線之后就開始爆炸,

                                                                                           

      這個頁面打不開呀,那個內容沒東西呀,第三方登錄問題呀,支付問題呀,臨時再改需求呀……(該來的都來了),加班、debug、測試、再debug……,然后經過幾天的修復,終于完成了跟自己電腦一樣穩定的運行,組員們都美滋滋的,今晚加個雞腿才行。

                                                                                          

      都說禍不單行,古人是不會騙我們的,Bug怎么會修得完呢?天真,要是Bug能修得完還要我們來干啥,好景不長,果然,過了一周之后,組員突然群里叫喳喳,

      what is it ? 

       

       

      來了,今天的主角登場了,我也要開始加班了。

      RabbitMQ

        這個是今天要說的東西,基礎概念什么的不是今天要說的重點,重點是:

       

      RabbitMQ內存使得整個服務器瀕臨癱瘓,遠程登錄服務器都差點擠不進去的狀態,別看截圖目前才1.3G,吃個午飯回來,就2.3G了,可怕不可怕?咋回事?

      老板喊你回來加班啦

        先不管了,線上優先解決,手動先Reset回收資源以釋放空間,這個只是臨時的辦法,然后檢查一下rabbitMQ的配置有沒有問題,路徑在

       C:\Users\Administrator\AppData\Roaming\RabbitMQ 

      完全是默認的配置,完全ojbk啊,那到底咋回事?繼續檢查,想想不如從項目開始吧,然后查看項目中的代碼,都是從來自【MessageLib】的組件調用

      好了,叫我老老大要這個組件的代碼,他把git的地址就發給我,我把項目down下來,

      這個封裝的組件內容不多,主要的文件一目了然,其實就是用到這個兩個組件來進行的二次封裝來調用

      主要的代碼是在【MessageQueue.cs】文件里,展示一下當時的代碼情況:

      using System;
      using System.Collections.Generic;
      using System.Linq;
      using System.Text;
      using System.Threading.Tasks;
      using MessageLib.ClassBean;
      using EasyNetQ;
      using System.Threading;
      
      namespace MessageLib
      {
          public static class MessageQueue
          {
              public static IBus bus = MQBusBuilder.CreateMessageBus();
              //消息隊列
              private static Queue<Item> NoticQueue = new Queue<Item>(5000);
              //日志隊列
              private static Queue<Item> LogQueue = new Queue<Item>(5000);
              //隊列數目發布數量
              private static int max_count_to_pulish = 1000;
      
              /// <summary>
              /// 可供外部使用的消息入列操作
              /// </summary>
              public static void push(Item item)
              {
                  if (item.type == ItemType.notic)
                  {
                      NoticQueue.Enqueue(item);
                  }
      
                  if (item.type == ItemType.log)
                  {
                      LogQueue.Enqueue(item);
                  }
              }
      
              /// <summary>
              /// 監聽后需要調用的發布接口
              /// </summary>
              private static void Pulish(object source, System.Timers.ElapsedEventArgs e)
              {
                  if (NoticQueue.Count > 0 || LogQueue.Count > 0)
                  {
                      if (bus == null || !bus.IsConnected)
                      {
                          bus = MQBusBuilder.CreateMessageBus();
                      }
      
                      if (bus.IsConnected)
                      {
                          Send(ItemType.notic);
                          Send(ItemType.log);
                      }
                  }
              }
      
              /// <summary>
              /// 程序自運行并開始監聽
              /// </summary>
              public static void Run()
              {
                  System.Timers.Timer timer = new System.Timers.Timer();
                  timer.Interval = 1000;
                  timer.Elapsed += new System.Timers.ElapsedEventHandler(Pulish);//到達時間的時候執行事件;    
                  timer.AutoReset = true;//設置是執行一次(false)還是一直執行(true);    
                  timer.Enabled = true;//是否執行System.Timers.Timer.Elapsed事件;    
              }
      
              /// <summary>
              /// 啟動線程異步調用
              /// </summary>
              /// <param name="channelType"></param>
              private static void Send(string channelType)
              {
                  Thread thread = new Thread(new ParameterizedThreadStart(PublishAction));
                  thread.IsBackground = true;
                  thread.Start(channelType);
              }
      
              /// <summary>
              /// 調用發布日志及提醒兩個接口
              /// </summary>
              /// <param name="channel"></param>
              private static void PublishAction(object channel)
              {
                  PublisLog();
                  PublisNotic();
              }
      
              /// <summary>
              /// 日志消息發送至RabbitMQ指定exchange、Queue
              /// </summary>
              private static void PublisLog()
              {
                  string channelName = ItemType.log;
                  try
                  {
                      var routingKey = channelName;
                      var mqqueue = bus.Advanced.QueueDeclare(string.Format("Queue.{0}", channelName));
                      var exchange = bus.Advanced.ExchangeDeclare(string.Format("Exchange.{0}",channelName), "direct");
                      var binding = bus.Advanced.Bind(exchange, mqqueue, routingKey);
                      while (LogQueue.Count > 0)
                      {
                          Item item = LogQueue.Dequeue();
                          if (item != null)
                          {
                              var properties = new MessageProperties();
                              var Message = new Message<string>(Newtonsoft.Json.JsonConvert.SerializeObject(item));
                              Message.Properties.AppId = item.appid;
                              bus.Advanced.Publish(exchange, routingKey, false, Message);
                          }
      
                      }
                  }
                  catch (Exception ex)
                  {
                      throw ex;
                  }
              }
      
              /// <summary>
              /// 提醒消息發送至RabbitMQ指定exchange、Queue
              /// </summary>
              private static void PublisNotic()
              {
                  string channelName = ItemType.notic;
                  var routingKey = channelName;
                  var mqqueue = bus.Advanced.QueueDeclare(string.Format("Queue.{0}", channelName));
                  var exchange = bus.Advanced.ExchangeDeclare(string.Format("Exchange.{0}", channelName), "direct");
                  var binding = bus.Advanced.Bind(exchange, mqqueue, routingKey);
                  while(NoticQueue.Count > 0)
                  {
                      Item item = NoticQueue.Dequeue();
                      if (item != null)
                      {
                          var properties = new MessageProperties();
                          var Message = new Message<string>(Newtonsoft.Json.JsonConvert.SerializeObject(item));
                          Message.Properties.AppId = item.appid;
                          bus.Advanced.Publish(exchange, routingKey, false, Message);
                      }
                  }
              }
          }
      }
      View Code

      然后我就發現了這一段代碼!

              /// <summary>
              /// 程序自運行并開始監聽
              /// </summary>
              public static void Run()
              {
                  System.Timers.Timer timer = new System.Timers.Timer();
                  timer.Interval = 1000;
                  timer.Elapsed += new System.Timers.ElapsedEventHandler(Pulish);//到達時間的時候執行事件;    
                  timer.AutoReset = true;//設置是執行一次(false)還是一直執行(true);    
                  timer.Enabled = true;//是否執行System.Timers.Timer.Elapsed事件;    
              }
              /// <summary>
              /// 啟動線程異步調用
              /// </summary>
              /// <param name="channelType"></param>
              private static void Send(string channelType)
              {
                  Thread thread = new Thread(new ParameterizedThreadStart(PublishAction));
                  thread.IsBackground = true;
                  thread.Start(channelType);
              }

        老老大寫Bug了,當Run()起來之后,隊列中【NoticQueue】有內容,就開始推送消息,發送消息Send(),每來一次推送new一個線程并設置為后臺線程,然后發送消息。好了,明白了,這里的線程很混亂,因為線程操作不當,new了N多個頻道,并且沒有主動回收,這也難怪內存暴漲呢。并且要是Run()調用多次,后果更加不堪設想。

      加班改起來

        開始動手吧,業務主要推送有普通消息、錯誤消息和通知消息,那么將隊列與線程組裝一起,新增一個類QueueTask.cs:

      using System;
      using System.Collections.Generic;
      using System.Linq;
      using System.Text;
      using System.Threading;
      using System.Threading.Tasks;
      using MessageLib.Core;
      using MessageLib.Core.ClassBean;
      using EasyNetQ;
      using EasyNetQ.Topology;
      using System.Linq.Expressions;
      
      namespace MessageLib.Core
      {
          public class QueueTask
          {
              private Queue<Item> QueueData = new Queue<Item>(5000);
              //隊列數目發布數量
              private int max_count_to_pulish = 1000;
              public  bool isRunning = false;
              private string itemType = ItemType.info;
              private string MessageRouter = ItemType.info;
      
              public QueueTask(string itemType,string MessageRouter)
              {
                  this.itemType = itemType;
                  this.MessageRouter = MessageRouter;
              }
      
              /// <summary>
              /// 可供外部使用的消息入列操作
              /// </summary>
              public void Push(Item item, IBus IBus)
              {
                  QueueData.Enqueue(item);
                  if (!isRunning)
                      Run(IBus);
              }
      
              public void Run(IBus IBus)
              {
                  if (!isRunning)
                  {
                      Timer timerNotic = new Timer(PulishMsg, IBus, 1000, 1000);
                      isRunning = true;
                  }
              }
      
              private void PulishMsg(object state)
              {
                  IBus IBus = state as IBus;
                  if (QueueData.Count > 0)
                  {
                      PublisMsg(itemType, IBus);
                  }
              }
      
              private void PublisMsg(object channel, IBus BusInstance)
              {
                  try
                  {
                      string channelName = channel as string;
                      if (QueueData.Count > 0)
                      {
                          var mqqueue = BusInstance.Advanced.QueueDeclare(string.Format("Queue.{0}", channelName));
                          var exchange = BusInstance.Advanced.ExchangeDeclare(string.Format("Exchange.{0}", channelName), ExchangeType.Direct);
                          var binding = BusInstance.Advanced.Bind(exchange, mqqueue, mqqueue.Name);
      
                          while (QueueData.Count > 0)
                          {
                              Item item = QueueData.Dequeue();
                              if (item != null)
                              {
                                  var properties = new MessageProperties();
                                  var Message = new Message<string>(Newtonsoft.Json.JsonConvert.SerializeObject(item));
                                  Message.Properties.AppId = item.appid;
                                  BusInstance.Advanced.Publish(exchange, mqqueue.Name, false, Message);
                              }
                          }
                      }
                  }
                  catch (Exception ex)
                  {
                      Console.WriteLine("PublisMsg error:" + ex.Message);
                  }
              } 
      
              public void Read<T>(IBus BusInstance,Action<Item> dealAction) where T : Item
              {
                  try
                  {
                      string channelName = itemType;
                      var mqqueue = BusInstance.Advanced.QueueDeclare(string.Format("Queue.{0}", channelName));
                      var exchange = BusInstance.Advanced.ExchangeDeclare(string.Format("Exchange.{0}", channelName), ExchangeType.Direct);
                      var binding = BusInstance.Advanced.Bind(exchange, mqqueue, mqqueue.Name);
      
                      var Consume = BusInstance.Advanced.Consume(mqqueue, registration => Task.Run(() =>
                      {
                          registration.Add<string>((message, info) => 
                          {
                              Item data = Newtonsoft.Json.JsonConvert.DeserializeObject<T>(message.Body);
                              dealAction(data);
                          });
                      }));
                  }
                  catch (Exception ex)
                  {
                      Console.WriteLine("Read error:" + ex.Message);
                  }
              }
          }
      }

       

      然后,在MessageQueue.cs修改為單例模式:

          public static class MessageQueue
          {
              /*Install-Package EasyNetQ-dotnet-core -Version 2.0.2-radicalgeek-netc0001 -Pre*/
      
              private static IBus bus = null;
              public static bool isRunning = false;
      
              //消息隊列
              private static QueueTask NoticQueue = null;
              //日志隊列
              private static QueueTask LogQueue = null;
              //自定義
              private static QueueTask InfoQueue = null;
      
              #region 同步鎖
              private static readonly object obj = new object();
              #endregion
      
              public static void Init(string Connection, string routeKey)
              {
                  if (NoticQueue == null)
                      NoticQueue = new QueueTask(ItemType.notic, ItemType.notic);
                  if (LogQueue == null)
                      LogQueue = new QueueTask(ItemType.error, ItemType.error);
                  if (InfoQueue == null)
                      InfoQueue = new QueueTask(ItemType.info, routeKey);
                  if (string.IsNullOrEmpty(MQBusBuilder.Connnection))
                      MQBusBuilder.Connnection = Connection;
              }
      
              public static IBus BusInstance
              {
                  get
                  {
                      if (bus == null)
                      {
                          lock (obj)
                          {
                              if (bus == null|| !bus.IsConnected)
                              {
                                  bus = MQBusBuilder.CreateMessageBus();
                              }
                          }
                      }
                      return bus;
                  }
              }
      
      
              /// <summary>
              /// 可供外部使用的消息入列操作
              /// </summary>
              public static void PushAndRun(Item item)
              {
                  if (string.IsNullOrWhiteSpace(MQBusBuilder.Connnection) || BusInstance == null)
                      return;
                  if (item.type == ItemType.notic)
                  {
                      NoticQueue.Push(item, BusInstance);
                  }
                  if (item.type == ItemType.error)
                  {
                      LogQueue.Push(item, BusInstance);
                  }
                  if (item.type == ItemType.info)
                  {
                      InfoQueue.Push(item, BusInstance);
                  }
              }
      
              public static void Read(string itemType, Action<Item> dealAction)
              {
                  if (itemType == ItemType.notic)
                  {
                      NoticQueue.Read<NoticItem>(BusInstance, dealAction);
                  }
                  if (itemType == ItemType.error)
                  {
                      LogQueue.Read<ErrorItem>(BusInstance, dealAction);
                  }
                  if (itemType == ItemType.info)
                  {
                      InfoQueue.Read<Message>(BusInstance, dealAction);
                  }
              }
          }
      View Code

      每次推送消息的時候,每個QueueTask就自己維護自己的線程和隊列了,當調用推送之后,就開始運作起來。恩,應該沒問題了。然后就發布nuget,再更新項目,然后發布。觀察一段時間,恩,完美。

       

      事件二

        事情過后,B端開始搞起來了,然后涉及到訂單系統,跟老大(不是老老大,老老大那時候已經跑了)商量之后確定使用消息隊列來做訂單的事件的拓展,然后就直接美滋滋的調用好之前寫的了,沒想到啊,這次是線程漲!因為訂單是從B端推送過來的,B端肯定沒事,訂單后臺訂閱消息之后,讀取過程中出現的線程增多,然后看看之前寫的Read()方法,感覺沒啥問題啊,每運行完一次,就多了一個線程,這個神奇了啊,那么源代碼擼起來。

      翻來覆去,看到這個Consume方法,繼承的是IDisposable接口,得勒,知道咋回事了。

      Consume.Dispose(); 多個消費者的情況下,用完請記得主動釋放啊。

      這回真的可以浪了。

       

      總結

        遇到問題,冷靜下來,耐得了寂寞才行。線上的問題優先解決,然后再慢慢Debug,解決不了,看源碼,再解決不了,降級處理,歡迎共同探討。同時也感謝一下技術群里的兄弟給的一些建議,并幫忙查找資料,還好EasyNetQ是開源了,不然也打算說先不用了,畢竟一開始沒什么用戶量,所以沒必要整那么麻煩,加班加點的弄這個問題。不過最終都完美的解決了,心里還是挺美滋滋的,程序猿隨之而來的成就感。

        別看我們在工位上默不作聲,我們可能在拯救世界呢!老板,該加工資啦!

                                                                                                   

       補充

      2018-12-25  鑒于大伙私信我想看看原來的bug修復后的情況,畢竟是公司代碼不適合完全開源,我單獨把例子源碼做過修改的發布出來,思路都差不多的,對比一下文章中原來的有問題的代碼就可以了吧。因為都已經修復掉了,修改后的在這里。??

      MessageLib.Core git 

      本文已獨家授權給腳本之家(ID:jb51net)公眾號發布

      posted @ 2018-11-07 15:05  山治先生  閱讀(20145)  評論(98)    收藏  舉報
      主站蜘蛛池模板: 一本久道久久综合中文字幕| 国产成人精选视频在线观看不卡 | 免费特黄夫妻生活片| 国产精品视频一区二区亚瑟| 狠狠色丁香婷婷综合尤物| 国内精品久久久久影院网站 | 少妇上班人妻精品偷人| 国产福利萌白酱在线观看视频| 日韩深夜福利视频在线观看| 天天做天天爱夜夜爽导航| 亚洲国产日韩一区三区| 四虎国产精品成人免费久久| 亚洲第一成人网站| 97人妻免费碰视频碰免| 国产精品中文字幕二区| 稷山县| 天堂网在线.www天堂在线资源 | 国产av黄色一区二区三区| 亚洲国产精品成人av网| 爱啪啪av导航| 日本夜爽爽一区二区三区| 日韩中文字幕精品人妻| 惠州市| 日韩欧美aⅴ综合网站发布| 亚洲AV成人片不卡无码| 日韩人妻无码精品久久| gogogo高清在线播放免费| 国产成人精品亚洲精品日日| 亚洲免费一区二区av| 九九热精品在线观看视频| 国产v亚洲v天堂无码久久久| 国产精品无码一区二区在线观一 | 在线免费成人亚洲av| 男女啪啪永久免费观看网站| 国产精品va无码一区二区| 好男人视频在线播放| 精品久久人人妻人人做精品| 97se亚洲综合自在线| 国产不卡av一区二区| 国产精品亚洲精品日韩已满十八小| 国产嫩草精品网亚洲av|