Java 流(Stream)處理操作完成之后,我們可以收集這個流中的元素,使之匯聚成一個最終結果。這個結果可以是一個對象,也可以是一個集合,甚至可以是一個基本類型數據。
以記錄 Record 為例:
@Data
@NoArgsConstructor
@AllArgsConstructor
public static class Record {
private String col1;
private String col2;
private int col3;
}
記錄 Record 包含三個屬性:列1(col1)、列2(col2)和 列3(col3)。
創建四個記錄實例:
Record r1 = new Record("a", "1", 1);
Record r2 = new Record("a", "2", 2);
Record r3 = new Record("b", "3", 3);
Record r4 = new Record("c", "4", 4);
添加到列表:
List<Record> records = new ArrayList<>();
records.add(r1);
records.add(r2);
records.add(r3);
records.add(r4);
收集所有記錄的 列1 值,以列表形式存儲結果
List<String> col1List = records.stream()
.map(Record::getCol1)
.collect(Collectors.toList());
log.info("col1List: {}", Json.toJson(col1List));
輸出結果:
col1List: ["a","a","b","c"]
收集所有記錄的 列1 值,且去重,以集合形式存儲
Set<String> col1Set = records.stream()
.map(Record::getCol1)
.collect(Collectors.toSet());
log.info("col1Set: {}", Json.toJson(col1Set));
輸出結果:
col1Set: ["a","b","c"]
收集記錄的 列2 值和 列3 值的對應關系,以字典形式存儲
Map<String, Integer> col2Map = records.stream()
.collect(Collectors.toMap(Record::getCol2, Record::getCol3));
log.info("col2Map: {}", Json.toJson(col2Map));
輸出結果:
col2Map: {"1":1,"2":2,"3":3,"4":4}
記錄的 列2 不能有重復值,否則會拋出 Duplicate key 異常。
收集所有記錄中 列3 值最大的記錄
Record max = records.stream()
.collect(Collectors.maxBy(Comparator.comparing(Record::getCol3)))
.orElse(null);
log.info("max: {}", Json.toJson(max));
輸出結果:
max: {"col1":"c","col2":"4","col3":4}
收集所有記錄中 列3 值的總和
int sum = records.stream()
.collect(Collectors.summingInt(Record::getCol3));
log.info("sum: {}", sum);
輸出結果:
sum: 10
流的收集需要通過 Stream.collect() 方法完成,方法的參數是一個 Collector(收集器);收集結果時,需要根據收集結果的目標類型,傳遞特定的收集器實例,如上:
- Collectors.toList()
- Collectors.toSet()
- Collectors.toMap()
- Collectors.maxBy()
- Collectors.summingInt()
Collectors(java.util.stream.Collectors) 是一個工具類,內置若干收集器,我們可以通過調用不同的方法快速獲取相應的收集器實例。
收集器(java.util.stream.Collector)本質是一個 接口,包含以下五個方法:
- Supplier supplier()
- BiConsumer<A, T> accumulator()
- BinaryOperator combiner()
- Function<A, R> finisher()
- Set
characteristics()
以 Collectors.toList() 為例演示收集器的工作過程。
創建一個中間結果容器
supplier() 方法會返回一個 Supplier 實例,調用該實例的 get() 方法,會創建一個中間結果容器。

@Override
public Supplier<List<String>> supplier() {
return new Supplier<List<String>>() {
@Override
public List<String> get() {
List<String> container = new ArrayList<>();
return container;
}
};
}
考慮到收集的元素類型 String,這里的中間結果容器類型為 ArrayList
根據收集過程的需要,中間結果容器可以是任意的數據結構。
逐一遍歷流中的每個元素,處理完成之后,添加到中間結果
accumulator() 方法會返回一個 BiConsumer 實例,它有一個 accept() 方法,
參數1:中間結果
參數2:流中遍歷到的某個元素
遍歷過程是 Java 自動完成的,每遍歷一個元素,會自動調用 BiConsumer.accept 方法。我們只需要在方法中實現元素的處理過程,然后把元素的處理結果添加到中間結果中就可以了。

@Override
public BiConsumer<List<String>, String> accumulator() {
return new BiConsumer<List<String>, String>() {
@Override
public void accept(List<String> container, String col) {
container.add(col);
}
};
}
這個示例中,流中的元素不需要任何處理,直接添加至中間結果即可。
中間結果轉換成最終結果
finisher() 方法會返回一個 Fuction 實例,它有一個 apply() 方法,
參數:中間結果
返回:最終結果
遍歷過程結束之后,Java 會自動調用 Function.apply() 方法,將中間結果轉換成最終結果。

@Override
public Function<List<String>, List<String>> finisher() {
return new Function<List<String>, List<String>>() {
@Override
public List<String> apply(List<String> container) {
return container;
}
};
}
這個示例中,中間結果就是最終結果,不需要任何處理,直接返回中間結果即可。
combiner()是做什么的?
流中的元素可以被并行處理,這樣的流稱為并行流。并行流相當于把一個大流切分成多個小流,內部使用多線程,并行處理這些小流。每一個小流遍歷完成之后,都會產生一個小的中間結果,需要將這些小的中間結果合并成一個大的中間結果。
假設有兩個小流,收集開始時,會創建兩個中間結果:

中間結果也是通過 Supplier.get() 方法創建的。
并行遍歷兩個小流,將各自流的處理結果添加到各自的中間結果中:

combiner() 方法會返回一個 BinaryOperator 實例,它有一個 apply() 方法:
參數1:中間結果1
參數2:中間結果2
返回:中間結果
Java 會在合適的時機自動調用 BinaryOperator.apply() 方法,將小的中間結果合并成大的中間結果。

@Override
public BinaryOperator<List<String>> combiner() {
return new BinaryOperator<List<String>>() {
@Override
public List<String> apply(List<String> container1, List<String> container2) {
container1.addAll(container2);
return container1;
}
};
}
characteristics()是什么的?
characteristics() 會返回一個 Characteristics(枚舉)集合實例,用于設定收集器的特性,支持以下三個值:
-
CONCURRENT
收集器支持并發使用
-
UNORDERED
收集器不保證元素順序
-
IDENTITY_FINISH
收集器中間結果可直接轉換成最終結果
Java 可以根據這些特性值,保證收集器正確地、有效率地執行。
完整代碼
Collector<String, List<String>, List<String>> collector = new Collector<String, List<String>, List<String>>() {
@Override
public Supplier<List<String>> supplier() {
return new Supplier<List<String>>() {
@Override
public List<String> get() {
List<String> container = new ArrayList<>();
return container;
}
};
}
@Override
public BiConsumer<List<String>, String> accumulator() {
return new BiConsumer<List<String>, String>() {
@Override
public void accept(List<String> container, String col) {
container.add(col);
}
};
}
@Override
public BinaryOperator<List<String>> combiner() {
return new BinaryOperator<List<String>>() {
@Override
public List<String> apply(List<String> container1, List<String> container2) {
container1.addAll(container2);
return container1;
}
};
}
@Override
public Function<List<String>, List<String>> finisher() {
return new Function<List<String>, List<String>>() {
@Override
public List<String> apply(List<String> container) {
return container;
}
};
}
@Override
public Set<Characteristics> characteristics() {
return new HashSet<>();
}
};
col1List = records.stream()
.map(Record::getCol1)
.collect(collector);
log.info("col1List: {}", Json.toJson(col1List));
浙公網安備 33010602011771號