Storm常見模式——分布式RPC
本文翻譯自:https://github.com/nathanmarz/storm/wiki/Distributed-RPC,作為學習Storm DRPC的資料,轉載必須以超鏈接形式標明文章原始出處及本文翻譯鏈接。
分布式RPC(distributed RPC,DRPC)用于對Storm上大量的函數調用進行并行計算過程。對于每一次函數調用,Storm集群上運行的拓撲接收調用函數的參數信息作為輸入流,并將計算結果作為輸出流發射出去。
DRPC本身算不上Storm的特性,它是通過Storm的基本元素:streams,spouts,bolts,topologies而衍生的一個模式。DRPC可以單獨作為一個獨立于Storm的庫發布,但由于其重要性還是和Storm捆綁在了一起。
總體概述
DRPC通過DRPC Server來實現,DRPC Server的整體工作過程如下:
- 接收到一個RPC調用請求;
- 發送請求到Storm上的拓撲;
- 從Storm上接收計算結果;
- 將計算結果返回給客戶端。
以上過程,在client客戶端看來,一個DRPC調用看起來和一般的RPC調用沒什么區別。下面代碼是client通過DRPC調用“reach”函數,參數為“http://twitter.com”:
DRPCClient client = new DRPCClient("drpc-host", 3772); String result = client.execute("reach", "http://twitter.com");
DRPC內部工作流程如下:

- Client向DRPC Server發送被調用執行的DRPC函數名稱及參數。
- Storm上的topology通過DRPCSpout實現這一函數,從DPRC Server接收到函數調用流;
- DRPC Server會為每次函數調用生成唯一的id;
- Storm上運行的topology開始計算結果,最后通過一個ReturnResults的Bolt連接到DRPC Server,發送指定id的計算結果;
- DRPC Server通過使用之前為每個函數調用生成的id,將結果關聯到對應的發起調用的client,將計算結果返回給client。
LinearDRPCTopologyBuilder
Storm提供了一個topology builder——LinearDRPCTopologyBuilder,它可以自動完成幾乎所有的DRPC步驟。包括:
- 構建spout;
- 向DRPC Server返回結果;
- 為Bolt提供函數用于對tuples進行聚集。
下面是一個簡單的例子,這個DRPC拓撲只是簡單的在輸入參數后追加“!”后返回:
public static class ExclaimBolt extends BaseBasicBolt { public void execute(Tuple tuple, BasicOutputCollector collector) { String input = tuple.getString(1); collector.emit(new Values(tuple.getValue(0), input + "!")); } public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("id", "result")); } } public static void main(String[] args) throws Exception { LinearDRPCTopologyBuilder builder = new LinearDRPCTopologyBuilder("exclamation"); builder.addBolt(new ExclaimBolt(), 3); // ... }
由上述例子可見,我們只需很少的工作即可完成拓撲。當創建LinearDRPCTopologyBuilder的時候,需要指定拓撲中DRPC函數的名稱“exclamation”。一個DRPC Server可以協調多個函數,每個函數有不同的函數名稱。拓撲中的第一個bolt的輸入是兩個字段:第一個是請求的id號;第二個是請求的參數。
LinearDRPCTopologyBuilder同時需要最后一個bolt發射一個包含兩個字段的輸出流:第一個字段是請求id;第二個字段是計算結果。因此,所有的中間tuples必須包含請求id作為第一個字段。
例子中,ExclaimBolt在輸入tuple的第二個字段后面追加“!”,LinearDRPCTopologyBuilder負責處理其余的協調工作:與DRPC Server建立連接,發送結果給DRPC Server。
本地模式DRPC
DRPC可以以本地模式運行,下面的代碼是如何在本地模式運行上面的例子:
LocalDRPC drpc = new LocalDRPC(); LocalCluster cluster = new LocalCluster(); cluster.submitTopology("drpc-demo", conf, builder.createLocalTopology(drpc)); System.out.println("Results for 'hello':" + drpc.execute("exclamation", "hello")); cluster.shutdown(); drpc.shutdown();
首先創建一個LocalDRPC對象,該對象在本地模擬一個DRPC Server,正如LocalCluster在本地模擬一個Storm集群一樣。然后創建一個LocalCluster對象在本地模式下運行拓撲。LinearDRPCTopologyBuilder含有單獨的方法用于創建本地拓撲和遠程拓撲。
本地模式下,LocalDRPC并不綁定任何端口,因此Storm的拓撲需要了解要通訊的對象——這就是為什么createLocalTopology方法需要以LocalDRPC對象作為輸入。
加載完拓撲之后,通過對LocalDRPC調用execute方法,就可以執行DRPC函數調用了。
遠程模式DRPC
在實際的Storm集群上運行DRPC也一樣很簡單。只需完成以下步驟:
- 啟動DRPC Server(s);
- 配置DRPC Server(s)地址;
- 向Storm集群提交DRPC拓撲。
首先,通過storm腳本啟動DRPC Server:
bin/storm drpc
然后,在Storm集群中配置DRPC Server地址,這就是DRPCSpout讀取函數調用請求的地方。這一步的配置可以通過storm.yaml文件或者拓撲的配置來完成。通過storm.yaml文件的配置方式如下:
drpc.servers:
- "drpc1.foo.com"
- "drpc2.foo.com"
最后,通過StormSubmitter啟動DRPC拓撲。為了以遠程模式運行上面的例子,代碼如下:
StormSubmitter.submitTopology("exclamation-drpc", conf, builder.createRemoteTopology());
createRemoteTopology被用于為Storm集群創建合適的拓撲。
一個復雜的例子
上面的exclamation只是一個簡單的DRPC例子。下面通過一個復雜的例子介紹如何在Storm集群內進行DRPC——計算Twitter上每個URL的到達度(reach),也就是每個URL暴露給的不同人的個數。
為了完成這一計算,需要完成以下步驟:
- 獲取所有點選了(tweet)該URL的人;
- 獲取步驟1中所有人的關注者(followers,粉絲);
- 對所有關注者followers進行去重;
- 對步驟3中的關注者人數進行求和。
一個簡單的URL到達度計算可能涉及成千上萬次數據庫調用以及數以百萬的followers記錄,計算量非常大。有了Storm,將很容易實現這一計算過程。單機上可能需要運行幾分鐘才能完成,在Storm集群上,即使是最難計算的URL也只需要幾秒鐘。
這個例子的代碼在storm-starter:點擊這里。這里是如何創建拓撲的代碼:
LinearDRPCTopologyBuilder builder = new LinearDRPCTopologyBuilder("reach"); builder.addBolt(new GetTweeters(), 3); builder.addBolt(new GetFollowers(), 12) .shuffleGrouping(); builder.addBolt(new PartialUniquer(), 6) .fieldsGrouping(new Fields("id", "follower")); builder.addBolt(new CountAggregator(), 2) .fieldsGrouping(new Fields("id"));
拓撲的執行分為以下四步:
- GetTweeters:獲取所有tweet了指定URL的用戶列表,這個Bolt將輸入流[id, url]轉換成輸出流[id, tweeter],每個url元組被映射為多個tweeter元組。
- GetFollowers:獲取步驟1中所有用戶列表的followers,這個Bolt將輸入流[id, twetter]轉換成輸出流[id, follower],當某個人同時是多個人的關注者follower,而且這些人都tweet了指定的URL,那么將產生重復的follower元組。
- PartialUniquer:將所有followers按照follower id分組,使得同一個follower在同一個task中被處理。這個Bolt接收follower并進行去重計數。
- CountAggregator:從各個PartialUniquer中接收各部分的計數結果,累加后完成到達度計算。
下面是PartialUniquer這個Bolt的代碼實現:
public class PartialUniquer extends BaseBatchBolt { BatchOutputCollector _collector; Object _id; Set<String> _followers = new HashSet<String>(); @Override public void prepare(Map conf, TopologyContext context, BatchOutputCollector collector, Object id) { _collector = collector; _id = id; } @Override public void execute(Tuple tuple) { _followers.add(tuple.getString(1)); } @Override public void finishBatch() { _collector.emit(new Values(_id, _followers.size())); } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("id", "partial-count")); } }
PartialUniquer通過繼承BaseBatchBolt實現了IBatchBolt接口,batch bolt提供了API用于將一批tuples作為整體來處理。每個請求id會創建一個新的batch bolt實例,同時Storm負責這些實例的清理工作。
當PartialUniquer接收到一個follower元組時執行execute方法,將follower添加到請求id對應的HashSet集合中。
Batch bolt同時提供了finishBatch方法用于當這個task已經處理完所有的元組時調用。PartialUniquer發射一個包含當前task所處理的follower ids子集去重后個數的元組。
在內部實現上,CoordinatedBolt用于檢測指定的bolt是否已經收到指定請求id的所有tuples元組。CoordinatedBolt使用direct streams管理實現這一協作過程。
拓撲的其他部分易于理解。到達度的每一步的計算過程都是并行進行的,通過DRPC實現也是非常容易的。
Non-linear DRPC拓撲
LinearDRPCTopologyBuilder只能處理“線性的”DRPC拓撲——正如到達度這樣可以通過一系列步驟序列來完成的計算。不難想象,DRPC調用中包含有更復雜的帶有分支和合并Bolt的拓撲。目前,必須自己直接使用CoordinatedBolt來完成這種非線性拓撲的計算。
LinearDRPCTopologyBuilder工作過程
- DRPCSpout發射[args, return-info],其中return-info包含DRPC Server的主機和端口號,以及DRPC Server為該次請求生成的唯一id號;
- 構造一個Storm拓撲包含以下部分:
- DRPCSpout
- PrepareRequest(生成一個請求id,為return info創建一個流,為args創建一個流)
- CoordinatedBolt wrappers以及direct groupings
- JoinResult(將結果與return info拼接起來)
- ReturnResult(連接到DRPC Server,返回結果)
- LinearDRPCTopologyBuilder是建立在Storm基本元素之上的高層抽象。
高級進階
- KeyedFairBolt用于組織同一時刻多請求的處理過程;
- 如何直接使用
CoordinatedBolt。
浙公網安備 33010602011771號