<output id="qn6qe"></output>

    1. <output id="qn6qe"><tt id="qn6qe"></tt></output>
    2. <strike id="qn6qe"></strike>

      亚洲 日本 欧洲 欧美 视频,日韩中文字幕有码av,一本一道av中文字幕无码,国产线播放免费人成视频播放,人妻少妇偷人无码视频,日夜啪啪一区二区三区,国产尤物精品自在拍视频首页,久热这里只有精品12

      HBase的Block Cache實現機制分析

      本文結合HBase 0.94.1版本源碼,對HBase的Block Cache實現機制進行分析,總結學習其Cache設計的核心思想。

      1. 概述

      HBase上Regionserver的內存分為兩個部分,一部分作為Memstore,主要用來寫;另外一部分作為BlockCache,主要用于讀。

      • 寫請求會先寫入Memstore,Regionserver會給每個region提供一個Memstore,當Memstore滿64MB以后,會啟動 flush刷新到磁盤。當Memstore的總大小超過限制時(heapsize * hbase.regionserver.global.memstore.upperLimit * 0.9),會強行啟動flush進程,從最大的Memstore開始flush直到低于限制。
      • 讀請求先到Memstore中查數據,查不到就到BlockCache中查,再查不到就會到磁盤上讀,并把讀的結果放入BlockCache。由于BlockCache采用的是LRU策略,因此BlockCache達到上限(heapsize * hfile.block.cache.size * 0.85)后,會啟動淘汰機制,淘汰掉最老的一批數據。

      一個Regionserver上有一個BlockCache和N個Memstore,它們的大小之和不能大于等于heapsize * 0.8,否則HBase不能正常啟動。

      默認配置下,BlockCache為0.2,而Memstore為0.4。在注重讀響應時間的應用場景下,可以將 BlockCache設置大些,Memstore設置小些,以加大緩存的命中率。

      HBase RegionServer包含三個級別的Block優先級隊列:

      • Single:如果一個Block第一次被訪問,則放在這一優先級隊列中;
      • Multi:如果一個Block被多次訪問,則從Single隊列移到Multi隊列中;
      • InMemory:如果一個Block是inMemory的,則放到這個隊列中。

      以上將Cache分級思想的好處在于:

      • 首先,通過inMemory類型Cache,可以有選擇地將in-memory的column families放到RegionServer內存中,例如Meta元數據信息;
      • 通過區分Single和Multi類型Cache,可以防止由于Scan操作帶來的Cache頻繁顛簸,將最少使用的Block加入到淘汰算法中。

      默認配置下,對于整個BlockCache的內存,又按照以下百分比分配給Single、Multi、InMemory使用:0.25、0.50和0.25。

      注意,其中InMemory隊列用于保存HBase Meta表元數據信息,因此如果將數據量很大的用戶表設置為InMemory的話,可能會導致Meta表緩存失效,進而對整個集群的性能產生影響。

      2. 源碼分析

      下面是對HBase 0.94.1中相關源碼(org.apache.hadoop.hbase.io.hfile.LruBlockCache)的分析過程。

      2.1加入Block Cache

        /** Concurrent map (the cache) */
        private final ConcurrentHashMap<BlockCacheKey,CachedBlock> map;
      
        /**
         * Cache the block with the specified name and buffer.
         * <p>
         * It is assumed this will NEVER be called on an already cached block.  If
         * that is done, an exception will be thrown.
         * @param cacheKey block's cache key
         * @param buf block buffer
         * @param inMemory if block is in-memory
         */
        public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf, boolean inMemory) {
          CachedBlock cb = map.get(cacheKey);
          if(cb != null) {
            throw new RuntimeException("Cached an already cached block");
          }
          cb = new CachedBlock(cacheKey, buf, count.incrementAndGet(), inMemory);
          long newSize = updateSizeMetrics(cb, false);
          map.put(cacheKey, cb);
          elements.incrementAndGet();
          if(newSize > acceptableSize() && !evictionInProgress) {
            runEviction();
          }
        }
      
        /**
         * Cache the block with the specified name and buffer.
         * <p>
         * It is assumed this will NEVER be called on an already cached block.  If
         * that is done, it is assumed that you are reinserting the same exact
         * block due to a race condition and will update the buffer but not modify
         * the size of the cache.
         * @param cacheKey block's cache key
         * @param buf block buffer
         */
        public void cacheBlock(BlockCacheKey cacheKey, Cacheable buf) {
          cacheBlock(cacheKey, buf, false);
        }

      1)  這里假設不會對同一個已經被緩存的BlockCacheKey重復放入cache操作;

      2)  根據inMemory標志創建不同類別的CachedBlock對象:若inMemory為true則創建BlockPriority.MEMORY類型,否則創建BlockPriority.SINGLE;注意,這里只有這兩種類型的Cache,因為BlockPriority.MULTI在Cache Block被重復訪問時才進行創建,見CachedBlock的access方法代碼:

        /**
         * Block has been accessed.  Update its local access time.
         */
        public void access(long accessTime) {
          this.accessTime = accessTime;
          if(this.priority == BlockPriority.SINGLE) {
            this.priority = BlockPriority.MULTI;
          }
        }

      3)  將BlockCacheKey和創建的CachedBlock對象加入到全局的ConcurrentHashMap map中,同時做一些更新計數操作;

      4)  最后判斷如果加入后的Block Size大于設定的臨界值且當前沒有淘汰線程運行,則調用runEviction()方法啟動LRU淘汰過程:

        /** Eviction thread */
        private final EvictionThread evictionThread;
        
        /**
         * Multi-threaded call to run the eviction process.
         */
        private void runEviction() {
          if(evictionThread == null) {
            evict();
          } else {
            evictionThread.evict();
          }
        }

      其中,EvictionThread線程即是LRU淘汰的具體實現線程。下面將給出詳細分析。

      2.2淘汰Block Cache

      EvictionThread線程主要用于與主線程的同步,從而完成Block Cache的LRU淘汰過程。

        /*
         * Eviction thread.  Sits in waiting state until an eviction is triggered
         * when the cache size grows above the acceptable level.<p>
         *
         * Thread is triggered into action by {@link LruBlockCache#runEviction()}
         */
        private static class EvictionThread extends HasThread {
          private WeakReference<LruBlockCache> cache;
          private boolean go = true;
      
          public EvictionThread(LruBlockCache cache) {
            super(Thread.currentThread().getName() + ".LruBlockCache.EvictionThread");
            setDaemon(true);
            this.cache = new WeakReference<LruBlockCache>(cache);
          }
      
          @Override
          public void run() {
            while (this.go) {
              synchronized(this) {
                try {
                  this.wait();
                } catch(InterruptedException e) {}
              }
              LruBlockCache cache = this.cache.get();
              if(cache == null) break;
              cache.evict();
            }
          }
      
          public void evict() {
            synchronized(this) {
              this.notify(); // FindBugs NN_NAKED_NOTIFY
            }
          }
      
          void shutdown() {
            this.go = false;
            interrupt();
          }
        }

      EvictionThread線程啟動后,調用wait被阻塞住,直到EvictionThread線程的evict方法被主線程調用時執行notify(見上面的代碼分析過程,通過主線程的runEviction方法觸發調用),開始執行LruBlockCache的evict方法進行真正的淘汰過程,代碼如下:

        /**
         * Eviction method.
         */
        void evict() {
      
          // Ensure only one eviction at a time
          if(!evictionLock.tryLock()) return;
      
          try {
            evictionInProgress = true;
            long currentSize = this.size.get();
            long bytesToFree = currentSize - minSize();
      
            if (LOG.isDebugEnabled()) {
              LOG.debug("Block cache LRU eviction started; Attempting to free " +
                StringUtils.byteDesc(bytesToFree) + " of total=" +
                StringUtils.byteDesc(currentSize));
            }
      
            if(bytesToFree <= 0) return;
      
            // Instantiate priority buckets
            BlockBucket bucketSingle = new BlockBucket(bytesToFree, blockSize,
                singleSize());
            BlockBucket bucketMulti = new BlockBucket(bytesToFree, blockSize,
                multiSize());
            BlockBucket bucketMemory = new BlockBucket(bytesToFree, blockSize,
                memorySize());
      
            // Scan entire map putting into appropriate buckets
            for(CachedBlock cachedBlock : map.values()) {
              switch(cachedBlock.getPriority()) {
                case SINGLE: {
                  bucketSingle.add(cachedBlock);
                  break;
                }
                case MULTI: {
                  bucketMulti.add(cachedBlock);
                  break;
                }
                case MEMORY: {
                  bucketMemory.add(cachedBlock);
                  break;
                }
              }
            }
      
            PriorityQueue<BlockBucket> bucketQueue =
              new PriorityQueue<BlockBucket>(3);
      
            bucketQueue.add(bucketSingle);
            bucketQueue.add(bucketMulti);
            bucketQueue.add(bucketMemory);
      
            int remainingBuckets = 3;
            long bytesFreed = 0;
      
            BlockBucket bucket;
            while((bucket = bucketQueue.poll()) != null) {
              long overflow = bucket.overflow();
              if(overflow > 0) {
                long bucketBytesToFree = Math.min(overflow,
                  (bytesToFree - bytesFreed) / remainingBuckets);
                bytesFreed += bucket.free(bucketBytesToFree);
              }
              remainingBuckets--;
            }
      
            if (LOG.isDebugEnabled()) {
              long single = bucketSingle.totalSize();
              long multi = bucketMulti.totalSize();
              long memory = bucketMemory.totalSize();
              LOG.debug("Block cache LRU eviction completed; " +
                "freed=" + StringUtils.byteDesc(bytesFreed) + ", " +
                "total=" + StringUtils.byteDesc(this.size.get()) + ", " +
                "single=" + StringUtils.byteDesc(single) + ", " +
                "multi=" + StringUtils.byteDesc(multi) + ", " +
                "memory=" + StringUtils.byteDesc(memory));
            }
          } finally {
            stats.evict();
            evictionInProgress = false;
            evictionLock.unlock();
          }
        }

      1)首先獲取鎖,保證同一時刻只有一個淘汰線程運行;

      2)計算得到當前Block Cache總大小currentSize及需要被淘汰釋放掉的大小bytesToFree,如果bytesToFree小于等于0則不進行后續操作;

      3) 初始化創建三個BlockBucket隊列,分別用于存放Single、Multi和InMemory類Block Cache,其中每個BlockBucket維護了一個CachedBlockQueue,按LRU淘汰算法維護該BlockBucket中的所有CachedBlock對象;

      4) 遍歷記錄所有Block Cache的全局ConcurrentHashMap,加入到相應的BlockBucket隊列中;

      5) 將以上三個BlockBucket隊列加入到一個優先級隊列中,按照各個BlockBucket超出bucketSize的大小順序排序(見BlockBucket的compareTo方法);

      6) 遍歷優先級隊列,對于每個BlockBucket,通過Math.min(overflow, (bytesToFree - bytesFreed) / remainingBuckets)計算出需要釋放的空間大小,這樣做可以保證盡可能平均地從三個BlockBucket中釋放指定的空間;具體實現過程詳見BlockBucket的free方法,從其CachedBlockQueue中取出即將被淘汰掉的CachedBlock對象:

          public long free(long toFree) {
            CachedBlock cb;
            long freedBytes = 0;
            while ((cb = queue.pollLast()) != null) {
              freedBytes += evictBlock(cb);
              if (freedBytes >= toFree) {
                return freedBytes;
              }
            }
            return freedBytes;
          }

      7) 進一步調用了LruBlockCache的evictBlock方法,從全局ConcurrentHashMap中移除該CachedBlock對象,同時更新相關計數:

        protected long evictBlock(CachedBlock block) {
          map.remove(block.getCacheKey());
          updateSizeMetrics(block, true);
          elements.decrementAndGet();
          stats.evicted();
          return block.heapSize();
        }

      8) 釋放鎖,完成善后工作。

      3. 總結

      以上關于Block Cache的實現機制,核心思想是將Cache分級,這樣的好處是避免Cache之間相互影響,尤其是對HBase來說像Meta表這樣的Cache應該保證高優先級。

      posted on 2012-09-24 18:55  大圓那些事  閱讀(14717)  評論(4)    收藏  舉報

      導航

      主站蜘蛛池模板: 色偷偷成人综合亚洲精品| 四虎成人精品在永久免费| 波多野结衣网站| 绯色蜜臀av一区二区不卡| 极品少妇无套内射视频| 日韩在线观看 一区二区| √天堂中文www官网在线| 国产尤物精品自在拍视频首页| 久久人人97超碰人人澡爱香蕉| 久久精品夜夜夜夜夜久久| 91偷自国产一区二区三区| 风韵丰满妇啪啪区老老熟女杏吧| 熟女精品视频一区二区三区| 中文字幕熟妇人妻在线视频| 日本中文字幕有码在线视频| 夜夜添无码试看一区二区三区| 激情97综合亚洲色婷婷五| 色综合色综合色综合频道| 蜜桃AV抽搐高潮一区二区| 国产伦一区二区三区精品| 人妻精品动漫H无码中字| 国产精品多p对白交换绿帽| 精品在免费线中文字幕久久| 国产黄色三级三级看三级| 精品无码久久久久久尤物| 亚洲熟妇丰满多毛xxxx| 99久久久无码国产麻豆| a∨变态另类天堂无码专区| 亚洲国产精品第一区二区| 亚洲AVAV天堂AV在线网阿V| 亚洲国产精品午夜福利| 日韩毛片在线视频x| 亚洲国产成人无码AV在线影院L | 久久夜色精品国产亚av| 亚洲综合一区二区三区| 激情在线网| 国产偷国产偷亚洲清高动态图| 国产麻豆放荡av激情演绎| 伊人精品久久久大香线蕉| 午夜激情小视频一区二区| 亚洲国产av无码综合原创国产 |