<output id="qn6qe"></output>

    1. <output id="qn6qe"><tt id="qn6qe"></tt></output>
    2. <strike id="qn6qe"></strike>

      亚洲 日本 欧洲 欧美 视频,日韩中文字幕有码av,一本一道av中文字幕无码,国产线播放免费人成视频播放,人妻少妇偷人无码视频,日夜啪啪一区二区三区,国产尤物精品自在拍视频首页,久热这里只有精品12

      【轉】-Java 中的阻塞隊列

      Java 中的阻塞隊列

      該博客轉載自?方騰飛??聊聊并發(七)——Java 中的阻塞隊列

      1. 什么是阻塞隊列?

      阻塞隊列(BlockingQueue)是一個支持兩個附加操作的隊列。這兩個附加的操作是:在隊列為空時,獲取元素的線程會等待隊列變為非空。當隊列滿時,存儲元素的線程會等待隊列可用。阻塞隊列常用于生產者和消費者的場景,生產者是往隊列里添加元素的線程,消費者是從隊列里拿元素的線程。阻塞隊列就是生產者存放元素的容器,而消費者也只從容器里拿元素。

      阻塞隊列提供了四種處理方法:

      方法\處理方式 拋出異常 返回特殊值 一直阻塞 超時退出
      插入方法 add(e) offer(e) put(e) offer(e,time,unit)
      移除方法 remove() poll() take() poll(time,unit)
      檢查方法 element() peek() 不可用 不可用
      • 拋出異常:是指當阻塞隊列滿時候,再往隊列里插入元素,會拋出 IllegalStateException("Queue full") 異常。當隊列為空時,從隊列里獲取元素時會拋出 NoSuchElementException 異常 。
      • 返回特殊值:插入方法會返回是否成功,成功則返回 true。移除方法,則是從隊列里拿出一個元素,如果沒有則返回 null
      • 一直阻塞:當阻塞隊列滿時,如果生產者線程往隊列里 put 元素,隊列會一直阻塞生產者線程,直到拿到數據,或者響應中斷退出。當隊列空時,消費者線程試圖從隊列里 take 元素,隊列也會阻塞消費者線程,直到隊列可用。
      • 超時退出:當阻塞隊列滿時,隊列會阻塞生產者線程一段時間,如果超過一定的時間,生產者線程就會退出。

      2. Java 里的阻塞隊列

      JDK7 提供了 7 個阻塞隊列。分別是

      • ArrayBlockingQueue :一個由數組結構組成的有界阻塞隊列。
      • LinkedBlockingQueue :一個由鏈表結構組成的有界阻塞隊列。
      • PriorityBlockingQueue :一個支持優先級排序的無界阻塞隊列。
      • DelayQueue:一個使用優先級隊列實現的無界阻塞隊列。
      • SynchronousQueue:一個不存儲元素的阻塞隊列。
      • LinkedTransferQueue:一個由鏈表結構組成的無界阻塞隊列。
      • LinkedBlockingDeque:一個由鏈表結構組成的雙向阻塞隊列。

      ArrayBlockingQueue 是一個用數組實現的有界阻塞隊列。此隊列按照先進先出(FIFO)的原則對元素進行排序。默認情況下不保證訪問者公平的訪問隊列,所謂公平訪問隊列是指阻塞的所有生產者線程或消費者線程,當隊列可用時,可以按照阻塞的先后順序訪問隊列,即先阻塞的生產者線程,可以先往隊列里插入元素,先阻塞的消費者線程,可以先從隊列里獲取元素。通常情況下為了保證公平性會降低吞吐量。我們可以使用以下代碼創建一個公平的阻塞隊列:

      ArrayBlockingQueue fairQueue = new  ArrayBlockingQueue(1000,true);
      

      訪問者的公平性是使用可重入鎖實現的,代碼如下:

      public ArrayBlockingQueue(int capacity, boolean fair) {
              if (capacity <= 0)
                  throw new IllegalArgumentException();
              this.items = new Object[capacity];
              lock = new ReentrantLock(fair);
              notEmpty = lock.newCondition();
              notFull =  lock.newCondition();
      }
      

      LinkedBlockingQueue 是一個用鏈表實現的有界阻塞隊列。此隊列的默認和最大長度為 Integer.MAX_VALUE。此隊列按照先進先出的原則對元素進行排序。

      PriorityBlockingQueue 是一個支持優先級的無界隊列。默認情況下元素采取自然順序排列,也可以通過比較器 comparator 來指定元素的排序規則。元素按照升序排列。

      DelayQueue 是一個支持延時獲取元素的無界阻塞隊列。隊列使用 PriorityQueue 來實現。隊列中的元素必須實現 Delayed 接口,在創建元素時可以指定多久才能從隊列中獲取當前元素。只有在延遲期滿時才能從隊列中提取元素。我們可以將 DelayQueue 運用在以下應用場景:

      • 緩存系統的設計:可以用 DelayQueue 保存緩存元素的有效期,使用一個線程循環查詢 DelayQueue,一旦能從 DelayQueue 中獲取元素時,表示緩存有效期到了。
      • 定時任務調度。使用 DelayQueue 保存當天將會執行的任務和執行時間,一旦從 DelayQueue 中獲取到任務就開始執行,從比如 TimerQueue 就是使用 DelayQueue 實現的。

      隊列中的 Delayed 必須實現 compareTo 來指定元素的順序。比如讓延時時間最長的放在隊列的末尾。實現代碼如下:

      public int compareTo(Delayed other) {
                 if (other == this) // compare zero ONLY if same object
                      return 0;
                  if (other instanceof ScheduledFutureTask) {
                      ScheduledFutureTask x = (ScheduledFutureTask)other;
                      long diff = time - x.time;
                      if (diff < 0)
                          return -1;
                      else if (diff > 0)
                          return 1;
      	   else if (sequenceNumber < x.sequenceNumber)
                          return -1;
                      else
                          return 1;
                  }
                  long d = (getDelay(TimeUnit.NANOSECONDS) -
                            other.getDelay(TimeUnit.NANOSECONDS));
                  return (d == 0) ? 0 : ((d < 0) ? -1 : 1);
              }
      

      如何實現 Delayed 接口

      我們可以參考 ScheduledThreadPoolExecutor 里 ScheduledFutureTask 類。這個類實現了 Delayed 接口。首先:在對象創建的時候,使用 time 記錄前對象什么時候可以使用,代碼如下:

      ScheduledFutureTask(Runnable r, V result, long ns, long period) {
                  super(r, result);
                  this.time = ns;
                  this.period = period;
                  this.sequenceNumber = sequencer.getAndIncrement();
      }
      

      然后使用 getDelay 可以查詢當前元素還需要延時多久,代碼如下:

      public long getDelay(TimeUnit unit) {
                  return unit.convert(time - now(), TimeUnit.NANOSECONDS);
              }
      

      通過構造函數可以看出延遲時間參數 ns 的單位是納秒,自己設計的時候最好使用納秒,因為 getDelay 時可以指定任意單位,一旦以納秒作為單位,而延時的時間又精確不到納秒就麻煩了。使用時請注意當 time 小于當前時間時,getDelay 會返回負數。

      如何實現延時隊列

      延時隊列的實現很簡單,當消費者從隊列里獲取元素時,如果元素沒有達到延時時間,就阻塞當前線程。

      long delay = first.getDelay(TimeUnit.NANOSECONDS);
                          if (delay <= 0)
                              return q.poll();
                          else if (leader != null)
                              available.await();
      

      SynchronousQueue 是一個不存儲元素的阻塞隊列。每一個 put 操作必須等待一個 take 操作,否則不能繼續添加元素。SynchronousQueue 可以看成是一個傳球手,負責把生產者線程處理的數據直接傳遞給消費者線程。隊列本身并不存儲任何元素,非常適合于傳遞性場景, 比如在一個線程中使用的數據,傳遞給另外一個線程使用,SynchronousQueue 的吞吐量高于 LinkedBlockingQueue 和 ArrayBlockingQueue。

      LinkedTransferQueue 是一個由鏈表結構組成的無界阻塞 TransferQueue 隊列。相對于其他阻塞隊列,LinkedTransferQueue 多了 tryTransfer 和 transfer 方法。

      transfer 方法。如果當前有消費者正在等待接收元素(消費者使用 take() 方法或帶時間限制的 poll() 方法時),transfer 方法可以把生產者傳入的元素立刻 transfer(傳輸)給消費者。如果沒有消費者在等待接收元素,transfer 方法會將元素存放在隊列的 tail 節點,并等到該元素被消費者消費了才返回。transfer 方法的關鍵代碼如下:

      Node pred = tryAppend(s, haveData);
      return awaitMatch(s, pred, e, (how == TIMED), nanos);
      

      第一行代碼是試圖把存放當前元素的 s 節點作為 tail 節點。第二行代碼是讓 CPU 自旋等待消費者消費元素。因為自旋會消耗 CPU,所以自旋一定的次數后使用 Thread.yield() 方法來暫停當前正在執行的線程,并執行其他線程。

      tryTransfer 方法。則是用來試探下生產者傳入的元素是否能直接傳給消費者。如果沒有消費者等待接收元素,則返回 false。和 transfer 方法的區別是 tryTransfer 方法無論消費者是否接收,方法立即返回。而 transfer 方法是必須等到消費者消費了才返回。

      對于帶有時間限制的 tryTransfer(E e, long timeout, TimeUnit unit) 方法,則是試圖把生產者傳入的元素直接傳給消費者,但是如果沒有消費者消費該元素則等待指定的時間再返回,如果超時還沒消費元素,則返回 false,如果在超時時間內消費了元素,則返回 true。

      LinkedBlockingDeque 是一個由鏈表結構組成的雙向阻塞隊列。所謂雙向隊列指的你可以從隊列的兩端插入和移出元素。雙端隊列因為多了一個操作隊列的入口,在多線程同時入隊時,也就減少了一半的競爭。相比其他的阻塞隊列,LinkedBlockingDeque 多了 addFirst,addLast,offerFirst,offerLast,peekFirst,peekLast 等方法,以 First 單詞結尾的方法,表示插入,獲取(peek)或移除雙端隊列的第一個元素。以 Last 單詞結尾的方法,表示插入,獲取或移除雙端隊列的最后一個元素。另外插入方法 add 等同于 addLast,移除方法 remove 等效于 removeFirst。但是 take 方法卻等同于 takeFirst,不知道是不是 Jdk 的 bug,使用時還是用帶有 First 和 Last 后綴的方法更清楚。

      在初始化 LinkedBlockingDeque 時可以設置容量防止其過渡膨脹。另外雙向阻塞隊列可以運用在“工作竊取”模式中。

      3. 阻塞隊列的實現原理

      如果隊列是空的,消費者會一直等待,當生產者添加元素時候,消費者是如何知道當前隊列有元素的呢?如果讓你來設計阻塞隊列你會如何設計,讓生產者和消費者能夠高效率的進行通訊呢?讓我們先來看看 JDK 是如何實現的。

      使用通知模式實現。所謂通知模式,就是當生產者往滿的隊列里添加元素時會阻塞住生產者,當消費者消費了一個隊列中的元素后,會通知生產者當前隊列可用。通過查看 JDK 源碼發現 ArrayBlockingQueue 使用了 Condition 來實現,代碼如下:

      private final Condition notFull;
      private final Condition notEmpty;
      
      public ArrayBlockingQueue(int capacity, boolean fair) {
              // 省略其他代碼 
              notEmpty = lock.newCondition();
              notFull =  lock.newCondition();
          }
      
      public void put(E e) throws InterruptedException {
              checkNotNull(e);
              final ReentrantLock lock = this.lock;
              lock.lockInterruptibly();
              try {
                  while (count == items.length)
                      notFull.await();
                  insert(e);
              } finally {
                  lock.unlock();
              }
      }
      
      public E take() throws InterruptedException {
              final ReentrantLock lock = this.lock;
              lock.lockInterruptibly();
              try {
                  while (count == 0)
                      notEmpty.await();
                  return extract();
        } finally {
                  lock.unlock();
              }
      }
      
      private void insert(E x) {
              items[putIndex] = x;
              putIndex = inc(putIndex);
              ++count;
              notEmpty.signal();
          }
      

      當我們往隊列里插入一個元素時,如果隊列不可用,阻塞生產者主要通過 LockSupport.park(this); 來實現

      public final void await() throws InterruptedException {
                  if (Thread.interrupted())
                      throw new InterruptedException();
                  Node node = addConditionWaiter();
                  int savedState = fullyRelease(node);
                  int interruptMode = 0;
                  while (!isOnSyncQueue(node)) {
                      LockSupport.park(this);
                      if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                          break;
                  }
                  if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
                      interruptMode = REINTERRUPT;
                  if (node.nextWaiter != null) // clean up if cancelled
                      unlinkCancelledWaiters();
                  if (interruptMode != 0)
      
      reportInterruptAfterWait(interruptMode);
              }
      

      繼續進入源碼,發現調用 setBlocker 先保存下將要阻塞的線程,然后調用 unsafe.park 阻塞當前線程。

      public static void park(Object blocker) {
              Thread t = Thread.currentThread();
              setBlocker(t, blocker);
              unsafe.park(false, 0L);
              setBlocker(t, null);
          }
      

      unsafe.park 是個 native 方法,代碼如下:

      public native void park(boolean isAbsolute, long time);
      

      park 這個方法會阻塞當前線程,只有以下四種情況中的一種發生時,該方法才會返回。

      • 與 park 對應的 unpark 執行或已經執行時。注意:已經執行是指 unpark 先執行,然后再執行的 park。
      • 線程被中斷時。
      • 如果參數中的 time 不是零,等待了指定的毫秒數時。
      • 發生異常現象時。這些異常事先無法確定。

      我們繼續看一下 JVM 是如何實現 park 方法的,park 在不同的操作系統使用不同的方式實現,在 linux 下是使用的是系統方法 pthread_cond_wait 實現。實現代碼在 JVM 源碼路徑 src/os/linux/vm/os_linux.cpp 里的 os::PlatformEvent::park 方法,代碼如下:

      void os::PlatformEvent::park() {      
           	     int v ;
      	     for (;;) {
      		v = _Event ;
      	     if (Atomic::cmpxchg (v-1, &_Event, v) == v) break ;
      	     }
      	     guarantee (v >= 0, "invariant") ;
      	     if (v == 0) {
      	     // Do this the hard way by blocking ...
      	     int status = pthread_mutex_lock(_mutex);
      	     assert_status(status == 0, status, "mutex_lock");
      	     guarantee (_nParked == 0, "invariant") ;
      	     ++ _nParked ;
      	     while (_Event < 0) {
      	     status = pthread_cond_wait(_cond, _mutex);
      	     // for some reason, under 2.7 lwp_cond_wait() may return ETIME ...
      	     // Treat this the same as if the wait was interrupted
      	     if (status == ETIME) { status = EINTR; }
      	     assert_status(status == 0 || status == EINTR, status, "cond_wait");
      	     }
      	     -- _nParked ;
      	     
      	     // In theory we could move the ST of 0 into _Event past the unlock(),
      	     // but then we'd need a MEMBAR after the ST.
      	     _Event = 0 ;
      	     status = pthread_mutex_unlock(_mutex);
      	     assert_status(status == 0, status, "mutex_unlock");
      	     }
      	     guarantee (_Event >= 0, "invariant") ;
      	     }
      
           }
      

      pthread_cond_wait 是一個多線程的條件變量函數,cond 是 condition 的縮寫,字面意思可以理解為線程在等待一個條件發生,這個條件是一個全局變量。這個方法接收兩個參數,一個共享變量 _cond,一個互斥量 _mutex。而 unpark 方法在 linux 下是使用 pthread_cond_signal 實現的。park 在 windows 下則是使用 WaitForSingleObject 實現的。

      當隊列滿時,生產者往阻塞隊列里插入一個元素,生產者線程會進入 WAITING (parking) 狀態。我們可以使用 jstack dump 阻塞的生產者線程看到這點:

      "main" prio=5 tid=0x00007fc83c000000 nid=0x10164e000 waiting on condition [0x000000010164d000]
         java.lang.Thread.State: WAITING (parking)
              at sun.misc.Unsafe.park(Native Method)
              - parking to wait for  <0x0000000140559fe8> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
              at java.util.concurrent.locks.LockSupport.park(LockSupport.java:186)
              at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2043)
              at java.util.concurrent.ArrayBlockingQueue.put(ArrayBlockingQueue.java:324)
              at blockingqueue.ArrayBlockingQueueTest.main(ArrayBlockingQueueTest.java:11)
      

      4. 參考資料

      posted @ 2024-07-10 09:42  booleandev  閱讀(21)  評論(0)    收藏  舉報
      主站蜘蛛池模板: 亚洲成人高清av在线| 极品白嫩少妇无套内谢| 无码人妻黑人中文字幕| аⅴ天堂中文在线网| av性色av久久无码ai换脸| 中文字幕自拍偷拍福利视频| 国产AV福利第一精品| 黑人大群体交免费视频| 国产精品自拍视频我看看| 成人又黄又爽又色的视频| 国产精品毛片一区二区| 亚洲欧美精品一中文字幕| 四川丰满少妇无套内谢| 欧洲码亚洲码的区别入口| 亚洲中文字幕一区精品自| 久久天天躁狠狠躁夜夜躁2012| 无遮挡高潮国产免费观看| 好看的国产精品自拍视频| 国产真人性做爰久久网站| 亚洲精品一区二区在线播| 久久99精品久久久久久9 | 好吊妞| 久久99精品久久久久久青青| 亚洲人成网站77777在线观看| 成人无遮挡裸免费视频在线观看| 黑人巨大粗物挺进了少妇| 日夜啪啪一区二区三区| 桓台县| 999精品全免费观看视频| 国产成人午夜精品影院| 亚洲国产日韩一区三区| 九九热精品免费在线视频| 免费无码VA一区二区三区 | 成人免费A级毛片无码片2022| 亚洲av乱码久久亚洲精品| 色综合久久蜜芽国产精品| 毛片亚洲AV无码精品国产午夜| 欧美日韩一区二区三区视频播放| 美女爽到高潮嗷嗷嗷叫免费网站| 99久久免费精品色老| 在线视频不卡在线亚洲|