<output id="qn6qe"></output>

    1. <output id="qn6qe"><tt id="qn6qe"></tt></output>
    2. <strike id="qn6qe"></strike>

      亚洲 日本 欧洲 欧美 视频,日韩中文字幕有码av,一本一道av中文字幕无码,国产线播放免费人成视频播放,人妻少妇偷人无码视频,日夜啪啪一区二区三区,国产尤物精品自在拍视频首页,久热这里只有精品12

      Java并發工具類之LongAdder原理總結

      出處: Java并發工具類之LongAdder原理總結

      LongAdder實現原理圖

                                     

                                     

        高并發下N多線程同時去操作一個變量會造成大量線程CAS失敗,然后處于自旋狀態,導致嚴重浪費CPU資源,降低了并發性。既然AtomicLong性能問題是由于過多線程同時去競爭同一個變量的更新而降低的,那么如果把一個變量分解為多個變量,讓同樣多的線程去競爭多個資源。

         LongAdder則是內部維護一個Cells數組,每個Cell里面有一個初始值為0的long型變量,在同等并發量的情況下,爭奪單個變量的線程會減少,這是變相的減少了爭奪共享資源的并發量,另外多個線程在爭奪同一個原子變量時候,如果失敗并不是自旋CAS重試,而是嘗試獲取其他原子變量的鎖,最后當獲取當前值時候是把所有變量的值累加后再加上base的值返回的。

        LongAdder維護了要給延遲初始化的原子性更新數組和一個基值變量base數組的大小保持是2的N次方大小,數組表的下標使用每個線程的hashcode值的掩碼表示,數組里面的變量實體是Cell類型。

        Cell 類型是Atomic的一個改進,用來減少緩存的爭用,對于大多數原子操作字節填充是浪費的,因為原子操作都是無規律的分散在內存中進行的,多個原子性操作彼此之間是沒有接觸的,但是原子性數組元素彼此相鄰存放將能經常共享緩存行,也就是偽共享。所以這在性能上是一個提升。

        另外由于Cells占用內存是相對比較大的,所以一開始并不創建,而是在需要時候再創建,也就是惰性加載,當一開始沒有空間時候,所有的更新都是操作base變量。

       

       

        java.util.concurrency.atomic.LongAdder是Java8新增的一個類,提供了原子累計值的方法。根據文檔的描述其性能要優于AtomicLong,下圖是一個簡單的測試對比(平臺:MBP):                 

                                 image 

        這里測試時基于JDK1.8進行的,AtomicLong 從Java8開始針對x86平臺進行了優化,使用XADD替換了CAS操作,我們知道JUC下面提供的原子類都是基于Unsafe類實現的,并由Unsafe來提供CAS的能力。CAS (compare-and-swap)本質上是由現代CPU在硬件級實現的原子指令,允許進行無阻塞,多線程的數據操作同時兼顧了安全性以及效率。大部分情況下,CAS都能夠提供不錯的性能,但是在高競爭的情況下開銷可能會成倍增長,具體的研究可以參考這篇文章, 我們直接看下代碼:

      復制代碼
      public class AtomicLong {
      public final long incrementAndGet() {
              return unsafe.getAndAddLong(this, valueOffset, 1L) + 1L;
          }
      }
      
      public final class Unsafe {
      public final long getAndAddLong(Object var1, long var2, long var4) {
              long var6;
              do {
                  var6 = this.getLongVolatile(var1, var2);
              } while(!this.compareAndSwapLong(var1, var2, var6, var6 + var4));
              return var6;
          }
      }
      復制代碼

        getAndAddLong方法會以volatile的語義去讀需要自增的域的最新值,然后通過CAS去嘗試更新,正常情況下會直接成功后返回,但是在高并發下可能會同時有很多線程同時嘗試這個過程,也就是說線程A讀到的最新值可能實際已經過期了,因此需要在while循環中不斷的重試,造成很多不必要的開銷,而xadd的相對來說會更高效一點,偽碼如下,最重要的是下面這段代碼是原子的,也就是說其他線程不能打斷它的執行或者看到中間值,這條指令是在硬件級直接支持的:

      function FetchAndAdd(address location, int inc) {
          int value := *location
          *location := value + inc
          return value
      }

        而LongAdder的性能比上面那種還要好很多,于是就研究了一下。首先它有一個基礎的值base,在發生競爭的情況下,會有一個Cell數組用于將不同線程的操作離散到不同的節點上去(會根據需要擴容,最大為CPU核數),sum()會將所有Cell數組中的value和base累加作為返回值。核心的思想就是將AtomicLong一個value的更新壓力分散到多個value中去,從而降低更新熱點。

                       

      public class LongAdder extends Striped64 implements Serializable {
      //...
      }

        LongAdder繼承自Striped64Striped64內部維護了一個懶加載的數組以及一個額外的base實例域,數組的大小是2的N次方,使用每個線程Thread內部的哈希值訪問。

      復制代碼
      abstract class Striped64 extends Number {
      /** Number of CPUS, to place bound on table size */
          static final int NCPU = Runtime.getRuntime().availableProcessors();
      
          /**
           * Table of cells. When non-null, size is a power of 2.
           */
          transient volatile Cell[] cells;
           
      @sun.misc.Contended static final class Cell {
              volatile long value;
              Cell(long x) { value = x; }
              final boolean cas(long cmp, long val) {
                  return UNSAFE.compareAndSwapLong(this, valueOffset, cmp, val);
              }
      
              // Unsafe mechanics
              private static final sun.misc.Unsafe UNSAFE;
              private static final long valueOffset;
              static {
                  try {
                      UNSAFE = sun.misc.Unsafe.getUnsafe();
                      Class<?> ak = Cell.class;
                      valueOffset = UNSAFE.objectFieldOffset
                          (ak.getDeclaredField("value"));
                  } catch (Exception e) {
                      throw new Error(e);
                  }
              }
          }
      
      }
      復制代碼

        數組的元素是Cell類,可以看到Cell類用Contended注解修飾,這里主要是解決false sharing(偽共享的問題),不過個人認為偽共享翻譯的不是很好????,或者應該是錯誤的共享,比如兩個volatile變量被分配到了同一個緩存行,但是這兩個的更新在高并發下會競爭,比如線程A去更新變量a,線程B去更新變量b,但是這兩個變量被分配到了同一個緩存行,因此會造成每個線程都去爭搶緩存行的所有權,例如A獲取了所有權然后執行更新這時由于volatile的語義會造成其刷新到主存,但是由于變量b也被緩存到同一個緩存行,因此就會造成cache miss,這樣就會造成極大的性能損失,因此有一些類庫的作者,例如JUC下面的、Disruptor等都利用了插入dummy 變量的方式,使得緩存行被其獨占,比如下面這種代碼:

      復制代碼
      static final class Cell {
              volatile long p0, p1, p2, p3, p4, p5, p6;
              volatile long value;
              volatile long q0, q1, q2, q3, q4, q5, q6;
              Cell(long x) { value = x; }
      
              final boolean cas(long cmp, long val) {
                  return UNSAFE.compareAndSwapLong(this, valueOffset, cmp, val);
              }
      
              // Unsafe mechanics
              private static final sun.misc.Unsafe UNSAFE;
              private static final long valueOffset;
              static {
                  try {
                      UNSAFE = getUnsafe();
                      Class<?> ak = Cell.class;
                      valueOffset = UNSAFE.objectFieldOffset
                          (ak.getDeclaredField("value"));
                  } catch (Exception e) {
                      throw new Error(e);
                  }
              }
       }
      復制代碼

        但是這種方式畢竟不通用,例如32、64位操作系統的緩存行大小不一樣,因此JAVA8中就增加了一個注@sun.misc.Contended解用于解決這個問題,由JVM去插入這些變量,具體可以參考openjdk.java.net/jeps/142 ,但是通常來說對象是不規則的分配到內存中的,但是數組由于是連續的內存,因此可能會共享緩存行,因此這里加一個Contended注解以防cells數組發生偽共享的情況。

      復制代碼
      /**
       * 底競爭下直接更新base,類似AtomicLong
       * 高并發下,會將每個線程的操作hash到不同的
       * cells數組中,從而將AtomicLong中更新
       * 一個value的行為優化之后,分散到多個value中
       * 從而降低更新熱點,而需要得到當前值的時候,直接
       * 將所有cell中的value與base相加即可,但是跟
       * AtomicLong(compare and change -> xadd)的CAS不同,
       * incrementAndGet操作及其變種
       * 可以返回更新后的值,而LongAdder返回的是void
       */
      public class LongAdder {
          public void add(long x) {
              Cell[] as; long b, v; int m; Cell a;
              /**
               *  如果是第一次執行,則直接case操作base
               */
              if ((as = cells) != null || !casBase(b = base, b + x)) {
                  boolean uncontended = true;
                  /**
                   * as數組為空(null或者size為0)
                   * 或者當前線程取模as數組大小為空
                   * 或者cas更新Cell失敗
                   */
                  if (as == null || (m = as.length - 1) < 0 ||
                      (a = as[getProbe() & m]) == null ||
                      !(uncontended = a.cas(v = a.value, v + x)))
                      longAccumulate(x, null, uncontended);
              }
          }
      
          public long sum() {
             //通過累加base與cells數組中的value從而獲得sum
              Cell[] as = cells; Cell a;
              long sum = base;
              if (as != null) {
                  for (int i = 0; i < as.length; ++i) {
                      if ((a = as[i]) != null)
                          sum += a.value;
                  }
              }
              return sum;
          }
      }
      
      /**
       * openjdk.java.net/jeps/142
       */
      @sun.misc.Contended static final class Cell {
          volatile long value;
          Cell(long x) { value = x; }
          final boolean cas(long cmp, long val) {
              return UNSAFE.compareAndSwapLong(this, valueOffset, cmp, val);
          }
      
          // Unsafe mechanics
          private static final sun.misc.Unsafe UNSAFE;
          private static final long valueOffset;
          static {
              try {
                  UNSAFE = sun.misc.Unsafe.getUnsafe();
                  Class<?> ak = Cell.class;
                  valueOffset = UNSAFE.objectFieldOffset
                      (ak.getDeclaredField("value"));
              } catch (Exception e) {
                  throw new Error(e);
              }
          }
      }
      
      abstract class Striped64 extends Number {
      
          final void longAccumulate(long x, LongBinaryOperator fn,
                                    boolean wasUncontended) {
              int h;
              if ((h = getProbe()) == 0) {
                  /**
                   * 若getProbe為0,說明需要初始化
                   */
                  ThreadLocalRandom.current(); // force initialization
                  h = getProbe();
                  wasUncontended = true;
              }
              boolean collide = false;                // True if last slot nonempty
              /**
               * 失敗重試
               */
              for (;;) {
                  Cell[] as; Cell a; int n; long v;
                  if ((as = cells) != null && (n = as.length) > 0) {
                      /**
                       *  若as數組已經初始化,(n-1) & h 即為取模操作,相對 % 效率要更高
                       */
                      if ((a = as[(n - 1) & h]) == null) {
                          if (cellsBusy == 0) {       // Try to attach new Cell
                              Cell r = new Cell(x);   // Optimistically create
                              if (cellsBusy == 0 && casCellsBusy()) {//這里casCellsBusy的作用其實就是一個spin lock
                                  //可能會有多個線程執行了`Cell r = new Cell(x);`,
                                  //因此這里進行cas操作,避免線程安全的問題,同時前面在判斷一次
                                  //避免正在初始化的時其他線程再進行額外的cas操作
                                  boolean created = false;
                                  try {               // Recheck under lock
                                      Cell[] rs; int m, j;
                                      //重新檢查一下是否已經創建成功了
                                      if ((rs = cells) != null &&
                                          (m = rs.length) > 0 &&
                                          rs[j = (m - 1) & h] == null) {
                                          rs[j] = r;
                                          created = true;
                                      }
                                  } finally {
                                      cellsBusy = 0;
                                  }
                                  if (created)
                                      break;
                                  continue;           // Slot 現在是非空了,continue到下次循環重試
                              }
                          }
                          collide = false;
                      }
                      else if (!wasUncontended)       // CAS already known to fail
                          wasUncontended = true;      // Continue after rehash
                      else if (a.cas(v = a.value, ((fn == null) ? v + x :
                                                   fn.applyAsLong(v, x))))
                          break;//若cas更新成功則跳出循環,否則繼續重試
                      else if (n >= NCPU || cells != as) // 最大只能擴容到CPU數目, 或者是已經擴容成功,這里只有的本地引用as已經過期了
                          collide = false;            // At max size or stale
                      else if (!collide)
                          collide = true;
                      else if (cellsBusy == 0 && casCellsBusy()) {
                          try {
                              if (cells == as) {      // 擴容
                                  Cell[] rs = new Cell[n << 1];
                                  for (int i = 0; i < n; ++i)
                                      rs[i] = as[i];
                                  cells = rs;
                              }
                          } finally {
                              cellsBusy = 0;
                          }
                          collide = false;
                          continue;                   // Retry with expanded table
                      }
                      //重新計算hash(異或)從而嘗試找到下一個空的slot
                      h = advanceProbe(h);
                  }
                  else if (cellsBusy == 0 && cells == as && casCellsBusy()) {
                      boolean init = false;
                      try {                           // Initialize table
                          if (cells == as) {
                              /**
                               * 默認size為2
                               */
                              Cell[] rs = new Cell[2];
                              rs[h & 1] = new Cell(x);
                              cells = rs;
                              init = true;
                          }
                      } finally {
                          cellsBusy = 0;
                      }
                      if (init)
                          break;
                  }
                  else if (casBase(v = base, ((fn == null) ? v + x : // 若已經有另一個線程在初始化,那么嘗試直接更新base
                                              fn.applyAsLong(v, x))))
                      break;                          // Fall back on using base
              }
          }
      
          final boolean casCellsBusy() {
              return UNSAFE.compareAndSwapInt(this, CELLSBUSY, 0, 1);
          }
      
          static final int getProbe() {
              /**
               * 通過Unsafe獲取Thread中threadLocalRandomProbe的值
               */
              return UNSAFE.getInt(Thread.currentThread(), PROBE);
          }
      
              // Unsafe mechanics
              private static final sun.misc.Unsafe UNSAFE;
              private static final long BASE;
              private static final long CELLSBUSY;
              private static final long PROBE;
              static {
                  try {
                      UNSAFE = sun.misc.Unsafe.getUnsafe();
                      Class<?> sk = Striped64.class;
                      BASE = UNSAFE.objectFieldOffset
                          (sk.getDeclaredField("base"));
                      CELLSBUSY = UNSAFE.objectFieldOffset
                          (sk.getDeclaredField("cellsBusy"));
                      Class<?> tk = Thread.class;
                      //返回Field在內存中相對于對象內存地址的偏移量
                      PROBE = UNSAFE.objectFieldOffset
                          (tk.getDeclaredField("threadLocalRandomProbe"));
                  } catch (Exception e) {
                      throw new Error(e);
                  }
              }
      }
      復制代碼

        由于Cell相對來說比較占內存,因此這里采用懶加載的方式,在無競爭的情況下直接更新base域,在第一次發生競爭的時候(CAS失敗)就會創建一個大小為2的cells數組,每次擴容都是加倍,只到達到CPU核數。同時我們知道擴容數組等行為需要只能有一個線程同時執行,因此需要一個鎖,這里通過CAS更新cellsBusy來實現一個簡單的spin lock。 數組訪問索引是通過Thread里的threadLocalRandomProbe域取模實現的,這個域是ThreadLocalRandom更新的,cells的數組大小被限制為CPU的核數,因為即使有超過核數個線程去更新,但是每個線程也只會和一個CPU綁定,更新的時候頂多會有cpu核數個線程,因此我們只需要通過hash將不同線程的更新行為離散到不同的slot即可。 我們知道線程、線程池會被關閉或銷毀,這個時候可能這個線程之前占用的slot就會變成沒人用的,但我們也不能清除掉,因為一般web應用都是長時間運行的,線程通常也會動態創建、銷毀,很可能一段時間后又會被其他線程占用,而對于短時間運行的,例如單元測試,清除掉有啥意義呢?

       

      總結

        總的來說,LongAdder從性能上來說要遠遠好于AtomicLong,一般情況下是可以直接替代AtomicLong使用的,Netty也通過一個接口封裝了這兩個類,在Java8下直接采用LongAdder,但是AtomicLong的一系列方法不僅僅可以自增,還可以獲取更新后的值,如果是例如獲取一個全局唯一的ID還是采用AtomicLong會方便一點。

       

      參考鏈接

      1. https://blogs.oracle.com/dave/atomic-fetch-and-add-vs-compare-and-swap
      2. https://en.wikipedia.org/wiki/Compare-and-swap
      3. http://ashkrit.blogspot.com/2014/02/atomicinteger-java-7-vs-java-8.html
      4. https://dzone.com/articles/adventures-atomiclong
      5. LongAdder原理分析
      6. LongAdder解析
      posted @ 2024-04-23 18:08  一人一見  閱讀(1304)  評論(0)    收藏  舉報
      主站蜘蛛池模板: 一区二区三区国产偷拍| 嫩草院一区二区乱码| 习水县| 国产成人免费永久在线平台 | 国产学生裸体无遮挡免费| 国产婷婷综合在线视频中文| 高清破外女出血AV毛片| 天天综合色一区二区三区| 国产日韩欧美亚洲精品95| 泾川县| 国产精品亚洲mnbav网站| 熟妇人妻av中文字幕老熟妇 | 国产一区二区日韩在线| 亚洲一区久久蜜臀av| 国产精品无码一区二区桃花视频| 九九热视频免费在线播放| 久久人人爽人人爽人人av| 国产亚洲精品成人aa片新蒲金| 亚洲成亚洲成网| 日本一区二区三区在线看| 国精产品一区一区三区有限公司杨 | 中文字幕亚洲制服在线看| 99久久久无码国产精品免费| 久久大香线蕉国产精品免费| 婷婷色香五月综合缴缴情香蕉| 麻豆精品一区二区综合av| 一区二区三区在线 | 欧洲| 激情在线网| 丝袜a∨在线一区二区三区不卡 | 欧美黑吊大战白妞| 中文字幕日韩精品有码| 熟女少妇精品一区二区| 无码一区二区三区中文字幕| 中文字幕日韩人妻一区| 亚洲欧美日韩综合一区二区| 手机看片日本在线观看视频| 贵溪市| 国产精品三级一区二区三区| 午夜福利片1000无码免费| 日韩V欧美V中文在线| 精品国产中文字幕在线|