<output id="qn6qe"></output>

    1. <output id="qn6qe"><tt id="qn6qe"></tt></output>
    2. <strike id="qn6qe"></strike>

      亚洲 日本 欧洲 欧美 视频,日韩中文字幕有码av,一本一道av中文字幕无码,国产线播放免费人成视频播放,人妻少妇偷人无码视频,日夜啪啪一区二区三区,国产尤物精品自在拍视频首页,久热这里只有精品12

      Storm常見模式——流聚合

      流聚合(stream join)是指將具有共同元組(tuple)字段的數據流(兩個或者多個)聚合形成一個新的數據流的過程。

      從定義上看,流聚合和SQL中表的聚合(table join)很像,但是二者有明顯的區別:table join的輸入是有限的,并且join的語義是非常明確的;而流聚合的語義是不明確的并且輸入流是無限的。

      數據流的聚合類型跟具體的應用有關。一些應用把兩個流發出的所有的tuple都聚合起來——不管多長時間;而另外一些應用則只會聚合一些特定的tuple。而另外一些應用的聚合邏輯又可能完全不一樣。而這些聚合類型里面最常見的類型是把所有的輸入流進行一樣的劃分,這個在storm里面用fields grouping在相同字段上進行grouping就可以實現。

      下面是對storm-starter(代碼見:https://github.com/nathanmarz/storm-starter)中有關兩個流的聚合的示例代碼剖析:

      先看一下入口類SingleJoinExample

      (1)這里首先創建了兩個發射源spout,分別是genderSpout和ageSpout:

              FeederSpout genderSpout = new FeederSpout(new Fields("id", "gender"));
              FeederSpout ageSpout = new FeederSpout(new Fields("id", "age"));
              
              TopologyBuilder builder = new TopologyBuilder();
              builder.setSpout("gender", genderSpout);
              builder.setSpout("age", ageSpout);

      其中genderSpout包含兩個tuple字段:id和gender,ageSpout包含兩個tuple字段:id和age(這里流聚合就是通過將相同id的tuple進行聚合,得到一個新的輸出流,包含id、gender和age字段)。

      (2)為了不同的數據流中的同一個id的tuple能夠落到同一個task中進行處理,這里使用了storm中的fileds grouping在id字段上進行分組劃分:

              builder.setBolt("join", new SingleJoinBolt(new Fields("gender", "age")))
                      .fieldsGrouping("gender", new Fields("id"))
                      .fieldsGrouping("age", new Fields("id"));

      從中可以看到,SingleJoinBolt就是真正進行流聚合的地方。下面我們來看看:

      (1)SingleJoinBolt構造時接收一個Fileds對象,其中傳進的是聚合后將要被輸出的字段(這里就是gender和age字段),保存到變量_outFileds中。

      (2)接下來看看完成SingleJoinBolt的構造后,SingleJoinBolt在真正開始接收處理tuple之前所做的準備工作(代碼見prepare方法):

      a)首先,將保存OutputCollector對象,創建TimeCacheMap對象,設置超時回調接口,用于tuple處理失敗時fail消息;緊接著記錄數據源的個數:

              _collector = collector;
              int timeout = ((Number) conf.get(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS)).intValue();
              _pending = new TimeCacheMap<List<Object>, Map<GlobalStreamId, Tuple>>(timeout, new ExpireCallback());
              _numSources = context.getThisSources().size();

      b)遍歷TopologyContext中不同數據源,得到所有數據源(這里就是genderSpout和ageSpout)中公共的Filed字段,保存到變量_idFields中(例子中就是id字段),同時將_outFileds中字段所在數據源記錄下來,保存到一張HashMap中_fieldLocations,以便聚合后獲取對應的字段值。

              Set<String> idFields = null;
              for(GlobalStreamId source: context.getThisSources().keySet()) {
                  Fields fields = context.getComponentOutputFields(source.get_componentId(), source.get_streamId());
                  Set<String> setFields = new HashSet<String>(fields.toList());
                  if(idFields==null) idFields = setFields;
                  else idFields.retainAll(setFields);
                  
                  for(String outfield: _outFields) {
                      for(String sourcefield: fields) {
                          if(outfield.equals(sourcefield)) {
                              _fieldLocations.put(outfield, source);
                          }
                      }
                  }
              }
              _idFields = new Fields(new ArrayList<String>(idFields));
              
              if(_fieldLocations.size()!=_outFields.size()) {
                  throw new RuntimeException("Cannot find all outfields among sources");
              }

      (3)好了,下面開始兩個spout流的聚合過程了(代碼見execute方法):

      首先,從tuple中獲取_idFields字段,如果不存在于等待被處理的隊列_pending中,則加入一行,其中key是獲取到的_idFields字段,value是一個空的HashMap<GlobalStreamId, Tuple>對象,記錄GlobalStreamId到Tuple的映射。

              List<Object> id = tuple.select(_idFields);
              GlobalStreamId streamId = new GlobalStreamId(tuple.getSourceComponent(), tuple.getSourceStreamId());
              if(!_pending.containsKey(id)) {
                  _pending.put(id, new HashMap<GlobalStreamId, Tuple>());            
              }

      從_pending隊列中,獲取當前GlobalStreamId streamId對應的HashMap對象parts中:

              Map<GlobalStreamId, Tuple> parts = _pending.get(id);

      如果streamId已經包含其中,則拋出異常,接收到同一個spout中的兩條一樣id的tuple,否則將該streamid加入parts中:

              if(parts.containsKey(streamId)) throw new RuntimeException("Received same side of single join twice");
              parts.put(streamId, tuple);

      如果parts已經包含了聚合數據源的個數_numSources時,從_pending隊列中移除這條記錄,然后開始構造聚合后的結果字段:依次遍歷_outFields中各個字段,從_fieldLocations中取到這些outFiled字段對應的GlobalStreamId,緊接著從parts中取出GlobalStreamId對應的outFiled,放入聚合后的結果中。

              if(parts.size()==_numSources) {
                  _pending.remove(id);
                  List<Object> joinResult = new ArrayList<Object>();
                  for(String outField: _outFields) {
                      GlobalStreamId loc = _fieldLocations.get(outField);
                      joinResult.add(parts.get(loc).getValueByField(outField));
                  }

      最后通過_collector將parts中存放的tuple和聚合后的輸出結果發射出去,并ack這些tuple已經處理成功。

                  _collector.emit(new ArrayList<Tuple>(parts.values()), joinResult);
                  
                  for(Tuple part: parts.values()) {
                      _collector.ack(part);
                  }
          }

      否則,繼續等待兩個spout流中這個streamid都到齊后再進行聚合處理。

      (4)最后,聲明一下輸出字段(代碼見declareOutputFields方法):

          declarer.declare(_outFields);

      posted on 2012-06-04 19:26  大圓那些事  閱讀(22494)  評論(2)    收藏  舉報

      導航

      主站蜘蛛池模板: 激情国产av做激情国产爱| 国产精品久久久久久人妻精品动漫 | 久久久久综合一本久道| 国产一区二区视频在线看| 中文字幕无码人妻aaa片| 亚洲熟妇熟女久久精品综合 | 少妇人妻真实偷人精品| 国产精品一区中文字幕| 国产360激情盗摄全集| 亚洲精品毛片一区二区| 天堂va蜜桃一区二区三区| 少妇被爽到高潮喷水久久欧美精品| 精品人妻少妇嫩草av专区| 忘忧草在线社区www中国中文| 国产在线播放专区av| 在线观看免费人成视频色| 日韩av一区二区三区在线| 男女爽爽无遮挡午夜视频| 亚洲gv猛男gv无码男同| 九色综合久99久久精品| 精品一区二区不卡免费| 久久综合97丁香色香蕉| 91人妻熟妇在线视频| 中文字幕av一区二区三区| 成人3d动漫一区二区三区| 久久久久国精品产熟女久色| 亚洲一区二区三区自拍麻豆| 成人免费乱码大片a毛片| 国产高清在线男人的天堂| 亚洲国产精品成人av网| 无码AV中文字幕久久专区| 国产精品综合一区二区三区| 欧美性xxxxx极品少妇| 亚洲综合网一区中文字幕| 国产精品成人午夜福利| 中文字幕乱码一区二区免费| 无码人妻少妇色欲av一区二区| аⅴ天堂中文在线网| 国产一区二区三区色视频| 不卡国产一区二区三区| 亚洲欧洲一区二区精品|