并發(fā)編程--上篇
Java并發(fā)探索--上篇
1.基本概念
- 線程與進(jìn)程:線程是程序執(zhí)行的最小單位,而進(jìn)程是系統(tǒng)進(jìn)行資源分配和調(diào)度的基本單位。例如,一個(gè) Java 程序可以包含多個(gè)線程,它們共享進(jìn)程的資源。
- 并發(fā)與并行:并發(fā)是指多個(gè)任務(wù)在同一時(shí)間段內(nèi)執(zhí)行,而并行是指多個(gè)任務(wù)在同一時(shí)刻執(zhí)行。在多核 CPU 系統(tǒng)中,可以實(shí)現(xiàn)真正的并行。
- 同步與異步:同步是指程序按照順序依次執(zhí)行,而異步是指程序在執(zhí)行某個(gè)任務(wù)時(shí),不需要等待該任務(wù)完成,可以繼續(xù)執(zhí)行其他任務(wù)。
“Java并發(fā)探索--下篇” --- 在下面找
【博客園】
http://www.rzrgm.cn/jackjavacpp
【CSDN】
https://blog.csdn.net/okok__TXF
2.探索線程的創(chuàng)建
①線程的狀態(tài)
從Thread源碼里面看出
public enum State {
// 尚未啟動(dòng)的線程的線程狀態(tài)。
NEW,
// 就緒
RUNNABLE,
// 等待監(jiān)視器鎖的線程的線程狀態(tài)
BLOCKED,
/*
等待線程的線程狀態(tài),線程由于調(diào)用以下方法之一而處于等待狀態(tài):
Object.wait() 沒(méi)有超時(shí)
Thread.join() 沒(méi)有超時(shí)
LockSupport.park()
*/
WAITING,
/*
指定等待時(shí)間的等待線程的線程狀態(tài)
線程處于定時(shí)等待狀態(tài),因?yàn)檎{(diào)用了以下方法之一,并指定等待時(shí)間:
Thread.sleep
Object.wait with timeout
Thread.join with timeout
LockSupport.parkNanos
LockSupport.parkUntil
*/
TIMED_WAITING,
//終止線程的線程狀態(tài)。線程已完成執(zhí)行。
TERMINATED;
}
下面看一張圖,很清楚的解釋了各狀態(tài)之間的關(guān)系:【節(jié)選自https://blog.csdn.net/agonie201218/article/details/128712507】
在Java中,一個(gè)Thread有大致六個(gè)狀態(tài)。
線程創(chuàng)建之后(new Thread)它將處于 NEW(新建) 狀態(tài),調(diào)用 start() 方法后開(kāi)始運(yùn)行,線程這時(shí)候處于 RUNNABLE(就緒) 狀態(tài)。可運(yùn)行狀態(tài)的線程獲得了 CPU 時(shí)間片后就處于 RUNNING(運(yùn)行) 狀態(tài)。
明白了線程的運(yùn)行狀態(tài),接下來(lái)讓我們來(lái)看一下在爪哇里面如何創(chuàng)建并且啟動(dòng)線程。
②線程創(chuàng)建
1)兩種基本方式
- 繼承Thread類(lèi),重寫(xiě)run方法
public class MyThread1 extends Thread {
@Override
public void run() {
System.out.println(Thread.currentThread().getName() + ": hello world");
}
}
public class JUCMain {
public static void main(String[] args) {
new MyThread1().start();
}
}
- 實(shí)現(xiàn)Runnable接口,傳入Thread
public class Runnable1 implements Runnable{
@Override
public void run() {
System.out.println("hello world, Runnable");
}
}
public class JUCMain {
public static void main(String[] args) {
new Thread(new Runnable1()).start();
}
}
網(wǎng)上還傳有其他創(chuàng)建線程的方式,比如: Callable接口,重寫(xiě)call,結(jié)合FutureTask;線程池;lambda表達(dá)式等等。。。誠(chéng)然,這也確實(shí)是創(chuàng)建線程啟動(dòng)的方式不錯(cuò)。但是本文畢竟是探索性質(zhì)的文章,我們要探索其本質(zhì)。
首先從start()方法看起(這個(gè)方式屬于Thread類(lèi)的)。調(diào)用start()后,JVM會(huì)創(chuàng)建一個(gè)新線程并執(zhí)行該線程的run()方法。注意:直接調(diào)用run()不會(huì)啟動(dòng)新線程,而是在當(dāng)前線程中執(zhí)行。
// 啟動(dòng)線程并觸發(fā) JVM 創(chuàng)建原生線程
// synchronized后面解釋【見(jiàn) 探索“鎖”】
public synchronized void start() {
// 零狀態(tài)值對(duì)應(yīng)于狀態(tài) “NEW”
// 線程想要start,必須是為0的狀態(tài)
if (threadStatus != 0)
throw new IllegalThreadStateException();
/*
group 是線程所屬的線程組。這行代碼將當(dāng)前線程實(shí)例添加到線程組中,
同時(shí)線程組的未啟動(dòng)線程計(jì)數(shù)會(huì)減1。
*/
group.add(this);
boolean started = false;
try {
start0(); //關(guān)鍵!調(diào)用本地方法(native)
started = true;
} finally {
try {
if (!started) { //啟動(dòng)失敗時(shí)回滾
//如果 started 為 false,說(shuō)明線程啟動(dòng)失敗,
//調(diào)用 group.threadStartFailed(this) 方法通知線程組該線程啟動(dòng)失敗。
group.threadStartFailed(this);
}
} catch (Throwable ignore) {
/* do nothing. If start0 threw a Throwable then
it will be passed up the call stack */
}
}
}
//========== native
private native void start0();
那么執(zhí)行的是run()方法,run方法里面是啥呢
private Runnable target; // target是Runnable類(lèi)型
@Override
public void run() {
if (target != null) {
target.run();
}
}
如果繼承Thread類(lèi)后,重寫(xiě)run()方法,那么run方法就會(huì)覆蓋上面的方法。
如果是實(shí)現(xiàn)的Runnable接口,new Thread(new Runnable1())的時(shí)候,就會(huì)把target賦值,然后調(diào)用run()方法的時(shí)候,就執(zhí)行的是target的run方法了。
2) 其他創(chuàng)建方式
.lambda
- lambda表達(dá)式創(chuàng)建:這個(gè)僅僅是寫(xiě)法不同而已。因?yàn)镽unnable是個(gè)函數(shù)式接口
@FunctionalInterface
public interface Runnable {
public abstract void run();
}
.callable
- Callable創(chuàng)建的方式
public class MyCall implements Callable<String> {
@Override
public String call() throws Exception {
Thread.sleep(2000);
return "Hello Callable";
}
}
public static void main(String[] args) throws ExecutionException, InterruptedException {
FutureTask<String> task = new FutureTask<>(new MyCall());
new Thread(task).start();
System.out.println(task.get());
}
new Thread(Runnable runnable)要求傳的類(lèi)型是Runnable,但是現(xiàn)在傳的是FutureTask。所以先來(lái)看一看FutureTask和Runnable之間有什么聯(lián)系.
從上面可以看到,F(xiàn)utureTask實(shí)現(xiàn)了RunnableFuture接口,然后RunnableFuture接口繼承了Future和Runnable兩個(gè)接口。
Future<V>
Future 接口是 Java 并發(fā)編程中的一個(gè)重要接口,位于 java.util.concurrent 包下,它代表了一個(gè)異步計(jì)算的結(jié)果。異步計(jì)算意味著在調(diào)用方法后,程序不會(huì)立即等待結(jié)果返回,而是可以繼續(xù)執(zhí)行其他任務(wù),當(dāng)結(jié)果準(zhǔn)備好時(shí),再通過(guò) Future 對(duì)象獲取。
// 這里使用了泛型 <V>,表示該 Future 對(duì)象所代表的異步計(jì)算結(jié)果的類(lèi)型。
public interface Future<V> {
//嘗試取消異步任務(wù)的執(zhí)行。
/*
如果任務(wù)已經(jīng)完成、已經(jīng)被取消或者由于其他原因無(wú)法取消,則返回 false;
如果任務(wù)成功取消,則返回 true。
*/
boolean cancel(boolean mayInterruptIfRunning);
//如果任務(wù)在完成之前被取消,則返回 true;否則返回 false。
boolean isCancelled();
//如果任務(wù)已經(jīng)完成,則返回 true;否則返回 false。
boolean isDone();
//獲取異步任務(wù)的計(jì)算結(jié)果。如果任務(wù)還未完成,調(diào)用該方法的線程會(huì)被阻塞,直到任務(wù)完成。
V get() throws InterruptedException, ExecutionException;
//獲取異步任務(wù)的計(jì)算結(jié)果,并且可以指定一個(gè)超時(shí)時(shí)間。
//如果在指定的時(shí)間內(nèi)任務(wù)還未完成,調(diào)用該方法的線程會(huì)被阻塞,直到任務(wù)完成或者超時(shí)。
V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}
RunnableFuture
public interface RunnableFuture<V> extends Runnable, Future<V> {
// 很簡(jiǎn)單嘛,這個(gè)是來(lái)自Runnable的
void run();
}
這個(gè)接口就相當(dāng)于組合了Runnable和Future,能夠獲取到返回值了。
FutureTask<V> 既然要把它當(dāng)做參數(shù)傳進(jìn)Thread的構(gòu)造函數(shù),那么想必它肯定是實(shí)現(xiàn)了run方法的。
public class FutureTask<V> implements RunnableFuture<V> {
// 基本屬性
private volatile int state;
private static final int NEW = 0;
private static final int COMPLETING = 1;
private static final int NORMAL = 2;
private static final int EXCEPTIONAL = 3;
private static final int CANCELLED = 4;
private static final int INTERRUPTING = 5;
private static final int INTERRUPTED = 6;
/** The underlying callable; nulled out after running */
private Callable<V> callable;
/** 結(jié)果 */
private Object outcome;
/** The thread running the callable; CAS ed during run() */
private volatile Thread runner;
/** Treiber stack of waiting threads */
private volatile WaitNode waiters;
// 看它的構(gòu)造函數(shù)1
public FutureTask(Callable<V> callable) {
if (callable == null)
throw new NullPointerException();
this.callable = callable; // 賦值callable========
this.state = NEW; // ensure visibility of callable
}
// 構(gòu)造函數(shù)2 ==== 本質(zhì)還是把Runnable加了一層,給封裝成Callable了
public FutureTask(Runnable runnable, V result) {
this.callable = Executors.callable(runnable, result);
this.state = NEW; // ensure visibility of callable
}
/*
Executors::callable(xx, xx)方法==========
public static <T> Callable<T> callable(Runnable task, T result) {
if (task == null)
throw new NullPointerException();
return new RunnableAdapter<T>(task, result);
}
static final class RunnableAdapter<T> implements Callable<T> {
final Runnable task;
final T result;
RunnableAdapter(Runnable task, T result) {
this.task = task;
this.result = result;
}
public T call() {
task.run(); // 調(diào)用Runnable的run()
return result;
}
}
*/
// run()方法 ---------------
// new Thread(new FutureTask<>(new MyCall()))
public void run() {
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
return;
try {
Callable<V> c = callable;
if (c != null && state == NEW) {
V result;
boolean ran;
try {
//====調(diào)用callable.call()
result = c.call();
ran = true;
} catch (Throwable ex) {
.........
}
// 如果運(yùn)行OK了,設(shè)置結(jié)果!
if (ran) set(result);
}
} finally {
.............
}
}
// 設(shè)置結(jié)果outcome
protected void set(V v) {
// http://www.rzrgm.cn/jackjavacpp/p/18787832
// 使用CAS --- 【見(jiàn)上一篇文章 java map & CAS & AQS】
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
outcome = v; // 這里
UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
finishCompletion();
}
}
// 比較核心的get方法================start
public V get() throws InterruptedException, ExecutionException {
int s = state;
if (s <= COMPLETING) // 如果狀態(tài)不是完成
s = awaitDone(false, 0L); // 等待完成
return report(s); // 返回結(jié)果
}
private int awaitDone(boolean timed, long nanos)
throws InterruptedException {
// 1.計(jì)算超時(shí)截止時(shí)間
final long deadline = timed ? System.nanoTime() + nanos : 0L;
WaitNode q = null;
boolean queued = false;
for (;;) { // 2.自旋循環(huán)等待任務(wù)完成
// 2.1如果該線程中斷了
if (Thread.interrupted()) {
removeWaiter(q);// 從等待隊(duì)列中移除當(dāng)前節(jié)點(diǎn)
throw new InterruptedException();
}
// 2.2檢查狀態(tài)
int s = state;
// 任務(wù)已終態(tài)(NORMAL, EXCEPTIONAL, CANCELLED)
if (s > COMPLETING) {
if (q != null)
q.thread = null;
return s;// 返回最終狀態(tài)
}
// 2.3若任務(wù)狀態(tài)等于 COMPLETING,表明任務(wù)正在完成,
// 此時(shí)調(diào)用 Thread.yield() 方法讓當(dāng)前線程讓出 CPU 時(shí)間片,等待任務(wù)完成。
else if (s == COMPLETING) // cannot time out yet
Thread.yield();
else if (q == null)
q = new WaitNode();
else if (!queued) //將節(jié)點(diǎn)加入等待隊(duì)列
queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
q.next = waiters, q);
else if (timed) { // 2.4如果是有時(shí)限的get()
nanos = deadline - System.nanoTime();
if (nanos <= 0L) {
removeWaiter(q);
return state; // 返回狀態(tài)
}
LockSupport.parkNanos(this, nanos);
}
else //若沒(méi)有設(shè)置超時(shí)時(shí)間,就調(diào)用 LockSupport.park 方法讓當(dāng)前線程無(wú)限期阻塞,直到被喚醒。
LockSupport.park(this);
}
}
private V report(int s) throws ExecutionException {
Object x = outcome;
if (s == NORMAL)
return (V)x; // 返回outcome
......
}
//==================================end
}
awaitDone 方法的核心功能是讓當(dāng)前線程等待異步任務(wù)完成,它會(huì)持續(xù)檢查任務(wù)的狀態(tài),根據(jù)不同的狀態(tài)采取相應(yīng)的處理措施,同時(shí)支持設(shè)置超時(shí)時(shí)間。在等待過(guò)程中,若線程被中斷,會(huì)拋出 InterruptedException 異常。
通過(guò)上面的分析,Callable這種方式實(shí)際上本質(zhì)還是Runnable嚯。使用FutureTask將Future和Runnable結(jié)合起來(lái),功能更加豐富。
.線程池ThreadPoolExecutor
- 線程池創(chuàng)建線程
如下使用方式。
public class PoolMain {
public static void main(String[] args) {
// 創(chuàng)建一個(gè)線程池
ExecutorService pool = Executors.newFixedThreadPool(1);
long start = System.currentTimeMillis();
// execute=============
pool.execute(() -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println("execute pool創(chuàng)建啟動(dòng)線程!");
});
// submit==============
Future<Integer> future = pool.submit(() -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println("submit pool創(chuàng)建啟動(dòng)線程!");
return 100;
});
try {
System.out.println(future.get());
} catch (InterruptedException | ExecutionException e) {
throw new RuntimeException(e);
}
System.out.println("main線程執(zhí)行時(shí)間:" + (System.currentTimeMillis() - start));
pool.shutdown();
}
}
從上面的例子可以看出,大致有ExecutorService,Executors, newFixedThreadPool()方法本質(zhì)是 new ThreadPoolExecutor(),故還有一個(gè)ThreadPoolExecutor類(lèi)。
接下來(lái)梳理一下這些類(lèi)背后的關(guān)系。【通過(guò)idea得到下面的關(guān)系圖】此外,Executors只是一個(gè)工具類(lèi)。
Executor是頂級(jí)接口
public interface Executor {
// 只定義了一個(gè)方法
void execute(Runnable command);
}
ExecutorService:是一個(gè)比Executor使用更廣泛的子類(lèi)接口,其提供了生命周期管理的方法,以及可跟蹤一個(gè)或多個(gè)異步任務(wù)執(zhí)行狀況返回Future的方法
public interface ExecutorService extends Executor {
void shutdown();
List<Runnable> shutdownNow();
<T> Future<T> submit(Callable<T> task);
<T> Future<T> submit(Runnable task, T result);
Future<?> submit(Runnable task);
//....
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
throws InterruptedException;
<T> T invokeAny(Collection<? extends Callable<T>> tasks)
throws InterruptedException, ExecutionException;
}
AbstractExecutorServiceExecutorService執(zhí)行方法的默認(rèn)實(shí)現(xiàn),發(fā)現(xiàn)下面的submit()底層實(shí)際執(zhí)行的是execute(ftask)方法【Executor接口的execute()方法,在這個(gè)抽象類(lèi)里面沒(méi)有具體實(shí)現(xiàn),到具體子類(lèi)ThreadPoolExecutor在可以看到】。
public abstract class AbstractExecutorService implements ExecutorService {
// 這里重點(diǎn)只看一下submit方法的默認(rèn)實(shí)現(xiàn)
// 優(yōu)點(diǎn)1: 可以有Future返回值
public Future<?> submit(Runnable task) {
if (task == null) throw new NullPointerException();
RunnableFuture<Void> ftask = newTaskFor(task, null);
execute(ftask);
return ftask;
}
public <T> Future<T> submit(Runnable task, T result) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task, result);
execute(ftask);
return ftask;
}
// 優(yōu)點(diǎn)2: 支持Callable參數(shù)
public <T> Future<T> submit(Callable<T> task) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task);
execute(ftask);
return ftask;
}
}
ThreadPoolExecutor:線程池,可以通過(guò)調(diào)用Executors靜態(tài)工廠方法來(lái)創(chuàng)建線程池并返回一個(gè)ExecutorService對(duì)象
public class ThreadPoolExecutor extends AbstractExecutorService {
/**
* 七大參數(shù)!!!!======
*/
public ThreadPoolExecutor(int corePoolSize,//線程池的核心線程數(shù)量
int maximumPoolSize,//線程池的最大線程數(shù)
long keepAliveTime,//當(dāng)線程數(shù)大于核心線程數(shù)時(shí),多余的空閑線程存活的最長(zhǎng)時(shí)間
TimeUnit unit,//時(shí)間單位
BlockingQueue<Runnable> workQueue,//任務(wù)隊(duì)列,用來(lái)儲(chǔ)存等待執(zhí)行任務(wù)的隊(duì)列
ThreadFactory threadFactory,//線程工廠,用來(lái)創(chuàng)建線程,一般默認(rèn)即可
RejectedExecutionHandler handler//拒絕策略,當(dāng)提交的任務(wù)過(guò)多而不能及時(shí)處理時(shí),定制策略來(lái)處理任務(wù)
) {
if (corePoolSize < 0 || maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize || keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
}
回到這一小節(jié)最開(kāi)始的時(shí)候,例子中的線程池有兩種提交并運(yùn)行線程的方式execute和submit兩個(gè)方法。現(xiàn)在來(lái)看一下ThreadPoolExecutor中的execute()方法是什么樣子的。submit()我們已經(jīng)知道了是在AbstractExecutorService中有默認(rèn)實(shí)現(xiàn)的。
// ThreadPoolExecutor::execute(Runnable command)
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
// 1.若當(dāng)前工作線程數(shù)小于核心線程數(shù)(corePoolSize),嘗試創(chuàng)建新的核心線程
// 這里是用的位運(yùn)算的,我沒(méi)有深究
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true)) //
return;
// 創(chuàng)建新線程失敗,重新獲取ctl
c = ctl.get();
}
// 2.任務(wù)入隊(duì)
// 線程池處于運(yùn)行狀態(tài)(isRunning(c))
// 且任務(wù)成功加入阻塞隊(duì)列(workQueue.offer(command))
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
// 2.2 雙重檢查
/*
2.2.1再次檢查線程池狀態(tài)(可能在此期間線程池被關(guān)閉)。
2.2.2若線程池已關(guān)閉,嘗試從隊(duì)列中移除任務(wù),移除成功則拒絕任務(wù)(reject(command))。
2.2.3若線程池仍運(yùn)行但無(wú)活躍線程(workerCountOf(recheck) == 0),
添加一個(gè)非核心線程(addWorker(null, false)),該線程會(huì)從隊(duì)列中拉取任務(wù)執(zhí)行。
*/
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
// 3.若任務(wù)無(wú)法入隊(duì)(隊(duì)列已滿),嘗試創(chuàng)建非核心線程(addWorker(command, false))
else if (!addWorker(command, false))
//若創(chuàng)建失敗(線程數(shù)已達(dá)maximumPoolSize或線程池已關(guān)閉),
//執(zhí)行拒絕策略(reject(command))
reject(command);
}
/*
execute(command)
│
├─ 檢查command非空
│
├─ 當(dāng)前線程數(shù) < corePoolSize?
│ ├─ 是 → 創(chuàng)建核心線程 → 成功則返回
│ └─ 否 → 進(jìn)入下一步
│
├─ 線程池是否RUNNING且任務(wù)入隊(duì)成功?
│ ├─ 是 → 雙重檢查狀態(tài)
│ │ ├─ 線程池已關(guān)閉 → 移除任務(wù)并拒絕
│ │ └─ 線程池仍運(yùn)行且無(wú)活躍線程 → 創(chuàng)建非核心線程
│ │
│ └─ 否 → 嘗試創(chuàng)建非核心線程
│ ├─ 成功 → 處理任務(wù)
│ └─ 失敗 → 拒絕任務(wù)
*/
為什么需要二次檢查線程池狀態(tài)?
- 任務(wù)入隊(duì)后,其他線程可能關(guān)閉了線程池(如調(diào)用shutdown())。
- 處理:
- 若線程池已關(guān)閉,需移除已入隊(duì)任務(wù)并拒絕。
- 若線程池仍運(yùn)行但無(wú)活躍線程(如核心線程數(shù)為0且任務(wù)在隊(duì)列中),需創(chuàng)建新線程處理隊(duì)列任務(wù)。
場(chǎng)景1:核心線程數(shù)未滿
- 線程池處于
RUNNING,當(dāng)前線程數(shù) 2(corePoolSize=5)。 addWorker(task, true)成功創(chuàng)建核心線程并執(zhí)行任務(wù)。
場(chǎng)景2:隊(duì)列已滿,創(chuàng)建臨時(shí)線程
- 核心線程滿載,隊(duì)列已滿,線程數(shù)未達(dá)
maximumPoolSize。 addWorker(task, false)創(chuàng)建臨時(shí)線程處理任務(wù)。
場(chǎng)景3:SHUTDOWN 狀態(tài)處理剩余任務(wù)
- 線程池調(diào)用
shutdown(),狀態(tài)變?yōu)?SHUTDOWN。 - 已有任務(wù)在隊(duì)列中,
addWorker(null, true)創(chuàng)建線程處理隊(duì)列任務(wù)。
ThreadPoolExecutor設(shè)計(jì)思想:
- 核心線程優(yōu)先:優(yōu)先使用核心線程處理任務(wù)。
- 隊(duì)列緩沖:核心線程滿載后,任務(wù)入隊(duì)等待。
- 非核心線程應(yīng)急:隊(duì)列滿后,創(chuàng)建臨時(shí)線程處理任務(wù)(不超過(guò)
maximumPoolSize)
addWorker 創(chuàng)建工作線程
addWorker 方法通過(guò)精細(xì)的狀態(tài)檢查和并發(fā)控制,確保線程池在動(dòng)態(tài)擴(kuò)縮容時(shí)保持線程安全。【方便理解,可以把這個(gè)方法看作是創(chuàng)建線程】其核心在于:
- 雙重循環(huán):外層處理狀態(tài)變化,內(nèi)層處理線程數(shù)修改。
- 鎖與原子操作結(jié)合:保證
workers集合和workerCount的一致性。 - 異常回滾機(jī)制:確保資源不會(huì)泄漏(如線程數(shù)虛增或 Worker 未清理)。
firstTask為我們最開(kāi)始寫(xiě)的Runnable。【記一個(gè)代號(hào)叫做 “我的任務(wù)” 】
// ======== firstTask pool.execute(() -> { try { Thread.sleep(1000); } catch (InterruptedException e) { throw new RuntimeException(e); } System.out.println("execute pool創(chuàng)建啟動(dòng)線程!"); });
// addWorker(runnable, core)
//標(biāo)記是否以核心線程數(shù)(corePoolSize)為上限創(chuàng)建線程。
//若為 false,則使用最大線程數(shù)(maximumPoolSize)
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
........
int rs = runStateOf(c);// 獲取線程池狀態(tài)
// 檢查是否允許添加Worker
if (rs >= SHUTDOWN &&
!(rs == SHUTDOWN &&
firstTask == null &&
!workQueue.isEmpty()))
return false;
for (;;) {
.........
//CAS 增加線程數(shù)
if (compareAndIncrementWorkerCount(c))
break retry;// 成功增加,跳出循環(huán)
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
}
}
...........
try {
// 把“我的任務(wù)”傳進(jìn)去了
w = new Worker(firstTask); // 創(chuàng)建的一個(gè)Worker
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
// 加鎖保護(hù) workers 集合
mainLock.lock();
try {
// 再次檢查狀態(tài)(防止在加鎖前狀態(tài)變化)
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive())
//======拋出異常....
workers.add(w); // 將 Worker 加入集合
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
// 【W(wǎng)orker類(lèi)里面的thread】
// 啟動(dòng)線程=========重點(diǎn)【見(jiàn)下面的分析】
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
Worker是ThreadPoolExecutor的內(nèi)部類(lèi),可以看出是一個(gè)Runnable,那么肯定重寫(xiě)了run()方法
private final class Worker
extends AbstractQueuedSynchronizer implements Runnable
{
final Thread thread;
Runnable firstTask; //"我的任務(wù)"到這里來(lái)了
// 構(gòu)造函數(shù)
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
// 利用線程工廠創(chuàng)建了一個(gè)線程
/*
如果final Thread t = w.thread;
t.start();啟動(dòng)的話,執(zhí)行的是這個(gè)內(nèi)部類(lèi)的run()
*/
this.thread = getThreadFactory().newThread(this);
}
// run就這一行
public void run() {runWorker(this);}
//到這里了
final void runWorker(Worker w) {
..
//"我的任務(wù)"
Runnable task = w.firstTask;
...
try {
//getTask()從等待隊(duì)列里面取出Runnable
while (task != null || (task = getTask()) != null) {
....
task.run();//==========
}
}......
..
//// 無(wú)任務(wù)時(shí)回收線程
processWorkerExit(w, completedAbruptly);
}
}
ThreadPoolExecutor的靜態(tài)內(nèi)部類(lèi) :: jdk自帶的四種拒絕策略。
public interface RejectedExecutionHandler {
void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
}
// 1.直接拋出異常
public static class AbortPolicy implements RejectedExecutionHandler {
.........
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
throw new RejectedExecutionException("Task " + r.toString() +
" rejected from " +
e.toString());
}
}
// 2.直接丟棄
public static class DiscardPolicy implements RejectedExecutionHandler {
.....
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
}
}
// 3.丟棄等待隊(duì)列的第一個(gè)
public static class DiscardOldestPolicy implements RejectedExecutionHandler {
........
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
//如果線程池未關(guān)閉,就彈出隊(duì)列頭部的元素,然后嘗試執(zhí)行
if (!e.isShutdown()) {
e.getQueue().poll();
e.execute(r);
}
}
}
// 4.調(diào)用者運(yùn)行,直接執(zhí)行run()方法里面的邏輯。
// 只要線程池沒(méi)有關(guān)閉,就由提交任務(wù)的當(dāng)前線程處理
public static class CallerRunsPolicy implements RejectedExecutionHandler {
......
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
r.run();
}
}
}
總結(jié)一下,在了解到了ThreadPoolExecutor的一些類(lèi)間關(guān)系、以及一些基本流程、屬性之后。接下來(lái)我們來(lái)梳理一遍,線程池的運(yùn)行方式。
-
創(chuàng)建線程池(七大參數(shù)、四大拒絕策略)
-
任務(wù)提交
//2.1 executor.execute(() -> { // 任務(wù)邏輯 });2.2任務(wù)處理決策鏈:::
2.2.1嘗試創(chuàng)建核心線程:當(dāng)前工作線程數(shù) <
corePoolSize,直接創(chuàng)建新核心線程執(zhí)行任務(wù)if (workerCount < corePoolSize) { addWorker(command, true); // true表示檢查corePoolSize return; }2.2.2任務(wù)入隊(duì): 若核心線程已滿,任務(wù)嘗試加入工作隊(duì)列。
if (workQueue.offer(command)) { // 雙重檢查線程池狀態(tài) if (線程池已關(guān)閉) 移除任務(wù)并拒絕; else if (當(dāng)前無(wú)活躍線程) 創(chuàng)建非核心線程處理隊(duì)列任務(wù); }2.2.3嘗試創(chuàng)建非核心線程: 若隊(duì)列已滿且線程數(shù) <
maximumPoolSize,創(chuàng)建非核心線程。else if (!addWorker(command, false)) { // false表示檢查maximumPoolSize reject(command); // 觸發(fā)拒絕策略 }2.2.4拒絕任務(wù)
- 工作線程執(zhí)行任務(wù)
3.1Worker初始化:每個(gè)
Worker綁定一個(gè)線程和初始任務(wù)(firstTask)。Worker w = new Worker(firstTask); final Thread t = w.thread; t.start(); // 啟動(dòng)線程執(zhí)行runWorker()3.2任務(wù)執(zhí)行循環(huán)(
runWorker方法):while (task != null || (task = getTask()) != null) { try { task.run(); // 執(zhí)行任務(wù) } finally { task = null; // 清理任務(wù)引用 } }3.3從隊(duì)列獲取任務(wù)(
getTask方法):
- 阻塞模式:若為核心線程或允許核心線程超時(shí),調(diào)用
workQueue.take()永久阻塞。- 超時(shí)模式:若非核心線程,調(diào)用
workQueue.poll(keepAliveTime)超時(shí)等待。Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take();
- 線程回收與資源釋放
private void processWorkerExit(Worker w, boolean completedAbruptly) { if (completedAbruptly) // 異常終止時(shí)補(bǔ)償workerCount decrementWorkerCount(); mainLock.lock(); try { workers.remove(w); // 從集合中移除Worker if (線程池仍在運(yùn)行 && 隊(duì)列非空) addWorker(null, false); // 補(bǔ)充線程處理隊(duì)列任務(wù) } finally { mainLock.unlock(); } }
線程池本質(zhì)也是Runnable!
一張好圖:【來(lái)自:【線程池工作原理】https://blog.csdn.net/fighting_yu/article/details/89473175】
3.探索”鎖“
上面探索了線程以及線程池的創(chuàng)建,發(fā)現(xiàn)其源碼中存在這種代碼;
//1.Thread的start方法
public synchronized void start()
//2.線程池部分源碼addWorker()方法
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
//3.
LockSupport.park(this);
這些是什么呢?這就是鎖。。
確保線程安全最常見(jiàn)的做法是利用鎖機(jī)制(Lock、sychronized)來(lái)對(duì)共享數(shù)據(jù)做互斥同步,這樣在同一個(gè)時(shí)刻,只有一個(gè)線程可以執(zhí)行某個(gè)方法或者某個(gè)代碼塊,那么操作必然是原子性的,線程安全的。
① synchronized
synchronized 是 Java 中最基本的同步機(jī)制,它可以修飾方法或代碼塊,確保同一時(shí)刻只有一個(gè)線程可以執(zhí)行被修飾的代碼。
public class SynchronizedTest {
public static void main(String[] args) {
SynchronizedTest lock1 = new SynchronizedTest();
new Thread(lock1::test).start();
new Thread(lock1::test2).start();
new Thread(lock1::testStatic).start();
new Thread(lock1::testFs).start();
}
public void testStatic() {
// 鎖的是Class對(duì)象
synchronized (SynchronizedTest.class){
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println("testStatic()");
}
}
// 鎖的是一個(gè)實(shí)例對(duì)象
public void test(){
synchronized (this){
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println("test()");
}
}
public synchronized void test2(){
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println("test2()");
}
public void testFs(){
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println("testFs()");
}
}
上面只有test() 和 test2() 是互斥的。也就是1秒過(guò)后,testStatic()、testFs()、和 【test()、test1() 其中之一】一起輸出打印。
修飾代碼塊、修飾方法:鎖的是該對(duì)象;
修飾靜態(tài)成員:鎖的是該類(lèi)的Class對(duì)象;這種方式可以確保對(duì)靜態(tài)變量的訪問(wèn)是線程安全的
還可以鎖任意對(duì)象。
其實(shí)主要弄清楚各自鎖的是什么對(duì)象就行了,看是否需要的是一個(gè)鎖,就可以判斷是否互斥了;
//鎖的是Class對(duì)象
public static synchronized void testStatic1() {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println("testStaticMethod()");
}
public void testStatic() {
synchronized (SynchronizedTest.class){
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println("testStatic()");
}
}
public synchronized void test2(){
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println("test2()");
}
如上述代碼示例,testStatic1和testStatic需要持有的對(duì)象是同一個(gè),故這二者會(huì)產(chǎn)生互斥,test2需要持有的是該類(lèi)的一個(gè)實(shí)例對(duì)象,故不會(huì)與這二者產(chǎn)生互斥。
需要注意的是: synchronized 并不屬于方法定義的一部分,故synchronized 關(guān)鍵字不能被繼承。如果在父類(lèi)中的某個(gè)方 法使用了 synchronized 關(guān)鍵字,而在子類(lèi)中覆蓋了這個(gè)方法,在子類(lèi)中的這 個(gè)方法默認(rèn)情況下并不是同步的,而必須顯式地在子類(lèi)的這個(gè)方法中加上 synchronized 關(guān)鍵字才可以。當(dāng)然,還可以在子類(lèi)方法中調(diào)用父類(lèi)中相應(yīng)的方 法,這樣雖然子類(lèi)中的方法不是同步的,但子類(lèi)調(diào)用了父類(lèi)的同步方法,因此, 子類(lèi)的方法也就相當(dāng)于同步了。
來(lái)看看如下示例:
public class Father {
public synchronized void method1(){
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println("Father method1");
}
}
//
public class Son extends Father{
public void syncSon() {
super.method1(); // 調(diào)用的父類(lèi)的同步方法
}
public void syncSon1() {
super.method1();
}
public void method1() { // 重寫(xiě)了,但是synchronized并不會(huì)繼承過(guò)來(lái)
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
System.out.println("Son method1");
}
public void sonHaha() { method1(); }
public void sonHehe() { method1(); }
public static void main(String[] args) {
Son son = new Son();
// new Thread(son::syncSon).start();
// new Thread(son::syncSon1).start(); // 會(huì)互斥
new Thread(son::sonHaha).start();
new Thread(son::sonHehe).start(); // 不會(huì)
}
}
synchronized【隱式鎖】的底層原理涉及到 Java 對(duì)象頭(Object Header)和 Monitor(監(jiān)視器)兩個(gè)關(guān)鍵概念。
每個(gè) Java 對(duì)象在內(nèi)存中分為三部分:
- 對(duì)象頭(Header)
- Mark Word(標(biāo)記字段):存儲(chǔ)哈希碼、GC 分代年齡、鎖狀態(tài)等。
- Klass Pointer(類(lèi)型指針):指向類(lèi)的元數(shù)據(jù)。
- 實(shí)例數(shù)據(jù)(Instance Data)
- 對(duì)齊填充(Padding)
Java 對(duì)象頭:在 Java 虛擬機(jī)中,每個(gè)對(duì)象都有一個(gè)對(duì)象頭,用于存儲(chǔ)對(duì)象的元數(shù)據(jù)信息,包括對(duì)象的哈希碼、GC 相關(guān)信息、鎖狀態(tài)等。對(duì)象頭通常包含一個(gè)標(biāo)記字段(Mark Word),用于標(biāo)識(shí)對(duì)象的鎖狀態(tài)。
Monitor(監(jiān)視器):Monitor 是一種同步機(jī)制,負(fù)責(zé)管理對(duì)象的鎖。每個(gè)對(duì)象都與一個(gè) Monitor 相關(guān)聯(lián)。當(dāng)一個(gè)線程嘗試進(jìn)入一個(gè)被synchronized修飾的代碼塊或方法時(shí),它會(huì)嘗試獲取對(duì)象的 Monitor。如果 Monitor 處于無(wú)鎖狀態(tài),則當(dāng)前線程會(huì)嘗試將其鎖定;如果 Monitor 已經(jīng)被其他線程鎖定,則當(dāng)前線程會(huì)進(jìn)入阻塞狀態(tài),直到持有鎖的線程釋放鎖。
// C++ 實(shí)現(xiàn)(HotSpot 源碼)
class ObjectMonitor {
void* _header; // Mark Word
void* _owner; // 持有鎖的線程
volatile intptr_t _count; // 重入次數(shù)
ObjectWaiter* _WaitSet; // 等待隊(duì)列(調(diào)用 wait() 的線程)
ObjectWaiter* _EntryList; // 阻塞隊(duì)列(競(jìng)爭(zhēng)鎖失敗的線程)
// ...
};
查看本小節(jié)開(kāi)頭示例的test()方法的字節(jié)碼:
synchronized 同步語(yǔ)句塊的實(shí)現(xiàn)使用的是 monitorenter 和 monitorexit 指令,當(dāng)執(zhí)行 monitorenter 指令時(shí),線程試圖獲取鎖也就是獲取 對(duì)象監(jiān)視器 monitor 的持有權(quán)。第一個(gè)monitorexit正常退出同步塊, 第二個(gè)是異常退出同步塊。
synchronized優(yōu)化:
鎖升級(jí)(JDK 6+)
3.0 無(wú)鎖
- 無(wú)鎖:當(dāng)?shù)谝粋€(gè)線程第一次訪問(wèn)一個(gè)對(duì)象的同步塊時(shí),JVM會(huì)在對(duì)象頭中設(shè)置該線程的ID,并將對(duì)象頭的狀態(tài)位設(shè)置為“偏向鎖”。這個(gè)過(guò)程稱(chēng)為“偏向”,表示對(duì)象當(dāng)前偏向于第一個(gè)訪問(wèn)它的線程。
3.1 偏向鎖(Biased Locking)
- 原理:第一個(gè)獲取鎖的線程將線程 ID 寫(xiě)入 Mark Word,后續(xù)無(wú)需 CAS。這樣如果該線程再來(lái)的時(shí)候,由于是已經(jīng)設(shè)置了鎖偏向該線程,故只需比對(duì)一下對(duì)象頭里面的Mark Word就行了。
- 觸發(fā)條件:JVM 啟用了偏向鎖(默認(rèn)開(kāi)啟),且對(duì)象未處于鎖定狀態(tài)。
- 撤銷(xiāo):當(dāng)其他線程嘗試競(jìng)爭(zhēng)時(shí),撤銷(xiāo)偏向鎖并升級(jí)為輕量級(jí)鎖。
3.2 輕量級(jí)鎖(Lightweight Locking)
- 加鎖流程:
- 在當(dāng)前線程棧幀中創(chuàng)建 Lock Record。
- 通過(guò) CAS 將 Mark Word 復(fù)制到 Lock Record,并嘗試將 Mark Word 指向 Lock Record。
輕量級(jí)鎖在以下場(chǎng)景會(huì)升級(jí)為重量級(jí)鎖:
- 自旋失敗:競(jìng)爭(zhēng)線程自旋一定次數(shù)后仍未獲取鎖。
- 競(jìng)爭(zhēng)加劇:等待鎖的線程數(shù)超過(guò) JVM 自適應(yīng)的閾值。
3.3 重量級(jí)鎖(Heavyweight Locking)
- 實(shí)現(xiàn):通過(guò)操作系統(tǒng)提供的互斥量(Mutex)和條件變量實(shí)現(xiàn),線程競(jìng)爭(zhēng)失敗后進(jìn)入阻塞狀態(tài)。
- 性能問(wèn)題:涉及用戶態(tài)到內(nèi)核態(tài)的切換,開(kāi)銷(xiāo)較大。
synchronized優(yōu)化:
鎖會(huì)升級(jí),從低到高升級(jí),反著降級(jí)不可以:無(wú)鎖狀態(tài) -> 偏向鎖狀態(tài) -> 輕量級(jí)鎖狀態(tài) -> 重量級(jí)鎖狀態(tài)
| 鎖類(lèi)型 | 實(shí)現(xiàn)機(jī)制 | 適用場(chǎng)景 | 性能開(kāi)銷(xiāo) |
|---|---|---|---|
| 偏向鎖 | 通過(guò) Mark Word 記錄線程 ID | 單線程重復(fù)訪問(wèn)同步塊 | 最低 |
| 輕量級(jí)鎖 | CAS + 自旋(適應(yīng)性自旋) | 低競(jìng)爭(zhēng)、短時(shí)間同步 | 中等 |
| 重量級(jí)鎖 | 操作系統(tǒng)互斥量(Mutex) + Monitor | 高競(jìng)爭(zhēng)、長(zhǎng)時(shí)間同步 | 最高 |
【節(jié)選自 鎖升級(jí)】 :https://blog.csdn.net/weixin_45433817/article/details/132216383
問(wèn):synchronized是公平鎖嗎?
首先要知道公平鎖和非公平鎖的概念:
- 公平鎖:公平鎖指的是多個(gè)線程按照申請(qǐng)鎖的順序來(lái)獲取鎖,先到先得。當(dāng)一個(gè)線程請(qǐng)求鎖時(shí),如果該鎖當(dāng)前處于可用狀態(tài),并且在該線程之前已經(jīng)有其他線程在等待該鎖,那么這個(gè)線程會(huì)被放入等待隊(duì)列的尾部,等待前面的線程依次獲取并釋放鎖后,它才能獲取鎖。
- 非公平鎖:非公平鎖則不保證線程獲取鎖的順序與申請(qǐng)鎖的順序一致。當(dāng)一個(gè)線程請(qǐng)求鎖時(shí),即使有其他線程在等待該鎖,它也會(huì)先嘗試直接獲取鎖,如果獲取成功就可以直接執(zhí)行,而不用排隊(duì)等待。
那么,synchronized 基于對(duì)象頭的 Mark Word 和監(jiān)視器(Monitor)實(shí)現(xiàn)。當(dāng)一個(gè)線程進(jìn)入同步塊時(shí),它會(huì)嘗試獲取對(duì)象的監(jiān)視器。如果監(jiān)視器處于空閑狀態(tài),該線程會(huì)直接獲取監(jiān)視器,而不會(huì)考慮是否有其他線程已經(jīng)在等待這個(gè)監(jiān)視器。例如,當(dāng)一個(gè)線程釋放了 synchronized 修飾的同步塊的鎖后,新到來(lái)的線程有很大機(jī)會(huì)直接獲取到鎖,而不是等待那些在等待隊(duì)列中的線程,這就體現(xiàn)了其非公平性。
class SynchronizedNonFairExample {
private static final Object lock = new Object();
private static int counter = 0;
public static void main(String[] args) {
for (int i = 0; i < 5; i++) {
new Thread(() -> {
while (true) {
synchronized (lock) {
counter++;
System.out.println(Thread.currentThread().getName() + " 獲取到鎖,計(jì)數(shù): " + counter);
try {
// 模擬執(zhí)行任務(wù)
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}, "Thread-" + i).start();
}
}
}
在上述代碼中,多個(gè)線程競(jìng)爭(zhēng) lock 對(duì)象的鎖。運(yùn)行代碼時(shí),你會(huì)發(fā)現(xiàn)線程獲取鎖的順序并不是按照線程啟動(dòng)的順序,這就說(shuō)明了 synchronized 是非公平鎖。
② Lock
上一節(jié)的synchronized是jdk內(nèi)置的關(guān)鍵字,屬于隱式鎖、也叫內(nèi)置鎖。現(xiàn)在這一節(jié)來(lái)探索一下顯式鎖,其提供更細(xì)粒度的控制(如可中斷、超時(shí)、公平性),核心實(shí)現(xiàn)為 ReentrantLock。
public interface Lock {
//獲取鎖。如果鎖不可用,則當(dāng)前線程將出于線程調(diào)度目的而被禁用,并在獲取鎖之前處于休眠狀態(tài)。
void lock();
void lockInterruptibly() throws InterruptedException;
//如果鎖可用,則獲取鎖,并立即返回值為 true。如果鎖不可用,則此方法將立即返回值 false。
boolean tryLock();
/*
如果在給定的等待時(shí)間內(nèi)有空閑,并且當(dāng)前線程尚未中斷,則獲取該鎖。
如果鎖可用,則此方法立即返回值 true。如果鎖不可用,則當(dāng)前線程將出于線程調(diào)度目的而被禁用,并處于休眠狀態(tài),
直到發(fā)生以下三種情況之一:鎖由當(dāng)前線程獲取;或者其他線程中斷當(dāng)前線程,支持中斷鎖獲取;或指定的等待時(shí)間已用
*/
boolean tryLock(long time, TimeUnit unit) throws InterruptedException;
//釋放鎖
void unlock();
//返回綁定到此 Lock 實(shí)例的新 Condition 實(shí)例。在等待條件之前,鎖必須由當(dāng)前線程持有
Condition newCondition();
}

從上圖中,我們可以得知juc包下的幾個(gè)主要的實(shí)現(xiàn)類(lèi),綠色圈圈連接的是里面的內(nèi)部類(lèi)。
1) ReentrantLock
接下來(lái)從我們最熟知的ReentrantLock開(kāi)始看起吧,他的簡(jiǎn)單使用:
public class LockTest {
Lock lock = new ReentrantLock();
int count = 0;
public static void main(String[] args) throws InterruptedException {
LockTest test = new LockTest();
for (int i = 1; i <= 2; i++) {
new Thread(test::add).start();
}
Thread.sleep(2000);
/*
如果不加鎖的話,結(jié)果就不一定是兩萬(wàn)了
*/
System.out.println(test.count);
}
public void add() {
// 標(biāo)準(zhǔn)寫(xiě)法 try加鎖 finally釋放鎖
try {
lock.lock();
for (int i = 0; i < 10000; i++) {
count++;
}
} finally {
lock.unlock();
}
}
}
我們分析一下,首先是調(diào)用了其無(wú)參構(gòu)造函數(shù)創(chuàng)建了一個(gè)對(duì)象,里面是new了一個(gè)看名字是非公平同步標(biāo)記的對(duì)象,那他肯定會(huì)有公平的同步標(biāo)記。
// 下面都是在ReentrantLock.java里面
private final Sync sync;
// 無(wú)參構(gòu)造
public ReentrantLock() {
sync = new NonfairSync();
}
// 有參構(gòu)造
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}
// 然后調(diào)用lock.lock()實(shí)際是調(diào)用的sync.lock();
public void lock() {
sync.lock();
}
// 然后調(diào)用lock.unlock()實(shí)際是調(diào)用的sync.release(1);;
public void unlock() {
sync.release(1);
}
// 是ReentrantLock的靜態(tài)內(nèi)部類(lèi)
static final class NonfairSync extends Sync {
private static final long serialVersionUID = 7316153563782823691L;
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
}
// 是ReentrantLock的靜態(tài)內(nèi)部類(lèi)
static final class FairSync extends Sync {
private static final long serialVersionUID = -3000897897090466540L;
final void lock() {
acquire(1);
}
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
}
// Sync繼承了AbstractQueuedSynchronizer:【AQS】
abstract static class Sync extends AbstractQueuedSynchronizer {
.....
}
從上面額關(guān)系我們可以看出一切源頭就是AbstractQueuedSynchronizer,也就是我們熟悉的AQS。在這篇文章的“補(bǔ)充知識(shí)點(diǎn)”環(huán)節(jié)中,對(duì)AQS做了一個(gè)簡(jiǎn)單的介紹及分析。【AQS】:https://blog.csdn.net/okok__TXF/article/details/146455487
【博客園】:https://blog.csdn.net/okok__TXF/article/details/146455487
它是是 Java 并發(fā)包 java.util.concurrent.locks 下的一個(gè)核心類(lèi),是構(gòu)建鎖和其他同步工具(如 ReentrantLock、Semaphore、CountDownLatch 等)的基礎(chǔ)框架。
里面定義了兩種資源共享模式:
- 獨(dú)占模式(Exclusive):同一時(shí)刻只有一個(gè)線程能獲取資源,如
ReentrantLock。
在獨(dú)占模式的時(shí)候,tryAcquire(int):嘗試獲取資源,成功返回 true,失敗返回 false;tryRelease(int):嘗試釋放資源,成功返回 true,失敗返回 false。
- 共享模式(Share):多個(gè)線程可同時(shí)獲取資源,如
Semaphore(信號(hào)量)、CountDownLatch(倒計(jì)時(shí) latch)。
在共享模式的時(shí)候,tryAcquireShared(int):嘗試獲取共享資源,負(fù)數(shù)表示失敗;0 表示成功但無(wú)剩余資源;正數(shù)表示成功且有剩余資源。tryReleaseShared(int):嘗試釋放共享資源,釋放后若允許喚醒后續(xù)等待節(jié)點(diǎn),返回 true,否則 false。
- 非公平鎖
回到ReentrantLock中,我們以lock()方法舉例子:【lock是無(wú)參構(gòu)造的】非公平鎖
//ReentrantLock.java
public void lock() {
sync.lock(); // ===========1
}
//內(nèi)部的抽象類(lèi)Sync
abstract void lock();
final boolean nonfairTryAcquire(int acquires) { // ===========7
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
//具體實(shí)現(xiàn)類(lèi)NonfairSync
final void lock() { // ===========2
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1); // ===========3
}
protected final boolean tryAcquire(int acquires) { // ===========6
return nonfairTryAcquire(acquires); // 這個(gè)是抽象類(lèi)Sync的
}
//AbstractQueuedSynchronizer.java --- acquire(1)
public final void acquire(int arg) { // ===========4
// 這里的tryAcquire是NonfairSync.java里面的
if (!tryAcquire(arg) && // ===========5
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
已經(jīng)按順序?qū)?234567標(biāo)注在了上面,模板方法的設(shè)計(jì)模式有時(shí)候真的會(huì)讓人暈頭轉(zhuǎn)向。。
一句簡(jiǎn)簡(jiǎn)單單的lock.lock()方法做了什么呢?
首先,會(huì)直接嘗試CAS獲取鎖【compareAndSetState(0, 1) 會(huì)通過(guò) CAS 操作嘗試將 AQS 中的 state 狀態(tài)從 0 改為 1】,如果成功的話成功則設(shè)置當(dāng)前線程為鎖持有者,否則進(jìn)入AQS的獲取流程;【在這里,當(dāng)線程調(diào)用 lock() 時(shí),會(huì)先通過(guò) CAS 操作嘗試將 AQS 的 state 從 0 改為 1。此時(shí)不會(huì)檢查等待隊(duì)列中是否有其他線程在排隊(duì),只要 CAS 成功,就直接獲取鎖,體現(xiàn)了 “插隊(duì)” 的特性。】
其次,進(jìn)入aqs的acquire流程 ,
1.tryAcquire(arg) 2.addWaiter(Node.EXCLUSIVE) 3.acquireQueued(xxx)
第一個(gè)方法tryAcquire再次嘗試獲取鎖(非公平鎖的 tryAcquire 即 nonfairTryAcquire(acquires)),在Sync :: nonfairTryAcquire(int acquires)方法里面,得到aqs里面的state,如果是0,再次嘗試 CAS 搶占(體現(xiàn)非公平性,不檢查隊(duì)列);如果不是0,說(shuō)明被搶占了,判斷持有鎖的線程是不是當(dāng)前線程,如果是(體現(xiàn)可重入性),更新state,如果不是返回false。
然后, 若tryAcquire失敗【返回false】,調(diào)用addWaiter(Node.EXCLUSIVE) 會(huì)將當(dāng)前線程包裝成一個(gè)獨(dú)占模式的 Node 節(jié)點(diǎn)加入 AQS 隊(duì)列。接著 acquireQueued 會(huì)使線程在隊(duì)列中自旋等待,不斷嘗試獲取鎖或被喚醒后嘗試獲取,直到成功。
接下來(lái)分析一下lock.unlock()方法,這個(gè)就在代碼里面注釋解釋了
//ReentrantLock.java
public void unlock() {
//調(diào)用其內(nèi)部同步器 sync 的 release 方法:
sync.release(1); // ===========1
}
// 內(nèi)部抽象類(lèi)Sync
protected final boolean tryRelease(int releases) { // ===========3
int c = getState() - releases; // 減少同步狀態(tài)值(釋放一次鎖,`state` 減 1)
// 檢查當(dāng)前線程是否為鎖的持有者,不是則拋異常
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {// 當(dāng) `state` 減為 0 時(shí),完全釋放鎖
free = true;
setExclusiveOwnerThread(null);// 清除獨(dú)占鎖的線程引用
}
setState(c);// 更新 `state` 值
return free;// 返回是否完全釋放鎖
}
//AbstractQueuedSynchronizer.java --- acquire(1)
public final boolean release(int arg) {
//tryRelease 嘗試釋放鎖,由子類(lèi)實(shí)現(xiàn)具體邏輯
if (tryRelease(arg)) { // ===========2
Node h = head;// 獲取等待隊(duì)列頭節(jié)點(diǎn)
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);// 喚醒后繼節(jié)點(diǎn) ===========4
return true;
}
return false;
}
//喚醒后繼節(jié)點(diǎn)
private void unparkSuccessor(Node node) { ===========5
int ws = node.waitStatus;
if (ws < 0) // 將頭節(jié)點(diǎn)的 `waitStatus` 設(shè)為 0(取消之前的狀態(tài))
compareAndSetWaitStatus(node, ws, 0);
Node s = node.next; // 找到頭節(jié)點(diǎn)的后繼節(jié)點(diǎn)
if (s == null || s.waitStatus > 0) { // 若后繼節(jié)點(diǎn)為空或已取消,從隊(duì)尾向前找第一個(gè)非取消節(jié)點(diǎn)
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null) // 喚醒找到的節(jié)點(diǎn)對(duì)應(yīng)的線程
LockSupport.unpark(s.thread);
}
在內(nèi)部抽象類(lèi)Sync中的tryRelease中:
- 減少
state值:ReentrantLock支持重入,state記錄鎖的重入次數(shù)。每次調(diào)用unlock(),state減 1。 - 檢查線程所有權(quán):確保只有鎖的持有者才能釋放鎖,否則拋出
IllegalMonitorStateException。 - 完全釋放鎖:當(dāng)
state減為 0 時(shí),將setExclusiveOwnerThread(null),表示鎖已完全釋放,返回true。
ReentrantLock支持重入:【其實(shí)從名字就可以看出來(lái)了 -- Reentrant(再進(jìn)去的、就是可重入嘛)】
ReentrantLock lock = new ReentrantLock();
lock.lock(); // state=1,線程持有鎖
lock.lock(); // state=2(重入)
lock.unlock(); // state=1(未完全釋放)
lock.unlock(); // state=0,釋放鎖并喚醒等待線程
- 公平鎖
那么ReentrantLock的公平鎖是什么樣子的呢?其實(shí)大致步驟都差不多,主要是在FairSync.java
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
//公平鎖會(huì)先檢查等待隊(duì)列是否有前驅(qū)節(jié)點(diǎn),若有則不能搶鎖
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
public final boolean hasQueuedPredecessors() {
// thread is first in queue.
Node t = tail; // Read fields in reverse initialization order
Node h = head;
Node s;
return h != t &&
((s = h.next) == null || s.thread != Thread.currentThread());
}
公平鎖在 state == 0 時(shí),會(huì)先通過(guò) hasQueuedPredecessors 檢查等待隊(duì)列。若有其他線程在排隊(duì),則當(dāng)前線程不能搶占,必須入隊(duì)等待,保證 “先來(lái)先得”,體現(xiàn)了公平性。而非公平鎖跳過(guò)這一步,直接搶鎖,這就是非公平性的核心體現(xiàn)。
2) 讀寫(xiě)鎖
//ReadWriteLock 維護(hù)一對(duì)關(guān)聯(lián)的鎖,一個(gè)用于只讀作,一個(gè)用于寫(xiě)入。只要沒(méi)有寫(xiě)入器,多個(gè)讀取器線程就可以同時(shí)持有讀取鎖。
//寫(xiě)鎖是獨(dú)占的。讀寫(xiě)鎖允許在訪問(wèn)共享數(shù)據(jù)時(shí)實(shí)現(xiàn)比互斥鎖允許的更高級(jí)別的并發(fā)。它利用了這樣一個(gè)事實(shí),
//即雖然一次只有一個(gè)線程(寫(xiě)入線程)可以修改共享數(shù)據(jù),但在許多情況下,任意數(shù)量的線程都可以同時(shí)讀取數(shù)據(jù)(因此是讀取線程)。
//從理論上講,與使用互斥鎖相比,使用讀寫(xiě)鎖允許的并發(fā)性增加將導(dǎo)致性能改進(jìn)。
public interface ReadWriteLock {
//返回用于讀取的鎖。
Lock readLock();
//返回用于寫(xiě)入的鎖。
Lock writeLock();
}
讀寫(xiě)鎖是否會(huì)比使用互斥鎖提高性能 取決于讀取數(shù)據(jù)的頻率與修改數(shù)據(jù)的頻率、讀取和寫(xiě)入作的持續(xù)時(shí)間以及數(shù)據(jù)的爭(zhēng)用 - 即嘗試同時(shí)讀取或?qū)懭霐?shù)據(jù)的線程數(shù)。例如,最初填充數(shù)據(jù),此后不經(jīng)常修改的集合;經(jīng)常搜索(例如某種目錄)是使用讀寫(xiě)鎖的理想候選者。但是,如果更新變得頻繁,則數(shù)據(jù)將花費(fèi)大部分時(shí)間進(jìn)行獨(dú)占鎖定,并發(fā)性幾乎沒(méi)有增加。只有分析和測(cè)量才能確定使用讀寫(xiě)鎖是否適合您的應(yīng)用程序。
讀寫(xiě)鎖允許多個(gè)線程同時(shí)讀(沒(méi)有寫(xiě)入時(shí),多個(gè)線程允許同時(shí)讀(提高性能)),但只要有一個(gè)線程在寫(xiě),其他線程就必須等待(只允許一個(gè)線程寫(xiě)入(其他線程既不能寫(xiě)入也不能讀取))。也就是讀讀不沖突、讀寫(xiě)就要沖突了。
- ReadWriteLock
下面給出一個(gè)簡(jiǎn)單示例:ReadWriteLock
public class ReadWriteLockTest2 {
private static final ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
private static final Lock readLock = readWriteLock.readLock();
private static final Lock writeLock = readWriteLock.writeLock();
private static int[] a = new int[10];
public static void main(String[] args) throws InterruptedException {
// 1個(gè)線程寫(xiě)
new Thread(ReadWriteLockTest2::write).start();
for (int i = 0; i < 9; i++) // 10個(gè)線程讀
new Thread(()-> System.out.println(get())).start();
Thread.sleep(2000);
}
public static Object get() {
readLock.lock();
try {
Thread.sleep(100);
return a[1];
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
readLock.unlock();
}
}
public static void write() {
writeLock.lock();
try {
a[1]++;
System.out.println("寫(xiě)進(jìn)行~~~");
Thread.sleep(1000);
System.out.println("寫(xiě)ok~~~");
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
writeLock.unlock();
}
}
}
寫(xiě)操作在執(zhí)行的時(shí)候,讀線程是會(huì)阻塞的。但是10個(gè)讀線程之間并沒(méi)有阻塞
- StampedLock
StampedLock對(duì)比 ReentrantReadWriteLock 有所增強(qiáng),在原先讀寫(xiě)鎖的基礎(chǔ)之上新增了樂(lè)觀讀的模式。該模式并不會(huì)加鎖,所以不會(huì)阻塞線程,會(huì)有更高的吞吐量和更高的性能。(樂(lè)觀鎖和悲觀鎖)
StampedLock具有三種控制讀/寫(xiě)訪問(wèn)的模式:
1、寫(xiě)入(Writing):方法writeLock可能阻塞等待獨(dú)占訪問(wèn),并返回一個(gè)戳,該戳可在方法unlockWrite中使用以釋放鎖。還提供了tryWriteLock的非定時(shí)和定時(shí)版本。當(dāng)鎖保持在寫(xiě)模式時(shí),不能獲得讀鎖,并且所有樂(lè)觀讀驗(yàn)證都將失敗。
2、讀取(Reading):方法readLock可能會(huì)阻塞等待非獨(dú)占訪問(wèn),并返回一個(gè)戳,該戳可在方法unlockRead中使用以釋放鎖。還提供了tryReadLock的非定時(shí)和定時(shí)版本。
3、樂(lè)觀讀取(Optimistic Reading):tryOptimisticRead方法返回一個(gè)非0的stamp,只有當(dāng)前同步狀態(tài)沒(méi)有被寫(xiě)模式所占有是才能獲取到。他是在獲取stamp值后對(duì)數(shù)據(jù)進(jìn)行讀取操作,最后驗(yàn)證該stamp值是否發(fā)生變化,如果發(fā)生變化則讀取無(wú)效,代表有數(shù)據(jù)寫(xiě)入。這種方式能夠降低競(jìng)爭(zhēng)和提高吞吐量。
簡(jiǎn)單示例:
public class StampedLockTest {
private static final StampedLock stampedLock = new StampedLock();
private static double x = 1.0;
private static double y = 1.0;
public static void main(String[] args) {
// 1. 一個(gè)線程寫(xiě)
new Thread(() -> addXY(1, 1)).start();
// 2. 10個(gè)線程讀
for (int i = 0; i < 10; i++) {
new Thread(() -> System.out.println(getSArea())).start();
}
}
private static double getSArea() {
// 樂(lè)觀讀
long stamp = stampedLock.tryOptimisticRead();
double s1 = x * y;
// 驗(yàn)證一下
if (!stampedLock.validate(stamp)) { // 驗(yàn)證失敗
stamp = stampedLock.readLock(); // 升級(jí)為讀鎖
try {
s1 = x * y;
} finally {
stampedLock.unlockRead(stamp);
}
}
return s1;
}
private static void addXY(double a, double b) {
long stamp = stampedLock.writeLock();
try {
System.out.println("寫(xiě)進(jìn)行~~");
x += a;
y += b;
Thread.sleep(1000);
System.out.println("寫(xiě)ok~~");
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
stampedLock.unlockWrite(stamp);
}
}
}
寫(xiě)操作:writeLock() 返回一個(gè) stamp(時(shí)間戳),表示獲取寫(xiě)鎖成功。寫(xiě)鎖是獨(dú)占的,獲取時(shí)會(huì)阻塞所有讀鎖和其他寫(xiě)鎖(除了樂(lè)觀讀)。通過(guò) unlockWrite(stamp) 釋放寫(xiě)鎖,stamp 必須與獲取時(shí)的一致,否則拋出異常。
樂(lè)觀讀: tryOptimisticRead():獲取一個(gè) 樂(lè)觀讀時(shí)間戳,不實(shí)際加鎖,直接讀取數(shù)據(jù)(成本極低),然后validate(stamp):檢查該時(shí)間戳對(duì)應(yīng)的讀操作期間是否有寫(xiě)操作發(fā)生。若 stamp 有效(無(wú)寫(xiě)操作),則數(shù)據(jù)一致;否則需要升級(jí)為讀鎖。鎖升級(jí):若驗(yàn)證失敗,說(shuō)明數(shù)據(jù)可能被修改,通過(guò) readLock() 獲取讀鎖(阻塞直到寫(xiě)鎖釋放),確保后續(xù)讀取的數(shù)據(jù)是最新的。
- 可重入性探討
JVM允許同一個(gè)線程重復(fù)獲取同一個(gè)鎖,這種能被同一個(gè)線程反復(fù)獲取的鎖,就叫做可重入鎖。
本小節(jié)看一下上面兩種讀寫(xiě)鎖的可重入性,首先是ReadWriteLock,從他的實(shí)現(xiàn)類(lèi)來(lái)看就是可重入的了【ReentrantReadWriteLock】
在ReentrantReadWriteLock中也有Sync的抽象內(nèi)部類(lèi),當(dāng)調(diào)用寫(xiě)鎖的lock時(shí),實(shí)際是會(huì)經(jīng)過(guò)里面重寫(xiě)的tryAcquire,從下面的代碼可以知道同一線程可多次獲取寫(xiě)鎖:當(dāng)線程獲取寫(xiě)鎖后,再次調(diào)用 writeLock() 會(huì)直接成功(無(wú)需等待),因?yàn)閮?nèi)部維護(hù)了一個(gè) 重入計(jì)數(shù)器(類(lèi)似 ReentrantLock)。每次獲取寫(xiě)鎖時(shí)計(jì)數(shù)器加 1,釋放時(shí)減 1,計(jì)數(shù)器為 0 時(shí)才真正釋放鎖。
protected final boolean tryAcquire(int acquires) {
Thread current = Thread.currentThread();
int c = getState();
int w = exclusiveCount(c);
if (c != 0) {
if (w == 0 || current != getExclusiveOwnerThread())
return false;
if (w + exclusiveCount(acquires) > MAX_COUNT)
....
// Reentrant acquire
setState(c + acquires);
return true;
}
....
return true;
}
ReentrantReadWriteLock 基于 AQS(AbstractQueuedSynchronizer) 實(shí)現(xiàn),通過(guò) state 變量的高 16 位和低 16 位分別記錄 讀鎖的共享次數(shù) 和 寫(xiě)鎖的重入次數(shù):
- 寫(xiě)鎖(獨(dú)占模式):使用低 16 位記錄當(dāng)前線程的重入次數(shù)(和
ReentrantLock類(lèi)似)。 - 讀鎖(共享模式):使用高 16 位記錄所有線程的讀鎖獲取次數(shù),但會(huì)通過(guò)線程本地變量(
ThreadLocal)記錄當(dāng)前線程的讀鎖重入次數(shù),避免不同線程的計(jì)數(shù)干擾。
這種設(shè)計(jì)使得同一線程多次獲取寫(xiě)鎖或在讀鎖 / 寫(xiě)鎖之間按規(guī)則重入時(shí),不會(huì)出現(xiàn)死鎖,符合可重入鎖的定義。
public static void write() {
writeLock.lock();
try {
writeLock.lock();
a[1]++;
System.out.println("寫(xiě)進(jìn)行~~~");
Thread.sleep(1000);
System.out.println("寫(xiě)ok~~~");
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
writeLock.unlock(); // 可重入
writeLock.unlock();
}
}
StampedLock是不可重入的,為什么呢?
-
沒(méi)有鎖計(jì)數(shù)機(jī)制:
StampedLock并沒(méi)有像ReentrantLock那樣維護(hù)一個(gè)鎖的重入計(jì)數(shù)。在ReentrantLock中,state變量用于記錄鎖的重入次數(shù),每次獲取鎖時(shí)state加 1,釋放鎖時(shí)state減 1。而StampedLock中的state變量主要用于表示鎖的狀態(tài)和版本信息,并非用于記錄重入次數(shù)。 -
如果一個(gè)線程已經(jīng)持有了
StampedLock的寫(xiě)鎖或讀鎖,再次嘗試獲取相同類(lèi)型的鎖時(shí),會(huì)出現(xiàn)以下情況:- 寫(xiě)鎖情況:如果線程已經(jīng)持有寫(xiě)鎖,再次調(diào)用
writeLock()方法,由于寫(xiě)鎖是獨(dú)占的,該線程會(huì)被阻塞,因?yàn)樗鼤?huì)等待自己釋放寫(xiě)鎖后才能再次獲取,這顯然會(huì)導(dǎo)致死鎖。 - 讀鎖情況:如果線程已經(jīng)持有讀鎖,再次調(diào)用
readLock()方法,雖然讀鎖是共享的,但StampedLock并不會(huì)像可重入鎖那樣允許線程多次獲取而不產(chǎn)生問(wèn)題。而且如果在持有讀鎖的情況下嘗試獲取寫(xiě)鎖,會(huì)導(dǎo)致死鎖,因?yàn)閷?xiě)鎖需要獨(dú)占資源,而當(dāng)前線程已經(jīng)持有了讀鎖。
- 寫(xiě)鎖情況:如果線程已經(jīng)持有寫(xiě)鎖,再次調(diào)用
private static void addXY(double a, double b) {
long stamp = stampedLock.writeLock();
try {
long lock = stampedLock.writeLock(); // 不可重入
System.out.println("寫(xiě)進(jìn)行~~");
x += a;
y += b;
Thread.sleep(1000);
System.out.println("寫(xiě)ok~~");
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
stampedLock.unlockWrite(stamp);
}
}
③ 鎖案例
上面只掌握了一丟丟的理論,沒(méi)有實(shí)踐怎么行呢?
1) 交替打印
第一個(gè),我們來(lái)實(shí)現(xiàn)一下三個(gè)線程交替打印A B C試試,第一個(gè)線程打印A,第二個(gè)B,第三個(gè)C
// synchronized實(shí)現(xiàn)
public class PrintABCSynchronized {
private int now = 1;
public static void main(String[] args) {
PrintABCSynchronized obj = new PrintABCSynchronized();
new Thread(obj::printA).start();
new Thread(obj::printB).start();
new Thread(obj::printC).start();
}
public void printA() {
for (int i = 0; i < 10; i++) {
synchronized (this) {
while ( now != 1 ) { // 為什么用while,不用if?留給讀者思考
try {this.wait();}
catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println("A"); now = 2;
this.notifyAll();
}
}
}
public void printB() {
for (int i = 0; i < 10; i++) {
synchronized (this) {
while ( now != 2 ) {
try {this.wait();}
catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println("B"); now = 3;
this.notifyAll();
}
}
}
public void printC() {
for (int i = 0; i < 10; i++) {
synchronized (this) {
while ( now != 3 ) {
try {this.wait();}
catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println("C"); now = 1;
this.notifyAll();
}
}
}
}
wait - notify 【這個(gè)只能用在synchronized同步代碼塊中,是屬于Object的方法】上面的缺陷很?chē)?yán)重,那就是一下子就喚醒了所有掛起的線程,其實(shí)有的線程根本就不用喚醒,有沒(méi)有一種辦法,就是我想喚醒誰(shuí)就喚醒誰(shuí)呢?
public class PrintABCLock {
private Lock lock = new ReentrantLock();
private Condition a = lock.newCondition();
private Condition b = lock.newCondition();
private Condition c = lock.newCondition();
private int flag = 1;
public static void main(String[] args) {
PrintABCLock obj = new PrintABCLock();
new Thread(obj::printA).start();
new Thread(obj::printB).start();
new Thread(obj::printC).start();
}
public void printA() {
lock.lock();
try {
for (int i = 0; i < 10; i++) {
while ( flag != 1 ) a.await();
System.out.println('A'); flag = 2;
b.signal();
}
} catch ( InterruptedException e) {
e.printStackTrace();
}
finally {
lock.unlock();
}
}
public void printB() {
lock.lock();
try {
for (int i = 0; i < 10; i++) {
while ( flag != 2 ) b.await();
System.out.println('B'); flag = 3;
c.signal();
}
} catch ( InterruptedException e) {
e.printStackTrace();
}
finally {
lock.unlock();
}
}
public void printC() {
lock.lock();
try {
for (int i = 0; i < 10; i++) {
while ( flag != 3 ) c.await();
System.out.println('C'); flag = 1;
a.signal();
}
} catch ( InterruptedException e) {
e.printStackTrace();
}
finally {
lock.unlock();
}
}
}
2) 阻塞隊(duì)列
下面模仿jdk中ArrayBlockingQueue的源碼,給了一個(gè)簡(jiǎn)潔的阻塞隊(duì)列
class TBlockedQueue<T> {
private final Lock lock;
private final Condition notEmpty;
private final Condition notFull;
private final int capacity;
private final LinkedList<T> list;
public TBlockedQueue(int capacity) {
if (capacity <= 0)
throw new IllegalArgumentException("Capacity 不能小于1");
this.capacity = capacity;
list = new LinkedList<>();
lock = new ReentrantLock();
notEmpty = lock.newCondition();
notFull = lock.newCondition();
}
public void add( T t ) {
list.addLast(t);
}
// 1. 入隊(duì) -- 當(dāng)隊(duì)列已滿時(shí),向隊(duì)列中添加元素的操作會(huì)被阻塞,直到隊(duì)列有空間可用。
public void put( T t ) {
if (t == null) throw new NullPointerException();
lock.lock();
try {
while (list.size() == capacity) notFull.await();
list.addLast(t);
notEmpty.signal();
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
lock.unlock();
}
}
// 2. 出隊(duì) -- 當(dāng)隊(duì)列為空時(shí),從隊(duì)列中獲取元素的操作會(huì)被阻塞,直到隊(duì)列中有新元素加入
public T take() {
lock.lock();
try {
while (list.isEmpty()) notEmpty.await();
T t = list.removeFirst();
notFull.signal();
return t;
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
lock.unlock();
}
}
}
這里自定義的 TBlockedQueue 是一個(gè)典型的 有界阻塞隊(duì)列,其核心思路是通過(guò) 鎖(Lock)和條件變量(Condition) 實(shí)現(xiàn)線程間的同步與協(xié)調(diào),確保在多線程環(huán)境下對(duì)隊(duì)列的操作是安全的。通過(guò) lock.lock() 和 lock.unlock() 包裹對(duì)共享資源 list 的操作,確保同一時(shí)刻只有一個(gè)線程修改隊(duì)列。同時(shí),使用 while 循環(huán)檢查條件(如 list.size() == capacity),防止 虛假喚醒導(dǎo)致條件不滿足時(shí)錯(cuò)誤地繼續(xù)執(zhí)行。
notEmpty:當(dāng)隊(duì)列為空時(shí),take操作會(huì)等待此條件;當(dāng)有元素入隊(duì)時(shí),通過(guò)signal()喚醒等待的消費(fèi)者線程。notFull:當(dāng)隊(duì)列已滿時(shí),put操作會(huì)等待此條件;當(dāng)有元素出隊(duì)時(shí),通過(guò)signal()喚醒等待的生產(chǎn)者線程。
3) AQS自定義鎖
前面分析可以知道ReentrantLock是以AQS為基礎(chǔ)框架來(lái)實(shí)現(xiàn)的,那么,此節(jié)我們自定義來(lái)實(shí)現(xiàn)一個(gè)鎖。
見(jiàn) “Java并發(fā)探索--下篇”
4.探索并發(fā)工具
5.虛擬線程
見(jiàn) “Java并發(fā)探索--下篇” --- 在下面找
【博客園】
http://www.rzrgm.cn/jackjavacpp
【CSDN】
https://blog.csdn.net/okok__TXF
end.參考
- https://blog.csdn.net/agonie201218/article/details/128712507
- https://blog.csdn.net/xu_yong_lin/article/details/117521773
- http://www.rzrgm.cn/java-bible/p/13930006.html
- https://blog.csdn.net/fighting_yu/article/details/89473175
- https://tech.meituan.com/2018/11/15/java-lock.html
- https://blog.csdn.net/weixin_44772566/article/details/137398521
- https://blog.csdn.net/m0_73978383/article/details/146442443 【synchronized詳解】
- https://liaoxuefeng.com/books/java/threading 【廖雪峰的官方網(wǎng)站--- 神中神】

浙公網(wǎng)安備 33010602011771號(hào)