<output id="qn6qe"></output>

    1. <output id="qn6qe"><tt id="qn6qe"></tt></output>
    2. <strike id="qn6qe"></strike>

      亚洲 日本 欧洲 欧美 视频,日韩中文字幕有码av,一本一道av中文字幕无码,国产线播放免费人成视频播放,人妻少妇偷人无码视频,日夜啪啪一区二区三区,国产尤物精品自在拍视频首页,久热这里只有精品12

      Storm常見模式——批處理

      Storm對流數據進行實時處理時,一種常見場景是批量一起處理一定數量的tuple元組,而不是每接收一個tuple就立刻處理一個tuple,這樣可能是性能的考慮,或者是具體業務的需要。

      例如,批量查詢或者更新數據庫,如果每一條tuple生成一條sql執行一次數據庫操作,數據量大的時候,效率會比批量處理的低很多,影響系統吞吐量。

      當然,如果要使用Storm的可靠數據處理機制的話,應該使用容器將這些tuple的引用緩存到內存中,直到批量處理的時候,ack這些tuple。

      下面給出一個簡單的代碼示例:

      現在,假設我們已經有了一個DBManager數據庫操作接口類,它至少有兩個接口:

      (1)getConnection(): 返回一個java.sql.Connection對象;

      (2)getSQL(Tuple tuple): 根據tuple元組生成數據庫操作語句。

      為了在Bolt中緩存一定數量的tuple,構造Bolt時傳遞int n參數賦給Bolt的成員變量int count,指定每個n條tuple批量處理一次。

      同時,為了在內存中緩存緩存Tuple,使用java concurrent中的ConcurrentLinkedQueue來存儲tuple,每當攢夠count條tuple,就觸發批量處理。

      另外,考慮到數據量小(如很長時間內都沒有攢夠count條tuple)或者count條數設置過大時,因此,Bolt中加入了一個定時器,保證最多每個1秒鐘進行一次批量處理tuple。

      下面是Bolt的完整代碼(僅供參考):

      import java.util.Map;
      import java.util.Queue;
      import java.util.concurrent.ConcurrentLinkedQueue;
      import java.sql.Connection;
      import java.sql.SQLException;
      import java.sql.Statement;
      
      import backtype.storm.task.OutputCollector;
      import backtype.storm.task.TopologyContext;
      import backtype.storm.topology.IRichBolt;
      import backtype.storm.topology.OutputFieldsDeclarer;
      import backtype.storm.tuple.Tuple;
      
      public class BatchingBolt implements IRichBolt {
          private static final long serialVersionUID = 1L;
          private OutputCollector collector;
          private Queue<Tuple> tupleQueue = new ConcurrentLinkedQueue<Tuple>();
          private int count;
          private long lastTime;
          private Connection conn;
      
          public BatchingBolt(int n) {
              count = n; //批量處理的Tuple記錄條數
              conn = DBManger.getConnection(); //通過DBManager獲取數據庫連接
              lastTime = System.currentTimeMillis(); //上次批量處理的時間戳
          }
      
          @Override
          public void prepare(Map stormConf, TopologyContext context,
                  OutputCollector collector) {
              this.collector = collector;
          }
      
          @Override
          public void execute(Tuple tuple) {
              tupleQueue.add(tuple);
              long currentTime = System.currentTimeMillis();
              // 每count條tuple批量提交一次,或者每個1秒鐘提交一次
              if (tupleQueue.size() >= count || currentTime >= lastTime + 1000) {
                  Statement stmt = conn.createStatement();
                  conn.setAutoCommit(false);
                  for (int i = 0; i < count; i++) {
                      Tuple tup = (Tuple) tupleQueue.poll();
                      String sql = DBManager.getSQL(tup); //生成sql語句
                      stmt.addBatch(sql); //加入sql
                      collector.ack(tup); //進行ack
                  }
                  stmt.executeBatch(); //批量提交sql
                  conn.commit();
                  conn.setAutoCommit(true);
                  System.out.println("batch insert data into database, total records: " + count);
                  lastTime = currentTime;
              }
          }
      
          @Override
          public void cleanup() {
          }
      
          @Override
          public void declareOutputFields(OutputFieldsDeclarer declarer) {
          }
      
          @Override
          public Map<String, Object> getComponentConfiguration() {
              // TODO Auto-generated method stub
              return null;
          }
      }

      posted on 2012-06-19 18:30  大圓那些事  閱讀(12859)  評論(5)    收藏  舉報

      導航

      主站蜘蛛池模板: 久久碰国产一区二区三区| 欧美综合婷婷欧美综合五月 | 日韩熟女精品一区二区三区| 饥渴的熟妇张开腿呻吟视频| 亚洲国产成人久久一区久久 | 中国亚洲女人69内射少妇| av亚洲一区二区在线| 午夜射精日本三级| av日韩在线一区二区三区| 深夜精品免费在线观看| 国产第一页浮力影院入口| 波多野结衣久久一区二区| 国产高潮视频在线观看| 呦系列视频一区二区三区| 久久久久99精品成人片| 久久国产精品老女人| 亚洲精品第一区二区三区| 在国产线视频A在线视频| 国内揄拍国内精品少妇| 尤物国产精品福利在线网| 国产精品免费看久久久| 肃南| 亚洲人成绝网站色www| 中文无码乱人伦中文视频在线| 成人麻豆日韩在无码视频| 四房播色综合久久婷婷| 国产三级无码内射在线看| 一级片一区二区中文字幕| 94人妻少妇偷人精品| 亚洲国产精品人人做人人爱| 国产在热线精品视频99公交| 看亚洲黄色不在线网占| 中文字幕精品无码一区二区| 宿松县| 国产中文字幕精品在线| 国产一区国产精品自拍| 日本一道一区二区视频| 精品亚洲国产成人av| 产综合无码一区| 亚洲a片无码一区二区蜜桃| 中文字幕乱码一区二区免费|