<output id="qn6qe"></output>

    1. <output id="qn6qe"><tt id="qn6qe"></tt></output>
    2. <strike id="qn6qe"></strike>

      亚洲 日本 欧洲 欧美 视频,日韩中文字幕有码av,一本一道av中文字幕无码,国产线播放免费人成视频播放,人妻少妇偷人无码视频,日夜啪啪一区二区三区,国产尤物精品自在拍视频首页,久热这里只有精品12

      java并發(fā):阻塞隊(duì)列之DelayQueue

      延時(shí)隊(duì)列

      DelayQueue是一個(gè)支持延時(shí)獲取元素的使用優(yōu)先級(jí)隊(duì)列實(shí)現(xiàn)的無(wú)界的阻塞隊(duì)列。

      在創(chuàng)建元素時(shí)可以指定多久才能從隊(duì)列中獲取當(dāng)前元素,只有在延遲期滿時(shí)才能從隊(duì)列中提取元素。

      類圖如下:

      DelayQueue的定義以及構(gòu)造函數(shù)如下:

      public class DelayQueue<E extends Delayed> extends AbstractQueue<E>
          implements BlockingQueue<E> {
      
          private final transient ReentrantLock lock = new ReentrantLock();
          private final PriorityQueue<E> q = new PriorityQueue<E>();
      
          /**
           * Thread designated to wait for the element at the head of
           * the queue.  This variant of the Leader-Follower pattern
           * (http://www.cs.wustl.edu/~schmidt/POSA/POSA2/) serves to
           * minimize unnecessary timed waiting.  When a thread becomes
           * the leader, it waits only for the next delay to elapse, but
           * other threads await indefinitely.  The leader thread must
           * signal some other thread before returning from take() or
           * poll(...), unless some other thread becomes leader in the
           * interim.  Whenever the head of the queue is replaced with
           * an element with an earlier expiration time, the leader
           * field is invalidated by being reset to null, and some
           * waiting thread, but not necessarily the current leader, is
           * signalled.  So waiting threads must be prepared to acquire
           * and lose leadership while waiting.
           */
          private Thread leader;
      
          /**
           * Condition signalled when a newer element becomes available
           * at the head of the queue or a new thread may need to
           * become leader.
           */
          private final Condition available = lock.newCondition();
      
          /**
           * Creates a new {@code DelayQueue} that is initially empty.
           */
          public DelayQueue() {}
      
          /**
           * Creates a {@code DelayQueue} initially containing the elements of the
           * given collection of {@link Delayed} instances.
           *
           * @param c the collection of elements to initially contain
           * @throws NullPointerException if the specified collection or any
           *         of its elements are null
           */
          public DelayQueue(Collection<? extends E> c) {
              this.addAll(c);
          }

      解讀:

      DelayQueue 內(nèi)部使用 PriorityQueue 存放數(shù)據(jù),使用 ReentrantLock 實(shí)現(xiàn)線程同步。

       

      重點(diǎn)解釋一下變量 leader:

      其使用基于 Leader-Follower模式的變體,用于減少不必要的線程等待。

      當(dāng)一個(gè)線程調(diào)用隊(duì)列的 take 方法變?yōu)?leader 線程后,它會(huì)調(diào)用 available. awaitNanos(delay) 等待 delay 時(shí)間;其他線程(follwer線程) 則調(diào)用 available. await()進(jìn)行無(wú)限等待。

      leader 線程延遲時(shí)間過(guò)期后會(huì)退出 take 方法,并通過(guò)調(diào)用 available.signal()方法喚醒一個(gè) follwer線程,被喚醒的 follwer線程被選舉為新的 leader線程。 

       

      Note:

      隊(duì)列中的元素必須實(shí)現(xiàn)Delayed接口和Comparable接口,也就是說(shuō)DelayQueue里面的元素必須有public void compareTo(To)和long getDelay(TimeUnit unit)方法存在。

      原因:由于每個(gè)元素都有一個(gè)過(guò)期時(shí)間,所以要實(shí)現(xiàn)獲取當(dāng)前元素還剩下多少時(shí)間就過(guò)期了的接口;由于DelayQueue底層使用優(yōu)先級(jí)隊(duì)列來(lái)實(shí)現(xiàn),所以要實(shí)現(xiàn)元素之間相互比較的接口。

      Delayed接口的定義如下:

      public interface Delayed extends Comparable<Delayed> {
      
          /**
           * Returns the remaining delay associated with this object, in the
           * given time unit.
           *
           * @param unit the time unit
           * @return the remaining delay; zero or negative values indicate
           * that the delay has already elapsed
           */
          long getDelay(TimeUnit unit);
      }

      解讀:

      此接口繼承自Comparable接口。

      Comparable接口的定義如下:

      public interface Comparable<T> {
          public int compareTo(T o);
      }

       

      添加元素

      offer方法的代碼如下:

          /**
           * Inserts the specified element into this delay queue.
           *
           * @param e the element to add
           * @return {@code true}
           * @throws NullPointerException if the specified element is null
           */
          public boolean offer(E e) {
              final ReentrantLock lock = this.lock;
              lock.lock();
              try {
                  q.offer(e);
                  if (q.peek() == e) {
                      leader = null;
                      available.signal();
                  }
                  return true;
              } finally {
                  lock.unlock();
              }
          }

      解讀:

      該方法在獲取獨(dú)占鎖之后調(diào)用優(yōu)先級(jí)隊(duì)列的offer方法實(shí)現(xiàn)入隊(duì)

          /**
           * Inserts the specified element into this priority queue.
           *
           * @return {@code true} (as specified by {@link Queue#offer})
           * @throws ClassCastException if the specified element cannot be
           *         compared with elements currently in this priority queue
           *         according to the priority queue's ordering
           * @throws NullPointerException if the specified element is null
           */
          public boolean offer(E e) {
              if (e == null)
                  throw new NullPointerException();
              modCount++;
              int i = size;
              if (i >= queue.length)
                  grow(i + 1);
              siftUp(i, e);
              size = i + 1;
              return true;
          }

      解讀:

      如果待插入元素 e 為 null,則拋出 NullPointerException 異常。

      由于DelayQueue是無(wú)界隊(duì)列,所以方法一直返回 true。

      Note:

      前述offer中 q.peek()方法返回的并不一定是當(dāng)前添加的元素。

      如果 q.peek()方法返回的是 e,則說(shuō)明當(dāng)前元素 e將是最先過(guò)期的,于是重置 leader線程為 null,進(jìn)而激活 avaliable變量對(duì)應(yīng)的條件隊(duì)列里的一個(gè)線程,告訴它隊(duì)列里面有元素了。

       

      put方法的代碼如下:

          /**
           * Inserts the specified element into this delay queue. As the queue is
           * unbounded this method will never block.
           *
           * @param e the element to add
           * @throws NullPointerException {@inheritDoc}
           */
          public void put(E e) {
              offer(e);
          }

      解讀:

      此方法直接調(diào)用offer方法來(lái)實(shí)現(xiàn)。

      獲取元素

      poll方法的代碼如下:

          /**
           * Retrieves and removes the head of this queue, or returns {@code null}
           * if this queue has no elements with an expired delay.
           *
           * @return the head of this queue, or {@code null} if this
           *         queue has no elements with an expired delay
           */
          public E poll() {
              final ReentrantLock lock = this.lock;
              lock.lock();
              try {
                  E first = q.peek();
                  return (first == null || first.getDelay(NANOSECONDS) > 0)
                      ? null
                      : q.poll();
              } finally {
                  lock.unlock();
              }
          }

      解讀:

      如果隊(duì)列里面沒(méi)有過(guò)期元素,則返回null;否則返回隊(duì)首元素。

       

      take方法的代碼如下:

          /**
           * Retrieves and removes the head of this queue, waiting if necessary
           * until an element with an expired delay is available on this queue.
           *
           * @return the head of this queue
           * @throws InterruptedException {@inheritDoc}
           */
          public E take() throws InterruptedException {
              final ReentrantLock lock = this.lock;
              lock.lockInterruptibly();
              try {
                  for (;;) {
                      E first = q.peek();
                      if (first == null)
                          available.await();
                      else {
                          long delay = first.getDelay(NANOSECONDS);
                          if (delay <= 0L)
                              return q.poll();
                          first = null; // don't retain ref while waiting
                          if (leader != null)
                              available.await();
                          else {
                              Thread thisThread = Thread.currentThread();
                              leader = thisThread;
                              try {
                                  available.awaitNanos(delay);
                              } finally {
                                  if (leader == thisThread)
                                      leader = null;
                              }
                          }
                      }
                  }
              } finally {
                  if (leader == null && q.peek() != null)
                      available.signal();
                  lock.unlock();
              }
          }

      解讀:

      如果隊(duì)列里面沒(méi)有過(guò)期元素,則等待。

       

      示例

      可以將延時(shí)隊(duì)列DelayQueue運(yùn)用在以下場(chǎng)景中:

        (1)緩存系統(tǒng)的設(shè)計(jì):可以用DelayQueue保存元素的有效期,使用一個(gè)線程循環(huán)查詢DelayQueue,一旦能從DelayQueue中獲取元素,則表示緩存有效期到了。

        (2)定時(shí)任務(wù)調(diào)度:使用DelayQueue保存當(dāng)天將要執(zhí)行的任務(wù)和執(zhí)行時(shí)間,一旦從DelayQueue中獲取到任務(wù)就開(kāi)始執(zhí)行任務(wù),比如TimerQueue就是使用DelayQueue實(shí)現(xiàn)的。

       

      具體示例如下:

      (1)Student對(duì)象作為DelayQueue的元素,其必須實(shí)現(xiàn)Delayed接口的兩個(gè)方法

      package com.test;
      
      import java.util.concurrent.Delayed;
      import java.util.concurrent.TimeUnit;
      
      public class Student implements Delayed {//必須實(shí)現(xiàn)Delayed接口
          
          private String name;
          private long submitTime;// 交卷時(shí)間
          private long workTime;// 考試時(shí)間
      
          public Student(String name, long submitTime) {
              this.name = name;
              this.workTime = submitTime;
              this.submitTime = TimeUnit.NANOSECONDS.convert(submitTime, TimeUnit.MILLISECONDS) + System.nanoTime();
              System.out.println(this.name + " 交卷,用時(shí)" + workTime);
          }
      
          public String getName() {
              return this.name + " 交卷,用時(shí)" + workTime;
          }
          
          //必須實(shí)現(xiàn)getDelay方法
          public long getDelay(TimeUnit unit) {
              //返回一個(gè)延遲時(shí)間
              return unit.convert(submitTime - System.nanoTime(), unit.NANOSECONDS);
          }
      
          //必須實(shí)現(xiàn)compareTo方法
          public int compareTo(Delayed o) {
              Student that = (Student) o;
              return submitTime > that.submitTime ? 1 : (submitTime < that.submitTime ? -1 : 0);
          }
      
      }

       

      (2)主線程程序

      package com.test;
      
      import java.util.concurrent.DelayQueue;

      public class DelayQueueTest { public static void main(String[] args) throws Exception { // 新建一個(gè)等待隊(duì)列 final DelayQueue<Student> bq = new DelayQueue<Student>(); for (int i = 0; i < 5; i++) { Student student = new Student("學(xué)生"+i,Math.round((Math.random()*10+i))); bq.put(student); // 將數(shù)據(jù)存到隊(duì)列里! } //獲取但不移除此隊(duì)列的頭部;如果此隊(duì)列為空,則返回 null。 System.out.println(bq.peek().getName()); } }

       

      小結(jié):

       

      posted @ 2021-08-15 20:29  時(shí)空穿越者  閱讀(373)  評(píng)論(0)    收藏  舉報(bào)
      主站蜘蛛池模板: 欧美寡妇xxxx黑人猛交| 久久国产免费直播| 天堂亚洲免费视频| 欧美性插b在线视频网站| 国产成人高清精品免费软件| 国产情侣激情在线对白| 精品少妇人妻av无码久久| 图片区 小说区 区 亚洲五月 | chinese性内射高清国产| 无码人妻斩一区二区三区| 亚洲最大色综合成人av| 日产无人区一线二码三码2021| 国产盗摄xxxx视频xxxx| 女人香蕉久久毛毛片精品| 日韩在线成年视频人网站观看| 国产精品自拍午夜福利| 国产视色精品亚洲一区二区| 国产亚洲制服免视频| 中文字幕人妻不卡精品| 国产精品伦人一久二久三久| 激情六月丁香婷婷四房播 | 国产美女精品一区二区三区| 91高清免费国产自产拍| 亚洲 欧美 综合 另类 中字| 人妻中文字幕一区二区视频| 久久精品一本到99热免费| 中文字幕精品亚洲二区| 在线看无码的免费网站| 久久日产一线二线三线| 热久在线免费观看视频 | 国产内射性高湖| 深夜免费av在线观看| 午夜精品视频在线看| 亚洲av久久精品狠狠爱av| 中文字幕在线看视频一区二区三区| 国产成人高清精品亚洲| 日韩大尺度一区二区三区| 国产精品自在欧美一区| 国产又黄又爽又不遮挡视频| 亚洲精品成人综合色在线| 国产精品成人一区二区三区|