Java 21 虛擬線程 vs 緩存線程池與固定線程池
探索 Java 并發(fā)如何從 Java 8 的增強發(fā)展到 Java 21 的虛擬線程,從而實現(xiàn)輕量級、可擴展且高效的多線程處理。
引言
并發(fā)編程仍然是構(gòu)建可擴展、響應(yīng)式 Java 應(yīng)用程序的關(guān)鍵部分。多年來,Java 持續(xù)增強了其多線程編程能力。本文回顧了從 Java 8 到 Java 21 并發(fā)的演進,重點介紹了重要的改進以及 Java 21 中引入的具有重大影響的虛擬線程。
從 Java 8 開始,并發(fā) API 出現(xiàn)了顯著的增強,例如原子變量、并發(fā)映射以及集成 lambda 表達式以實現(xiàn)更具表現(xiàn)力的并行編程。
Java 8 引入的關(guān)鍵改進包括:
- 線程與執(zhí)行器
- 同步與鎖
- 原子變量與 ConcurrentMap
Java 21 于 2023 年底發(fā)布,帶來了虛擬線程這一重大演進,從根本上改變了 Java 應(yīng)用程序處理大量并發(fā)任務(wù)的方式。虛擬線程為服務(wù)器應(yīng)用程序提供了更高的可擴展性,同時保持了熟悉的"每個請求一個線程"的編程模型。

或許,Java 21 中最重要的特性就是虛擬線程。
在 Java 21 中,Java 的基本并發(fā)模型保持不變,Stream API 仍然是并行處理大型數(shù)據(jù)集的首選方式。
隨著虛擬線程的引入,并發(fā) API 現(xiàn)在能提供更好的性能。在當(dāng)今的微服務(wù)和可擴展服務(wù)器應(yīng)用領(lǐng)域,線程數(shù)量必須增長以滿足需求。虛擬線程的主要目標(biāo)是使服務(wù)器應(yīng)用程序能夠?qū)崿F(xiàn)高可擴展性,同時仍使用簡單的"每個請求一個線程"模型。
虛擬線程
在 Java 21 之前,JDK 的線程實現(xiàn)使用的是操作系統(tǒng)線程的薄包裝器。然而,操作系統(tǒng)線程代價高昂:
- 如果每個請求在其整個持續(xù)時間內(nèi)消耗一個操作系統(tǒng)線程,線程數(shù)量很快就會成為可擴展性的瓶頸。
- 即使使用線程池,吞吐量仍然受到限制,因為實際線程數(shù)量是有上限的。
虛擬線程的目標(biāo)是打破 Java 線程與操作系統(tǒng)線程之間的 1:1 關(guān)系。
虛擬線程應(yīng)用了類似于虛擬內(nèi)存的概念。正如虛擬內(nèi)存將大的地址空間映射到較小的物理內(nèi)存一樣,虛擬線程允許運行時通過將它們映射到少量操作系統(tǒng)線程來制造擁有許多線程的假象。
平臺線程是操作系統(tǒng)線程的薄包裝器。
而虛擬線程并不綁定到任何特定的操作系統(tǒng)線程。虛擬線程可以執(zhí)行平臺線程可以運行的任何代碼。這是一個主要優(yōu)勢——現(xiàn)有的 Java 代碼通常無需修改或僅需少量修改即可在虛擬線程上運行。虛擬線程由平臺線程承載,這些平臺線程仍然由操作系統(tǒng)調(diào)度。
例如,您可以像這樣創(chuàng)建一個使用虛擬線程的執(zhí)行器:
ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor();
對比示例
虛擬線程僅在主動執(zhí)行 CPU 密集型任務(wù)時才消耗操作系統(tǒng)線程。虛擬線程在其生命周期內(nèi)可以在不同的載體線程上掛載或卸載。
通常,當(dāng)虛擬線程遇到阻塞操作時,它會自行卸載。一旦該阻塞任務(wù)完成,虛擬線程通過掛載到任何可用的載體線程上來恢復(fù)執(zhí)行。這種掛載和卸載過程頻繁且透明地發(fā)生——不會阻塞操作系統(tǒng)線程。
示例 — 源代碼
Example01CachedThreadPool.java
在此示例中,使用緩存線程池創(chuàng)建了一個執(zhí)行器:
var executor = Executors.newCachedThreadPool()
package threads;
import java.time.Duration;
import java.util.concurrent.Executors;
import java.util.stream.IntStream;
/**
*
* @author Milan Karajovic <milan.karajovic.rs@gmail.com>
*
*/
public class Example01CachedThreadPool {
public void executeTasks(final int NUMBER_OF_TASKS) {
final int BLOCKING_CALL = 1;
System.out.println("Number of tasks which executed using 'newCachedThreadPool()' " + NUMBER_OF_TASKS + " tasks each.");
long startTime = System.currentTimeMillis();
try (var executor = Executors.newCachedThreadPool()) {
IntStream.range(0, NUMBER_OF_TASKS).forEach(i -> {
executor.submit(() -> {
// 模擬阻塞調(diào)用
Thread.sleep(Duration.ofSeconds(BLOCKING_CALL));
return i;
});
});
} catch (Exception e) {
throw new RuntimeException(e);
}
long endTime = System.currentTimeMillis();
System.out.println("For executing " + NUMBER_OF_TASKS + " tasks duration is: " + (endTime - startTime) + " ms");
}
}
package threads;
import org.junit.jupiter.api.MethodOrderer;
import org.junit.jupiter.api.Order;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestMethodOrder;
/**
*
* @author Milan Karajovic <milan.karajovic.rs@gmail.com>
*
*/
@TestMethodOrder(MethodOrderer.OrderAnnotation.class)
public class Example01CachedThreadPoolTest {
@Test
@Order(1)
public void test_1000_tasks() {
Example01CachedThreadPool example01CachedThreadPool = new Example01CachedThreadPool();
example01CachedThreadPool.executeTasks(1000);
}
@Test
@Order(2)
public void test_10_000_tasks() {
Example01CachedThreadPool example01CachedThreadPool = new Example01CachedThreadPool();
example01CachedThreadPool.executeTasks(10_000);
}
@Test
@Order(3)
public void test_100_000_tasks() {
Example01CachedThreadPool example01CachedThreadPool = new Example01CachedThreadPool();
example01CachedThreadPool.executeTasks(100_000);
}
@Test
@Order(4)
public void test_1_000_000_tasks() {
Example01CachedThreadPool example01CachedThreadPool = new Example01CachedThreadPool();
example01CachedThreadPool.executeTasks(1_000_000);
}
}
我 PC 上的測試結(jié)果:


Example02FixedThreadPool.java
使用固定線程池創(chuàng)建執(zhí)行器:
var executor = Executors.newFixedThreadPool(500)
package threads;
import java.time.Duration;
import java.util.concurrent.Executors;
import java.util.stream.IntStream;
/**
*
* @author Milan Karajovic <milan.karajovic.rs@gmail.com>
*
*/
public class Example02FixedThreadPool {
public void executeTasks(final int NUMBER_OF_TASKS) {
final int BLOCKING_CALL = 1;
System.out.println("Number of tasks which executed using 'newFixedThreadPool(500)' " + NUMBER_OF_TASKS + " tasks each.");
long startTime = System.currentTimeMillis();
try (var executor = Executors.newFixedThreadPool(500)) {
IntStream.range(0, NUMBER_OF_TASKS).forEach(i -> {
executor.submit(() -> {
// 模擬阻塞調(diào)用
Thread.sleep(Duration.ofSeconds(BLOCKING_CALL));
return i;
});
});
} catch (Exception e) {
throw new RuntimeException(e);
}
long endTime = System.currentTimeMillis();
System.out.println("For executing " + NUMBER_OF_TASKS + " tasks duration is: " + (endTime - startTime) + " ms");
}
}
package threads;
import org.junit.jupiter.api.MethodOrderer;
import org.junit.jupiter.api.Order;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestMethodOrder;
/**
*
* @author Milan Karajovic <milan.karajovic.rs@gmail.com>
*
*/
@TestMethodOrder(MethodOrderer.OrderAnnotation.class)
public class Example02FixedThreadPoolTest {
@Test
@Order(1)
public void test_1000_tasks() {
Example02FixedThreadPool example02FixedThreadPool = new Example02FixedThreadPool();
example02FixedThreadPool.executeTasks(1000);
}
@Test
@Order(2)
public void test_10_000_tasks() {
Example02FixedThreadPool example02FixedThreadPool = new Example02FixedThreadPool();
example02FixedThreadPool.executeTasks(10_000);
}
@Test
@Order(3)
public void test_100_000_tasks() {
Example02FixedThreadPool example02FixedThreadPool = new Example02FixedThreadPool();
example02FixedThreadPool.executeTasks(100_000);
}
@Test
@Order(4)
public void test_1_000_000_tasks() {
Example02FixedThreadPool example02FixedThreadPool = new Example02FixedThreadPool();
example02FixedThreadPool.executeTasks(1_000_000);
}
}
我 PC 上的測試結(jié)果:


Example03VirtualThread.java
使用虛擬線程每任務(wù)執(zhí)行器創(chuàng)建執(zhí)行器:
var executor = Executors.newVirtualThreadPerTaskExecutor()
package threads;
import java.time.Duration;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.stream.IntStream;
/**
*
* @author Milan Karajovic <milan.karajovic.rs@gmail.com>
*
*/
public class Example03VirtualThread {
public void executeTasks(final int NUMBER_OF_TASKS) {
final int BLOCKING_CALL = 1;
System.out.println("Number of tasks which executed using 'newVirtualThreadPerTaskExecutor()' " + NUMBER_OF_TASKS + " tasks each.");
long startTime = System.currentTimeMillis();
try (ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor()) {
IntStream.range(0, NUMBER_OF_TASKS).forEach(i -> {
executor.submit(() -> {
// 模擬阻塞調(diào)用
Thread.sleep(Duration.ofSeconds(BLOCKING_CALL));
return i;
});
});
} catch (Exception e) {
throw new RuntimeException(e);
}
long endTime = System.currentTimeMillis();
System.out.println("For executing " + NUMBER_OF_TASKS + " tasks duration is: " + (endTime - startTime) + " ms");
}
}
package threads;
import org.junit.jupiter.api.MethodOrderer;
import org.junit.jupiter.api.Order;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestMethodOrder;
/**
*
* @author Milan Karajovic <milan.karajovic.rs@gmail.com>
*
*/
@TestMethodOrder(MethodOrderer.OrderAnnotation.class)
public class Example03VirtualThreadTest {
@Test
@Order(1)
public void test_1000_tasks() {
Example03VirtualThread example03VirtualThread = new Example03VirtualThread();
example03VirtualThread.executeTasks(1000);
}
@Test
@Order(2)
public void test_10_000_tasks() {
Example03VirtualThread example03VirtualThread = new Example03VirtualThread();
example03VirtualThread.executeTasks(10_000);
}
@Test
@Order(3)
public void test_100_000_tasks() {
Example03VirtualThread example03VirtualThread = new Example03VirtualThread();
example03VirtualThread.executeTasks(100_000);
}
@Test
@Order(4)
public void test_1_000_000_tasks() {
Example03VirtualThread example03VirtualThread = new Example03VirtualThread();
example03VirtualThread.executeTasks(1_000_000);
}
@Test
@Order(5)
public void test_2_000_000_tasks() {
Example03VirtualThread example03VirtualThread = new Example03VirtualThread();
example03VirtualThread.executeTasks(2_000_000);
}
}
我 PC 上的測試結(jié)果:


結(jié)論
您可以清楚地看到用于處理所有 NUMBER_OF_TASKS 的不同執(zhí)行器實現(xiàn)之間的執(zhí)行時間差異。值得嘗試不同的 NUMBER_OF_TASKS 值以觀察性能變化。
虛擬線程的優(yōu)勢在處理大量任務(wù)時變得尤其明顯。當(dāng) NUMBER_OF_TASKS 設(shè)置為較高的數(shù)值時——例如 1,000,000——性能差距是顯著的。如下表所示,虛擬線程在處理大量任務(wù)時效率要高得多:

我確信,在澄清這一點之后,如果您的應(yīng)用程序使用并發(fā) API 處理大量任務(wù),您會認真考慮遷移到 Java 21 并利用虛擬線程。在許多情況下,這種轉(zhuǎn)變可以顯著提高應(yīng)用程序的性能和可擴展性。
源代碼:GitHub Repository – Comparing Threads in Java 21

浙公網(wǎng)安備 33010602011771號