CompletableFuture 從入門到精通:完整案例大全
之前已經(jīng)有了一篇介紹CompletableFuture使用的文章,但是感覺例子不夠清晰,重新組織了一版:
一、基礎(chǔ)入門篇 ??
1.1 創(chuàng)建 CompletableFuture
public class CompletableFutureBasic {
/**
* 1. 創(chuàng)建已完成的 CompletableFuture
*/
public void createCompletedFuture() {
// 直接創(chuàng)建已完成并帶有結(jié)果值的 Future
CompletableFuture<String> completedFuture = CompletableFuture.completedFuture("Hello World");
// 獲取結(jié)果(立即返回,不會(huì)阻塞)
try {
String result = completedFuture.get();
System.out.println("已完成 Future 結(jié)果: " + result);
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* 2. 使用 runAsync 執(zhí)行無(wú)返回值的異步任務(wù)
* 適用于不需要返回結(jié)果的場(chǎng)景
*/
public void runAsyncExample() {
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
// 模擬耗時(shí)操作
try {
Thread.sleep(1000);
System.out.println("異步任務(wù)執(zhí)行完成,線程: " + Thread.currentThread().getName());
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
// 等待任務(wù)完成
future.join(); // join() 不會(huì)拋出受檢異常
System.out.println("runAsync 任務(wù)完成");
}
/**
* 3. 使用 supplyAsync 執(zhí)行有返回值的異步任務(wù)
* 適用于需要返回計(jì)算結(jié)果的場(chǎng)景
*/
public void supplyAsyncExample() {
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
// 模擬數(shù)據(jù)計(jì)算
try {
Thread.sleep(2000);
System.out.println("數(shù)據(jù)計(jì)算完成,線程: " + Thread.currentThread().getName());
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return "任務(wù)被中斷";
}
return "計(jì)算結(jié)果: 42";
});
// 獲取計(jì)算結(jié)果
try {
String result = future.get(3, TimeUnit.SECONDS); // 設(shè)置超時(shí)時(shí)間
System.out.println("supplyAsync 結(jié)果: " + result);
} catch (TimeoutException e) {
System.out.println("任務(wù)執(zhí)行超時(shí)");
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* 4. 使用自定義線程池
* 默認(rèn)使用 ForkJoinPool.commonPool(),但可以指定自定義線程池
*/
public void customThreadPoolExample() {
// 創(chuàng)建自定義線程池
ExecutorService customExecutor = Executors.newFixedThreadPool(3,
new ThreadFactory() {
private AtomicInteger counter = new AtomicInteger(1);
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "custom-pool-" + counter.getAndIncrement());
}
});
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
System.out.println("使用自定義線程池執(zhí)行,線程: " + Thread.currentThread().getName());
return "自定義線程池結(jié)果";
}, customExecutor);
String result = future.join();
System.out.println("結(jié)果: " + result);
// 關(guān)閉線程池
customExecutor.shutdown();
}
}
1.2 結(jié)果處理基礎(chǔ)
public class ResultHandlingBasic {
/**
* 1. thenApply - 轉(zhuǎn)換結(jié)果
* 當(dāng)前階段完成后,將結(jié)果轉(zhuǎn)換為另一種類型
*/
public void thenApplyExample() {
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
System.out.println("第一階段: 獲取用戶ID");
return "user123";
}).thenApply(userId -> {
System.out.println("第二階段: 根據(jù)用戶ID查詢用戶信息");
return "用戶信息: " + userId;
}).thenApply(userInfo -> {
System.out.println("第三階段: 格式化用戶信息");
return userInfo + " [已格式化]";
});
String result = future.join();
System.out.println("最終結(jié)果: " + result);
}
/**
* 2. thenAccept - 消費(fèi)結(jié)果
* 接收前一階段的結(jié)果并進(jìn)行消費(fèi),不返回新值
*/
public void thenAcceptExample() {
CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> {
System.out.println("查詢訂單數(shù)據(jù)");
return "訂單數(shù)據(jù)";
}).thenAccept(orderData -> {
System.out.println("處理訂單數(shù)據(jù): " + orderData);
// 這里可以進(jìn)行數(shù)據(jù)保存、發(fā)送消息等操作
});
future.join();
System.out.println("訂單處理完成");
}
/**
* 3. thenRun - 執(zhí)行后續(xù)操作
* 不關(guān)心前一階段的結(jié)果,只在前一階段完成后執(zhí)行操作
*/
public void thenRunExample() {
CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> {
System.out.println("執(zhí)行數(shù)據(jù)清理操作");
return "清理結(jié)果";
}).thenRun(() -> {
System.out.println("數(shù)據(jù)清理完成,發(fā)送通知");
// 不依賴清理結(jié)果,只是執(zhí)行后續(xù)操作
});
future.join();
System.out.println("整個(gè)流程完成");
}
/**
* 4. thenApply vs thenApplyAsync 區(qū)別
* thenApply: 默認(rèn)在同一線程執(zhí)行
* thenApplyAsync: 在另一個(gè)線程執(zhí)行
*/
public void applyVsApplyAsync() {
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
System.out.println("supplyAsync 線程: " + Thread.currentThread().getName());
return "初始數(shù)據(jù)";
}).thenApply(data -> {
System.out.println("thenApply 線程: " + Thread.currentThread().getName());
return data + " -> 同步轉(zhuǎn)換";
});
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
System.out.println("supplyAsync 線程: " + Thread.currentThread().getName());
return "初始數(shù)據(jù)";
}).thenApplyAsync(data -> {
System.out.println("thenApplyAsync 線程: " + Thread.currentThread().getName());
return data + " -> 異步轉(zhuǎn)換";
});
System.out.println("同步結(jié)果: " + future1.join());
System.out.println("異步結(jié)果: " + future2.join());
}
}
二、組合操作篇 ??
2.1 任務(wù)鏈?zhǔn)浇M合
public class TaskChaining {
/**
* 1. thenCompose - 扁平化鏈?zhǔn)秸{(diào)用
* 用于一個(gè)異步操作依賴另一個(gè)異步操作的結(jié)果
*/
public void thenComposeExample() {
// 模擬用戶服務(wù)
CompletableFuture<String> userFuture = CompletableFuture.supplyAsync(() -> {
System.out.println("查詢用戶信息");
return "user_1001";
});
// thenCompose 將兩個(gè) Future 扁平化,避免嵌套
CompletableFuture<String> resultFuture = userFuture.thenCompose(userId -> {
return CompletableFuture.supplyAsync(() -> {
System.out.println("根據(jù)用戶ID查詢訂單: " + userId);
return "訂單列表 for " + userId;
});
});
String result = resultFuture.join();
System.out.println("最終結(jié)果: " + result);
// 對(duì)比:如果不使用 thenCompose 會(huì)有嵌套
CompletableFuture<CompletableFuture<String>> nestedFuture =
userFuture.thenApply(userId ->
CompletableFuture.supplyAsync(() -> "訂單列表 for " + userId)
);
}
/**
* 2. thenCombine - 合并兩個(gè)獨(dú)立任務(wù)的結(jié)果
* 兩個(gè)任務(wù)并行執(zhí)行,完成后合并它們的結(jié)果
*/
public void thenCombineExample() {
CompletableFuture<String> userFuture = CompletableFuture.supplyAsync(() -> {
System.out.println("并行任務(wù)1: 查詢用戶信息");
try { Thread.sleep(1000); } catch (InterruptedException e) { /* ignore */ }
return "用戶A";
});
CompletableFuture<String> productFuture = CompletableFuture.supplyAsync(() -> {
System.out.println("并行任務(wù)2: 查詢商品信息");
try { Thread.sleep(800); } catch (InterruptedException e) { /* ignore */ }
return "商品B";
});
// 合并兩個(gè)任務(wù)的結(jié)果
CompletableFuture<String> combinedFuture = userFuture.thenCombine(productFuture,
(user, product) -> {
System.out.println("合并用戶和商品信息");
return user + " 購(gòu)買了 " + product;
});
String result = combinedFuture.join();
System.out.println("合并結(jié)果: " + result);
}
/**
* 3. thenAcceptBoth - 消費(fèi)兩個(gè)任務(wù)的結(jié)果
* 類似 thenCombine,但只消費(fèi)不返回新值
*/
public void thenAcceptBothExample() {
CompletableFuture<Integer> stockFuture = CompletableFuture.supplyAsync(() -> {
System.out.println("查詢庫(kù)存");
return 100;
});
CompletableFuture<Double> priceFuture = CompletableFuture.supplyAsync(() -> {
System.out.println("查詢價(jià)格");
return 299.99;
});
CompletableFuture<Void> resultFuture = stockFuture.thenAcceptBoth(priceFuture,
(stock, price) -> {
System.out.println("庫(kù)存: " + stock + ", 價(jià)格: " + price);
System.out.println("總價(jià)值: " + (stock * price));
});
resultFuture.join();
System.out.println("消費(fèi)完成");
}
/**
* 4. runAfterBoth - 兩個(gè)任務(wù)完成后執(zhí)行
* 不關(guān)心任務(wù)結(jié)果,只關(guān)心完成狀態(tài)
*/
public void runAfterBothExample() {
CompletableFuture<Void> task1 = CompletableFuture.runAsync(() -> {
System.out.println("任務(wù)1: 數(shù)據(jù)備份");
try { Thread.sleep(1000); } catch (InterruptedException e) { /* ignore */ }
});
CompletableFuture<Void> task2 = CompletableFuture.runAsync(() -> {
System.out.println("任務(wù)2: 日志清理");
try { Thread.sleep(800); } catch (InterruptedException e) { /* ignore */ }
});
CompletableFuture<Void> result = task1.runAfterBoth(task2, () -> {
System.out.println("兩個(gè)任務(wù)都完成了,開始執(zhí)行后續(xù)操作");
});
result.join();
System.out.println("所有操作完成");
}
}
2.2 多任務(wù)組合
public class MultipleTaskCombination {
/**
* 1. allOf - 等待所有任務(wù)完成
* 適用于需要等待多個(gè)并行任務(wù)全部完成的場(chǎng)景
*/
public void allOfExample() {
List<CompletableFuture<String>> futures = new ArrayList<>();
// 創(chuàng)建多個(gè)并行任務(wù)
for (int i = 1; i <= 5; i++) {
final int taskId = i;
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
try {
// 模擬不同的執(zhí)行時(shí)間
Thread.sleep(ThreadLocalRandom.current().nextInt(1000, 3000));
System.out.println("任務(wù) " + taskId + " 完成");
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
return "任務(wù)" + taskId + "結(jié)果";
});
futures.add(future);
}
// 等待所有任務(wù)完成
CompletableFuture<Void> allFutures = CompletableFuture.allOf(
futures.toArray(new CompletableFuture[0])
);
// 所有任務(wù)完成后處理結(jié)果
CompletableFuture<List<String>> resultsFuture = allFutures.thenApply(v -> {
return futures.stream()
.map(CompletableFuture::join) // 此時(shí)所有任務(wù)已完成,join不會(huì)阻塞
.collect(Collectors.toList());
});
List<String> results = resultsFuture.join();
System.out.println("所有任務(wù)完成,結(jié)果: " + results);
}
/**
* 2. anyOf - 任意一個(gè)任務(wù)完成
* 適用于競(jìng)速場(chǎng)景,只需要最快的結(jié)果
*/
public void anyOfExample() {
// 模擬多個(gè)數(shù)據(jù)源查詢
CompletableFuture<String> source1 = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(1200);
System.out.println("數(shù)據(jù)源1返回");
} catch (InterruptedException e) { /* ignore */ }
return "數(shù)據(jù)源1結(jié)果";
});
CompletableFuture<String> source2 = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(800);
System.out.println("數(shù)據(jù)源2返回");
} catch (InterruptedException e) { /* ignore */ }
return "數(shù)據(jù)源2結(jié)果";
});
CompletableFuture<String> source3 = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(1500);
System.out.println("數(shù)據(jù)源3返回");
} catch (InterruptedException e) { /* ignore */ }
return "數(shù)據(jù)源3結(jié)果";
});
// 獲取最快返回的結(jié)果
CompletableFuture<Object> fastestFuture = CompletableFuture.anyOf(source1, source2, source3);
Object fastestResult = fastestFuture.join();
System.out.println("最快返回的結(jié)果: " + fastestResult);
}
/**
* 3. 復(fù)雜組合:電商訂單處理流程
*/
public void ecommerceOrderProcessing() {
// 1. 驗(yàn)證訂單
CompletableFuture<Boolean> validationFuture = CompletableFuture.supplyAsync(() -> {
System.out.println("驗(yàn)證訂單信息...");
try { Thread.sleep(200); } catch (InterruptedException e) { /* ignore */ }
return true;
});
// 2. 檢查庫(kù)存(并行)
CompletableFuture<Boolean> stockFuture = CompletableFuture.supplyAsync(() -> {
System.out.println("檢查商品庫(kù)存...");
try { Thread.sleep(300); } catch (InterruptedException e) { /* ignore */ }
return true;
});
// 3. 計(jì)算價(jià)格(并行)
CompletableFuture<Double> priceFuture = CompletableFuture.supplyAsync(() -> {
System.out.println("計(jì)算訂單價(jià)格...");
try { Thread.sleep(250); } catch (InterruptedException e) { /* ignore */ }
return 299.99;
});
// 等待驗(yàn)證和庫(kù)存檢查都通過
CompletableFuture<Boolean> preCheckFuture = validationFuture.thenCombine(stockFuture,
(valid, inStock) -> valid && inStock);
// 所有前置條件滿足后,處理訂單
CompletableFuture<String> orderFuture = preCheckFuture.thenCompose(passed -> {
if (!passed) {
return CompletableFuture.completedFuture("訂單驗(yàn)證失敗");
}
return priceFuture.thenApply(price -> {
System.out.println("創(chuàng)建訂單,價(jià)格: " + price);
return "訂單創(chuàng)建成功,金額: " + price;
});
});
String result = orderFuture.join();
System.out.println("訂單處理結(jié)果: " + result);
}
}
三、異常處理篇 ??
3.1 基礎(chǔ)異常處理
public class ExceptionHandling {
/**
* 1. exceptionally - 捕獲異常并提供默認(rèn)值
* 類似于 try-catch,在發(fā)生異常時(shí)返回備用值
*/
public void exceptionallyExample() {
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
System.out.println("開始執(zhí)行可能失敗的任務(wù)...");
if (Math.random() > 0.5) {
throw new RuntimeException("模擬業(yè)務(wù)異常");
}
return "成功結(jié)果";
}).exceptionally(throwable -> {
// 捕獲異常,返回默認(rèn)值
System.err.println("捕獲到異常: " + throwable.getMessage());
return "默認(rèn)結(jié)果";
});
String result = future.join();
System.out.println("最終結(jié)果: " + result);
}
/**
* 2. handle - 統(tǒng)一處理成功和異常情況
* 無(wú)論成功還是失敗都會(huì)執(zhí)行,可以轉(zhuǎn)換結(jié)果或恢復(fù)
*/
public void handleExample() {
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
System.out.println("執(zhí)行任務(wù)...");
if (Math.random() > 0.7) {
throw new RuntimeException("任務(wù)執(zhí)行失敗");
}
return "原始結(jié)果";
}).handle((result, throwable) -> {
if (throwable != null) {
// 處理異常情況
System.err.println("任務(wù)失敗: " + throwable.getMessage());
return "異常恢復(fù)結(jié)果";
} else {
// 處理成功情況
System.out.println("任務(wù)成功: " + result);
return result + " -> 處理后的結(jié)果";
}
});
String result = future.join();
System.out.println("處理后的結(jié)果: " + result);
}
/**
* 3. whenComplete - 完成時(shí)回調(diào)(不改變結(jié)果)
* 類似于 finally,無(wú)論成功失敗都會(huì)執(zhí)行,但不改變結(jié)果
*/
public void whenCompleteExample() {
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
System.out.println("執(zhí)行核心業(yè)務(wù)...");
if (Math.random() > 0.6) {
throw new RuntimeException("業(yè)務(wù)異常");
}
return "業(yè)務(wù)結(jié)果";
}).whenComplete((result, throwable) -> {
// 執(zhí)行清理或日志記錄操作
if (throwable != null) {
System.err.println("業(yè)務(wù)執(zhí)行失敗,記錄日志: " + throwable.getMessage());
} else {
System.out.println("業(yè)務(wù)執(zhí)行成功,結(jié)果: " + result);
}
System.out.println("執(zhí)行清理操作...");
});
try {
String result = future.join();
System.out.println("最終獲取的結(jié)果: " + result);
} catch (Exception e) {
System.err.println("外層捕獲異常: " + e.getMessage());
}
}
}
3.2 高級(jí)異常處理模式
public class AdvancedExceptionHandling {
/**
* 1. 異常傳播和恢復(fù)鏈
* 在復(fù)雜的鏈?zhǔn)秸{(diào)用中處理異常
*/
public void exceptionPropagationChain() {
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
System.out.println("階段1: 數(shù)據(jù)獲取");
if (Math.random() > 0.8) {
throw new RuntimeException("數(shù)據(jù)獲取失敗");
}
return "原始數(shù)據(jù)";
})
.thenApply(data -> {
System.out.println("階段2: 數(shù)據(jù)轉(zhuǎn)換");
if (Math.random() > 0.9) {
throw new RuntimeException("數(shù)據(jù)轉(zhuǎn)換失敗");
}
return data + " -> 轉(zhuǎn)換后";
})
.exceptionally(throwable -> {
// 第一階段或第二階段異常的恢復(fù)
System.err.println("前兩階段異常,使用備用數(shù)據(jù): " + throwable.getMessage());
return "備用數(shù)據(jù)";
})
.thenApply(data -> {
System.out.println("階段3: 最終處理: " + data);
return data + " -> 最終結(jié)果";
})
.handle((result, throwable) -> {
// 最終的統(tǒng)一處理
if (throwable != null) {
return "系統(tǒng)異常: " + throwable.getMessage();
}
return result;
});
String result = future.join();
System.out.println("鏈?zhǔn)教幚斫Y(jié)果: " + result);
}
/**
* 2. 超時(shí)控制
* 使用 completeOnTimeout 設(shè)置超時(shí)默認(rèn)值
*/
public void timeoutControlExample() {
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
try {
// 模擬長(zhǎng)時(shí)間運(yùn)行的任務(wù)
Thread.sleep(5000);
return "正常結(jié)果";
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return "中斷結(jié)果";
}
})
.completeOnTimeout("超時(shí)默認(rèn)值", 2, TimeUnit.SECONDS) // 2秒超時(shí)
.exceptionally(throwable -> {
System.err.println("其他異常: " + throwable.getMessage());
return "異常默認(rèn)值";
});
String result = future.join();
System.out.println("超時(shí)控制結(jié)果: " + result);
}
/**
* 3. 重試機(jī)制
* 實(shí)現(xiàn)帶重試的異步操作
*/
public void retryMechanism() {
AtomicInteger retryCount = new AtomicInteger(0);
int maxRetries = 3;
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
int currentTry = retryCount.incrementAndGet();
System.out.println("第 " + currentTry + " 次嘗試...");
if (Math.random() > 0.3 && currentTry < maxRetries) {
throw new RuntimeException("模擬失敗,需要重試");
}
return "成功結(jié)果(第 " + currentTry + " 次嘗試)";
})
.exceptionally(throwable -> {
if (retryCount.get() < maxRetries) {
System.out.println("執(zhí)行失敗,進(jìn)行重試...");
// 在實(shí)際項(xiàng)目中,這里可以重新提交任務(wù)
return "重試結(jié)果";
} else {
return "達(dá)到最大重試次數(shù),返回默認(rèn)值";
}
});
String result = future.join();
System.out.println("重試機(jī)制結(jié)果: " + result);
}
/**
* 4. 組合操作中的異常處理
* 處理多個(gè)并行任務(wù)中的異常
*/
public void exceptionInCombination() {
CompletableFuture<String> task1 = CompletableFuture.supplyAsync(() -> {
System.out.println("任務(wù)1執(zhí)行");
throw new RuntimeException("任務(wù)1失敗");
}).exceptionally(e -> "任務(wù)1默認(rèn)結(jié)果");
CompletableFuture<String> task2 = CompletableFuture.supplyAsync(() -> {
System.out.println("任務(wù)2執(zhí)行");
return "任務(wù)2成功結(jié)果";
});
// 即使 task1 失敗,task2 仍然會(huì)執(zhí)行
CompletableFuture<Void> combined = CompletableFuture.allOf(task1, task2)
.exceptionally(throwable -> {
System.err.println("組合任務(wù)異常: " + throwable.getMessage());
return null;
});
combined.join();
// 獲取各自的結(jié)果(即使有失敗)
String result1 = task1.exceptionally(e -> "獲取失敗").join();
String result2 = task2.exceptionally(e -> "獲取失敗").join();
System.out.println("任務(wù)1結(jié)果: " + result1);
System.out.println("任務(wù)2結(jié)果: " + result2);
}
}
四、高級(jí)應(yīng)用篇 ??
4.1 性能優(yōu)化模式
public class PerformanceOptimization {
/**
* 1. 并行流水線處理
* 將大任務(wù)拆分為多個(gè)小任務(wù)并行處理
*/
public void parallelPipeline() {
// 模擬數(shù)據(jù)處理流水線
CompletableFuture<String> resultFuture = CompletableFuture.supplyAsync(() -> {
System.out.println("階段1: 數(shù)據(jù)采集");
return "原始數(shù)據(jù)";
})
.thenApplyAsync(data -> {
System.out.println("階段2: 數(shù)據(jù)清洗 - " + Thread.currentThread().getName());
return data + " -> 清洗后";
})
.thenApplyAsync(data -> {
System.out.println("階段3: 數(shù)據(jù)轉(zhuǎn)換 - " + Thread.currentThread().getName());
return data + " -> 轉(zhuǎn)換后";
})
.thenApplyAsync(data -> {
System.out.println("階段4: 數(shù)據(jù)聚合 - " + Thread.currentThread().getName());
return data + " -> 聚合完成";
});
String result = resultFuture.join();
System.out.println("流水線處理結(jié)果: " + result);
}
/**
* 2. 批量異步處理
* 處理大量獨(dú)立任務(wù)的優(yōu)化模式
*/
public void batchAsyncProcessing() {
List<Integer> dataList = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
// 為每個(gè)數(shù)據(jù)項(xiàng)創(chuàng)建異步任務(wù)
List<CompletableFuture<String>> futures = dataList.stream()
.map(item -> CompletableFuture.supplyAsync(() -> {
try {
// 模擬處理時(shí)間
Thread.sleep(100);
System.out.println("處理數(shù)據(jù): " + item + " - " + Thread.currentThread().getName());
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
return "結(jié)果-" + item;
}))
.collect(Collectors.toList());
// 等待所有任務(wù)完成
CompletableFuture<Void> allDone = CompletableFuture.allOf(
futures.toArray(new CompletableFuture[0])
);
// 收集結(jié)果
CompletableFuture<List<String>> resultsFuture = allDone.thenApply(v ->
futures.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList())
);
List<String> results = resultsFuture.join();
System.out.println("批量處理完成,結(jié)果數(shù)量: " + results.size());
System.out.println("前3個(gè)結(jié)果: " + results.subList(0, 3));
}
/**
* 3. 緩存模式
* 使用 CompletableFuture 作為緩存值,避免重復(fù)計(jì)算
*/
public class AsyncCache<K, V> {
private final ConcurrentHashMap<K, CompletableFuture<V>> cache = new ConcurrentHashMap<>();
private final Function<K, V> loader;
public AsyncCache(Function<K, V> loader) {
this.loader = loader;
}
public CompletableFuture<V> get(K key) {
return cache.computeIfAbsent(key, k ->
CompletableFuture.supplyAsync(() -> loader.apply(k))
.exceptionally(throwable -> {
// 發(fā)生異常時(shí)移除緩存
cache.remove(key);
throw new CompletionException(throwable);
})
);
}
}
public void cachePatternExample() {
AsyncCache<String, String> cache = new AsyncCache<>(key -> {
System.out.println("計(jì)算值: " + key);
try { Thread.sleep(1000); } catch (InterruptedException e) { /* ignore */ }
return key + "-value";
});
// 多次獲取相同key,只會(huì)計(jì)算一次
CompletableFuture<String> future1 = cache.get("key1");
CompletableFuture<String> future2 = cache.get("key1"); // 從緩存獲取
CompletableFuture.allOf(future1, future2).join();
System.out.println("future1結(jié)果: " + future1.join());
System.out.println("future2結(jié)果: " + future2.join());
System.out.println("兩個(gè)future是同一個(gè)對(duì)象: " + (future1 == future2));
}
}
4.2 復(fù)雜業(yè)務(wù)場(chǎng)景
public class ComplexBusinessScenarios {
/**
* 1. 電商訂單全流程處理
*/
public void ecommerceFullProcess() {
// 創(chuàng)建自定義線程池
ExecutorService businessExecutor = Executors.newFixedThreadPool(10);
CompletableFuture<String> orderProcess = CompletableFuture
// 階段1: 訂單驗(yàn)證
.supplyAsync(() -> {
System.out.println("1. 驗(yàn)證訂單信息");
try { Thread.sleep(100); } catch (InterruptedException e) { /* ignore */ }
return "訂單驗(yàn)證通過";
}, businessExecutor)
// 階段2: 并行檢查庫(kù)存和用戶信用
.thenCompose(validationResult -> {
CompletableFuture<Boolean> stockCheck = CompletableFuture.supplyAsync(() -> {
System.out.println("2.1 檢查庫(kù)存");
try { Thread.sleep(200); } catch (InterruptedException e) { /* ignore */ }
return true;
}, businessExecutor);
CompletableFuture<Boolean> creditCheck = CompletableFuture.supplyAsync(() -> {
System.out.println("2.2 檢查用戶信用");
try { Thread.sleep(150); } catch (InterruptedException e) { /* ignore */ }
return true;
}, businessExecutor);
return stockCheck.thenCombine(creditCheck, (stockOk, creditOk) ->
stockOk && creditOk ? "前置檢查通過" : "前置檢查失敗"
);
})
// 階段3: 扣減庫(kù)存和計(jì)算價(jià)格
.thenCompose(preCheckResult -> {
if (!"前置檢查通過".equals(preCheckResult)) {
return CompletableFuture.completedFuture("訂單創(chuàng)建失敗: " + preCheckResult);
}
CompletableFuture<Boolean> inventoryUpdate = CompletableFuture.supplyAsync(() -> {
System.out.println("3.1 扣減庫(kù)存");
try { Thread.sleep(100); } catch (InterruptedException e) { /* ignore */ }
return true;
}, businessExecutor);
CompletableFuture<Double> priceCalculation = CompletableFuture.supplyAsync(() -> {
System.out.println("3.2 計(jì)算最終價(jià)格");
try { Thread.sleep(120); } catch (InterruptedException e) { /* ignore */ }
return 299.99;
}, businessExecutor);
return inventoryUpdate.thenCombine(priceCalculation, (updateOk, price) ->
updateOk ? "訂單創(chuàng)建成功,價(jià)格: " + price : "庫(kù)存扣減失敗"
);
})
// 階段4: 發(fā)送通知(不阻塞主流程)
.whenComplete((result, throwable) -> {
if (throwable == null) {
CompletableFuture.runAsync(() -> {
System.out.println("4. 發(fā)送訂單通知: " + result);
try { Thread.sleep(50); } catch (InterruptedException e) { /* ignore */ }
}, businessExecutor);
}
})
// 異常處理
.exceptionally(throwable -> {
System.err.println("訂單處理異常: " + throwable.getMessage());
return "系統(tǒng)異常,請(qǐng)稍后重試";
});
String finalResult = orderProcess.join();
System.out.println("訂單處理最終結(jié)果: " + finalResult);
businessExecutor.shutdown();
}
/**
* 2. 數(shù)據(jù)同步和聚合
*/
public void dataSyncAndAggregation() {
// 模擬從多個(gè)數(shù)據(jù)源獲取數(shù)據(jù)
List<CompletableFuture<Map<String, Object>>> dataSourceFutures = Arrays.asList(
CompletableFuture.supplyAsync(() -> {
System.out.println("從數(shù)據(jù)庫(kù)獲取用戶數(shù)據(jù)");
try { Thread.sleep(300); } catch (InterruptedException e) { /* ignore */ }
Map<String, Object> userData = new HashMap<>();
userData.put("userId", 1001);
userData.put("userName", "張三");
return userData;
}),
CompletableFuture.supplyAsync(() -> {
System.out.println("從Redis獲取用戶緩存數(shù)據(jù)");
try { Thread.sleep(100); } catch (InterruptedException e) { /* ignore */ }
Map<String, Object> cacheData = new HashMap<>();
cacheData.put("lastLogin", "2024-01-01");
cacheData.put("loginCount", 42);
return cacheData;
}),
CompletableFuture.supplyAsync(() -> {
System.out.println("從外部API獲取用戶積分");
try { Thread.sleep(400); } catch (InterruptedException e) { /* ignore */ }
Map<String, Object> pointsData = new HashMap<>();
pointsData.put("points", 1500);
pointsData.put("level", "VIP");
return pointsData;
})
);
// 等待所有數(shù)據(jù)源返回并聚合數(shù)據(jù)
CompletableFuture<Map<String, Object>> aggregatedFuture =
CompletableFuture.allOf(dataSourceFutures.toArray(new CompletableFuture[0]))
.thenApply(v -> {
Map<String, Object> result = new HashMap<>();
dataSourceFutures.forEach(future -> {
try {
Map<String, Object> data = future.join();
result.putAll(data);
} catch (Exception e) {
System.err.println("數(shù)據(jù)源獲取失敗: " + e.getMessage());
}
});
return result;
});
Map<String, Object> aggregatedData = aggregatedFuture.join();
System.out.println("聚合后的用戶數(shù)據(jù): " + aggregatedData);
}
/**
* 3. 限流和背壓控制
*/
public void rateLimitingAndBackpressure() {
// 創(chuàng)建限流線程池
ExecutorService limitedExecutor = Executors.newFixedThreadPool(3);
Semaphore semaphore = new Semaphore(2); // 同時(shí)最多2個(gè)任務(wù)執(zhí)行
List<CompletableFuture<String>> limitedFutures = new ArrayList<>();
for (int i = 1; i <= 10; i++) {
final int taskId = i;
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
try {
semaphore.acquire(); // 獲取許可
System.out.println("開始執(zhí)行任務(wù) " + taskId + ",當(dāng)前并發(fā): " +
(2 - semaphore.availablePermits()));
// 模擬任務(wù)執(zhí)行
Thread.sleep(1000);
return "任務(wù)" + taskId + "完成";
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return "任務(wù)" + taskId + "中斷";
} finally {
semaphore.release(); // 釋放許可
System.out.println("釋放任務(wù) " + taskId + ",剩余許可: " +
semaphore.availablePermits());
}
}, limitedExecutor);
limitedFutures.add(future);
}
// 等待所有任務(wù)完成
CompletableFuture<Void> allDone = CompletableFuture.allOf(
limitedFutures.toArray(new CompletableFuture[0])
);
allDone.join();
System.out.println("所有限流任務(wù)完成");
limitedExecutor.shutdown();
}
}
五、實(shí)戰(zhàn)案例篇 ??
5.1 微服務(wù)調(diào)用編排
public class MicroserviceOrchestration {
/**
* 1. 服務(wù)調(diào)用編排:用戶注冊(cè)流程
*/
public CompletableFuture<UserRegisterResult> userRegisterFlow(UserRegisterRequest request) {
// 階段1: 并行驗(yàn)證
CompletableFuture<Boolean> emailCheck = checkEmailAvailable(request.getEmail());
CompletableFuture<Boolean> mobileCheck = checkMobileAvailable(request.getMobile());
return emailCheck.thenCombine(mobileCheck, (emailOk, mobileOk) -> {
if (!emailOk) {
throw new BusinessException("郵箱已被注冊(cè)");
}
if (!mobileOk) {
throw new BusinessException("手機(jī)號(hào)已被注冊(cè)");
}
return "驗(yàn)證通過";
})
// 階段2: 創(chuàng)建用戶
.thenCompose(validation -> createUser(request))
// 階段3: 并行初始化用戶數(shù)據(jù)
.thenCompose(userId -> {
CompletableFuture<Void> initProfile = initUserProfile(userId);
CompletableFuture<Void> sendWelcome = sendWelcomeMessage(userId, request.getEmail());
CompletableFuture<Void> grantPoints = grantRegisterPoints(userId);
return CompletableFuture.allOf(initProfile, sendWelcome, grantPoints)
.thenApply(v -> userId);
})
// 階段4: 記錄注冊(cè)日志(異步,不阻塞)
.whenComplete((userId, throwable) -> {
if (throwable == null) {
recordRegisterLog(userId, request.getSource()).exceptionally(e -> {
System.err.println("記錄日志失敗: " + e.getMessage());
return null;
});
}
})
// 構(gòu)建最終結(jié)果
.handle((userId, throwable) -> {
if (throwable != null) {
return UserRegisterResult.fail(throwable.getMessage());
}
return UserRegisterResult.success(userId, "注冊(cè)成功");
});
}
// 模擬服務(wù)方法
private CompletableFuture<Boolean> checkEmailAvailable(String email) {
return CompletableFuture.supplyAsync(() -> {
System.out.println("檢查郵箱可用性: " + email);
try { Thread.sleep(100); } catch (InterruptedException e) { /* ignore */ }
return Math.random() > 0.1; // 90%概率可用
});
}
private CompletableFuture<Boolean> checkMobileAvailable(String mobile) {
return CompletableFuture.supplyAsync(() -> {
System.out.println("檢查手機(jī)號(hào)可用性: " + mobile);
try { Thread.sleep(150); } catch (InterruptedException e) { /* ignore */ }
return Math.random() > 0.1; // 90%概率可用
});
}
private CompletableFuture<String> createUser(UserRegisterRequest request) {
return CompletableFuture.supplyAsync(() -> {
System.out.println("創(chuàng)建用戶: " + request.getEmail());
try { Thread.sleep(200); } catch (InterruptedException e) { /* ignore */ }
return "user_" + System.currentTimeMillis();
});
}
private CompletableFuture<Void> initUserProfile(String userId) {
return CompletableFuture.runAsync(() -> {
System.out.println("初始化用戶檔案: " + userId);
try { Thread.sleep(100); } catch (InterruptedException e) { /* ignore */ }
});
}
private CompletableFuture<Void> sendWelcomeMessage(String userId, String email) {
return CompletableFuture.runAsync(() -> {
System.out.println("發(fā)送歡迎郵件: " + email);
try { Thread.sleep(300); } catch (InterruptedException e) { /* ignore */ }
});
}
private CompletableFuture<Void> grantRegisterPoints(String userId) {
return CompletableFuture.runAsync(() -> {
System.out.println("發(fā)放注冊(cè)積分: " + userId);
try { Thread.sleep(50); } catch (InterruptedException e) { /* ignore */ }
});
}
private CompletableFuture<Void> recordRegisterLog(String userId, String source) {
return CompletableFuture.runAsync(() -> {
System.out.println("記錄注冊(cè)日志: " + userId + ", 來(lái)源: " + source);
try { Thread.sleep(80); } catch (InterruptedException e) { /* ignore */ }
});
}
// 數(shù)據(jù)模型
static class UserRegisterRequest {
private String email;
private String mobile;
private String source;
// getters and setters
public String getEmail() { return email; }
public String getMobile() { return mobile; }
public String getSource() { return source; }
}
static class UserRegisterResult {
private boolean success;
private String userId;
private String message;
static UserRegisterResult success(String userId, String message) {
UserRegisterResult result = new UserRegisterResult();
result.success = true;
result.userId = userId;
result.message = message;
return result;
}
static UserRegisterResult fail(String message) {
UserRegisterResult result = new UserRegisterResult();
result.success = false;
result.message = message;
return result;
}
}
static class BusinessException extends RuntimeException {
public BusinessException(String message) {
super(message);
}
}
}
5.2 性能監(jiān)控和調(diào)試
public class PerformanceMonitoring {
/**
* 帶監(jiān)控的 CompletableFuture 包裝器
*/
public static class MonitoredCompletableFuture<T> {
private final CompletableFuture<T> future;
private final long startTime;
private final String taskName;
public MonitoredCompletableFuture(CompletableFuture<T> future, String taskName) {
this.future = future;
this.startTime = System.currentTimeMillis();
this.taskName = taskName;
// 添加完成回調(diào)記錄指標(biāo)
this.future.whenComplete((result, throwable) -> {
long duration = System.currentTimeMillis() - startTime;
if (throwable != null) {
System.out.printf("? 任務(wù)監(jiān)控 [%s] - 失敗 - 耗時(shí): %dms - 異常: %s%n",
taskName, duration, throwable.getMessage());
} else {
System.out.printf("? 任務(wù)監(jiān)控 [%s] - 成功 - 耗時(shí): %dms%n",
taskName, duration);
}
});
}
public CompletableFuture<T> getFuture() {
return future;
}
public static <T> MonitoredCompletableFuture<T> supplyAsync(
Supplier<T> supplier, String taskName) {
return new MonitoredCompletableFuture<>(
CompletableFuture.supplyAsync(supplier), taskName);
}
}
/**
* 使用監(jiān)控包裝器的示例
*/
public void monitoredExample() {
MonitoredCompletableFuture<String> task1 = MonitoredCompletableFuture.supplyAsync(() -> {
try { Thread.sleep(1000); } catch (InterruptedException e) { /* ignore */ }
return "任務(wù)1結(jié)果";
}, "數(shù)據(jù)查詢");
MonitoredCompletableFuture<Integer> task2 = MonitoredCompletableFuture.supplyAsync(() -> {
try { Thread.sleep(500); } catch (InterruptedException e) { /* ignore */ }
return 42;
}, "計(jì)算任務(wù)");
CompletableFuture<String> combined = task1.getFuture()
.thenCombine(task2.getFuture(), (r1, r2) -> r1 + " + " + r2);
String result = combined.join();
System.out.println("組合結(jié)果: " + result);
}
/**
* 調(diào)試工具:打印執(zhí)行線程
*/
public static <T> CompletableFuture<T> withDebug(CompletableFuture<T> future, String operation) {
return future
.thenApply(result -> {
System.out.printf("?? [%s] thenApply - 線程: %s, 結(jié)果: %s%n",
operation, Thread.currentThread().getName(), result);
return result;
})
.exceptionally(throwable -> {
System.out.printf("?? [%s] exceptionally - 線程: %s, 異常: %s%n",
operation, Thread.currentThread().getName(), throwable.getMessage());
throw new CompletionException(throwable);
});
}
public void debugExample() {
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
System.out.println("初始任務(wù)線程: " + Thread.currentThread().getName());
return "初始數(shù)據(jù)";
});
CompletableFuture<String> debugFuture = withDebug(future, "階段1")
.thenApplyAsync(data -> {
System.out.println("異步轉(zhuǎn)換線程: " + Thread.currentThread().getName());
return data + " -> 轉(zhuǎn)換后";
})
.thenApply(data -> {
System.out.println("同步轉(zhuǎn)換線程: " + Thread.currentThread().getName());
return data + " -> 最終結(jié)果";
});
String result = debugFuture.join();
System.out.println("調(diào)試示例結(jié)果: " + result);
}
}
總結(jié)
核心要點(diǎn)回顧 ??
基礎(chǔ)操作:
supplyAsync/runAsync- 創(chuàng)建異步任務(wù)thenApply- 轉(zhuǎn)換結(jié)果thenAccept- 消費(fèi)結(jié)果thenRun- 執(zhí)行后續(xù)操作
組合操作:
thenCompose- 扁平化鏈?zhǔn)秸{(diào)用thenCombine- 合并兩個(gè)任務(wù)結(jié)果allOf- 等待所有任務(wù)完成anyOf- 獲取最快任務(wù)結(jié)果
異常處理:
exceptionally- 異常恢復(fù)handle- 統(tǒng)一處理成功和異常whenComplete- 完成時(shí)回調(diào)
最佳實(shí)踐:
- 合理使用異步:IO密集型任務(wù)適合異步,CPU密集型需謹(jǐn)慎
- 避免阻塞操作:在異步任務(wù)中避免同步阻塞調(diào)用
- 合理設(shè)置超時(shí):使用
completeOnTimeout或orTimeout - 資源清理:及時(shí)關(guān)閉自定義線程池
- 監(jiān)控調(diào)試:添加監(jiān)控指標(biāo)便于問題排查
性能優(yōu)化技巧 ?
- 使用自定義線程池避免公共線程池耗盡
- 合理設(shè)置線程數(shù):IO密集型可多線程,CPU密集型要謹(jǐn)慎
- 避免過度拆分:小任務(wù)過多會(huì)增加調(diào)度開銷
- 使用批量操作減少線程切換
- 合理使用緩存避免重復(fù)計(jì)算
常見陷阱 ??
- 異常被吞沒:記得使用
exceptionally或handle處理異常 - 線程池耗盡:監(jiān)控線程池狀態(tài),合理設(shè)置參數(shù)
- 內(nèi)存泄漏:及時(shí)取消不再需要的 Future
- 死鎖風(fēng)險(xiǎn):避免在異步任務(wù)中等待其他 Future
記住:CompletableFuture 是強(qiáng)大的工具,但需要根據(jù)具體場(chǎng)景合理使用!
?? 如果你喜歡這篇文章,請(qǐng)點(diǎn)贊支持! ?? 同時(shí)歡迎關(guān)注我的博客,獲取更多精彩內(nèi)容!
本文來(lái)自博客園,作者:佛祖讓我來(lái)巡山,轉(zhuǎn)載請(qǐng)注明原文鏈接:http://www.rzrgm.cn/sun-10387834/p/19177162

浙公網(wǎng)安備 33010602011771號(hào)