CompletableFuture學習總結
簡介
CompletableFuture結合了Future的優點,提供了非常強大的Future的擴展功能,可以幫助我們簡化異步編程的復雜性,提供了函數式編程的能力,可以通過回調的方式處理計算結果,并且提供了轉換和組合CompletableFuture的方法。
CompletableFuture被設計在Java中進行異步編程。異步編程意味著在主線程之外創建一個獨立的線程,與主線程分隔開,并在上面運行一個非阻塞的任務,然后通知主線程進展,成功或者失敗。
CompletableFuture是由Java8引入的,在Java8之前我們一般通過Future實現異步。
Future用于表示異步計算的結果,只能通過阻塞或者輪詢的方式獲取結果,而且不支持設置回調方法,Java8之前若要設置回調一般會使用guava的ListenableFuture。 CompletableFuture對Future進行了擴展,可以通過設置回調的方式處理計算結果,同時也支持組合操作,支持進一步的編排,同時一定程度解決了回調地獄的問題。
核心概念
CompletableFuture 是一個非常強大的并發工具類,它實現了 Future 和 CompletionStage 接口,用于表示某個異步計算的結果,與傳統的 Future 不同,CompletableFuture 提供了函數式編程的方法,可以更容易地組織異步代碼,處理回調和組合多個異步操作。
假設,有一個電商網站,用戶瀏覽產品詳情頁時,需要展示產品的基本信息、價格、庫存、用戶評價等多個方面的數據,這些數據可能來自不同的數據源或服務,比如:
- 產品基本信息可能來自一個主數據庫。
- 價格和庫存 可能需要實時從另一個庫存服務獲取。
- 用戶評價可能存儲在另一個專門用于用戶反饋的系統中。
為了提升用戶體驗,希望這些數據的獲取能夠并行進行,而不是一個接一個地串行獲取,這就是 CompletableFuture 的經典場景。
CompletableFuture 類在主要用來解決異步編程和并發執行的問題,在傳統的同步編程模型中,代碼的執行通常是阻塞的,即一行代碼執行完成后,下一行代碼才能開始執行,這種模型在處理耗時操作時,如 I/O 操作、數據庫訪問或網絡請求,會導致線程長時間閑置,等待操作完成,從而降低系統的吞吐量和響應能力。
因此,CompletableFuture 類提供了一種非阻塞的、基于回調的編程方式,可以在等待某個長時間運行的任務完成時,同時執行其他任務,這樣,就可以更充分地利用系統資源,提高程序的并發性和響應速度。
使用CompletableFuture通常用于解決以下類似場景的問題:
- 發起異步請求:當用戶請求一個產品詳情頁時,后端服務可以同時發起對三個數據源的異步請求,這可以通過創建三個
CompletableFuture實例來實現,每個實例負責一個數據源的請求。 - 處理異步結果:一旦這些異步請求發出,它們就可以獨立地執行,主線程可以繼續處理其他任務,當某個
CompletableFuture完成時,它會包含一個結果(或者是執行過程中的異常)。 - 組合異步結果:使用
CompletableFuture的組合方法(如thenCombine、thenAcceptBoth或allOf),可以等待所有異步操作完成,并將它們的結果組合在一起,比如,可以等待產品基本信息、價格和庫存以及用戶評價都返回后,再將這些數據整合到一個響應對象中,返回給前端。 - 異常處理:如果在獲取某個數據源時發生異常,
CompletableFuture允許以異步的方式處理這些異常,比如通過exceptionally方法提供一個默認的備選結果或執行一些清理操作。 - 最終響應:一旦所有數據源的數據都成功獲取并組合在一起,或者某個數據源發生異常并得到了妥善處理,服務就可以將最終的產品詳情頁響應發送給前端用戶。
使用CompletableFuture 可以高效的并發數據獲取,提升系統的響應速度和整體性能。
核心API
CompletableFuture 列用于表示某個異步計算的結果,它提供了函數式編程的方法來處理異步計算,允許以非阻塞的方式編寫并發代碼,并且可以鏈接多個異步操作,以下是一些常用方法的含義:
1、靜態工廠方法
CompletableFuture.supplyAsync(Supplier<? extends U> supplier): 異步執行給定的Supplier,并返回一個表示結果的新CompletableFuture。CompletableFuture.supplyAsync(Supplier<? extends U> supplier, Executor executor): 使用指定的執行器異步執行給定的Supplier。CompletableFuture.runAsync(Runnable runnable): 異步執行給定的Runnable,并返回一個表示其完成的新CompletableFuture。CompletableFuture.runAsync(Runnable runnable, Executor executor): 使用指定的執行器異步執行給定的Runnable。
2、完成時的處理
thenApply(Function<? super T,? extends U> fn): 當此CompletableFuture完成時,對其結果應用給定的函數。thenAccept(Consumer<? super T> action): 當此CompletableFuture完成時,執行給定的操作。thenRun(Runnable action): 當此CompletableFuture完成時,執行給定的無參數操作。
3、異常處理
exceptionally(Function<Throwable,? extends T> fn): 當此CompletableFuture異常完成時,對其異常應用給定的函數。
4、組合多個 CompletableFuture
thenCombine(CompletableFuture<? extends U> other, BiFunction<? super T,? super U,? extends V> fn): 當此CompletableFuture和另一個都完成時,使用給定的函數組合它們的結果。thenAcceptBoth(CompletableFuture<? extends U> other, BiConsumer<? super T,? super U> action): 當此CompletableFuture和另一個都完成時,對它們的結果執行給定的操作。runAfterBoth(CompletableFuture<?> other, Runnable action): 當此CompletableFuture和另一個都完成時,執行給定的操作。applyToEither(CompletableFuture<? extends T> other, Function<? super T, U> fn): 當此CompletableFuture或另一個完成時(哪個先完成),對其結果應用給定的函數。acceptEither(CompletableFuture<? extends T> other, Consumer<? super T> action): 當此CompletableFuture或另一個完成時(哪個先完成),對其結果執行給定的操作。runAfterEither(CompletableFuture<?> other, Runnable action): 當此CompletableFuture或另一個完成時(哪個先完成),執行給定的操作。
5、等待和獲取結果
get(): 等待計算完成,然后獲取其結果。get(long timeout, TimeUnit unit): 等待計算在給定的時間內完成,并獲取其結果。join(): 類似于get(),但是會在計算未完成時拋出未檢查的異常。complete(T value): 如果尚未完成,則設置此CompletableFuture的結果。completeExceptionally(Throwable ex): 如果尚未完成,則使此CompletableFuture異常完成。
6、取消
cancel(boolean mayInterruptIfRunning): 嘗試取消此CompletableFuture。isCancelled(): 如果此CompletableFuture被取消,則返回true。
7、查詢
isDone(): 如果此CompletableFuture完成(無論是正常完成還是異常完成),則返回true。
封裝計算邏輯的CompletableFuture
上面的代碼允許我們選擇任何并發執行的機制,但是如果我們想跳過這個樣板文件,簡單地異步執行一些代碼呢?
靜態方法runAsync和supplyAsync允許我們相應地使用Runnable和Supplier函數類型創建一個可完成的未來實例。
Runnable和Supplier都是函數接口,由于新的java8特性,它們允許將實例作為lambda表達式傳遞。
Runnable接口與線程中使用的舊接口相同,不允許返回值。
Supplier接口是一個通用函數接口,它有一個方法,該方法沒有參數,并且返回一個參數化類型的值。
這允許我們提供一個供應商實例作為lambda表達式來執行計算并返回結果。簡單到:
CompletableFuture<String> future
= CompletableFuture.supplyAsync(() -> "Hello");
// ...
assertEquals("Hello", future.get());
異步計算的處理結果
處理計算結果的最通用的方法是將其提供給函數。thenApply方法正是這樣做的;它接受一個函數實例,用它來處理結果,并返回一個包含函數返回值的Future:
CompletableFuture<String> completableFuture
= CompletableFuture.supplyAsync(() -> "Hello");
CompletableFuture<String> future = completableFuture
.thenApply(s -> s + " World");
assertEquals("Hello World", future.get());
如果我們不需要在Future中返回值,我們可以使用Consumer函數接口的實例。它的單個方法接受一個參數并返回void。
在可完成的將來,有一種方法可以解決這個用例。thenAccept方法接收使用者并將計算結果傳遞給它。最后一個future.get()調用返回Void類型的實例:
CompletableFuture<String> completableFuture
= CompletableFuture.supplyAsync(() -> "Hello");
CompletableFuture<Void> future = completableFuture
.thenAccept(s -> System.out.println("Computation returned: " + s));
future.get();
最后,如果我們既不需要計算的值,也不想返回值,那么我們可以將一個可運行的lambda傳遞給thenRun方法。在下面的示例中,我們只需在調用future.get()后在控制臺中打印一行:
CompletableFuture<String> completableFuture
= CompletableFuture.supplyAsync(() -> "Hello");
CompletableFuture<Void> future = completableFuture
.thenRun(() -> System.out.println("Computation finished."));
future.get();
組合CompletableFuture
CompletableFuture API最好的部分是能夠在一系列計算步驟中組合CompletableFuture實例。
這種鏈接的結果本身就是一個完整的Future,允許進一步的鏈接和組合。這種方法在函數語言中普遍存在,通常被稱為享元模式。
在下面的示例中,我們使用thenCompose方法按順序鏈接兩個Future。
請注意,此方法接受一個返回CompletableFuture實例的函數。此函數的參數是上一計算步驟的結果。這允許我們在下一個CompletableFuture的lambda中使用此值:
CompletableFuture<String> completableFuture
= CompletableFuture.supplyAsync(() -> "Hello")
.thenCompose(s -> CompletableFuture.supplyAsync(() -> s + " World"));
assertEquals("Hello World", completableFuture.get());
thenCompose方法與thenApply一起實現了享元模式的基本構建塊。它們與流的map和flatMap方法以及java8中的可選類密切相關。
兩個方法都接收一個函數并將其應用于計算結果,但是thencomose(flatMap)方法接收一個返回另一個相同類型對象的函數。這種功能結構允許將這些類的實例組合為構建塊。
如果我們想執行兩個獨立的未來,并對它們的結果進行處理,我們可以使用thenCombine方法,該方法接受一個未來和一個具有兩個參數的函數來處理這兩個結果:
CompletableFuture<String> completableFuture
= CompletableFuture.supplyAsync(() -> "Hello")
.thenCombine(CompletableFuture.supplyAsync(
() -> " World"), (s1, s2) -> s1 + s2));
assertEquals("Hello World", completableFuture.get());
一個簡單的例子是,當我們想處理兩個CompletableFuture的結果時,但不需要將任何結果值傳遞給CompletableFuture的鏈。thenAcceptBoth方法可以幫助:
CompletableFuture future = CompletableFuture.supplyAsync(() -> "Hello")
.thenAcceptBoth(CompletableFuture.supplyAsync(() -> " World"),
(s1, s2) -> System.out.println(s1 + s2));
thenApply()和thenCompose()方法之間的區別
在前面的部分中,我們展示了有關thenApply()和thenCompose()的示例。兩個api都有助于鏈接不同的CompletableFuture調用,但這兩個函數的用法不同。
thenApply()
我們可以使用此方法處理上一次調用的結果。但是,需要記住的一點是,返回類型將由所有調用組合而成。
因此,當我們要轉換CompletableFuture調用的結果時,此方法非常有用:
CompletableFuture<Integer> finalResult = compute().thenApply(s-> s + 1);
thenCompose()
thenCompose()方法與thenApply()類似,因為兩者都返回一個新的完成階段。但是,thencose()使用前一階段作為參數。它將展平并直接返回一個帶有結果的CompletableFuture,而不是我們在thenApply()中觀察到的嵌套CompletableFuture:
CompletableFuture<Integer> computeAnother(Integer i){
return CompletableFuture.supplyAsync(() -> 10 + i);
}
CompletableFuture<Integer> finalResult = compute().thenCompose(this::computeAnother);
因此,如果要鏈接可完成的CompletableFuture方法,那么最好使用thenCompose()。
另外,請注意,這兩個方法之間的差異類似于map()和flatMap()之間的差異。
并行運行多個CompletableFuture
當我們需要并行執行多個期貨時,我們通常希望等待所有Supplier執行,然后處理它們的組合結果。
CompletableFuture.allOf靜態方法允許等待的所有Supplier的完成:
CompletableFuture<String> future1
= CompletableFuture.supplyAsync(() -> "Hello");
CompletableFuture<String> future2
= CompletableFuture.supplyAsync(() -> "Beautiful");
CompletableFuture<String> future3
= CompletableFuture.supplyAsync(() -> "World");
CompletableFuture<Void> combinedFuture
= CompletableFuture.allOf(future1, future2, future3);
// ...
combinedFuture.get();
assertTrue(future1.isDone());
assertTrue(future2.isDone());
assertTrue(future3.isDone());
注意CompletableFuture.allOf()的返回類型是CompletableFuture。這種方法的局限性在于它不能返回所有Supplier的組合結果。相反,我們必須從未來手動獲取結果。幸運的是,CompletableFuture.join()方法和Java 8 Streams API使它變得簡單:
String combined = Stream.of(future1, future2, future3)
.map(CompletableFuture::join)
.collect(Collectors.joining(" "));
assertEquals("Hello Beautiful World", combined);
join()方法類似于get方法,但是如果Future不能正常完成,它會拋出一個未檢查的異常。這樣就可以將其用作Stream.map()方法中的方法引用。
具體使用簡單demo案例
/** * 獲取統計指標信息 * @return 統計指標值 */ @GetMapping("/statistics") public ResultVO statistics(){ ResultVO resultVO = new ResultVO(); try { // 獲取企業所需要統計的數量 CompletableFuture<Integer> unitNum = CompletableFuture.supplyAsync(() -> { log.info("執行[獲取企業所需要統計的數量]任務:"+"線程id:"+Thread.currentThread().getId()+"線程名稱:"+ Thread.currentThread().getName()); return enterpriseService.list().size(); }); // 獲取供需所需要統計的數量 CompletableFuture<Integer> supplyDemandNum = CompletableFuture.supplyAsync(() -> { log.info("執行[獲取供需所需要統計的數量]任務:"+"線程id:"+Thread.currentThread().getId()+"線程名稱:"+ Thread.currentThread().getName()); return supplyDemandService.list().size(); }); // 等待所有異步任務執行完成 CompletableFuture.allOf(unitNum,supplyDemandNum).join(); resultVO.setUnitNum(unitNum.get()); resultVO.setSupplyDemandNum(supplyDemandNum.get()); } catch (Exception e) { log.error("任務執行異常!"); throw new RuntimeException("獲取統計指標異常!"); } return resultVO; }

返回結果:
{ "unitNum": 109, "supplyDemandNum": 2 }

浙公網安備 33010602011771號