<output id="qn6qe"></output>

    1. <output id="qn6qe"><tt id="qn6qe"></tt></output>
    2. <strike id="qn6qe"></strike>

      亚洲 日本 欧洲 欧美 视频,日韩中文字幕有码av,一本一道av中文字幕无码,国产线播放免费人成视频播放,人妻少妇偷人无码视频,日夜啪啪一区二区三区,国产尤物精品自在拍视频首页,久热这里只有精品12
      轉載和引用,請注明原文出處! Fork me on GitHub
      結局很美妙的事,開頭并非如此!

      多線程系列五:并發工具類和并發容器

      一、并發容器

      1.ConcurrentHashMap

      為什么要使用ConcurrentHashMap

      1. HashMap是線程不安全的,在多線程環境下,使用HashMap進行put操作會引起死循環,導致CPU利用率接近100%。

      HashMap在并發執行put操作時會引起死循環,是因為多線程會導致HashMap的Entry鏈表形成環形數據結構,一旦形成環形數據結構,Entry的next節點永遠不為空,就會產生死循環獲取Entry。

      2. HashTable效率低,HashTable容器使用synchronized來保證線程安全,但在線程競爭激烈的情況下HashTable的效率非常低下。因為當一個線程訪問HashTable的同步方法,其他線程也訪問HashTable的同步方法時,會進入阻塞或輪詢狀態。如線程1使用put進行元素添加,線程2不但不能使用put方法添加元素,也不能使用get方法來獲取元素,所以競爭越激烈效率越低。

      ConcurrentHashMap的一些有用的方法

      很多時候我們希望在元素不存在時插入元素,我們一般會像下面那樣寫代碼

      synchronized(map){

        if (map.get(key) == null){

            return map.put(key, value);

        } else{

            return map.get(key);

        }

      }

      putIfAbsent(key,value)方法原子性的實現了同樣的功能

      putIfAbsent(K key, V value)  

       如果key對應的value不存在,則put進去,返回null。否則不put,返回已存在的value  

      boolean remove(Object key, Object value)  

        如果key對應的值是value,則移除K-V,返回true。否則不移除,返回false  

      boolean replace(K key, V oldValue, V newValue)  

       如果key對應的當前值是oldValue,則替換為newValue,返回true。否則不替換,返回false

      Hash的解釋

      散列任意長度的輸入通過一種算法變換成固定長度的輸出。屬于壓縮的映射。

      hash算法示例圖演示:

       

      類似于HaspMap的實現就是使用散列,比如把1000個元素放到長度為10的hashmap里面去,放入之前會把這1000個數經過hash算法映射到10個數組里面去,這時候就會存在相同的映射值在一個數組的相同位置,就會產生hash碰撞,此時hashmap就會在產生碰撞的數組的后面使用Entry鏈表來存儲相同映射的值,然后使用equals方法來判斷同一個鏈表存儲的值是否一樣來獲取值,鏈表就是hashmap用來解決碰撞的方法,所以我們一般在寫一個類的時候要寫自己的hashcode方法和equals方法,如果鍵的hashcode相同,再使用鍵的equals方法判斷鍵內容是不是一樣的,一樣的就獲取值

      Md5Sha,取余都是散列算法,ConcurrentHashMap中是wang/jenkins算法

       ConcurrentHashMap在1.7下的實現

      分段鎖的設計思想

      分段鎖的思想示例圖:

      說明:

      a) 傳統的hashtable是將很小空間的數組整段鎖住,這樣性能比較低

      b) 分段鎖的思想:ConcurrentHashMap是在很小空間數組的前面再加一個數組,映射的時候先映射到前面的數組,然后再映射到后面的很小空間的數組;讀取的時候只需要把前面的數組鎖住就可以了。

      ConcurrentHashMap是由Segment數組結構和HashEntry數組結構組成:

      Segment實際是一種可重入鎖(ReentrantLock),也就是用于分段的鎖。HashEntry則用于存儲鍵值對數據。

      一個ConcurrentHashMap里包含一個Segment數組。Segment的結構和HashMap類似,是一種數組和鏈表結構。一個Segment里包含一個HashEntry數組,每個HashEntry是一個鏈表結構的元素,每個Segment守護著一個HashEntry數組里的元素,當對HashEntry數組的數據進行修改時,必須首先獲得與它對應的Segment鎖。

       

      說明:上圖存在兩次散列的過程:比如插入一個1000的數,首先是把1000的位數(最多是高16位)做一次散列找到在segments數組中的位置,然后再把1000本身做一次散列找到在table中的位置。獲取值時一樣

       

      ConcurrentHashMap初始化方法是通過initialCapacity、loadFactor和concurrencyLevel(參數concurrencyLevel是用戶估計的并發級別,就是說你覺得最多有多少線程共同修改這個map,根據這個來確定Segment數組的大小concurrencyLevel默認是DEFAULT_CONCURRENCY_LEVEL = 16;)。

      ConcurrentHashMap完全允許多個讀操作并發進行,讀操作并不需要加鎖。ConcurrentHashMap實現技術是保證HashEntry幾乎是不可變的。HashEntry代表每個hash鏈中的一個節點,可以看到其中的對象屬性要么是final的,要么是volatile的。

      總結:ConcurrentHashMap在1.7及以下的實現使用數組+鏈表的方式,采用了分段鎖的思想

      ConcurrentHashMap在1.8下的實現

      改進一:取消segments字段,直接采用transient volatile HashEntry<K,V>[] table保存數據,采用table數組元素作為鎖,從而實現了對每一行數據進行加鎖,進一步減少并發沖突的概率。

      改進二:將原先table數組+單向鏈表的數據結構,變更為table數組+單向鏈表+紅黑樹的結構。對于并發個數超過8(默認值)的鏈表,jdk1.8中采用了紅黑樹的結構,那么查詢的時間復雜度可以降低到O(logN),可以改進性能。

      總結:ConcurrentHashMap在1.8下的實現使用數組+鏈表+紅黑樹的方式,當鏈表個數超過8的時候就把原來的鏈表轉成紅黑樹,使用紅黑樹來存取,采用了元素鎖的思想

       

      多用isEmpty()盡量少用size():因為用size時要對容器數進行計算比較慢

       

      2. ConcurrentSkipListMap  ConcurrentSkipListSet

      ConcurrentSkipListMap 對應TreeMap的并發實現

      ConcurrentSkipListSet 對應TreeSet的并發實現

      了解什么是跳表(SkipList)

      二分查找和AVL樹查找

      二分查找要求元素可以隨機訪問,所以決定了需要把元素存儲在連續內存。這樣查找確實很快,但是插入和刪除元素的時候,為了保證元素的有序性,就需要大量的移動元素了,這樣插入就慢了。

      如果需要的是一個能夠進行二分查找,又能快速添加和刪除元素的數據結構,首先就是二叉查找樹,二叉查找樹在最壞情況下可能變成一個鏈表。

      于是,就出現了平衡二叉樹,根據平衡算法的不同有AVL樹,B-Tree,B+Tree,紅黑樹等,但是AVL樹實現起來比較復雜,平衡操作較難理解,這時候就可以用SkipList跳表結構

      傳統意義的單鏈表是一個線性結構,向有序的鏈表中插入一個節點需要O(n)的時間,查找操作需要O(n)的時間。

       

      如果我們使用上圖所示的跳躍表,就可以減少查找所需時間為O(n/2),因為我們可以先通過每個節點的最上面的指針先進行查找,這樣子就能跳過一半的節點。

      比如我們想查找19,首先和6比較,大于6之后,在和9進行比較,然后在和12進行比較......最后比較到21的時候,發現21大于19,說明查找的點在17和21之間,從這個過程中,我們可以看出,查找的時候跳過了3、7、12等點,因此查找的復雜度為O(n/2)。

      跳躍表其實也是一種通過“空間來換取時間”的一個算法,通過在每個節點中增加了向前的指針,從而提升查找的效率。

      跳躍表又被稱為概率,或者說是隨機化的數據結構,目前開源軟件 Redis 和 lucence都有用到它

      如何提高鏈表的訪問速度使用跳表(SkipList),把鏈表的部分元素再弄成一個鏈表,依次查找這個鏈表,比如比如我們想查找19,首先和6比較,大于6之后,在和9進行比較,然后在和12進行比較......最后比較到21的時候,發現21大于19,說明查找的點在17和21之間

      3. ConcurrentLinkedQueue  無界非阻塞隊列

      ConcurrentLinkedQueue 對應LinkedList 并發版本

      Add,offer:添加元素

      Peek():get頭元素并不把元素拿走

      poll():get頭元素把元素拿走

      4. CopyOnWriteArrayListCopyOnWriteArraySet

      寫的時候進行復制,可以進行并發的讀。

      適用讀多寫少的場景:比如白名單,黑名單,商品類目的訪問和更新場景,假如我們有一個搜索網站,用戶在這個網站的搜索框中,輸入關鍵字搜索內容,但是某些關鍵字不允許被搜索。這些不能被搜索的關鍵字會被放在一個黑名單當中,黑名單每天晚上更新一次。當用戶搜索時,會檢查當前關鍵字在不在黑名單當中,如果在,則提示不能搜索。

      弱點:內存占用高,數據一致性弱

      總結:寫的時候重新復制一份數據,然后在復制的數據里面寫入數據,寫完以后再把原來的數據的引用指向復制的數據,所以存在數據的弱一致性(最終會一致),適用于讀多寫少的場景

      5.什么是阻塞隊列

      取數據和存數據不滿足要求時,會對線程進行阻塞。例如取數據時發現隊列里面沒有數據就在那里阻塞等著有數據了再取;存數據時發現隊列已經滿了就在那里阻塞等著有數據被取走時再存

      方法

      拋出異常

      返回值

      一直阻塞

      超時退出

      插入

      Add

      offer

      put

      offer

      移除

      remove

      poll

      take

      poll

      檢查

      element

      peek

      沒有

      沒有

      常用阻塞隊列

      ArrayBlockingQueue 數組結構組成有界阻塞隊列。

      先進先出原則,初始化必須傳大小,takeput時候用的同一把鎖

      LinkedBlockingQueue:鏈表結構組成的有界阻塞隊列

      先進先出原則,初始化可以不傳大小,puttake鎖分離

      PriorityBlockingQueue:支持優先級排序的無界阻塞隊列,

      排序,自然順序升序排列,更改順序:類自己實現compareTo()方法,初始化PriorityBlockingQueue指定一個比較器Comparator

      DelayQueue: 使用了優先級隊列的無界阻塞隊列

      支持延時獲取,隊列里的元素要實現Delay接口。DelayQueue非常有用,可以將DelayQueue運用在以下應用場景:

      緩存系統的設計:可以用DelayQueue保存緩存元素的有效期,使用一個線程,設置查詢時間循環查詢DelayQueue,在查詢時間內一旦能從DelayQueue中獲取元素時,表示緩存有效期到了。

      還有訂單到期,限時支付等等。

      SynchronousQueue:不存儲元素的阻塞隊列

      每個put操作必須要等take操作

      LinkedTransferQueue:鏈表結構組成的界阻塞隊列,有Transfer,tryTransfer兩個方法,生產者put時,當前有消費者take,生產者直接把元素傳給消費者。Transfer會等拿到值才返回,tryTransfer嘗試拿一下,拿不到就立即返回

      LinkedBlockingDeque:鏈表結構組成的雙向阻塞隊列

      可以在隊列的兩端同時插入和移除,xxxFirst頭部操作,xxxLast尾部操作。工作竊取模式

      使用阻塞隊列DelayQueue實現定時緩存:

      1. 定義一個用戶實體

      package com.lgs.delayqueue;
      
      /**
       * 
       * @Description: 定義一個用戶實體
       * @author lgs
       * @date 2020年11月1日
       *
       */
      public class User {
          private final String name;
      
          public User(String name) {
              this.name = name;
          }
      
          public String getName() {
              return name;
          }
      }

      2. 定義一個實現Delayed接口的定時緩存bean

      package com.lgs.delayqueue;
      
      import java.util.concurrent.Delayed;
      import java.util.concurrent.TimeUnit;
      
      /**
       * 
       * @Description: 定義一個實現Delayed接口的定時緩存bean
       * @author liguangsheng
       * @date 2020年11月1日
       *
       * @param <T> 要緩存的數據
       */
      public class CacheBean<T> implements Delayed {
      
          private String id;
          private String name;
          
          // 要緩存的數據
          private T data;
          
          //到期時間
          private long activeTime;
      
          public CacheBean(String id, String name, T data, long activeTime) {
              this.id = id;
              this.name = name;
              this.data = data;
              // 到期時間為傳入時間加當前時間 單位納秒
              this.activeTime = TimeUnit.NANOSECONDS.
                      convert(activeTime,TimeUnit.MILLISECONDS)+System.nanoTime();
          }
      
          public String getId() {
              return id;
          }
      
          public void setId(String id) {
              this.id = id;
          }
      
          public String getName() {
              return name;
          }
      
          public void setName(String name) {
              this.name = name;
          }
      
          public T getData() {
              return data;
          }
      
          public void setData(T data) {
              this.data = data;
          }
      
          public long getActiveTime() {
              return activeTime;
          }
      
          public void setActiveTime(long activeTime) {
              this.activeTime = activeTime;
          }
      
          // 獲取數據還能存儲的時間=到期時間減去當前時間 單位納秒
          @Override
          public long getDelay(TimeUnit unit) {
              return unit.convert(this.activeTime-System.nanoTime(),
                      TimeUnit.NANOSECONDS);
          }
      
          /**
           * 是不是已經到期了
           */
          @Override
          public int compareTo(Delayed o) {
              long d = getDelay(TimeUnit.NANOSECONDS)-o.getDelay(TimeUnit.NANOSECONDS);
              return (d==0)?0:(d<0)?-1:1;
          }
      }

      3.  往定時緩存bean里面放入數據

      package com.lgs.delayqueue;
      
      import java.util.concurrent.DelayQueue;
      
      /**
       * 
       * @Description: 往定時緩存bean里面放入數據
       * @author lgs
       * @date 2020年11月1日
       *
       */
      public class PutInCache implements Runnable {
      
          private DelayQueue<CacheBean<User>> queue;
      
          public PutInCache(DelayQueue<CacheBean<User>> queue) {
              this.queue = queue;
          }
      
          @Override
          public void run() {
              CacheBean<User> cacheBean = new CacheBean<User>("1","5秒",
                      new User("lgs"),5000);
              CacheBean<User> cacheBean2 = new CacheBean<User>("2","3秒",
                      new User("ll"),3000);
              queue.offer(cacheBean);
              System.out.println("put in cache:"+cacheBean.getId()+":"+cacheBean.getName());
              queue.offer(cacheBean2);
              System.out.println("put in cache:"+cacheBean2.getId()+":"+cacheBean2.getName());
      
          }
      }

      4. 從定時緩存bean里面獲取數據

      package com.lgs.delayqueue;
      
      import java.util.concurrent.DelayQueue;
      
      /**
       * 
       * @Description: 從定時緩存bean里面獲取數據
       * @author lgs
       * @date 2020年11月1日
       *
       */
      public class GetFromCache implements Runnable {
      
          private DelayQueue<CacheBean<User>> queue;
      
          public GetFromCache(DelayQueue<CacheBean<User>> queue) {
              this.queue = queue;
          }
      
          @Override
          public void run() {
              while(true){
                  try {
                      CacheBean<User> item = queue.take();
                      System.out.println("GetFromCache "+item.getId()+":"+item.getName()+
                              "data:"+((User)item.getData()).getName());
                  } catch (InterruptedException e) {
                      e.printStackTrace();
                  }
              }
          }
      }

      5.  測試阻塞隊列DelayQueue實現定時緩存

      package com.lgs.delayqueue;
      
      import java.util.concurrent.DelayQueue;
      
      /**
       * 
       * @Description: 測試阻塞隊列DelayQueue實現定時緩存
       * @author lgs
       * @date 2020年11月1日
       *
       */
      public class Test {
          public static void main(String[] args) throws InterruptedException {
              DelayQueue<CacheBean<User>> queue = new DelayQueue<CacheBean<User>>();
              new Thread(new PutInCache(queue)).start();
              new Thread(new GetFromCache(queue)).start();
      
              for(int i=1;i<20;i++){
                  Thread.sleep(500);
                  System.out.println(i*500);
              }
          }
      }

      輸出結果:

      1500
      2000
      2500
      GetFromCache 2:3秒data:ll
      3000
      3500
      4000
      4500
      GetFromCache 1:5秒data:lgs
      5000
      5500

      了解阻塞隊列的實現原理

      使用了Condition實現。

      生產者消費者模式

      在并發編程中使用生產者和消費者模式能夠解決絕大多數并發問題。該模式通過平衡生產線程和消費線程的工作能力來提高程序整體處理數據的速度。

      在線程世界里,生產者就是生產數據的線程,消費者就是消費數據的線程。在多線程開發中,如果生產者處理速度很快,而消費者處理速度很慢,那么生產者就必須等待消費者處理完,才能繼續生產數據。同樣的道理,如果消費者的處理能力大于生產者,那么消費者就必須等待生產者。為了解決這種生產消費能力不均衡的問題,便有了生產者和消費者模式

      生產者和消費者模式是通過一個容器來解決生產者和消費者的強耦合問題。生產者和消費者彼此之間不直接通信,而是通過阻塞隊列來進行通信,所以生產者生產完數據之后不用等待消費者處理,直接扔給阻塞隊列,消費者不找生產者要數據,而是直接從阻塞隊列里取,阻塞隊列就相當于一個緩沖區,平衡了生產者和消費者的處理能力

      6. 什么是Fork/Join框架

      并行執行任務的框架,把大任務拆分成很多的小任務,匯總每個小任務的結果得到大任務的結果

       

      工作竊取算法

      工作竊取(work-stealing)算法是指某個線程從其他隊列里竊取任務來執行,執行完以后把結果放回去

      那么,為什么需要使用工作竊取算法呢?假如我們需要做一個比較大的任務,可以把這個任務分割為若干互不依賴的子任務,為了減少線程間的競爭,把這些子任務分別放到不同的隊列里,并為每個隊列創建一個單獨的線程來執行隊列里的任務,線程和隊列一一對應。

      比如A線程負責處理A隊列里的任務。但是,有的線程會先把自己隊列里的任務干完,而其他線程對應的隊列里還有任務等待處理。干完活的線程與其等著,不如去幫其他線程干活,于是它就去其他線程的隊列里竊取一個任務來執行。而在這時它們會訪問同一個隊列,所以為了減少竊取任務線程和被竊取任務線程之間的競爭,通常會使用雙端隊列,被竊取任務線程永遠從雙端隊列的頭部拿任務執行,而竊取任務的線程永遠從雙端隊列的尾部拿任務執行。

      Fork/Join框架的使用

      Fork/Join使用兩個類來完成以上兩件事情。

      ①ForkJoinTask:我們要使用ForkJoin框架,必須首先創建一個ForkJoin任務。它提供在任務

      中執行fork()和join()操作的機制。通常情況下,我們不需要直接繼承ForkJoinTask類,只需要繼承它的子類,Fork/Join框架提供了以下兩個子類。

      ·RecursiveAction:用于沒有返回結果的任務。

      ·RecursiveTask:用于有返回結果的任務。

      ②ForkJoinPool:ForkJoinTask需要通過ForkJoinPool來執行。

      Fork/Join有同步和異步兩種方式。

      案例1:孫悟空摘桃子fork/join的案例

       1 /**
       2  * 孫悟空摘桃子fork/join的案例,孫悟空去摘桃子時發現桃子太多就讓猴子猴孫去幫忙在桃子,
       3  * 摘完以后再統一匯總求和
       4  */
       5 public class ForkJoinWuKong {
       6 
       7   private static class XiaoWuKong extends RecursiveTask<Integer>{
       8 
       9       private final static int THRESHOLD = 100;//閾值,數組多小的時候,不再進行任務拆分操作
      10       private PanTao[] src;
      11       private int fromIndex;
      12       private int toIndex;
      13       private IPickTaoZi pickTaoZi;
      14 
      15       public XiaoWuKong(PanTao[] src, int fromIndex, int toIndex, IPickTaoZi pickTaoZi) {
      16           this.src = src;
      17           this.fromIndex = fromIndex;
      18           this.toIndex = toIndex;
      19           this.pickTaoZi = pickTaoZi;
      20       }
      21 
      22       @Override
      23       protected Integer compute() {
      24           //計算完以后結果匯總
      25           if (toIndex-fromIndex<THRESHOLD){
      26               int count =0 ;
      27               for(int i=fromIndex;i<toIndex;i++){
      28                   if (pickTaoZi.pick(src,i)) count++;
      29               }
      30               return count;
      31           }
      32           //大任務拆分成小任務
      33           else{
      34               //fromIndex....mid......toIndex
      35               int mid = (fromIndex+toIndex)/2;
      36               XiaoWuKong left = new XiaoWuKong(src,fromIndex,mid,pickTaoZi);
      37               XiaoWuKong right = new XiaoWuKong(src,mid,toIndex,pickTaoZi);
      38               invokeAll(left,right);
      39               return left.join()+right.join();
      40 
      41           }
      42       }
      43   }
      44 
      45     public static void main(String[] args) {
      46 
      47         ForkJoinPool pool = new ForkJoinPool();
      48         PanTao[] src = MakePanTaoArray.makeArray();
      49         IProcessTaoZi processTaoZi = new WuKongProcessImpl();
      50         IPickTaoZi pickTaoZi = new WuKongPickImpl(processTaoZi);
      51 
      52         long start = System.currentTimeMillis();
      53 
      54         //構造一個ForkJoinTask
      55         XiaoWuKong xiaoWuKong = new XiaoWuKong(src,0,
      56                 src.length-1,pickTaoZi);
      57 
      58         //ForkJoinTask交給ForkJoinPool來執行。
      59         pool.invoke(xiaoWuKong);
      60 
      61         System.out.println("The count is "+ xiaoWuKong.join()
      62                 +" spend time:"+(System.currentTimeMillis()-start)+"ms");
      63 
      64     }
      65 
      66 }

      案例2:使用Fork/Join框架實現計算1+2+3+....+100的結果

      package com.study.demo.forkjoin;
      
      import java.util.concurrent.ExecutionException;
      import java.util.concurrent.ForkJoinPool;
      import java.util.concurrent.Future;
      import java.util.concurrent.RecursiveTask;
      
      /**
       * Fork/Join框架設計思路:
       * 第一步:分割任務。首先我們需要有一個fork類來把大任務分割成子任務,有可能子任務還是很大,所以還需要
       *         不停的分割,直到分割出的子任務足夠小。
       * 第二步:執行任務并合并結果。分割的子任務分別放在雙端隊列里,然后啟動幾個線程分別從雙端隊列里獲取任務執行。
       *         子任務執行完的結果都統一放在一個隊列里,啟動一個線程從隊列里拿數據,然后合并這些數據。 
       * 
       * Fork/Join框架的具體實現:
       * Fork/Join使用兩個類來完成以上兩件事情:
       * ForkJoinTask:我們要使用ForkJoin框架,必須首先創建一個ForkJoin任務。它提供在任務中執行fork()和join()
       *               操作的機制,通常情況下我們不需要直接繼承ForkJoinTask類,而只需要繼承它的子類,Fork/Join框架提供了以下兩個子類:
       *               RecursiveAction:用于沒有返回結果的任務。
       *               RecursiveTask :用于有返回結果的任務。
       * ForkJoinPool :ForkJoinTask需要通過ForkJoinPool來執行,任務分割出的子任務會添加到當前工作線程所維護的雙端隊列中,
       *                進入隊列的頭部。當一個工作線程的隊列里暫時沒有任務時,它會隨機從其他工作線程的隊列的尾部獲取一個任務。
       *                
       * 實戰:使用Fork/Join框架實現計算1+2+3+....+100的結果-100個數拆分成10個(閾值)子任務來執行最后匯總結果
       *
       */
      public class CountTask extends RecursiveTask<Integer> {
      
          /**
           * 序列化
           */
          private static final long serialVersionUID = 1L;
          private static final int THRESHOLD = 10;// 閾值
          private int start;
          private int end;
          public CountTask(int start, int end) {
              this.start = start;
              this.end = end;
          }
      
          @Override
          protected Integer compute() {
      
              int sum = 0;
      
              // 如果任務足夠小就計算任務
              boolean canCompute = (end - start) <= THRESHOLD;
              if (canCompute) {
                  for (int i = start; i <= end; i++) {
                      sum += i;
                  }
      
              } else {
      
                  // 如果任務大于閥值,就分裂成兩個子任務計算
                  int middle = (start + end) / 2;
                  CountTask leftTask = new CountTask(start, middle);
                  CountTask rightTask = new CountTask(middle + 1, end);
      
                  // 執行子任務
                  leftTask.fork();
                  rightTask.fork();
      
                  // 等待子任務執行完,并得到其結果
                  int leftResult = leftTask.join();
                  int rightResult = rightTask.join();
      
                  // 合并子任務
                  sum = leftResult + rightResult;
      
              }
      
              return sum;
      
          }
      
          public static void main(String[] args) {
      
              ForkJoinPool forkJoinPool = new ForkJoinPool();
      
              // 生成一個計算任務,負責計算1+2+3+4
              CountTask task = new CountTask(1, 100);
      
              // 執行一個任務
              Future result = forkJoinPool.submit(task);
      
              try {
      
                  System.out.println(result.get());
      
              } catch (InterruptedException e) {
      
              } catch (ExecutionException e) {
      
              }
      
          }
      
      }

       

      二、并發工具類

      1. CountDownLatch

      允許一個或多個線程等待其他線程完成操作。CountDownLatch的構造函數接收一個int類型的參數作為計數器,如果你想等待N個點完成,這里就傳入N。當我們調用CountDownLatch的countDown方法時,N就會減1,CountDownLatch的await方法會阻塞當前線程,直到N變成零。

      由于countDown方法可以用在任何地方,所以這里說的N個點,可以是N個線程,也可以是1個線程里的N個執行步驟。用在多個線程時,只需要把這個CountDownLatch的引用傳遞到線程里即可。

       1 public class CountDownLatchCase {
       2 
       3     static CountDownLatch c = new CountDownLatch(7);
       4 
       5     private static class SubThread implements Runnable{
       6 
       7         @Override
       8         public void run() {
       9             System.out.println(Thread.currentThread().getId());
      10             c.countDown();
      11             System.out.println(Thread.currentThread().getId()+" is done");
      12         }
      13     }
      14 
      15     public static void main(String[] args) throws InterruptedException {
      16 
      17         new Thread(new Runnable() {
      18             @Override
      19             public void run() {
      20                 System.out.println(Thread.currentThread().getId());
      21                 c.countDown();
      22                 System.out.println("sleeping...");
      23                 try {
      24                     Thread.sleep(1500);
      25                 } catch (InterruptedException e) {
      26                     e.printStackTrace();
      27                 }
      28                 System.out.println("sleep is completer");
      29                 c.countDown();
      30             }
      31         }).start();
      32 
      33         for(int i=0;i<=4;i++){
      34             Thread thread = new Thread(new SubThread());
      35             thread.start();
      36         }
      37 
      38         c.await();
      39         System.out.println("Main will gone.....");
      40     }
      41 }

       

      2. CyclicBarrier

      CyclicBarrier的字面意思是可循環使用(Cyclic)的屏障(Barrier)。它要做的事情是,讓一組線程到達一個屏障(也可以叫同步點)時被阻塞,直到最后一個線程到達屏障時,屏障才會開門,所有被屏障攔截的線程才會繼續運行。CyclicBarrier默認的構造方法是CyclicBarrier(int parties),其參數表示屏障攔截的線程數量,每個線程調用await方法告訴CyclicBarrier我已經到達了屏障,然后當前線程被阻塞。

       1 public class CyclicBarrriesBase {
       2 
       3     static CyclicBarrier c = new CyclicBarrier(2);
       4 
       5     public static void main(String[] args) throws InterruptedException, BrokenBarrierException {
       6         new Thread(new Runnable() {
       7             @Override
       8             public void run() {
       9                 System.out.println(Thread.currentThread().getId());
      10                 try {
      11                     c.await();//等待主線程完成
      12                     System.out.println(Thread.currentThread().getId()+"is going");
      13                 } catch (InterruptedException e) {
      14                     e.printStackTrace();
      15                 } catch (BrokenBarrierException e) {
      16                     e.printStackTrace();
      17                 }
      18                 System.out.println("sleeping...");
      19 
      20             }
      21         }).start();
      22 
      23         System.out.println("main will sleep.....");
      24         Thread.sleep(2000);
      25         c.await();////等待子線程完成
      26 
      27         System.out.println("All are complete.");
      28     }
      29 
      30 
      31 
      32 }

       

      CyclicBarrier還提供一個更高級的構造函數CyclicBarrier(int parties,Runnable barrierAction),用于在線程到達屏障時,優先執行barrierAction,方便處理更復雜的業務場景。

      CyclicBarrier可以用于多線程計算數據,最后合并計算結果的場景。

       1 public class CyclicBarrierSum {
       2 
       3     static CyclicBarrier c = new CyclicBarrier(5,new SumThread());
       4     //子線程結果存放的緩存
       5     private static ConcurrentHashMap<String,Integer> resultMap =
       6             new ConcurrentHashMap<>();
       7 
       8     //所有子線程達到屏障后,會執行這個Runnable的任務
       9     private static class SumThread implements Runnable{
      10 
      11         @Override
      12         public void run() {
      13             int result =0;
      14             for(Map.Entry<String,Integer> workResult:resultMap.entrySet()){
      15                 result = result+workResult.getValue();
      16             }
      17             System.out.println("result = "+result);
      18             System.out.println("完全可以做與子線程,統計無關的事情.....");
      19         }
      20     }
      21 
      22     //工作線程,也就是子線程
      23     private static class WorkThread implements Runnable{
      24 
      25         private Random t = new Random();
      26 
      27         @Override
      28         public void run() {
      29             int r = t.nextInt(1000)+1000;
      30             System.out.println(Thread.currentThread().getId()+":r="+r);
      31             resultMap.put(Thread.currentThread().getId()+"",r);
      32             try {
      33                 Thread.sleep(1000+r);
      34                 c.await();
      35             } catch (InterruptedException e) {
      36                 e.printStackTrace();
      37             } catch (BrokenBarrierException e) {
      38                 e.printStackTrace();
      39             }
      40 
      41         }
      42     }
      43 
      44     public static void main(String[] args) {
      45         for(int i=0;i<=4;i++){
      46             Thread thread = new Thread(new WorkThread());
      47             thread.start();
      48         }
      49     }
      50 }

       

      CyclicBarrierCountDownLatch的區別

      CountDownLatch的計數器只能使用一次,而CyclicBarrier的計數器可以使用reset()方法重置,CountDownLatch.await一般阻塞主線程,所有的工作線程執行countDown,CyclicBarrierton通過工作線程調用await從而阻塞工作線程直到所有工作線程達到屏障

      4. 控制并發線程數的Semaphore

      Semaphore(信號量)是用來控制同時訪問特定資源的線程數量,它通過協調各個線程,以保證合理的使用公共資源。應用場景Semaphore可以用于做流量控制,特別是公用資源有限的應用場景,比如數據庫連接。假如有一個需求,要讀取幾萬個文件的數據,因為都是IO密集型任務,我們可以啟動幾十個線程并發地讀取,但是如果讀到內存后,還需要存儲到數據庫中,而數據庫的連接數只有10個,這時我們必須控制只有10個線程同時獲取數據庫連接保存數據,否則會報錯無法獲取數據庫連接。這個時候,就可以使用Semaphore來做流量控制。。Semaphore的構造方法Semaphore(int permits)接受一個整型的數字,表示可用的許可證數量。Semaphore的用法也很簡單,首先線程使用Semaphore的acquire()方法獲取一個許可證,使用完之后調用release()方法歸還許可證。還可以用tryAcquire()方法嘗試獲取許可證。

       1 public class SemaphporeCase<T> {
       2 
       3     private final Semaphore items;//有多少元素可拿
       4     private final Semaphore space;//有多少空位可放元素
       5     private List queue = new LinkedList<>();
       6 
       7     public SemaphporeCase(int itemCounts){
       8         this.items = new Semaphore(0);
       9         this.space = new Semaphore(itemCounts);
      10     }
      11 
      12     //放入數據
      13     public void put(T x) throws InterruptedException {
      14         space.acquire();//拿空位的許可,沒有空位線程會在這個方法上阻塞
      15         synchronized (queue){
      16             queue.add(x);
      17         }
      18         items.release();//有元素了,可以釋放一個拿元素的許可
      19     }
      20 
      21     //取數據
      22     public T take() throws InterruptedException {
      23         items.acquire();//拿元素的許可,沒有元素線程會在這個方法上阻塞
      24         T t;
      25         synchronized (queue){
      26             t = (T)queue.remove(0);
      27         }
      28         space.release();//有空位了,可以釋放一個存在空位的許可
      29         return t;
      30     }
      31 }

       

      Semaphore還提供一些其他方法,具體如下。

      ·intavailablePermits():返回此信號量中當前可用的許可證數。

      ·intgetQueueLength():返回正在等待獲取許可證的線程數。

      ·booleanhasQueuedThreads():是否有線程正在等待獲取許可證。

      ·void reducePermits(int reduction):減少reduction個許可證,是個protected方法。

      ·Collection getQueuedThreads():返

      5. Exchanger

      Exchanger(交換者)是一個用于線程間協作的工具類。Exchanger用于進行線程間的數據交換。它提供一個同步點,在這個同步點,兩個線程可以交換彼此的數據。這兩個線程通過exchange方法交換數據,如果第一個線程先執行exchange()方法,它會一直等待第二個線程也執行exchange方法,當兩個線程都到達同步點時,這兩個線程就可以交換數據,將本線程生產出來的數據傳遞給對方。

       1 public class ExchangeCase {
       2 
       3     static final Exchanger<List<String>> exgr = new Exchanger<>();
       4 
       5     public static void main(String[] args) {
       6 
       7         new Thread(new Runnable() {
       8 
       9             @Override
      10             public void run() {
      11                 try {
      12                     List<String> list = new ArrayList<>();
      13                     list.add(Thread.currentThread().getId()+" insert A1");
      14                     list.add(Thread.currentThread().getId()+" insert A2");
      15                     list = exgr.exchange(list);//交換數據
      16                     for(String item:list){
      17                         System.out.println(Thread.currentThread().getId()+":"+item);
      18                     }
      19                 } catch (InterruptedException e) {
      20                     e.printStackTrace();
      21                 }
      22             }
      23         }).start();
      24 
      25         new Thread(new Runnable() {
      26 
      27             @Override
      28             public void run() {
      29                 try {
      30                     List<String> list = new ArrayList<>();
      31                     list.add(Thread.currentThread().getId()+" insert B1");
      32                     list.add(Thread.currentThread().getId()+" insert B2");
      33                     list.add(Thread.currentThread().getId()+" insert B3");
      34                     System.out.println(Thread.currentThread().getId()+" will sleep");
      35                     Thread.sleep(1500);
      36                     list = exgr.exchange(list);//交換數據
      37                     for(String item:list){
      38                         System.out.println(Thread.currentThread().getId()+":"+item);
      39                     }
      40                 } catch (InterruptedException e) {
      41                     e.printStackTrace();
      42                 }
      43             }
      44         }).start();
      45 
      46     }
      47 
      48 }

       

      posted @ 2018-02-10 17:34  小不點啊  閱讀(5424)  評論(0)    收藏  舉報
      主站蜘蛛池模板: 亚洲成人av在线高清| 最新亚洲av日韩av二区| 亚洲国产成人AⅤ片在线观看| 亚洲一区二区三区激情视频| 国产不卡一区二区在线| 日本不卡的一区二区三区| 福利视频在线一区二区| 亚洲乱理伦片在线观看中字| 综合图区亚洲另类偷窥| 无码欧美毛片一区二区三| 精品无码久久久久久尤物| 亚欧洲乱码视频在线专区| 五月婷婷久久中文字幕| 亚洲人妻av伦理| 国产自产一区二区三区视频| 91福利视频一区二区| 日本高清在线观看WWW色| 在线观看免费人成视频色| 午夜福利精品国产二区| 无码专区视频精品老司机| 7m精品福利视频导航| 国产蜜臀av在线一区二区| 亚洲国产中文字幕精品| 人人人澡人人肉久久精品| 亚洲一区二区三区小蜜桃| 久久精品亚洲精品国产色婷| 五月天天天综合精品无码| 欧美性xxxxx极品少妇| 最新的精品亚洲一区二区| 中文字幕国产精品资源| 麻豆果冻传媒2021精品传媒一区| 东京热高清无码精品| 国产偷窥熟女精品视频大全| 亚洲国产成人久久77| 99国产欧美另类久久久精品| a在线观看视频在线播放| 青青国产揄拍视频| 亚洲视频欧美不卡| 久久精品国产大片免费观看| 国产不卡精品视频男人的天堂| 国产精品论一区二区三区|