<output id="qn6qe"></output>

    1. <output id="qn6qe"><tt id="qn6qe"></tt></output>
    2. <strike id="qn6qe"></strike>

      亚洲 日本 欧洲 欧美 视频,日韩中文字幕有码av,一本一道av中文字幕无码,国产线播放免费人成视频播放,人妻少妇偷人无码视频,日夜啪啪一区二区三区,国产尤物精品自在拍视频首页,久热这里只有精品12

      從零開始實現簡易版Netty(九) MyNetty 實現池化內存的線程本地緩存

      從零開始實現簡易版Netty(九) MyNetty 實現池化內存的線程本地緩存

      1. Netty 池化內存線程本地緩存介紹

      在上一篇博客中,截止lab8版本MyNetty已經實現了Normal和Small規格的池化內存分配。按照計劃,在lab9中MyNetty將實現池化內存的線程本地緩存功能,以完成池化內存功能的最后一塊拼圖。
      由于本文屬于系列博客,讀者需要對之前的博客內容有所了解才能更好地理解本文內容。

      在lab7、lab8的實現中可以發現,出于空間利用率的考慮,一個PoolArena會同時被多個線程并發訪問。因此無論是Normal還是Small規格的池化內存分配,Netty在進行實際的池化內存分配時都或多或少的需要使用互斥鎖來確保用于追蹤池化內存狀態的元數據PoolArena、PoolSubPage等不會被并發的更新而出現問題。
      jemalloc的論文中提到,內存分配作為一個高頻的操作需要盡可能的減少線程的同步競爭以提高效率,大量線程都阻塞在同步鎖上會大大降低內存分配的整體吞吐率,通過引入線程本地緩存可以顯著減少同步競爭的頻率。
      "The main goal of thread caches is to reduce the volume of synchronization events."
      "Minimize lock contention. jemalloc's independent arenas were inspired by lkmalloc, but as time went on, tcmalloc made it abundantly clear that it's even better to avoid synchronization altogether, so jemalloc also implements thread-specific caching."

      引入線程本地緩存后,當前線程在釋放池化內存時,不會直接將空閑的池化內存對象還給公共的PoolArena中,而是優先嘗試放入獨屬于本線程的本地緩存中。同時,在嘗試申請池化內存分配時,也會優先查詢線程本地緩存中是否存在對應規格的可用池化內存段,如果有則直接使用,而無需通過公共的PoolArena獲取。
      有了線程本地緩存,線程在絕大多數情況下都只和獨屬于自己的本地緩存進行交互,因此能夠大幅減少與其它線程爭搶公共PoolArena元數據互斥鎖的場景并提高所訪問內存空間的緩存局部性,從而大幅提升內存分配的吞吐量。
      當然,線程本地緩存也不是沒有缺點的,線程本地緩存毫無疑問增加了內存的開銷,規格繁多的本地池化內存段對象多數時候都只會靜靜地在緩存中等待被使用(視為內部碎片),因此線程本地所能緩存的池化內存段數量是被嚴格限制的,使用者需要在池化內存分配效率與空間利用率的取舍上達成平衡。具體的實現細節,我們在下文中結合源碼再展開介紹。

      2. MyNetty 池化內存線程本地緩存源碼實現

      在jemalloc的論文中提到,為了減少線程之間對Arena的爭搶,jemalloc設置了多個Arena區域,并使用特別的算法使得每個Arena盡可能的被線程均勻的使用。Arena與線程是一對多的關系,而一個線程在進行池化內存分配前選擇并永久綁定一個Arena。

      2.1 PoolThreadLocalCache實現解析

      在Netty中,參考jemalloc也同樣是設置多個PoolArena,并令一個線程在進行最初的池化內存分配之前綁定一個PoolArena。
      具體的邏輯在PooledByteBufAllocator中,PooledByteBufAllocator中為基于堆內存的HeapByteBuffer和基于堆外直接內存的DirectByteBuffer以數組的形式分別維護了N個PoolArena(heapArenas、directArenas)。
      具體N為多少可以在allocator分配器的構造方法中通過參數設置,默認情況下其值取決與處理器的數量和內存大小。而具體的當前線程與其中某一個PoolArena進行綁定的邏輯則位于PoolThreadLocalCache這一核心數據結構之中。

      MyNetty PooledByteBufAllocator實現源碼
      public class MyPooledByteBufAllocator extends MyAbstractByteBufAllocator{
      
          private final MyPoolArena<byte[]>[] heapArenas;
      
          private final MyPoolThreadLocalCache threadLocalCache;
      
          public MyPooledByteBufAllocator() {
              this(false);
          }
      
          public MyPooledByteBufAllocator(boolean useCacheForAllThreads) {
              // 簡單起見,arena的數量與處理器核數掛鉤(netty中有更復雜的方式去配置,既可以構造參數傳參設置,也可以配置系統參數來控制默認值)
              int arenasNum = Runtime.getRuntime().availableProcessors() * 2;
      
              // 初始化好heapArena數組
              heapArenas = new MyPoolArena.HeapArena[arenasNum];
              for (int i = 0; i < heapArenas.length; i ++) {
                  MyPoolArena.HeapArena arena = new MyPoolArena.HeapArena(this);
                  heapArenas[i] = arena;
              }
      
              // 創建threadLocalCache,讓線程綁定到唯一的PoolArena中,并且在small/normal分配時,啟用相關的內存塊緩存
              this.threadLocalCache = new MyPoolThreadLocalCache(useCacheForAllThreads,this);
          }
      
          @Override
          protected MyByteBuf newHeapBuffer(int initialCapacity, int maxCapacity) {
              // 從ThreadLocal中獲得當前線程所綁定的PoolArena(如果是線程第一次分配,則ThreadLocal初始值獲取時會進行綁定)
              MyPoolThreadCache cache = threadLocalCache.get();
              MyPoolArena<byte[]> targetArena = cache.heapArena;
              return targetArena.allocate(cache, initialCapacity, maxCapacity);
          }
      }
      
      MyNetty PoolThreadLocalCache實現源碼
      public class MyPoolThreadLocalCache extends MyFastThreadLocal<MyPoolThreadCache> {
      
          private final boolean useCacheForAllThreads;
      
          private static final int DEFAULT_SMALL_CACHE_SIZE = 256;
          private static final int DEFAULT_NORMAL_CACHE_SIZE = 32;
      
          private static final int DEFAULT_MAX_CACHED_BUFFER_CAPACITY = 32 * 1024;
          private static final int DEFAULT_CACHE_TRIM_INTERVAL = 8192;
      
          private MyPooledByteBufAllocator myPooledByteBufAllocator;
      
          private ScheduledExecutorService scheduledExecutorService = new ScheduledThreadPoolExecutor(1);
      
          private final Runnable trimTask = new Runnable() {
              @Override
              public void run() {
                  myPooledByteBufAllocator.trimCurrentThreadCache();
              }
          };
      
          MyPoolThreadLocalCache(boolean useCacheForAllThreads,
                                 MyPooledByteBufAllocator myPooledByteBufAllocator) {
              this.useCacheForAllThreads = useCacheForAllThreads;
              this.myPooledByteBufAllocator = myPooledByteBufAllocator;
          }
      
          @Override
          protected synchronized MyPoolThreadCache initialValue() {
              // 從allocator所包含的HeapArena中挑選出一個最合適的HeapArena與當前線程綁定
              // 什么是最合適的?就是被其它線程綁定次數最少的(最少被使用 leastUsed),也就是相對最空閑的PoolArena
              final MyPoolArena<byte[]> heapArena = leastUsedArena(myPooledByteBufAllocator.getHeapArenas());
      
              final Thread current = Thread.currentThread();
      
              // 如果沒有配置useCacheForAllThreads=true,則只有FastThreadLocalThread等特殊場景才啟用PoolThreadCache緩存功能
              if (useCacheForAllThreads ||
                  // If the current thread is a FastThreadLocalThread we will always use the cache
                  current instanceof MyFastThreadLocalThread) {
                  final MyPoolThreadCache cache = new MyPoolThreadCache(
                      heapArena, DEFAULT_SMALL_CACHE_SIZE, DEFAULT_NORMAL_CACHE_SIZE,
                      DEFAULT_MAX_CACHED_BUFFER_CAPACITY, DEFAULT_CACHE_TRIM_INTERVAL);
      
      
                  return cache;
              }else{
                  // No caching so just use 0 as sizes.
                  // 不啟用緩存,則返回一個smallCacheSize/normalCacheSize都為0的特殊PoolThreadCache
                  // 但是挑選一個heapArena和當前線程綁定的邏輯依然是存在的,只是沒有small/normal具體分配時的線程本地緩存
                  return new MyPoolThreadCache(heapArena, 0, 0, 0, 0);
              }
          }
      
          @Override
          protected void onRemoval(MyPoolThreadCache threadCache) {
              threadCache.free(false);
          }
      
          private <T> MyPoolArena<T> leastUsedArena(MyPoolArena<T>[] arenas) {
              if (arenas == null || arenas.length == 0) {
                  return null;
              }
      
              MyPoolArena<T> minArena = arenas[0];
      
              // optimized
              // If it is the first execution, directly return minarena and reduce the number of for loop comparisons below
              if (minArena.numThreadCaches.get() == 0) {
                  // 當前Allocator第一次分配PoolArena,快速返回第一個即可
                  return minArena;
              }
      
              // 否則從所有的PoolArena中找到相對來說被最少得線程綁定的那個PoolArena
              for (int i = 1; i < arenas.length; i++) {
                  MyPoolArena<T> arena = arenas[i];
                  if (arena.numThreadCaches.get() < minArena.numThreadCaches.get()) {
                      minArena = arena;
                  }
              }
      
              return minArena;
          }
      }
      
      • MyNetty由于只支持了堆內存的池化內存分配,因此只有heapArenas數組。同時也簡化了設置Arenas個數的邏輯。
      • PoolThreadLocalCache是一個FastThreadLocal類型的數據結構。每一個線程都有一個自己所獨有的PoolThreadCache,這個結構就是用于存放PooledByteBuf池化內存對象的線程本地緩存。
        initialValue方法中可以看到,Netty并不會無腦的為所有線程都開啟線程本地緩存,默認情況下useCacheForAllThreads為false,則只會為FastThreadLocalThread類型的線程設置線程本地緩存。
        對于普通線程,則返回了一個參數均為0的,本質上是無緩存作用的PoolThreadCache對象(PoolThreadCache內部的工作原理我們在下一小節展開)。
      • 在初始化PoolThreadCache時,通過leastUsedArena方法找到當前綁定線程最少的PoolArena與線程專屬的PoolThreadCache緩存進行關聯。
        通過這一方式,實現了上述jemalloc中提到的將線程與PoolArena進行綁定的功能,并盡可能的使每個PoolArena負載平衡。

      2.2 PoolThreadCache實現解析

      • 在PoolThreadCache中,類似Small規格的池化內存分配,為每一種特定的規格都維護了一個對象池。
        對象池是MyMemoryRegionCache結構,small規格的對象池是smallSubPageHeapCaches數組,normal規格的對象池是normalHeapCaches數組。
      • 默認情況下,所有的small規格都會進行緩存;而normal規格中只有32kb的這一最小規格才會被緩存,更大的規格將不會進行線程本地緩存。
        這樣的設計出于兩方面的考慮,首先是每個線程都要維護線程本地緩存,緩存的池化內存段會占用大量的內存空間,所要緩存的規格越多,則內存碎片越多,空間利用率越低。
        其次,絕大多數情況下越大規格的內存申請的頻率越低,進行線程本地緩存所帶來的吞吐量的提升越小?;谶@兩點,netty將最大的本地緩存規格設置為了32kb。
        當然,如果應用的開發者的實際場景中就是有大量的大規格池化內存的分配需求,netty也允許使用對應的參數來控制實際需要進行線程本地緩存的最大規格。
      • MyMemoryRegionCache中都維護了一個隊列存放所緩存的池化內存段對象(掛載在Entry節點上,handle連續內存段);與PooledByteBuf對象池的設計一樣,該隊列也是專門針對多寫單讀的并發場景優化的MpscQueue。
        因為從線程本地緩存中獲取池化內存段的只會是持有者線程,而歸還時則可能在經過多次傳遞后,由其它線程進行歸還而寫回隊列。
      PoolThreadCache結構示意圖

      img

      MyNetty PoolThreadCache實現源碼
      /**
       * 池化內存分配線程緩存,完全參考Netty的PoolThreadCache
       * */
      public class MyPoolThreadCache {
      
          final MyPoolArena<byte[]> heapArena;
      
          private final int freeSweepAllocationThreshold;
      
          // Hold the caches for the different size classes, which are small and normal.
          private final MyMemoryRegionCache<byte[]>[] smallSubPageHeapCaches;
          private final MyMemoryRegionCache<byte[]>[] normalHeapCaches;
      
          private int allocations;
      
          private final AtomicBoolean freed = new AtomicBoolean();
          
          MyPoolThreadCache(MyPoolArena<byte[]> heapArena,
                          int smallCacheSize, int normalCacheSize, int maxCachedBufferCapacity,
                          int freeSweepAllocationThreshold) {
              this.freeSweepAllocationThreshold = freeSweepAllocationThreshold;
              this.heapArena = heapArena;
      
              if (heapArena != null) {
                  // 為每一種Small類型的size,都創建一個SubPageMemoryRegionCache來做緩存
                  smallSubPageHeapCaches = createSubPageCaches(
                      smallCacheSize, heapArena.numSmallSubpagePools);
      
                  // 為每一種Normal類型的size,都創建一個NormalMemoryRegionCache來做緩存
                  normalHeapCaches = createNormalCaches(
                      normalCacheSize, maxCachedBufferCapacity, heapArena);
      
                  // 當前Arena所綁定的ThreadCache數量加1
                  heapArena.numThreadCaches.getAndIncrement();
              } else {
                  // No heapArea is configured so just null out all caches
                  smallSubPageHeapCaches = null;
                  normalHeapCaches = null;
              }
      
              // Only check if there are caches in use.
              if ((smallSubPageHeapCaches != null || normalHeapCaches != null)
                  && freeSweepAllocationThreshold < 1) {
                  throw new IllegalArgumentException("freeSweepAllocationThreshold: " + freeSweepAllocationThreshold + " (expected: > 0)");
              }
          }
      
          private static <T> MyMemoryRegionCache<T>[] createSubPageCaches(int cacheSize, int numCaches) {
              if (cacheSize > 0 && numCaches > 0) {
                  // 為每一種Small類型的size,都創建一個SubPageMemoryRegionCache來做緩存
                  MyMemoryRegionCache<T>[] cache = new MyMemoryRegionCache[numCaches];
                  for (int i = 0; i < cache.length; i++) {
                      cache[i] = new SubPageMemoryRegionCache<T>(cacheSize);
                  }
                  return cache;
              } else {
                  return null;
              }
          }
      
          @SuppressWarnings("unchecked")
          private static <T> MyMemoryRegionCache<T>[] createNormalCaches(int cacheSize, int maxCachedBufferCapacity, MyPoolArena<T> area) {
              if (cacheSize > 0 && maxCachedBufferCapacity > 0) {
                  // 所能緩存的buf規格的最大值,由chunk和用戶參數指定的最小值決定的
                  int max = Math.min(area.mySizeClasses.getChunkSize(), maxCachedBufferCapacity);
      
                  List<MyMemoryRegionCache<T>> cache = new ArrayList<>() ;
      
                  // 為每一種Normal類型的size,都創建一個NormalMemoryRegionCache來做緩存
                  int nSizes = area.getMySizeClasses().getSmallAndNormalTotalSize();
                  for (int idx = area.numSmallSubpagePools;
                       idx < nSizes && area.getMySizeClasses().sizeIdx2size(idx).getSize() <= max ; idx++) {
                      cache.add(new NormalMemoryRegionCache<>(cacheSize));
                  }
                  return cache.toArray(new MyMemoryRegionCache[0]);
              } else {
                  return null;
              }
          }
      
          /**
           * Try to allocate a small buffer out of the cache. Returns {@code true} if successful {@code false} otherwise
           */
          boolean allocateSmall(MyPoolArena<?> area, MyPooledByteBuf<?> buf, int reqCapacity, MySizeClassesMetadataItem sizeClassesMetadataItem) {
              return allocate(cacheForSmall(area, sizeClassesMetadataItem), buf, reqCapacity);
          }
      
          /**
           * Try to allocate a normal buffer out of the cache. Returns {@code true} if successful {@code false} otherwise
           */
          boolean allocateNormal(MyPoolArena<?> area, MyPooledByteBuf<?> buf, int reqCapacity, MySizeClassesMetadataItem sizeClassesMetadataItem) {
              return allocate(cacheForNormal(area, sizeClassesMetadataItem), buf, reqCapacity);
          }
      
          @SuppressWarnings({ "unchecked", "rawtypes" })
          private boolean allocate(MyMemoryRegionCache<?> cache, MyPooledByteBuf buf, int reqCapacity) {
              if (cache == null) {
                  // no cache found so just return false here
                  return false;
              }
              boolean allocated = cache.allocate(buf, reqCapacity, this);
              if (++ allocations >= freeSweepAllocationThreshold) {
                  allocations = 0;
                  trim();
              }
              return allocated;
          }
      
          boolean add(MyPoolArena<?> area, MyPoolChunk chunk,  ByteBuffer nioBuffer, long handle, int normCapacity) {
              MySizeClassesMetadataItem mySizeClassesMetadataItem = area.getMySizeClasses().size2SizeIdx(normCapacity);
              MyMemoryRegionCache<?> cache = cache(area, mySizeClassesMetadataItem);
              if (cache == null) {
                  // 當前無法緩存
                  return false;
              }
              if (freed.get()) {
                  return false;
              }
              return cache.add(chunk, nioBuffer, handle, normCapacity);
          }
      
          /// TODO: In the future when we move to Java9+ we should use java.lang.ref.Cleaner.
          @Override
          protected void finalize() throws Throwable {
              try {
                  super.finalize();
              } finally {
                  free(true);
              }
          }
      
          /**
           *  Should be called if the Thread that uses this cache is about to exist to release resources out of the cache
           */
          void free(boolean finalizer) {
              // As free() may be called either by the finalizer or by FastThreadLocal.onRemoval(...) we need to ensure
              // we only call this one time.
              if (freed.compareAndSet(false, true)) {
                  // 當前ThreadLocal被銷毀時,會調用free方法,在free方法中需要將當前已緩存的、未實際釋放掉的內存都放回到PoolArena中
                  // 只有這樣,才能避免內存泄露
                  free(smallSubPageHeapCaches, finalizer);
                  free(normalHeapCaches, finalizer);
      
                  if (heapArena != null) {
                      heapArena.numThreadCaches.getAndDecrement();
                  }
              }
          }
      
          private static int free(MyMemoryRegionCache<?>[] caches, boolean finalizer) {
              if (caches == null) {
                  return 0;
              }
      
              int numFreed = 0;
              for (MyMemoryRegionCache<?> c: caches) {
                  numFreed += free(c, finalizer);
              }
              return numFreed;
          }
      
          private static int free(MyMemoryRegionCache<?> cache, boolean finalizer) {
              if (cache == null) {
                  return 0;
              }
              return cache.free(finalizer);
          }
      
          private MyMemoryRegionCache<?> cacheForSmall(MyPoolArena<?> area, MySizeClassesMetadataItem sizeClassesMetadataItem) {
      
              return cache(smallSubPageHeapCaches, sizeClassesMetadataItem.getSize());
          }
      
          private MyMemoryRegionCache<?> cacheForNormal(MyPoolArena<?> area, MySizeClassesMetadataItem sizeClassesMetadataItem) {
              int sizeIdx = sizeClassesMetadataItem.getTableIndex();
              // We need to substract area.numSmallSubpagePools as sizeIdx is the overall index for all sizes.
              int idx = sizeIdx - area.numSmallSubpagePools;
      
              return cache(normalHeapCaches, idx);
          }
      
          private MyMemoryRegionCache<?> cache(MyPoolArena<?> area, MySizeClassesMetadataItem sizeClassesMetadataItem) {
              switch (sizeClassesMetadataItem.getSizeClassEnum()) {
                  case NORMAL:
                      return cacheForNormal(area, sizeClassesMetadataItem);
                  case SMALL:
                      return cacheForSmall(area, sizeClassesMetadataItem);
                  default:
                      throw new Error();
              }
          }
      
          private static <T> MyMemoryRegionCache<T> cache(MyMemoryRegionCache<T>[] cache, int sizeIdx) {
              if (cache == null || sizeIdx > cache.length - 1) {
                  // 當前規格無法緩存
                  return null;
              }
              return cache[sizeIdx];
          }
      
          void trim() {
              trim(smallSubPageHeapCaches);
              trim(normalHeapCaches);
          }
      
          private static void trim(MyMemoryRegionCache<?>[] caches) {
              if (caches == null) {
                  return;
              }
              for (MyMemoryRegionCache<?> c: caches) {
                  trim(c);
              }
          }
      
          private static void trim(MyMemoryRegionCache<?> cache) {
              if (cache == null) {
                  return;
              }
              cache.trim();
          }
      
      
          /**
           * Cache used for buffers which are backed by TINY or SMALL size.
           */
          public static final class SubPageMemoryRegionCache<T> extends MyMemoryRegionCache<T> {
              SubPageMemoryRegionCache(int size) {
                  super(size, SizeClassEnum.SMALL);
              }
      
              @Override
              protected void initBuf(
                  MyPoolChunk<T> chunk, ByteBuffer nioBuffer, long handle, MyPooledByteBuf<T> buf, int reqCapacity,
                  MyPoolThreadCache threadCache) {
                  chunk.initBufWithSubpage(buf, nioBuffer, handle, reqCapacity, threadCache);
              }
          }
      
          /**
           * Cache used for buffers which are backed by NORMAL size.
           */
          public static final class NormalMemoryRegionCache<T> extends MyMemoryRegionCache<T> {
              NormalMemoryRegionCache(int size) {
                  super(size, SizeClassEnum.NORMAL);
              }
      
              @Override
              protected void initBuf(
                  MyPoolChunk<T> chunk, ByteBuffer nioBuffer, long handle, MyPooledByteBuf<T> buf, int reqCapacity,
                  MyPoolThreadCache threadCache) {
                  chunk.initBuf(buf, nioBuffer, handle, reqCapacity, threadCache);
              }
          }
      }
      
      public abstract class MyMemoryRegionCache<T> {
      
          private final int size;
          private final Queue<Entry<T>> queue;
          private final SizeClassEnum sizeClassEnum;
          private int allocations;
      
          @SuppressWarnings("rawtypes")
          private static final MyObjectPool<Entry> RECYCLER = MyObjectPool.newPool(handle -> new Entry(handle));
      
          MyMemoryRegionCache(int size, SizeClassEnum sizeClassEnum) {
              this.size = MathUtil.safeFindNextPositivePowerOfTwo(size);
              queue = new MpscUnpaddedArrayQueue<>(this.size);
              this.sizeClassEnum = sizeClassEnum;
          }
      
          /**
           * Add to cache if not already full.
           * @return true 當前線程釋放內存時,成功加入到本地線程緩存,不需要實際的回收
           *         false 當前線程釋放內存時,加入本地線程緩存失敗,需要進行實際的回收
           */
          @SuppressWarnings("unchecked")
          public final boolean add(MyPoolChunk<T> chunk, ByteBuffer nioBuffer, long handle, int normCapacity) {
              Entry<T> entry = newEntry(chunk, nioBuffer, handle, normCapacity);
              // 嘗試加入到當前Cache的隊列里
              boolean queued = queue.offer(entry);
              if (!queued) {
                  // If it was not possible to cache the chunk, immediately recycle the entry
                  entry.recycle();
              }
      
              return queued;
          }
          
          public final boolean allocate(MyPooledByteBuf<T> buf, int reqCapacity, MyPoolThreadCache threadCache) {
              Entry<T> entry = queue.poll();
              if (entry == null) {
                  return false;
              }
      
              // 之前已經緩存過了,直接把對應的內存段拿出來復用,給本次同樣規格的內存分配
              initBuf(entry.chunk, entry.nioBuffer, entry.handle, buf, reqCapacity, threadCache);
              entry.recycle();
      
              // allocations is not thread-safe which is fine as this is only called from the same thread all time.
              ++ allocations;
              return true;
          }
          
          public final int free(boolean finalizer) {
              return free(Integer.MAX_VALUE, finalizer);
          }
      
          private int free(int max, boolean finalizer) {
              int numFreed = 0;
              for (; numFreed < max; numFreed++) {
                  // 遍歷所有已緩存的entry,一個接著一個進行實際的內存釋放
                  Entry<T> entry = queue.poll();
                  if (entry != null) {
                      freeEntry(entry, finalizer);
                  } else {
                      // all cleared
                      return numFreed;
                  }
              }
              return numFreed;
          }
          
          public final void trim() {
              int free = size - allocations;
              allocations = 0;
      
              // We not even allocated all the number that are
              if (free > 0) {
                  free(free, false);
              }
          }
      
          @SuppressWarnings({ "unchecked", "rawtypes" })
          private  void freeEntry(Entry entry, boolean finalizer) {
              // Capture entry state before we recycle the entry object.
              MyPoolChunk chunk = entry.chunk;
              long handle = entry.handle;
              int normCapacity = entry.normCapacity;
      
              if (!finalizer) {
                  // recycle now so PoolChunk can be GC'ed. This will only be done if this is not freed because of
                  // a finalizer.
                  entry.recycle();
              }
      
              // 將當前entry中緩存的handle內存段進行實際的回收,放回到所屬的PoolChunk中
              chunk.arena.freeChunk(chunk, handle, normCapacity);
          }
          
          protected abstract void initBuf(MyPoolChunk<T> chunk, ByteBuffer nioBuffer, long handle,
                                          MyPooledByteBuf<T> buf, int reqCapacity, MyPoolThreadCache threadCache);
      
          @SuppressWarnings("rawtypes")
          private static Entry newEntry(MyPoolChunk<?> chunk, ByteBuffer nioBuffer, long handle, int normCapacity) {
              Entry entry = RECYCLER.get();
              entry.chunk = chunk;
              entry.nioBuffer = nioBuffer;
              entry.handle = handle;
              entry.normCapacity = normCapacity;
              return entry;
          }
      
          static final class Entry<T> {
              final MyObjectPool.Handle<Entry<?>> recyclerHandle;
              MyPoolChunk<T> chunk;
              ByteBuffer nioBuffer;
              long handle = -1;
              int normCapacity;
      
              Entry(MyObjectPool.Handle<Entry<?>> recyclerHandle) {
                  this.recyclerHandle = recyclerHandle;
              }
      
              void recycle() {
                  chunk = null;
                  nioBuffer = null;
                  handle = -1;
                  recyclerHandle.recycle(this);
              }
          }
      }
      

      2.3 引入線程本地緩存后的池化內存分配與釋放

      在實現了線程本地緩存的功能后,我們再完整的梳理一下Netty中池化內存的分配與釋放的流程。

      池化內存分配
      1. 入口為PooledByteBufAllocator,當前線程獲得獨屬于自己的線程本地緩存PoolThreadCache。
        初始化時,通過最少使用算法(leastUsedArena方法)找到目前負載最低的PoolArena與當前線程建立綁定關系。
        獲取到當前線程所綁定的PoolArena后,通過其allocate方法進行池化內存分配。
      2. PoolArena的allocate方法中,先從Recycle對象池中獲取一個未綁定底層內存的裸PooledByteBuf對象(newByteBuf方法),然后再嘗試為其分配底層內存。
        通過計算所要申請的內存規格(SizeClasses類計算規范化后的規格),判斷其是Small、Normal還是Huge規格級別,分別走不同的分配邏輯。
        分配時,優先從線程本地緩存中獲取可用的內存段。如果本地緩存中已緩存對應規格的handle內存段,則直接將其與當前PooledByteBuf進行綁定后返回,完成內存分配。
      3. 如果本地緩存無法滿足當前分配,則需要從所綁定的PoolArena中獲取可供使用的空閑內存段。
        Huge規格內存申請由于較為少見,且緩存空間代價過大,因此Netty中不進行池化,通過單獨的額外分配內存空間以滿足其需求。
        Normal規格的池化內存分配基于伙伴算法,以PoolChunk為基礎單位管理連續的內存段(PoolChunkList管理不同使用率的PoolChunk),在通過分割和合并內存段的方式來追蹤內存段的使用情況。
        Small規格的池化內存分配使用slab算法,通過一系列的PoolSubpage集合管理相同規格的內存段插槽,使用bitmap來追蹤各個內存段插槽的分配情況(已分配/未分配)。
      4. 被分配出去的底層內存,以handle的形式進行表征。handle是一個64位的long類型結構,共劃分為5個屬性,分別是runOffset、size、isUsed、isSubpage、bitmapIdx of subpage。
        第一個屬性是runOffset,即當前內存段的起始位置在第幾個Page頁。
        第二個屬性是size,即handle所標識的連續內存段有多長,一共幾個Page頁大小(類似一家人買了個連號,都挨著坐一起)。
        第三個屬性是isUsed,是否使用。一個handle被分配出去了,isUsed=1;未被分配出去則isUsed=0。
        第四個屬性是isSubPage,是否是SubPage類型的內存段,0代表是Normal級別,1代表是Small級別。
        第五個屬性是bitmapIdx of subpage,僅用于small級別的分配(Normal分配所有位全為0),標識當前內存段位于所屬PoolSubPage的第幾個插槽。
      5. 分配完成后,對應的底層內存段被標識為已分配,其所對應的handle值與PooledByteBuf進行綁定,在后續釋放內存時基于該handle對象定位到對應的連續內存段將其釋放。
        PooledByteBuf在初始化時,計算好其在對應PoolChunk底層內存中的offset偏移量,在實際使用時通過該偏移量才能正確的讀寫到實際分配出去的對應內存地址。
      池化內存釋放
      1. Netty中通過引用計數法追蹤PooledByteBuf對象的使用情況,在PooledByteBuf被初始化時其被引用數refCnt為1。后續每次被額外依賴,被引用數自增1;不再被使用時,通過release方法減少其被引用數。當refCnt被減為0時,說明當前PooledByteBuf已不再被使用,需要進行釋放。
      2. 釋放時,通過PooledByteBuf的deallocate方法進行釋放。通過當前Buf對象所屬的PoolArena的free方法,釋放歸還此前分配出的底層內存段。
        和分配一樣,釋放時也優先通過PoolThreadCache.add方法嘗試將對應的handle內存段放入線程本地緩存中。
        如果線程本地緩存能夠存放的了,則將當前內存段放入對應規格的MemoryRegionCache所對應的隊列中,完成底層內存的釋放。
      3. 如果線程本地緩存因為一些原因無法緩存,則找到當前內存段所屬的PoolChunk,判斷其大小規格,進行對應的釋放邏輯。
        如果釋放后滿足一些條件,比如對應的PoolSubPage或PoolChunk完全空閑,則可能會把對應的PoolSubPage、PoolChunk等結構也一并回收掉。
      4. reallocate方法釋放底層池化內存后,再嘗試將PooledByteBuf歸還給其所屬的對象池中,以供后續新的池化內存分配申請使用。

      總結

      至此,在實現了線程本地緩存的功能后,MyNetty終于完成了目標中池化內存分配的全部功能。
      為了簡化理解的難度,相比Netty做了大量的簡化工作,比如沒有實現堆外內存的池化,內存泄露的自動檢測,統計追蹤,寫死了本應靈活配置的各項參數等等。

      在簡化邏輯的基礎上,再結合jemalloc論文有機的拆分原本耦合很緊密、非常復雜的Netty整體實現,大幅降低了讀者在理解Netty關于池化內存管理的難度。
      可以看到,在內存分配這一對性能非常敏感的底層功能上,大佬們絞盡腦汁進行了各方面的優化,以提升時間和空間效率;而在無法同時兼顧空間與時間的情況下,基于實際的使用場景,Netty站在巨人的肩膀上,參考jemalloc做到了非常好的平衡。
      比如為高頻使用Small規格內存池化,使用更占空間的slab算法;而相對低頻使用的Normal規格內存池化,則使用更節約空間的伙伴算法;至于較為罕見的huge規格,則干脆放棄池化。再比如,為了提升吞吐量避免大量的互斥鎖爭搶,默認允許FastLocalThread線程啟用線程本地緩存。但本地緩存的內存段數量被嚴格控制,同時能被緩存的規格大小也被嚴格控制,避免空間效率過低。

      博客中展示的完整代碼在我的github上:https://github.com/1399852153/MyNetty (release/lab9_thread_local_cache 分支)。
      希望MyNetty系列博客能夠幫助到對Netty感興趣的讀者,內容如有錯誤,還請多多指教。

      posted on 2025-10-17 21:20  小熊餐館  閱讀(143)  評論(0)    收藏  舉報

      主站蜘蛛池模板: av在线播放国产一区| 玖玖在线精品免费视频| 国产精品久久久久7777| 中文字幕日韩精品人妻| 99久久婷婷国产综合精品青草漫画| 免费中文熟妇在线影片| 国产玩具酱一区二区三区| 成人亚洲精品一区二区三区 | 久久综合五月丁香六月丁香| 国产精品伊人久久综合网| 国产偷窥熟女精品视频大全| 又大又粗欧美成人网站| 一本无码人妻在中文字幕免费| 欧美黑人巨大xxxxx| 麻豆国产va免费精品高清在线| 亚洲区综合中文字幕日日| 欧美交a欧美精品喷水| 国产成人高清精品亚洲| 欧美videos粗暴| 少妇无码AV无码专区| 亚洲国产精品成人综合色在| 色偷偷亚洲女人天堂观看| 国产精品69人妻我爱绿帽子 | 久热色精品在线观看视频| 99在线精品国自产拍中文字幕| 欧美人成在线播放网站免费| 777米奇影视第四色| 国产精品国产三级国产专业| 清河县| 国产精品一二区在线观看| 亚洲一区二区精品另类| 唐人社视频呦一区二区| 国产精品麻豆欧美日韩ww| 2020国产欧洲精品网站| 久久久久四虎精品免费入口| 国产亚洲精品在av| 亚洲综合一区二区三区视频| 日本妇人成熟免费| 91密桃精品国产91久久| 中文字幕日韩精品国产| 亚洲一二区制服无码中字|