<output id="qn6qe"></output>

    1. <output id="qn6qe"><tt id="qn6qe"></tt></output>
    2. <strike id="qn6qe"></strike>

      亚洲 日本 欧洲 欧美 视频,日韩中文字幕有码av,一本一道av中文字幕无码,国产线播放免费人成视频播放,人妻少妇偷人无码视频,日夜啪啪一区二区三区,国产尤物精品自在拍视频首页,久热这里只有精品12

      HBase性能優化方法總結(三):讀表操作

      本文主要是從HBase應用程序設計與開發的角度,總結幾種常用的性能優化方法。有關HBase系統配置級別的優化,可參考:淘寶Ken Wu同學的博客

      下面是本文總結的第三部分內容:讀表操作相關的優化方法。

      3. 讀表操作

      3.1 多HTable并發讀

      創建多個HTable客戶端用于讀操作,提高讀數據的吞吐量,一個例子:

      static final Configuration conf = HBaseConfiguration.create();
      static final String table_log_name = “user_log”;
      rTableLog = new HTable[tableN];
      for (int i = 0; i < tableN; i++) {
      rTableLog[i] = new HTable(conf, table_log_name);
      rTableLog[i].setScannerCaching(50);
      }

      3.2 HTable參數設置

      3.2.1 Scanner Caching

      hbase.client.scanner.caching配置項可以設置HBase scanner一次從服務端抓取的數據條數,默認情況下一次一條。通過將其設置成一個合理的值,可以減少scan過程中next()的時間開銷,代價是scanner需要通過客戶端的內存來維持這些被cache的行記錄。

      有三個地方可以進行配置:1)在HBase的conf配置文件中進行配置;2)通過調用HTable.setScannerCaching(int scannerCaching)進行配置;3)通過調用Scan.setCaching(int caching)進行配置。三者的優先級越來越高。

      3.2.2 Scan Attribute Selection

      scan時指定需要的Column Family,可以減少網絡傳輸數據量,否則默認scan操作會返回整行所有Column Family的數據。

      3.2.3 Close ResultScanner

      通過scan取完數據后,記得要關閉ResultScanner,否則RegionServer可能會出現問題(對應的Server資源無法釋放)。

      3.3 批量讀

      通過調用HTable.get(Get)方法可以根據一個指定的row key獲取一行記錄,同樣HBase提供了另一個方法:通過調用HTable.get(List<Get>)方法可以根據一個指定的row key列表,批量獲取多行記錄,這樣做的好處是批量執行,只需要一次網絡I/O開銷,這對于對數據實時性要求高而且網絡傳輸RTT高的情景下可能帶來明顯的性能提升。

      3.4 多線程并發讀

      在客戶端開啟多個HTable讀線程,每個讀線程負責通過HTable對象進行get操作。下面是一個多線程并發讀取HBase,獲取店鋪一天內各分鐘PV值的例子:

      public class DataReaderServer {
      //獲取店鋪一天內各分鐘PV值的入口函數
      public static ConcurrentHashMap<String, String> getUnitMinutePV(long uid, long startStamp, long endStamp){
      long min = startStamp;
      int count = (int)((endStamp - startStamp) / (60*1000));
      List<String> lst = new ArrayList<String>();
      for (int i = 0; i <= count; i++) {
      min = startStamp + i * 60 * 1000;
      lst.add(uid + "_" + min);
      }
      return parallelBatchMinutePV(lst);
      }
      //多線程并發查詢,獲取分鐘PV值
      private static ConcurrentHashMap<String, String> parallelBatchMinutePV(List<String> lstKeys){
      ConcurrentHashMap<String, String> hashRet = new ConcurrentHashMap<String, String>();
      int parallel = 3;
      List<List<String>> lstBatchKeys = null;
      if (lstKeys.size() < parallel ){
      lstBatchKeys = new ArrayList<List<String>>(1);
      lstBatchKeys.add(lstKeys);
      }
      else{
      lstBatchKeys = new ArrayList<List<String>>(parallel);
      for(int i = 0; i < parallel; i++ ){
      List<String> lst = new ArrayList<String>();
      lstBatchKeys.add(lst);
      }

      for(int i = 0 ; i < lstKeys.size() ; i ++ ){
      lstBatchKeys.get(i%parallel).add(lstKeys.get(i));
      }
      }

      List<Future< ConcurrentHashMap<String, String> >> futures = new ArrayList<Future< ConcurrentHashMap<String, String> >>(5);

      ThreadFactoryBuilder builder = new ThreadFactoryBuilder();
      builder.setNameFormat("ParallelBatchQuery");
      ThreadFactory factory = builder.build();
      ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(lstBatchKeys.size(), factory);

      for(List<String> keys : lstBatchKeys){
      Callable< ConcurrentHashMap<String, String> > callable = new BatchMinutePVCallable(keys);
      FutureTask< ConcurrentHashMap<String, String> > future = (FutureTask< ConcurrentHashMap<String, String> >) executor.submit(callable);
      futures.add(future);
      }
      executor.shutdown();

      // Wait for all the tasks to finish
      try {
      boolean stillRunning = !executor.awaitTermination(
      5000000, TimeUnit.MILLISECONDS);
      if (stillRunning) {
      try {
      executor.shutdownNow();
      } catch (Exception e) {
      // TODO Auto-generated catch block
      e.printStackTrace();
      }
      }
      } catch (InterruptedException e) {
      try {
      Thread.currentThread().interrupt();
      } catch (Exception e1) {
      // TODO Auto-generated catch block
      e1.printStackTrace();
      }
      }

      // Look for any exception
      for (Future f : futures) {
      try {
      if(f.get() != null)
      {
      hashRet.putAll((ConcurrentHashMap<String, String>)f.get());
      }
      } catch (InterruptedException e) {
      try {
      Thread.currentThread().interrupt();
      } catch (Exception e1) {
      // TODO Auto-generated catch block
      e1.printStackTrace();
      }
      } catch (ExecutionException e) {
      e.printStackTrace();
      }
      }

      return hashRet;
      }
      //一個線程批量查詢,獲取分鐘PV值
      protected static ConcurrentHashMap<String, String> getBatchMinutePV(List<String> lstKeys){
      ConcurrentHashMap<String, String> hashRet = null;
      List<Get> lstGet = new ArrayList<Get>();
      String[] splitValue = null;
      for (String s : lstKeys) {
      splitValue = s.split("_");
      long uid = Long.parseLong(splitValue[0]);
      long min = Long.parseLong(splitValue[1]);
      byte[] key = new byte[16];
      Bytes.putLong(key, 0, uid);
      Bytes.putLong(key, 8, min);
      Get g = new Get(key);
      g.addFamily(fp);
      lstGet.add(g);
      }
      Result[] res = null;
      try {
      res = tableMinutePV[rand.nextInt(tableN)].get(lstGet);
      } catch (IOException e1) {
      logger.error("tableMinutePV exception, e=" + e1.getStackTrace());
      }

      if (res != null && res.length > 0) {
      hashRet = new ConcurrentHashMap<String, String>(res.length);
      for (Result re : res) {
      if (re != null && !re.isEmpty()) {
      try {
      byte[] key = re.getRow();
      byte[] value = re.getValue(fp, cp);
      if (key != null && value != null) {
      hashRet.put(String.valueOf(Bytes.toLong(key,
      Bytes.SIZEOF_LONG)), String.valueOf(Bytes
      .toLong(value)));
      }
      } catch (Exception e2) {
      logger.error(e2.getStackTrace());
      }
      }
      }
      }

      return hashRet;
      }
      }
      //調用接口類,實現Callable接口
      class BatchMinutePVCallable implements Callable<ConcurrentHashMap<String, String>>{
      private List<String> keys;

      public BatchMinutePVCallable(List<String> lstKeys ) {
      this.keys = lstKeys;
      }

      public ConcurrentHashMap<String, String> call() throws Exception {
      return DataReadServer.getBatchMinutePV(keys);
      }
      }

      3.5 緩存查詢結果

      對于頻繁查詢HBase的應用場景,可以考慮在應用程序中做緩存,當有新的查詢請求時,首先在緩存中查找,如果存在則直接返回,不再查詢HBase;否則對HBase發起讀請求查詢,然后在應用程序中將查詢結果緩存起來。至于緩存的替換策略,可以考慮LRU等常用的策略。

      3.6 Blockcache

      HBase上Regionserver的內存分為兩個部分,一部分作為Memstore,主要用來寫;另外一部分作為BlockCache,主要用于讀。

      寫請求會先寫入Memstore,Regionserver會給每個region提供一個Memstore,當Memstore滿64MB以后,會啟動 flush刷新到磁盤。當Memstore的總大小超過限制時(heapsize * hbase.regionserver.global.memstore.upperLimit * 0.9),會強行啟動flush進程,從最大的Memstore開始flush直到低于限制。

      讀請求先到Memstore中查數據,查不到就到BlockCache中查,再查不到就會到磁盤上讀,并把讀的結果放入BlockCache。由于BlockCache采用的是LRU策略,因此BlockCache達到上限(heapsize * hfile.block.cache.size * 0.85)后,會啟動淘汰機制,淘汰掉最老的一批數據。

      一個Regionserver上有一個BlockCache和N個Memstore,它們的大小之和不能大于等于heapsize * 0.8,否則HBase不能啟動。默認BlockCache為0.2,而Memstore為0.4。對于注重讀響應時間的系統,可以將 BlockCache設大些,比如設置BlockCache=0.4Memstore=0.39,以加大緩存的命中率。

      有關BlockCache機制,請參考這里:HBase的Block cacheHBase的blockcache機制hbase中的緩存的計算與使用

      posted on 2012-03-08 15:22  大圓那些事  閱讀(31431)  評論(1)    收藏  舉報

      導航

      主站蜘蛛池模板: 日韩精品毛片无码一区到三区| 欧美成人aaa片一区国产精品| 国产极品美女网站在线观看| 国产伦精品一区二区亚洲| 成 人 色 网 站免费观看| 午夜国产理论大片高清| 亚洲精品日韩中文字幕| 亚洲人成色99999在线观看| av中文无码韩国亚洲色偷偷| 一区二区三区国产偷拍| 日本一区二区三区四区黄色| 亚洲天堂av日韩精品| 亚洲一本大道在线| 亚洲色欲在线播放一区| 黑人巨大亚洲一区二区久| 国产精品久久精品| 精品无人区一区二区三区在线| 久久综合久色欧美综合狠狠| 国产一区二区不卡在线| 亚洲精品午夜精品| 亚洲精品天堂一区二区| 在线视频精品中文无码| 粉嫩国产av一区二区三区| 国产精品一区免费在线看| 精品国产中文字幕av| 亚洲欧美偷国产日韩| 91精品国产一二三产区| 欧美黑人XXXX性高清版| 欧美交a欧美精品喷水| 最新精品国偷自产在线美女足| 激,情四虎欧美视频图片| 中文字幕日本一区二区在线观看| 高潮精品熟妇一区二区三区| 亚洲深深色噜噜狠狠网站| 91亚洲国产成人精品性色| 日韩国产亚洲一区二区三区| 精品久久久bbbb人妻| 丰满人妻被黑人猛烈进入| 国内偷自第一区二区三区| 国产亚洲国产精品二区| 欧洲免费一区二区三区视频|