<output id="qn6qe"></output>

    1. <output id="qn6qe"><tt id="qn6qe"></tt></output>
    2. <strike id="qn6qe"></strike>

      亚洲 日本 欧洲 欧美 视频,日韩中文字幕有码av,一本一道av中文字幕无码,国产线播放免费人成视频播放,人妻少妇偷人无码视频,日夜啪啪一区二区三区,国产尤物精品自在拍视频首页,久热这里只有精品12

      Storm常見模式——分布式RPC

      本文翻譯自:https://github.com/nathanmarz/storm/wiki/Distributed-RPC,作為學習Storm DRPC的資料,轉載必須以超鏈接形式標明文章原始出處及本文翻譯鏈接

      分布式RPC(distributed RPC,DRPC)用于對Storm上大量的函數調用進行并行計算過程。對于每一次函數調用,Storm集群上運行的拓撲接收調用函數的參數信息作為輸入流,并將計算結果作為輸出流發射出去。

      DRPC本身算不上Storm的特性,它是通過Storm的基本元素:streams,spouts,bolts,topologies而衍生的一個模式。DRPC可以單獨作為一個獨立于Storm的庫發布,但由于其重要性還是和Storm捆綁在了一起。

      總體概述

      DRPC通過DRPC Server來實現,DRPC Server的整體工作過程如下:

      1. 接收到一個RPC調用請求;
      2. 發送請求到Storm上的拓撲;
      3. 從Storm上接收計算結果;
      4. 將計算結果返回給客戶端。

      以上過程,在client客戶端看來,一個DRPC調用看起來和一般的RPC調用沒什么區別。下面代碼是client通過DRPC調用“reach”函數,參數為“http://twitter.com”:

      DRPCClient client = new DRPCClient("drpc-host", 3772);
      String result = client.execute("reach", "http://twitter.com");

      DRPC內部工作流程如下:

      1. Client向DRPC Server發送被調用執行的DRPC函數名稱及參數。
      2. Storm上的topology通過DRPCSpout實現這一函數,從DPRC Server接收到函數調用流;
      3. DRPC Server會為每次函數調用生成唯一的id;
      4. Storm上運行的topology開始計算結果,最后通過一個ReturnResults的Bolt連接到DRPC Server,發送指定id的計算結果;
      5. DRPC Server通過使用之前為每個函數調用生成的id,將結果關聯到對應的發起調用的client,將計算結果返回給client。

      LinearDRPCTopologyBuilder

      Storm提供了一個topology builder——LinearDRPCTopologyBuilder,它可以自動完成幾乎所有的DRPC步驟。包括:

      1. 構建spout
      2. DRPC Server返回結果;
      3. Bolt提供函數用于對tuples進行聚集。

      下面是一個簡單的例子,這個DRPC拓撲只是簡單的在輸入參數后追加!后返回:

      public static class ExclaimBolt extends BaseBasicBolt {
          public void execute(Tuple tuple, BasicOutputCollector collector) {
              String input = tuple.getString(1);
              collector.emit(new Values(tuple.getValue(0), input + "!"));
          }
      
          public void declareOutputFields(OutputFieldsDeclarer declarer) {
              declarer.declare(new Fields("id", "result"));
          }
      }
      
      public static void main(String[] args) throws Exception {
          LinearDRPCTopologyBuilder builder = new LinearDRPCTopologyBuilder("exclamation");
          builder.addBolt(new ExclaimBolt(), 3);
          // ...
      }

      由上述例子可見,我們只需很少的工作即可完成拓撲。當創建LinearDRPCTopologyBuilder的時候,需要指定拓撲中DRPC函數的名稱exclamation。一個DRPC Server可以協調多個函數,每個函數有不同的函數名稱。拓撲中的第一個bolt的輸入是個字段:第一個是請求的id號;第二個是請求的參數。

      LinearDRPCTopologyBuilder同時需要最后一個bolt發射一個包含兩個字段的輸出流:第一個字段是請求id;第二個字段是計算結果。因此,所有的中間tuples必須包含請求id作為第一個字段。

      例子中,ExclaimBolt在輸入tuple的第二個字段后面追加“!”LinearDRPCTopologyBuilder負責處理其余的協調工作:與DRPC Server建立連接,發送結果給DRPC Server

      本地模式DRPC

      DRPC可以以本地模式運行,下面的代碼是如何在本地模式運行上面的例子:

      LocalDRPC drpc = new LocalDRPC();
      LocalCluster cluster = new LocalCluster();
      
      cluster.submitTopology("drpc-demo", conf, builder.createLocalTopology(drpc));
      
      System.out.println("Results for 'hello':" + drpc.execute("exclamation", "hello"));
      
      cluster.shutdown();
      drpc.shutdown();

      首先創建一個LocalDRPC對象,該對象在本地模擬一個DRPC Server,正如LocalCluster在本地模擬一個Storm集群一樣。然后創建一個LocalCluster對象在本地模式下運行拓撲。LinearDRPCTopologyBuilder含有單獨的方法用于創建本地拓撲和遠程拓撲。

      本地模式下,LocalDRPC并不綁定任何端口,因此Storm的拓撲需要了解要通訊的對象——這就是為什么createLocalTopology方法需要以LocalDRPC對象作為輸入。

      加載完拓撲之后,通過對LocalDRPC調用execute方法,就可以執行DRPC函數調用了。

      遠程模式DRPC

      在實際的Storm集群上運行DRPC也一樣很簡單。只需完成以下步驟:

      1. 啟動DRPC Server(s);
      2. 配置DRPC Server(s)地址;
      3. 向Storm集群提交DRPC拓撲。

      首先,通過storm腳本啟動DRPC Server:

      bin/storm drpc

      然后,在Storm集群中配置DRPC Server地址,這就是DRPCSpout讀取函數調用請求的地方。這一步的配置可以通過storm.yaml文件或者拓撲的配置來完成。通過storm.yaml文件的配置方式如下:

      drpc.servers:
        - "drpc1.foo.com"
        - "drpc2.foo.com"

      最后,通過StormSubmitter啟動DRPC拓撲。為了以遠程模式運行上面的例子,代碼如下:

      StormSubmitter.submitTopology("exclamation-drpc", conf, builder.createRemoteTopology());

      createRemoteTopology被用于為Storm集群創建合適的拓撲。

      一個復雜的例子

      上面的exclamation只是一個簡單的DRPC例子。下面通過一個復雜的例子介紹如何在Storm集群內進行DRPC——計算Twitter上每個URL的到達度(reach),也就是每個URL暴露給的不同人的個數。

      為了完成這一計算,需要完成以下步驟:

      1. 獲取所有點選了(tweet)該URL的人;
      2. 獲取步驟1中所有人的關注者(followers,粉絲);
      3. 對所有關注者followers進行去重;
      4. 對步驟3中的關注者人數進行求和。

      一個簡單的URL到達度計算可能涉及成千上萬次數據庫調用以及數以百萬的followers記錄,計算量非常大。有了Storm,將很容易實現這一計算過程。單機上可能需要運行幾分鐘才能完成,在Storm集群上,即使是最難計算的URL也只需要幾秒鐘。

      這個例子的代碼在storm-starter:點擊這里。這里是如何創建拓撲的代碼:

      LinearDRPCTopologyBuilder builder = new LinearDRPCTopologyBuilder("reach");
      builder.addBolt(new GetTweeters(), 3);
      builder.addBolt(new GetFollowers(), 12)
              .shuffleGrouping();
      builder.addBolt(new PartialUniquer(), 6)
              .fieldsGrouping(new Fields("id", "follower"));
      builder.addBolt(new CountAggregator(), 2)
              .fieldsGrouping(new Fields("id"));
      拓撲的執行分為以下四步:
      1. GetTweeters:獲取所有tweet了指定URL的用戶列表,這個Bolt將輸入流[id, url]轉換成輸出流[id, tweeter],每個url元組被映射為多個tweeter元組。
      2. GetFollowers:獲取步驟1中所有用戶列表的followers,這個Bolt將輸入流[id, twetter]轉換成輸出流[id, follower],當某個人同時是多個人的關注者follower,而且這些人都tweet了指定的URL,那么將產生重復的follower元組。
      3. PartialUniquer:將所有followers按照follower id分組,使得同一個follower在同一個task中被處理。這個Bolt接收follower并進行去重計數。
      4. CountAggregator:從各個PartialUniquer中接收各部分的計數結果,累加后完成到達度計算。

      下面是PartialUniquer這個Bolt的代碼實現:

      public class PartialUniquer extends BaseBatchBolt {
          BatchOutputCollector _collector;
          Object _id;
          Set<String> _followers = new HashSet<String>();
          
          @Override
          public void prepare(Map conf, TopologyContext context, BatchOutputCollector collector, Object id) {
              _collector = collector;
              _id = id;
          }
      
          @Override
          public void execute(Tuple tuple) {
              _followers.add(tuple.getString(1));
          }
          
          @Override
          public void finishBatch() {
              _collector.emit(new Values(_id, _followers.size()));
          }
      
          @Override
          public void declareOutputFields(OutputFieldsDeclarer declarer) {
              declarer.declare(new Fields("id", "partial-count"));
          }
      }

      PartialUniquer通過繼承BaseBatchBolt實現了IBatchBolt接口,batch bolt提供了API用于將一批tuples作為整體來處理。每個請求id會創建一個新的batch bolt實例,同時Storm負責這些實例的清理工作。

      PartialUniquer接收到一個follower元組時執行execute方法,將follower添加到請求id對應的HashSet集合中。

      Batch bolt同時提供了finishBatch方法用于當這個task已經處理完所有的元組時調用。PartialUniquer發射一個包含當前task所處理的follower ids子集去重后個數的元組。

      在內部實現上,CoordinatedBolt用于檢測指定的bolt是否已經收到指定請求id的所有tuples元組。CoordinatedBolt使用direct streams管理實現這一協作過程。

      拓撲的其他部分易于理解。到達度的每一步的計算過程都是并行進行的,通過DRPC實現也是非常容易的。

      Non-linear DRPC拓撲

      LinearDRPCTopologyBuilder只能處理線性的”DRPC拓撲——正如到達度這樣可以通過一系列步驟序列來完成的計算。不難想象,DRPC調用中包含有更復雜的帶有分支和合并Bolt的拓撲。目前,必須自己直接使用CoordinatedBolt來完成這種非線性拓撲的計算。

      LinearDRPCTopologyBuilder工作過程

      • DRPCSpout發射[args, return-info],其中return-info包含DRPC Server的主機和端口號,以及DRPC Server為該次請求生成的唯一id號;
      • 構造一個Storm拓撲包含以下部分:
        • DRPCSpout
        • PrepareRequest(生成一個請求id,為return info創建一個流,為args創建一個流)
        • CoordinatedBolt wrappers以及direct groupings
        • JoinResult(將結果與return info拼接起來)
        • ReturnResult(連接到DRPC Server,返回結果)
      • LinearDRPCTopologyBuilder是建立在Storm基本元素之上的高層抽象。

      高級進階

      • KeyedFairBolt用于組織同一時刻多請求的處理過程;
      • 如何直接使用CoordinatedBolt

       

      posted on 2012-07-02 20:27  大圓那些事  閱讀(18611)  評論(1)    收藏  舉報

      導航

      主站蜘蛛池模板: 中文字幕无码av不卡一区| 福利视频一区二区在线| 亚洲一区二区三区四区| 麻豆国产成人AV在线播放| 少妇激情a∨一区二区三区| 女人下边被添全过视频的网址| 办公室强奷漂亮少妇视频| 国产无套粉嫩白浆在线| 神马久久亚洲一区 二区| 明水县| 亚洲一区二区三区十八禁| 韩国三级在线 中文字幕 无码| 成人无套少萝内射中出| 少妇高潮激情一区二区三| 黄页网址大全免费观看| 亚洲国产制服丝袜高清在线| 丁香婷婷色综合激情五月| 亚洲av激情一区二区三区| 亚洲av日韩在线资源| 四虎在线成人免费观看| 国产在线无码不卡播放| 亚洲综合一区二区三区不卡| 在线播放亚洲成人av| 日韩精品福利一区二区三区| 推油少妇久久99久久99久久| 国产色无码专区在线观看| 亚洲精品精华液一区二区| 国产性色的免费视频网站| 亚洲理论在线A中文字幕| 国产亚洲精品第一综合麻豆 | 亚洲 日韩 在线精品| 一卡二卡三卡四卡视频区| 亚洲一区三区三区成人久| 人人做人人爽人人爱| 国产亚洲真人做受在线观看| 国产欧美日韩高清在线不卡| 兰坪| 久久亚洲日韩精品一区二区三区 | 日本熟妇乱一区二区三区| 亚洲一区二区三区在线观看精品中文| 狠狠色噜噜狠狠狠狠777米奇|