<output id="qn6qe"></output>

    1. <output id="qn6qe"><tt id="qn6qe"></tt></output>
    2. <strike id="qn6qe"></strike>

      亚洲 日本 欧洲 欧美 视频,日韩中文字幕有码av,一本一道av中文字幕无码,国产线播放免费人成视频播放,人妻少妇偷人无码视频,日夜啪啪一区二区三区,国产尤物精品自在拍视频首页,久热这里只有精品12

      lenmom

      博客園 首頁 新隨筆 聯系 訂閱 管理

      1. 什么是阻塞隊列?

      阻塞隊列(BlockingQueue)是一個支持兩個附加操作的隊列。這兩個附加的操作是:在隊列為空時,獲取元素的線程會等待隊列變為非空。當隊列滿時,存儲元素的線程會等待隊列可用。阻塞隊列常用于生產者和消費者的場景,生產者是往隊列里添加元素的線程,消費者是從隊列里拿元素的線程。阻塞隊列就是生產者存放元素的容器,而消費者也只從容器里拿元素。

      阻塞隊列提供了四種處理方法:

      方法\處理方式拋出異常返回特殊值一直阻塞超時退出
      插入方法 add(e) offer(e) put(e) offer(e,time,unit)
      移除方法 remove() poll() take() poll(time,unit)
      檢查方法 element() peek() 不可用 不可用

      異常:是指當阻塞隊列滿時候,再往隊列里插入元素,會拋出IllegalStateException("Queue full")異常。當隊列為空時,從隊列里獲取元素時會拋出NoSuchElementException異常 。


      • 返回特殊值:插入方法會返回是否成功,成功則返回true。移除方法,則是從隊列里拿出一個元素,如果沒有則返回null
      • 一直阻塞:當阻塞隊列滿時,如果生產者線程往隊列里put元素,隊列會一直阻塞生產者線程,直到拿到數據,或者響應中斷退出。當隊列空時,消費者線程試圖從隊列里take元素,隊列也會阻塞消費者線程,直到隊列可用。
      • 超時退出:當阻塞隊列滿時,隊列會阻塞生產者線程一段時間,如果超過一定的時間,生產者線程就會退出。

          詳細介紹BlockingQueue,以下是涉及的主要內容:

      • BlockingQueue的核心方法
      • 阻塞隊列的成員的概要介紹
      • 詳細介紹DelayQueue、ArrayBlockingQueue、LinkedBlockingQueue的原理
      • 線程池與BlockingQueue
      1、初識阻塞隊列

      在新增的Concurrent包中,BlockingQueue很好的解決了多線程中,如何高效安全“傳輸”數據的問題。通過這些高效并且線程安全的隊列類,為我們快速搭建高質量的多線程程序帶來極大的便利。本文詳細介紹了BlockingQueue家庭中的所有成員,包括他們各自的功能以及常見使用場景。

      BlockingQueue的核心方法:

      public interface BlockingQueue<E> extends Queue<E> {
      
          //將給定元素設置到隊列中,如果設置成功返回true, 否則拋出異常。如果是往限定了長度的隊列中設置值,推薦使用offer()方法。
          boolean add(E e);
      
          //將給定的元素設置到隊列中,如果設置成功返回true, 否則返回false. e的值不能為空,否則拋出空指針異常。
          boolean offer(E e);
      
          //將元素設置到隊列中,如果隊列中沒有多余的空間,該方法會一直阻塞,直到隊列中有多余的空間。
          void put(E e) throws InterruptedException;
      
          //將給定元素在給定的時間內設置到隊列中,如果設置成功返回true, 否則返回false.
          boolean offer(E e, long timeout, TimeUnit unit)
              throws InterruptedException;
      
          //從隊列中獲取值,如果隊列中沒有值,線程會一直阻塞,直到隊列中有值,并且該方法取得了該值。
          E take() throws InterruptedException;
      
          //在給定的時間里,從隊列中獲取值,如果沒有取到會拋出異常。
          E poll(long timeout, TimeUnit unit)
              throws InterruptedException;
      
          //獲取隊列中剩余的空間。
          int remainingCapacity();
      
          //從隊列中移除指定的值。
          boolean remove(Object o);
      
          //判斷隊列中是否擁有該值。
          public boolean contains(Object o);
      
          //將隊列中值,全部移除,并發設置到給定的集合中。
          int drainTo(Collection<? super E> c);
      
          //指定最多數量限制將隊列中值,全部移除,并發設置到給定的集合中。
          int drainTo(Collection<? super E> c, int maxElements);
      }
      

      在深入之前先了解下下ReentrantLock 和 Condition:
      重入鎖ReentrantLock:
      ReentrantLock鎖在同一個時間點只能被一個線程鎖持有;而可重入的意思是,ReentrantLock鎖,可以被單個線程多次獲取。
      ReentrantLock分為“公平鎖”和“非公平鎖”。它們的區別體現在獲取鎖的機制上是否公平。“鎖”是為了保護競爭資源,防止多個線程同時操作線程而出錯,ReentrantLock在同一個時間點只能被一個線程獲取(當某線程獲取到“鎖”時,其它線程就必須等待);ReentraantLock是通過一個FIFO的等待隊列來管理獲取該鎖所有線程的。在“公平鎖”的機制下,線程依次排隊獲取鎖;而“非公平鎖”在鎖是可獲取狀態時,不管自己是不是在隊列的開頭都會獲取鎖。
      主要方法:

      • lock()獲得鎖
      • lockInterruptibly()獲得鎖,但優先響應中斷
      • tryLock()嘗試獲得鎖,成功返回true,否則false,該方法不等待,立即返回
      • tryLock(long time,TimeUnit unit)在給定時間內嘗試獲得鎖
      • unlock()釋放鎖

      Condition:await()、signal()方法分別對應之前的Object的wait()和notify()

      • 和重入鎖一起使用
      • await()是當前線程等待同時釋放鎖
      • awaitUninterruptibly()不會在等待過程中響應中斷
      • signal()用于喚醒一個在等待的線程,還有對應的singalAll()方法
      2、阻塞隊列的成員
      隊列有界性數據結構
      ArrayBlockingQueue bounded(有界) 加鎖 arrayList
      LinkedBlockingQueue optionally-bounded 加鎖 linkedList
      PriorityBlockingQueue unbounded 加鎖 heap
      DelayQueue unbounded 加鎖 heap
      SynchronousQueue bounded 加鎖
      LinkedTransferQueue unbounded 加鎖 heap
      LinkedBlockingDeque unbounded 無鎖 heap

      下面分別簡單介紹一下:

      • ArrayBlockingQueue:是一個用數組實現的有界阻塞隊列,此隊列按照先進先出(FIFO)的原則對元素進行排序。支持公平鎖和非公平鎖。【注:每一個線程在獲取鎖的時候可能都會排隊等待,如果在等待時間上,先獲取鎖的線程的請求一定先被滿足,那么這個鎖就是公平的。反之,這個鎖就是不公平的。公平的獲取鎖,也就是當前等待時間最長的線程先獲取鎖】

      • LinkedBlockingQueue:一個由鏈表結構組成的有界隊列,此隊列的長度為Integer.MAX_VALUE。此隊列按照先進先出的順序進行排序。
      • PriorityBlockingQueue: 一個支持線程優先級排序的無界隊列,默認自然序進行排序,也可以自定義實現compareTo()方法來指定元素排序規則,不能保證同優先級元素的順序。
      • DelayQueue: 一個實現PriorityBlockingQueue實現延遲獲取的無界隊列,在創建元素時,可以指定多久才能從隊列中獲取當前元素。只有延時期滿后才能從隊列中獲取元素。(DelayQueue可以運用在以下應用場景:1.緩存系統的設計:可以用DelayQueue保存緩存元素的有效期,使用一個線程循環查詢DelayQueue,一旦能從DelayQueue中獲取元素時,表示緩存有效期到了。2.定時任務調度。使用DelayQueue保存當天將會執行的任務和執行時間,一旦從DelayQueue中獲取到任務就開始執行,從比如TimerQueue就是使用DelayQueue實現的。)
      • SynchronousQueue: 一個不存儲元素的阻塞隊列,每一個put操作必須等待take操作,否則不能添加元素。支持公平鎖和非公平鎖。SynchronousQueue的一個使用場景是在線程池里。Executors.newCachedThreadPool()就使用了SynchronousQueue,這個線程池根據需要(新任務到來時)創建新的線程,如果有空閑線程則會重復使用,線程空閑了60秒后會被回收。
      • LinkedTransferQueue: 一個由鏈表結構組成的無界阻塞隊列,相當于其它隊列,LinkedTransferQueue隊列多了transfer和tryTransfer方法。
      • LinkedBlockingDeque: 一個由鏈表結構組成的雙向阻塞隊列。隊列頭部和尾部都可以添加和移除元素,多線程并發時,可以將鎖的競爭最多降到一半。

      接下來重點介紹下:ArrayBlockingQueue、LinkedBlockingQueue以及DelayQueue

      3、阻塞隊列原理以及使用

      (1)DelayQueue

      DelayQueue的泛型參數需要實現Delayed接口,Delayed接口繼承了Comparable接口,DelayQueue內部使用非線程安全的優先隊列(PriorityQueue),并使用Leader/Followers模式,最小化不必要的等待時間。DelayQueue不允許包含null元素。

      Leader/Followers模式:

      1. 有若干個線程(一般組成線程池)用來處理大量的事件
      2. 有一個線程作為領導者,等待事件的發生;其他的線程作為追隨者,僅僅是睡眠。
      3. 假如有事件需要處理,領導者會從追隨者中指定一個新的領導者,自己去處理事件。
      4. 喚醒的追隨者作為新的領導者等待事件的發生。
      5. 處理事件的線程處理完畢以后,就會成為追隨者的一員,直到被喚醒成為領導者。
      6. 假如需要處理的事件太多,而線程數量不夠(能夠動態創建線程處理另當別論),則有的事件可能會得不到處理。

      所有線程會有三種身份中的一種:leader和follower,以及一個干活中的狀態:proccesser。它的基本原則就是,永遠最多只有一個leader。而所有follower都在等待成為leader。線程池啟動時會自動產生一個Leader負責等待網絡IO事件,當有一個事件產生時,Leader線程首先通知一個Follower線程將其提拔為新的Leader,然后自己就去干活了,去處理這個網絡事件,處理完畢后加入Follower線程等待隊列,等待下次成為Leader。這種方法可以增強CPU高速緩存相似性,及消除動態內存分配和線程間的數據交換。
      參數以及構造函數:

          // 可重入鎖
          private final transient ReentrantLock lock = new ReentrantLock();
          
          // 存儲隊列元素的隊列——優先隊列
          private final PriorityQueue<E> q = new PriorityQueue<E>();
      
          //用于優化阻塞通知的線程元素leader,Leader/Followers模式
          private Thread leader = null;
      
          //用于實現阻塞和通知的Condition對象
          private final Condition available = lock.newCondition();
          
          public DelayQueue() {}
          
          public DelayQueue(Collection<? extends E> c) {
              this.addAll(c);
          }
      

      先看offer()方法:

          public boolean offer(E e) {
              final ReentrantLock lock = this.lock;
              lock.lock();
              try {
                  q.offer(e);
                  // 如果原來隊列為空,重置leader線程,通知available條件
                  if (q.peek() == e) {
                      leader = null;
                      available.signal();
                  }
                  return true;
              } finally {
                  lock.unlock();
              }
          }
      
          //因為DelayQueue不限制長度,因此添加元素的時候不會因為隊列已滿產生阻塞,因此帶有超時的offer方法的超時設置是不起作用的
          public boolean offer(E e, long timeout, TimeUnit unit) {
              // 和不帶timeout的offer方法一樣
              return offer(e);
          }

      普通的poll()方法:如果延遲時間沒有耗盡的話,直接返回null

          public E poll() {
              final ReentrantLock lock = this.lock;
              lock.lock();
              try {
                  E first = q.peek();
                  if (first == null || first.getDelay(TimeUnit.NANOSECONDS) > 0)
                      return null;
                  else
                      return q.poll();
              } finally {
                  lock.unlock();
              }
          }

      再看看take()方法:

          public E take() throws InterruptedException {
              final ReentrantLock lock = this.lock;
              lock.lockInterruptibly();
              try {
                  for (;;) {
                      // 如果隊列為空,需要等待available條件被通知
                      E first = q.peek();
                      if (first == null)
                          available.await();
                      else {
                          long delay = first.getDelay(TimeUnit.NANOSECONDS);
                          // 如果延遲時間已到,直接返回第一個元素
                          if (delay <= 0)
                              return q.poll();
                          // leader線程存在表示有其他線程在等待,那么當前線程肯定需要等待
                          else if (leader != null)
                              available.await();
                          else {
                              Thread thisThread = Thread.currentThread();
                              leader = thisThread;
                              // 如果沒有leader線程,設置當前線程為leader線程
                              // 嘗試等待直到延遲時間耗盡(可能提前返回,那么下次
                              // 循環會繼續處理)
                              try {
                                  available.awaitNanos(delay);
                              } finally {
                                  // 如果leader線程還是當前線程,重置它用于下一次循環。
                                  // 等待available條件時,鎖可能被其他線程占用從而導致
                                  // leader線程被改變,所以要檢查
                                  if (leader == thisThread)
                                      leader = null;
                              }
                          }
                      }
                  }
              } finally {
                  // 如果沒有其他線程在等待,并且隊列不為空,通知available條件
                  if (leader == null && q.peek() != null)
                      available.signal();
                  lock.unlock();
              }
          }

      最后看看帶有timeout的poll方法:

          public E poll(long timeout, TimeUnit unit) throws InterruptedException {
              long nanos = unit.toNanos(timeout);
              final ReentrantLock lock = this.lock;
              lock.lockInterruptibly();
              try {
                  for (;;) {
                      E first = q.peek();
                      if (first == null) {
                          if (nanos <= 0)
                              return null;
                          else
                              // 嘗試等待available條件,記錄剩余的時間
                              nanos = available.awaitNanos(nanos);
                      } else {
                          long delay = first.getDelay(TimeUnit.NANOSECONDS);
                          if (delay <= 0)
                              return q.poll();
                          if (nanos <= 0)
                              return null;
                          // 當leader線程不為空時(此時delay>=nanos),等待的時間
                          // 似乎delay更合理,但是nanos也可以,因為排在當前線程前面的
                          // 其他線程返回時會喚醒available條件從而返回,
                          if (nanos < delay || leader != null)
                              nanos = available.awaitNanos(nanos);
                          else {
                              Thread thisThread = Thread.currentThread();
                              leader = thisThread;
                              try {
                                  long timeLeft = available.awaitNanos(delay);
                                  // nanos需要更新
                                  nanos -= delay - timeLeft;
                              } finally {
                                  if (leader == thisThread)
                                      leader = null;
                              }
                          }
                      }
                  }
              } finally {
                  if (leader == null && q.peek() != null)
                      available.signal();
                  lock.unlock();
              }
          }

      (2)ArrayBlockingQueue

      參數以及構造函數:

          // 存儲隊列元素的數組
          final Object[] items;
      
          // 拿數據的索引,用于take,poll,peek,remove方法
          int takeIndex;
      
          // 放數據的索引,用于put,offer,add方法
          int putIndex;
      
          // 元素個數
          int count;
      
          // 可重入鎖
          final ReentrantLock lock;
          // notEmpty條件對象,由lock創建
          private final Condition notEmpty;
          // notFull條件對象,由lock創建
          private final Condition notFull;
      
          public ArrayBlockingQueue(int capacity) {
              this(capacity, false);//默認構造非公平鎖的阻塞隊列 
          }
          public ArrayBlockingQueue(int capacity, boolean fair) {
              if (capacity <= 0)
                  throw new IllegalArgumentException();
              this.items = new Object[capacity];
              //初始化ReentrantLock重入鎖,出隊入隊擁有這同一個鎖 
              lock = new ReentrantLock(fair);
              //初始化非空等待隊列
              notEmpty = lock.newCondition();
              //初始化非滿等待隊列 
              notFull =  lock.newCondition();
          }
          public ArrayBlockingQueue(int capacity, boolean fair,
                                    Collection<? extends E> c) {
              this(capacity, fair);
      
              final ReentrantLock lock = this.lock;
              lock.lock(); // Lock only for visibility, not mutual exclusion
              try {
                  int i = 0;
                  //將集合添加進數組構成的隊列中 
                  try {
                      for (E e : c) {
                          checkNotNull(e);
                          items[i++] = e;
                      }
                  } catch (ArrayIndexOutOfBoundsException ex) {
                      throw new IllegalArgumentException();
                  }
                  count = i;
                  putIndex = (i == capacity) ? 0 : i;
              } finally {
                  lock.unlock();
              }
          }

      添加的實現原理:

      這里的add方法和offer方法最終調用的是enqueue(E x)方法,其方法內部通過putIndex索引直接將元素添加到數組items中,這里可能會疑惑的是當putIndex索引大小等于數組長度時,需要將putIndex重新設置為0,這是因為當前隊列執行元素獲取時總是從隊列頭部獲取,而添加元素從中從隊列尾部獲取所以當隊列索引(從0開始)與數組長度相等時,下次我們就需要從數組頭部開始添加了,如下圖演示

      //入隊操作
          private void enqueue(E x) {
              final Object[] items = this.items;
              //通過putIndex索引對數組進行賦值
              items[putIndex] = x;
              //索引自增,如果已是最后一個位置,重新設置 putIndex = 0;
              if (++putIndex == items.length)
                  putIndex = 0;
              count++;
              notEmpty.signal();
          }

      接著看put方法:
      put方法是一個阻塞的方法,如果隊列元素已滿,那么當前線程將會被notFull條件對象掛起加到等待隊列中,直到隊列有空檔才會喚醒執行添加操作。但如果隊列沒有滿,那么就直接調用enqueue(e)方法將元素加入到數組隊列中。到此我們對三個添加方法即put,offer,add都分析完畢,其中offer,add在正常情況下都是無阻塞的添加,而put方法是阻塞添加。這就是阻塞隊列的添加過程。說白了就是當隊列滿時通過條件對象Condtion來阻塞當前調用put方法的線程,直到線程又再次被喚醒執行。總得來說添加線程的執行存在以下兩種情況,一是,隊列已滿,那么新到來的put線程將添加到notFull的條件隊列中等待,二是,有移除線程執行移除操作,移除成功同時喚醒put線程,如下圖所示

          public void put(E e) throws InterruptedException {
              checkNotNull(e);
              final ReentrantLock lock = this.lock;
              lock.lockInterruptibly();
              try {
                  //當隊列元素個數與數組長度相等時,無法添加元素
                  while (count == items.length)
                      //將當前調用線程掛起,添加到notFull條件隊列中等待喚醒
                      notFull.await();
                  enqueue(e);
              } finally {
                  lock.unlock();
              }
          }
      移除實現原理:

      poll方法,該方法獲取并移除此隊列的頭元素,若隊列為空,則返回 null

          public E poll() {
            final ReentrantLock lock = this.lock;
             lock.lock();
             try {
                 //判斷隊列是否為null,不為null執行dequeue()方法,否則返回null
                 return (count == 0) ? null : dequeue();
             } finally {
                 lock.unlock();
             }
          }
          //刪除隊列頭元素并返回
          private E dequeue() {
           //拿到當前數組的數據
           final Object[] items = this.items;
            @SuppressWarnings("unchecked")
            //獲取要刪除的對象
            E x = (E) items[takeIndex];
            將數組中takeIndex索引位置設置為null
            items[takeIndex] = null;
            //takeIndex索引加1并判斷是否與數組長度相等,
            //如果相等說明已到盡頭,恢復為0
            if (++takeIndex == items.length)
                takeIndex = 0;
            count--;//隊列個數減1
            if (itrs != null)
                itrs.elementDequeued();//同時更新迭代器中的元素數據
            //刪除了元素說明隊列有空位,喚醒notFull條件對象添加線程,執行添加操作
            notFull.signal();
            return x;
          }

      接著看remove(Object o)方法

          public boolean remove(Object o) {
              if (o == null) return false;
              //獲取數組數據
              final Object[] items = this.items;
              final ReentrantLock lock = this.lock;
              lock.lock();//加鎖
              try {
                  //如果此時隊列不為null,這里是為了防止并發情況
                  if (count > 0) {
                      //獲取下一個要添加元素時的索引
                      final int putIndex = this.putIndex;
                      //獲取當前要被刪除元素的索引
                      int i = takeIndex;
                      //執行循環查找要刪除的元素
                      do {
                          //找到要刪除的元素
                          if (o.equals(items[i])) {
                              removeAt(i);//執行刪除
                              return true;//刪除成功返回true
                          }
                          //當前刪除索引執行加1后判斷是否與數組長度相等
                          //若為true,說明索引已到數組盡頭,將i設置為0
                          if (++i == items.length)
                              i = 0; 
                      } while (i != putIndex);//繼承查找
                  }
                  return false;
              } finally {
                  lock.unlock();
              }
          }
      
          //根據索引刪除元素,實際上是把刪除索引之后的元素往前移動一個位置
          void removeAt(final int removeIndex) {
      
           final Object[] items = this.items;
            //先判斷要刪除的元素是否為當前隊列頭元素
            if (removeIndex == takeIndex) {
                //如果是直接刪除
                items[takeIndex] = null;
                //當前隊列頭元素加1并判斷是否與數組長度相等,若為true設置為0
                if (++takeIndex == items.length)
                    takeIndex = 0;
                count--;//隊列元素減1
                if (itrs != null)
                    itrs.elementDequeued();//更新迭代器中的數據
            } else {
            //如果要刪除的元素不在隊列頭部,
            //那么只需循環迭代把刪除元素后面的所有元素往前移動一個位置
                //獲取下一個要被添加的元素的索引,作為循環判斷結束條件
                final int putIndex = this.putIndex;
                //執行循環
                for (int i = removeIndex;;) {
                    //獲取要刪除節點索引的下一個索引
                    int next = i + 1;
                    //判斷是否已為數組長度,如果是從數組頭部(索引為0)開始找
                    if (next == items.length)
                        next = 0;
                     //如果查找的索引不等于要添加元素的索引,說明元素可以再移動
                    if (next != putIndex) {
                        items[i] = items[next];//把后一個元素前移覆蓋要刪除的元
                        i = next;
                    } else {
                    //在removeIndex索引之后的元素都往前移動完畢后清空最后一個元素
                        items[i] = null;
                        this.putIndex = i;
                        break;//結束循環
                    }
                }
                count--;//隊列元素減1
                if (itrs != null)
                    itrs.removedAt(removeIndex);//更新迭代器數據
            }
            notFull.signal();//喚醒添加線程
          }

      remove(Object o)方法的刪除過程相對復雜些,因為該方法并不是直接從隊列頭部刪除元素。首先線程先獲取鎖,再一步判斷隊列count>0,這點是保證并發情況下刪除操作安全執行。接著獲取下一個要添加源的索引putIndex以及takeIndex索引 ,作為后續循環的結束判斷,因為只要putIndex與takeIndex不相等就說明隊列沒有結束。然后通過while循環找到要刪除的元素索引,執行removeAt(i)方法刪除,在removeAt(i)方法中實際上做了兩件事,一是首先判斷隊列頭部元素是否為刪除元素,如果是直接刪除,并喚醒添加線程,二是如果要刪除的元素并不是隊列頭元素,那么執行循環操作,從要刪除元素的索引removeIndex之后的元素都往前移動一個位置,那么要刪除的元素就被removeIndex之后的元素替換,從而也就完成了刪除操作。

      接著看take()方法
      take方法其實很簡單,有就刪除沒有就阻塞,注意這個阻塞是可以中斷的,如果隊列沒有數據那么就加入notEmpty條件隊列等待(有數據就直接取走,方法結束),如果有新的put線程添加了數據,那么put操作將會喚醒take線程,執行take操作。圖示如下

          //從隊列頭部刪除,隊列沒有元素就阻塞,可中斷
           public E take() throws InterruptedException {
              final ReentrantLock lock = this.lock;
                lock.lockInterruptibly();//中斷
                try {
                    //如果隊列沒有元素
                    while (count == 0)
                        //執行阻塞操作
                        notEmpty.await();
                    return dequeue();//如果隊列有元素執行刪除操作
                } finally {
                    lock.unlock();
                }
              }

      最后看看peek()方法,比較簡單,直接返回當前隊列的頭元素但不刪除任何元素。

          public E peek() {
                final ReentrantLock lock = this.lock;
                lock.lock();
                try {
                 //直接返回當前隊列的頭元素,但不刪除
                    return itemAt(takeIndex); // null when queue is empty
                } finally {
                    lock.unlock();
                }
            }
      
          final E itemAt(int i) {
                return (E) items[i];
            }

      (3)LinkedBlockingQueue

      參數以及構造函數:

          //節點類,用于存儲數據
          static class Node<E> {
              E item;
              Node<E> next;
      
              Node(E x) { item = x; }
          }
          // 容量大小
          private final int capacity;
      
          // 元素個數,因為有2個鎖,存在競態條件,使用AtomicInteger
          private final AtomicInteger count = new AtomicInteger(0);
      
          // 頭結點
          private transient Node<E> head;
      
          // 尾節點
          private transient Node<E> last;
      
          // 獲取并移除元素時使用的鎖,如take, poll, etc
          private final ReentrantLock takeLock = new ReentrantLock();
      
          // notEmpty條件對象,當隊列沒有數據時用于掛起執行刪除的線程
          private final Condition notEmpty = takeLock.newCondition();
      
          // 添加元素時使用的鎖如 put, offer, etc 
          private final ReentrantLock putLock = new ReentrantLock();
      
          // notFull條件對象,當隊列數據已滿時用于掛起執行添加的線程 
          private final Condition notFull = putLock.newCondition();
      
      
          public LinkedBlockingQueue() {
              this(Integer.MAX_VALUE);
          }
      
          public LinkedBlockingQueue(int capacity) {
              if (capacity <= 0) throw new IllegalArgumentException();
              this.capacity = capacity;
              last = head = new Node<E>(null);
          }
      
          public LinkedBlockingQueue(Collection<? extends E> c) {
              this(Integer.MAX_VALUE);
              final ReentrantLock putLock = this.putLock;
              putLock.lock(); // Never contended, but necessary for visibility
              try {
                  int n = 0;
                  for (E e : c) {
                      if (e == null)
                          throw new NullPointerException();
                      if (n == capacity)
                          throw new IllegalStateException("Queue full");
                      enqueue(new Node<E>(e));
                      ++n;
                  }
                  count.set(n);
              } finally {
                  putLock.unlock();
              }
          }
      
      4、線程池中的BlockingQueue

      首先看下構造函數

      public ThreadPoolExecutor(int corePoolSize,
                               int maximumPoolSize,
                               long keepAliveTime,
                               TimeUnit unit,
                               BlockingQueue<Runnable> workQueue,
                               ThreadFactory threadFactory,
                               RejectedExecutionHandler handler){...}

      TimeUnit:時間單位;BlockingQueue:等待的線程存放隊列;keepAliveTime:非核心線程的閑置超時時間,超過這個時間就會被回收;RejectedExecutionHandler:線程池對拒絕任務的處理策略。
      自定義線程池:這個構造方法對于隊列是什么類型比較關鍵。

      • 在使用有界隊列時,若有新的任務需要執行,如果線程池實際線程數小于corePoolSize,則優先創建線程,
      • 若大于corePoolSize,則會將任務加入隊列,
      • 若隊列已滿,則在總線程數不大于maximumPoolSize的前提下,創建新的線程,
      • 若隊列已經滿了且線程數大于maximumPoolSize,則執行拒絕策略。或其他自定義方式。

      接下來看下源碼:

        public void execute(Runnable command) {  
                if (command == null) //不能是空任務  
                    throw new NullPointerException();  
            //如果還沒有達到corePoolSize,則添加新線程來執行任務  
                if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) {  
                 //如果已經達到corePoolSize,則不斷的向工作隊列中添加任務  
                    if (runState == RUNNING && workQueue.offer(command)) {  
                    //線程池已經沒有任務  
                        if (runState != RUNNING || poolSize == 0)   
                            ensureQueuedTaskHandled(command);  
                    }  
                 //如果線程池不處于運行中或者工作隊列已經滿了,但是當前的線程數量還小于允許最大的maximumPoolSize線程數量,則繼續創建線程來執行任務  
                    else if (!addIfUnderMaximumPoolSize(command))  
                    //已達到最大線程數量,任務隊列也已經滿了,則調用飽和策略執行處理器  
                        reject(command); // is shutdown or saturated  
                }  
        }  
        
        private boolean addIfUnderCorePoolSize(Runnable firstTask) {  
                Thread t = null;  
                final ReentrantLock mainLock = this.mainLock;  
                mainLock.lock();  
                //更改幾個重要的控制字段需要加鎖  
                try {  
                    //池里線程數量小于核心線程數量,并且還需要是運行時  
                    if (poolSize < corePoolSize && runState == RUNNING)  
                        t = addThread(firstTask);  
                } finally {  
                    mainLock.unlock();  
                }  
                if (t == null)  
                    return false;  
                t.start(); //創建后,立即執行該任務  
                return true;  
            }  
        
        private Thread addThread(Runnable firstTask) {  
                Worker w = new Worker(firstTask);  
                Thread t = threadFactory.newThread(w); //委托線程工廠來創建,具有相同的組、優先級、都是非后臺線程  
                if (t != null) {  
                    w.thread = t;  
                    workers.add(w); //加入到工作者線程集合里  
                    int nt = ++poolSize;  
                    if (nt > largestPoolSize)  
                        largestPoolSize = nt;  
                }  
                return t;  
            }  
      posted on 2019-12-10 18:25  老董  閱讀(968)  評論(0)    收藏  舉報
      主站蜘蛛池模板: 99久re热视频这里只有精品6| 国产69久久精品成人看| 国产成人理论在线视频观看| 国产偷倩视频| 成人午夜无人区一区二区| 国产性色av高清在线观看| 伊人色综合一区二区三区| 国产成人一区二区不卡| 赤水市| 成熟妇女性成熟满足视频| 五月天国产成人AV免费观看| 镇宁| 蜜臀91精品高清国产福利 | 成人午夜在线观看刺激| 久久精品av国产一区二区| 人人澡超碰碰97碰碰碰| 亚洲人成网线在线播放VA| 大陆一级毛片免费播放| 久久亚洲精品情侣| 亚洲天堂一区二区成人在线| 国产人妇三级视频在线观看| 亚洲成av人片天堂网无码| 国产成人精品一区二区三区| 黑人精品一区二区三区不| 久久精品国产亚洲av麻豆软件| 在线视频中文字幕二区| 国产乱国产乱老熟300部视频 | 97欧美精品系列一区二区| 少妇愉情理伦片丰满丰满午夜| 国产日韩入口一区二区| 伊人成伊人成综合网222| 高邑县| 久久香蕉欧美精品| 亚洲国产美女精品久久久| 亚洲永久一区二区三区在线| 99久久精品看国产一区| 丰满熟女人妻一区二区三| 午夜精品久久久久久久爽| 国产中文字幕精品视频| 92成人午夜福利一区二区| 成熟女人特级毛片www免费|