<output id="qn6qe"></output>

    1. <output id="qn6qe"><tt id="qn6qe"></tt></output>
    2. <strike id="qn6qe"></strike>

      亚洲 日本 欧洲 欧美 视频,日韩中文字幕有码av,一本一道av中文字幕无码,国产线播放免费人成视频播放,人妻少妇偷人无码视频,日夜啪啪一区二区三区,国产尤物精品自在拍视频首页,久热这里只有精品12

      Disruptor-簡單使用

      前言

      Disruptor是一個高性能的無鎖并發框架,其主要應用場景是在高并發、低延遲的系統中,如金融領域的交易系統,游戲服務器等。其優點就是非常快,號稱能支撐每秒600萬訂單。需要注意的是,Disruptor是單機框架,對標JDK中的Queue,而非可用于分布式系統的MQ

      本文基于Disruptor v3.4.*版本

      Demo

      既然是簡單使用,這階段只需要關注:

      • 生產者
      • 消費者:EventHandler
      • 消息的傳遞:消息的載體Event

      簡單例子

      首先,我們定義消息的載體Event,生產者向消費者傳遞的消息通過Event承載

      class LongEvent {
          private long value;
      
          public void set(long value) {
              this.value = value;
          }
          @Override
          public String toString() {
              return "LongEvent{" + "value=" + value + '}';
          }
      }
      

      然后定義Event生產工廠,這用于初始化Event

      EventFactory<LongEvent> factory = new EventFactory<LongEvent>() {
          @Override
          public LongEvent newInstance() {
              return new LongEvent();
          }
      };
      

      接下來就可以構建Disruptor了,以下是完整代碼

      // 消息載體(event)
      static class LongEvent {
          private long value;
      
          public void set(long value) {
              this.value = value;
          }
          @Override
          public String toString() {
              return "LongEvent{" + "value=" + value + '}';
          }
      }
      
      // 發布消息的轉換器
      public static void translate(LongEvent event, long sequence, ByteBuffer buffer)
      {
          event.set(buffer.getLong(0));
      }
      
      public static void main(String[] args) throws Exception {
      
          // event生產工廠,初始化RingBuffer的時候使用
          EventFactory<LongEvent> factory = new EventFactory<LongEvent>() {
              @Override
              public LongEvent newInstance() {
                  return new LongEvent();
              }
          };
      
          // 指定RingBuffer的大小(必須是2的n次方)
          int bufferSize = 1024;
      
          // 構造Disruptor(默認使用多生產者模式、BlockingWaitStrategy阻塞策略)
          Disruptor<LongEvent> disruptor = new Disruptor<>(LongEvent::new, bufferSize, DaemonThreadFactory.INSTANCE);
          //  Disruptor<LongEvent> disruptor = new Disruptor<>(factory, bufferSize, DaemonThreadFactory.INSTANCE, ProducerType.MULTI, new BlockingWaitStrategy());
          // 設置消費者
          EventHandler<LongEvent> handler = (event, sequence, endOfBatch) -> {
              System.out.println("Event: " + event);
          };
          disruptor.handleEventsWith(handler);
      
          // 啟動disruptor,啟動所有需要運行的線程
          disruptor.start();
      
          RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer();
          ByteBuffer bb = ByteBuffer.allocate(8);
          for (long i = 0; i < 100; i++) {
              bb.putLong(i);
              // 發布事件
              ringBuffer.publishEvent(LongEventMain::translate, bb);
          }
      }
      

      消費者組合(多使用場景)

      Disruptor不僅可以當高性能的隊列使用,還支持消費者的串行、并行消費等

      以下只展示關鍵代碼(設置消費者),其余部分參考上一節的簡單demo

      1. 單鏈串行

        Untitled

        disruptor.handleEventsWith(handlerA).then(handlerB);
        
      2. 并行

        Untitled

        disruptor.handleEventsWith(handlerA, handlerB);
        
      3. 鏈內串行,多鏈并行

        Untitled

        disruptor.handleEventsWith(handlerA).then(handlerC);
        disruptor.handleEventsWith(handlerB).then(handlerD);
        
      4. 菱形(C、D都執行完才到E)

        Untitled

        disruptor.handleEventsWith(handlerA).then(handlerC);
        disruptor.handleEventsWith(handlerB).then(handlerD);
        disruptor.after(handlerC, handlerD).then(handlerE);
        
        
      5. 分組(AB都執行完才到CD)

        Untitled

        disruptor.handleEventsWith(handlerA, handlerB).then(handlerC, handlerD);
        
      6. 分組不重復消費

        組內競爭,組外串行:每個消息在每個分組中只有一個消費者能消費成功,如果就是分組A中只有HandlerA2能得到數據,分組B中只有HandlerB1獲得

        Untitled

        // 注意:此處的handler實現的是WorkHandler接口
        disruptor.handleEventsWithWorkerPool(handlerA1, handlerA2, handlerA3)
                        .then(handlerB1, handlerB2, handlerB3);
        
      7. 分組不重復消費(菱形)

        Untitled

        // handlerA、handlerB實現WorkHandler接口
        // handlerC 實現EventHandler或WorkHandler接口均可
        disruptor.handleEventsWithWorkerPool(handlerA1, handlerA2, handlerA3)
                        .then(handlerB1, handlerB2, handlerB3)
                        .then(handlerC);
        

        等待策略

        消費者速度比生產者快時,需要等待。因此就有了不同的等待策略以適應不同場景

        • BlockingWaitStrategy

          默認策略。使用鎖和 Condition 的等待、喚醒機制。速度慢,但節省CPU資源并且在不同部署環境中能提供更加一致的性能表現。

        • YieldingWaitStrategy

          二段式,一階段自旋100次,二階段執行Thread.yield,需要低延遲的場景可使用此策略

        • SleepingWaitStrategy

          三段式,一階段自旋,二階段執行Thread.yield,三階段睡眠

        • BusySpinWaitStrategy

          性能最高的策略,與 YieldingWaitStrategy 一樣在低延遲場景使用,但是此策略要求消費者數量低于 CPU 邏輯內核總數

        其他小技巧

        1. 清除消息載體 Event 中的數據

          如果 Event 中存在大對象,應該在消費者鏈的末尾,添加一個清除數據的消費者,以幫助jvm垃圾回收。demo中的 LongEvent 是 private long value; 所以沒必要添加。

      總結

      本文介紹了 Disruptor 的簡單使用,以及復雜場景下消費者的配置。下篇開坑 Disruptor 源碼解析。


      參考資料

      Disruptor官方文檔

      posted @ 2023-04-10 15:38  王谷雨  閱讀(785)  評論(0)    收藏  舉報
      主站蜘蛛池模板: 久久免费看少妇免费观看| 亚洲人成人无码www| 极品粉嫩小泬无遮挡20p| 亚洲中文一区二区av| 四虎成人精品在永久免费| 国产免费午夜福利片在线| 2021亚洲国产精品无码| 97色伦97色伦国产| 亚洲欧美牲交| 亚洲男人第一无码av网站| 中文日产幕无线码一区中文| 亚洲成人av在线资源网| 加勒比中文字幕无码一区| 又黄又爽又色的免费网站| 人妻丰满熟妇av无码处处不卡| 国产91精品一区二区亚洲| 91中文字幕一区在线| 国产精品久久久久久久专区| 日本精品一区二区不卡| 国产v亚洲v天堂无码久久久| 在线观看中文字幕国产码| 亚洲国产成人精品女久久| 人妻有码中文字幕在线| 丰满少妇高潮无套内谢| 国产日女人视频在线观看| 亚洲ΑV久久久噜噜噜噜噜| 东北妇女精品bbwbbw| 国产成人精品亚洲日本片| 欧美人与动牲交精品| 中文字幕人妻在线精品| 四虎精品视频永久免费| 色综合热无码热国产| 久久av无码精品人妻出轨| 日本中文一二区有码在线| 亚洲人成网站77777在线观看| аⅴ天堂中文在线网 | 中国china露脸自拍性hd| 国产gaysexchina男外卖| 亚洲熟女乱色一区二区三区| 国产成人午夜福利院| 丝袜美腿视频一区二区三区|