<output id="qn6qe"></output>

    1. <output id="qn6qe"><tt id="qn6qe"></tt></output>
    2. <strike id="qn6qe"></strike>

      亚洲 日本 欧洲 欧美 视频,日韩中文字幕有码av,一本一道av中文字幕无码,国产线播放免费人成视频播放,人妻少妇偷人无码视频,日夜啪啪一区二区三区,国产尤物精品自在拍视频首页,久热这里只有精品12

      Storm常見模式——求TOP N

      Storm的另一種常見模式是對流式數據進行所謂“streaming top N”的計算,它的特點是持續的在內存中按照某個統計指標(如出現次數)計算TOP N,然后每隔一定時間間隔輸出實時計算后的TOP N結果。

      流式數據的TOP N計算的應用場景很多,例如計算twitter上最近一段時間內的熱門話題、熱門點擊圖片等等。

      下面結合Storm-Starter中的例子,介紹一種可以很容易進行擴展的實現方法:首先,在多臺機器上并行的運行多個Bolt,每個Bolt負責一部分數據的TOP N計算,然后再有一個全局的Bolt來合并這些機器上計算出來的TOP N結果,合并后得到最終全局的TOP N結果。

      該部分示例代碼的入口是RollingTopWords類,用于計算文檔中出現次數最多的N個單詞。首先看一下這個Topology結構:

      Topology構建的代碼如下:

              TopologyBuilder builder = new TopologyBuilder();
              builder.setSpout("word", new TestWordSpout(), 5);
              builder.setBolt("count", new RollingCountObjects(60, 10), 4)
                       .fieldsGrouping("word", new Fields("word"));
              builder.setBolt("rank", new RankObjects(TOP_N), 4)
                       .fieldsGrouping("count", new Fields("obj"));
              builder.setBolt("merge", new MergeObjects(TOP_N))
                       .globalGrouping("rank");

      1)首先,TestWordSpout()Topology的數據源Spout,持續隨機生成單詞發出去,產生數據流“word”,輸出Fields“word”,核心代碼如下:

          public void nextTuple() {
              Utils.sleep(100);
              final String[] words = new String[] {"nathan", "mike", "jackson", "golda", "bertels"};
              final Random rand = new Random();
              final String word = words[rand.nextInt(words.length)];
              _collector.emit(new Values(word));
        }
          public void declareOutputFields(OutputFieldsDeclarer declarer) {
              declarer.declare(new Fields("word"));
        }

      2)接下來,“word”流入RollingCountObjects這個Bolt中進行word count計算,為了保證同一個word的數據被發送到同一個Bolt中進行處理,按照“word”字段進行field grouping;在RollingCountObjects中會計算各個word的出現次數,然后產生“count”流,輸出“obj”“count”兩個Field,核心代碼如下

          public void execute(Tuple tuple) {
      
              Object obj = tuple.getValue(0);
              int bucket = currentBucket(_numBuckets);
              synchronized(_objectCounts) {
                  long[] curr = _objectCounts.get(obj);
                  if(curr==null) {
                      curr = new long[_numBuckets];
                      _objectCounts.put(obj, curr);
                  }
                  curr[bucket]++;
                  _collector.emit(new Values(obj, totalObjects(obj)));
                  _collector.ack(tuple);
              }
          }
          public void declareOutputFields(OutputFieldsDeclarer declarer) {
              declarer.declare(new Fields("obj", "count"));
          }

      3)然后,RankObjects這個Bolt按照“count”流的“obj”字段進行field grouping;在Bolt內維護TOP N個有序的單詞,如果超過TOP N個單詞,則將排在最后的單詞踢掉,同時每個一定時間(2秒)產生“rank”流,輸出“list”字段,輸出TOP N計算結果到下一級數據流“merge”流,核心代碼如下:

          public void execute(Tuple tuple, BasicOutputCollector collector) {
              Object tag = tuple.getValue(0);
              Integer existingIndex = _find(tag);
              if (null != existingIndex) {
                  _rankings.set(existingIndex, tuple.getValues());
              } else {
                  _rankings.add(tuple.getValues());
              }
              Collections.sort(_rankings, new Comparator<List>() {
                  public int compare(List o1, List o2) {
                      return _compare(o1, o2);
                  }
              });
              if (_rankings.size() > _count) {
                  _rankings.remove(_count);
              }
              long currentTime = System.currentTimeMillis();
              if(_lastTime==null || currentTime >= _lastTime + 2000) {
                  collector.emit(new Values(new ArrayList(_rankings)));
                  _lastTime = currentTime;
              }
          }
      
          public void declareOutputFields(OutputFieldsDeclarer declarer) {
              declarer.declare(new Fields("list"));
          }

      4)最后,MergeObjects這個Bolt按照“rank”流的進行全局的grouping,即所有上一級Bolt產生的“rank”流都流到這個“merge”流進行;MergeObjects的計算邏輯和RankObjects類似,只是將各個RankObjectsBolt合并后計算得到最終全局的TOP N結果,核心代碼如下:

          public void execute(Tuple tuple, BasicOutputCollector collector) {
              List<List> merging = (List) tuple.getValue(0);
              for(List pair : merging) {
                  Integer existingIndex = _find(pair.get(0));
                  if (null != existingIndex) {
                      _rankings.set(existingIndex, pair);
                  } else {
                      _rankings.add(pair);
                  }
      
                  Collections.sort(_rankings, new Comparator<List>() {
                      public int compare(List o1, List o2) {
                          return _compare(o1, o2);
                      }
                  });
      
                  if (_rankings.size() > _count) {
                      _rankings.subList(_count, _rankings.size()).clear();
                  }
              }
      
              long currentTime = System.currentTimeMillis();
              if(_lastTime==null || currentTime >= _lastTime + 2000) {
                  collector.emit(new Values(new ArrayList(_rankings)));
                  LOG.info("Rankings: " + _rankings);
                  _lastTime = currentTime;
              }
          }
      
          public void declareOutputFields(OutputFieldsDeclarer declarer) {
              declarer.declare(new Fields("list"));
          }

      關于上述例子的幾點說明:

      (1) 為什么要有RankObjectsMergeObjects兩級的Bolt來計算呢?

      其實,計算TOP N的一個最簡單的思路是直接使用一個Bolt(通過類似于RankObjects的類實現)來做全局的求TOP N操作。

      但是,這種方式的明顯缺點在于受限于單臺機器的處理能力。

      (2) 如何保證計算結果的正確性?

      首先通過field grouping將同一個word的計算放到同一個Bolt上處理;最后有一個全局的global grouping匯總得到TOP N

      這樣可以做到最大可能并行性,同時也能保證計算結果的正確。

      (3) 如果當前計算資源無法滿足計算TOP N,該怎么辦?

      這個問題本質上就是系統的可擴展性問題,基本的解決方法就是盡可能做到在多個機器上的并行計算過程,針對上面的Topology結構:

      a) 可以通過增加每一級處理單元Bolt的數量,減少每個Bolt處理的數據規模;

      b) 可以通過增加一級或多級Bolt處理單元,減少最終匯總處理的數據規模。

      本文參考代碼見:https://github.com/nathanmarz/storm-starter

      posted on 2012-06-16 15:08  大圓那些事  閱讀(10269)  評論(5)    收藏  舉報

      導航

      主站蜘蛛池模板: 国产成人午夜福利在线播放| 无码一区二区三区AV免费| 久久精品国产福利一区二区| 中文字幕国产精品资源| 被灌满精子的波多野结衣| 奇米四色7777中文字幕| 亚洲国产精品无码一区二区三区| 日本深夜福利在线观看| 一区二区不卡国产精品| 四虎国产精品永久地址99| 亚洲欧美成人aⅴ在线| 亚洲精品日韩在线观看| 久久夜色撩人精品国产小说| 久久久久久99av无码免费网站| 欧美一区二区三区欧美日韩亚洲 | 国产午夜精品理论大片| 亚洲精品中文字幕尤物综合| 男女激情一区二区三区| 人妻无码不卡中文字幕系列| 国产一级av在线播放| 91精品蜜臀国产综合久久| 日韩加勒比一本无码精品| 狠狠色婷婷久久综合频道日韩| 九九热精品免费视频| 国产精品三级中文字幕| 国产精品午夜福利在线观看| 日本阿v片在线播放免费| 国产l精品国产亚洲区| 性色在线视频精品| 国产精品久久久国产盗摄| 视频一区视频二区在线视频| 国产亚洲一二三区精品| 亚洲日韩国产中文其他| 久久婷婷五月综合97色直播| 另类专区一区二区三区| 国产熟女精品一区二区三区 | 精品国产乱弄九九99久久| 亚洲欧美中文日韩V日本| 国产精品剧情亚洲二区| 国产亚洲中文字幕久久网| 亚洲 欧美 综合 在线 精品|