Executors----------------------------------------------Executors----------------------------------------------Executors--------------
為了更好的控制多線程,JDK提供了一套線程框架Executor,幫助開發人員有效的進行線程控制。它們都在java.util.concurrent包中,是JDK并發包的核心。
其中有一個比較重要的類:Executors,他扮演著線程工廠的角色,我們通過Executors可以創建特定功能的線程池。
Executors創建線程池方法:
newFixedThreadPool();該方法返回一個固定數量的線程池,該方法的線程數始終不變,當有一個任務提交時,若線程池中空閑,則立即執行,若沒有,
則會被暫緩在一個任務隊列中,等待有空閑的線程去執行。
newSingleThreadExecutor();創建一個線程的線程池,若空閑則執行,若沒有空閑線程則暫緩在任務隊列中。
newCachedThreadPool();返回一個可根據實際情況調整線程個數的線程池,不限制最大線程數量,若用空閑的線程則執行任務,若無任務則不創建線程。
并且每一個空閑線程會在60秒后自動回收。
newScheduledThreadPool();該方法返回一個SchededExecutorService對象,類似于第一個方法,但該線程池可以指定線程的數量。
//創建出容量為10的線程池
ExecutorService pool = Executors.newFixedThreadPool(10);
//表示創建了容量為1的線程池
ExecutorService pool2 = Executors.newSingleThreadExecutor();
//創建容量沒有限制的線程池
ExecutorService pool3 = Executors.newCachedThreadPool();
//創建固定數量的線程池(指定數量) 底層使用帶有延遲和周期性的執行任務機制
ScheduledExecutorService pool4 = Executors.newScheduledThreadPool(10);
分析底層實現:
newFixedThreadPool(10);
/**
* @param nThreads:核心線程
* @return
* new ThreadPoolExecutor
* @param nThreads 核心線程(線程池被實例化后,其內部直接初始化了"nThreads"個線程)
* @param nThreads 最大線程數
* @param 0L 當前線程空閑時間為0
* @param TimeUnit.MILLISECONDS 該線程空閑時間單位(秒、小時...)
* @param new LinkedBlockingQueue<Runnable>() (一種無界隊列類型)的存儲器,如果池子沒有空閑的話,新的線程將會被載入到該隊列中
*/
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
newSingleThreadExecutor();
/**
* @return
* new FinalizableDelegatedExecutorService
* ThreadPoolExecutor
* @param 1 默認在池子里創建一個線程
* @param 1 池子默認大小為1
* @param 0L 當前線程空閑時間為0
* @param TimeUnit.MILLISECONDS 該線程空閑時間單位(秒、小時...)
* @param new LinkedBlockingQueue<Runnable>() (一種無界隊列類型)的存儲器,如果池子沒有空閑的話,新的線程將會被載入到該隊列中
*/
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
newCachedThreadPool();
/**
* @return
* new ThreadPoolExecutor
* @param 0 初始化該線程池的時候默認不創建線程
* @param Integer.MAX_VALUE 默認該池子的大小為 MAX_VALUE (即不限制大小)
* @param 0 初始化該線程池的時候默認不創建線程
* @param TimeUnit.MILLISECONDS 該線程空閑時間單位(秒、小時...)
* @param new SynchronousQueue<Runnable>())阻塞隊列,用于存儲等待的任務
*/
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
Executors----------------------------------------------Executors----------------------------------------------Executors--------------
同步輔助類----------------------------------------------同步輔助類----------------------------------------------同步輔助類--------------
CountDownLatch 在完成一組正在其他線程中執行的操作之前,它允許一個或多個線程一直等待;
使用場景:常用語監聽某些初始化操作,等初始化執行完畢后,通知主線程繼續工作。
構造方法參數指定了計數的次數
countDown方法,當前線程調用此方法,則計數減一
await方法,調用此方法會一直阻塞當前線程,直到計時器的值為0
1 public class CountDownLatchDemo { 2 final static SimpleDateFormat sdf=new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); 3 public static void main(String[] args) throws InterruptedException { 4 CountDownLatch latch=new CountDownLatch(2);//兩個工人的協作 5 Worker worker1=new Worker("zhang san", 5000, latch); 6 Worker worker2=new Worker("li si", 8000, latch); 7 worker1.start();// 8 worker2.start();// 9 latch.await();//等待所有工人完成工作 10 System.out.println("all work done at "+sdf.format(new Date())); 11 } 12 13 14 static class Worker extends Thread{ 15 String workerName; 16 int workTime; 17 CountDownLatch latch; 18 public Worker(String workerName ,int workTime ,CountDownLatch latch){ 19 this.workerName=workerName; 20 this.workTime=workTime; 21 this.latch=latch; 22 } 23 public void run(){ 24 System.out.println("Worker "+workerName+" do work begin at "+sdf.format(new Date())); 25 doWork();//工作了 26 System.out.println("Worker "+workerName+" do work complete at "+sdf.format(new Date())); 27 latch.countDown();//工人完成工作,計數器減一 28 29 } 30 31 private void doWork(){ 32 try { 33 Thread.sleep(workTime); 34 } catch (InterruptedException e) { 35 e.printStackTrace(); 36 } 37 } 38 } 39 40 41 }
輸出:
Worker zhang san do work begin at 2011-04-14 11:05:11
Worker li si do work begin at 2011-04-14 11:05:11
Worker zhang san do work complete at 2011-04-14 11:05:16
Worker li si do work complete at 2011-04-14 11:05:19
all work done at 2011-04-14 11:05:19
CyclicBarrier 它允許一組線程互相等待,直到到達某個公共屏障點 (common barrier point)。在涉及一組固定大小的線程的程序中,
這些線程必須不時地互相等待,此時 CyclicBarrier 很有用。因為該 barrier 在釋放等待線程后可以重用,所以稱它為循環 的 barrier。
使用場景:需要所有的子任務都完成時,才執行主任務,這個時候就可以選擇使用CyclicBarrier。
method:
await
public int await()
throws InterruptedException,
BrokenBarrierException
在所有參與者都已經在此 barrier 上調用 await方法之前,將一直等待。如果當前線程不是將到達的最后一個線程,出于調度目的,將禁用它,且在發生以下情況之一前,該線程將一直處于休眠狀態:
最后一個線程到達;或者
其他某個線程中斷當前線程;或者
其他某個線程中斷另一個等待線程;或者
其他某個線程在等待 barrier 時超時;或者
其他某個線程在此 barrier 上調用 reset()。
如果當前線程:
在進入此方法時已經設置了該線程的中斷狀態;或者
在等待時被中斷
則拋出 InterruptedException,并且清除當前線程的已中斷狀態。如果在線程處于等待狀態時 barrier 被 reset(),或者在調用 await 時 barrier 被損壞,抑或任意一個線程正處于等待狀態,
則拋出 BrokenBarrierException 異常。
如果任何線程在等待時被 中斷,則其他所有等待線程都將拋出 BrokenBarrierException 異常,并將 barrier 置于損壞狀態。
如果當前線程是最后一個將要到達的線程,并且構造方法中提供了一個非空的屏障操作,則在允許其他線程繼續運行之前,當前線程將運行該操作。如果在執行屏障操作過程中發生異常,
則該異常將傳播到當前線程中,并將 barrier 置于損壞狀態。
返回:
到達的當前線程的索引,其中,索引 getParties() - 1 指示將到達的第一個線程,零指示最后一個到達的線程
拋出:
InterruptedException - 如果當前線程在等待時被中斷
BrokenBarrierException - 如果另一個 線程在當前線程等待時被中斷或超時,或者重置了 barrier,或者在調用 await 時 barrier 被損壞,抑或由于異常而導致屏障操作(如果存在)失敗。
1 public class CyclicBarrierTest { 2 3 public static void main(String[] args) throws IOException, InterruptedException { 4 //如果將參數改為4,但是下面只加入了3個選手,這永遠等待下去 5 //Waits until all parties have invoked await on this barrier. 6 CyclicBarrier barrier = new CyclicBarrier(3); 7 8 ExecutorService executor = Executors.newFixedThreadPool(3); 9 executor.submit(new Thread(new Runner(barrier, "1號選手"))); 10 executor.submit(new Thread(new Runner(barrier, "2號選手"))); 11 executor.submit(new Thread(new Runner(barrier, "3號選手"))); 12 13 executor.shutdown(); 14 } 15 } 16 17 class Runner implements Runnable { 18 // 一個同步輔助類,它允許一組線程互相等待,直到到達某個公共屏障點 (common barrier point) 19 private CyclicBarrier barrier; 20 21 private String name; 22 23 public Runner(CyclicBarrier barrier, String name) { 24 super(); 25 this.barrier = barrier; 26 this.name = name; 27 } 28 29 @Override 30 public void run() { 31 try { 32 Thread.sleep(1000 * (new Random()).nextInt(8)); 33 System.out.println(name + " 準備好了..."); 34 // barrier的await方法,在所有參與者都已經在此 barrier 上調用 await 方法之前,將一直等待。 35 barrier.await(); 36 } catch (InterruptedException e) { 37 e.printStackTrace(); 38 } catch (BrokenBarrierException e) { 39 e.printStackTrace(); 40 } 41 System.out.println(name + " 起跑!"); 42 } 43 }
輸出:
3號選手 準備好了... 2號選手 準備好了... 1號選手 準備好了... 1號選手 起跑! 2號選手 起跑! 3號選手 起跑!
同步輔助類----------------------------------------------同步輔助類----------------------------------------------同步輔助類--------------