Storm常見(jiàn)模式——BasicBolt
Storm中的很多Bolt都有一個(gè)最常見(jiàn)的處理步驟:
- 讀入一個(gè)tuple;
- 根據(jù)這個(gè)輸入tuple,提取后發(fā)射0個(gè),1個(gè)或多個(gè)tuple;
- 最后,通過(guò)ack操作確認(rèn)這個(gè)tuple被成功處理。
按照上述處理步驟,依次處理發(fā)向這個(gè)Bolt的各個(gè)tuple元組。
這種模式可以實(shí)現(xiàn)像ETL這類的簡(jiǎn)單函數(shù)或過(guò)濾器功能,Storm中專門為這種模式封裝了相應(yīng)接口:IBasicBolt。BaseBasicBolt等類實(shí)現(xiàn)了這一接口。
下面是以BaseBasicBolt為基礎(chǔ),按照上述模式實(shí)現(xiàn)詞頻統(tǒng)計(jì)的Bolt(代碼參考鏈接:storm-starter):
public static class WordCount extends BaseBasicBolt { //記錄每個(gè)單詞及單詞出現(xiàn)的次數(shù) Map<String, Integer> counts = new HashMap<String, Integer>(); @Override public void execute(Tuple tuple, BasicOutputCollector collector) { String word = tuple.getString(0); Integer count = counts.get(word); //提取單詞出現(xiàn)次數(shù) if(count==null) count = 0; count++; counts.put(word, count); //更新單詞出現(xiàn)次數(shù) collector.emit(new Values(word, count)); //發(fā)射統(tǒng)計(jì)結(jié)果 } @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { declarer.declare(new Fields("word", "count")); } }
posted on 2012-06-19 19:56 大圓那些事 閱讀(6522) 評(píng)論(2) 收藏 舉報(bào)
浙公網(wǎng)安備 33010602011771號(hào)