從零開始學Flink:流批一體的執行模式
在大數據處理領域,批處理和流處理曾經被視為兩種截然不同的范式。然而,隨著Apache Flink的出現,這種界限正在逐漸模糊。Flink的一個核心特性是其批流一體的架構設計,允許用戶使用統一的API和執行引擎處理有界數據(批處理)和無界數據(流處理)。本文將深入探討Flink的執行模式(Execution Mode),特別是在Flink 1.20.1版本中對批處理和流處理模式的支持和優化。
一、Flink執行模式概述
1. 執行模式的基本概念
Flink的執行模式決定了作業如何被調度和執行。在Flink 1.12及以后的版本中,引入了統一的流批處理執行模式,主要包括以下三種模式:
- STREAMING模式: 傳統的流處理執行模式,適用于處理無界數據流
- BATCH模式: 專門為有界數據優化的批處理執行模式
- AUTOMATIC模式: 自動根據數據源類型選擇執行模式
這三種模式的引入使得Flink能夠在同一套API上提供最佳的批處理和流處理性能。
2. 執行模式的演進歷程
Flink的執行模式經歷了以下幾個關鍵階段:
- 早期版本: Flink最初專注于流處理,但提供了對批處理的支持
- Flink 1.12: 引入了全新的批處理執行模式(BATCH模式)
- Flink 1.14: 增強了批處理模式的性能和功能
- Flink 1.20.1: 進一步優化了批流一體架構,改進了執行模式的自動選擇機制
二、Execution Mode的技術原理
1. 兩種執行模式的核心區別
雖然Flink使用相同的API和代碼結構,但BATCH和STREAMING模式在內部執行方式上存在顯著差異:
| 特性 | STREAMING模式 | BATCH模式 |
|---|---|---|
| 調度策略 | 連續流式調度 | 批處理調度,類似于MapReduce |
| 資源利用 | 持續占用資源 | 任務完成后釋放資源 |
| 優化技術 | 流式優化 | 批處理優化,如查詢優化、物化視圖 |
| 處理延遲 | 毫秒級延遲 | 較高延遲,但吞吐量更大 |
| 適用場景 | 實時數據處理 | 離線數據分析 |
2. 批流一體的設計理念
Flink的批流一體架構基于以下核心理念:
- 統一的API: 無論批處理還是流處理,都使用相同的DataStream API
- 統一的狀態管理: 共享相同的狀態后端和檢查點機制
- 統一的容錯機制: 基于檢查點的故障恢復
- 統一的優化器: 但針對不同執行模式應用不同的優化策略
三、配置和使用Execution Mode
1. 環境準備
首先,確保你已經設置了正確的依賴:
dependencies {
// Flink核心依賴
implementation 'org.apache.flink:flink_core:1.20.1'
implementation 'org.apache.flink:flink-streaming-java:1.20.1'
implementation 'org.apache.flink:flink-clients:1.20.1'
implementation 'org.apache.flink:flink-connector-files:1.20.1'
implementation 'org.apache.flink:flink-connector-kafka:3.4.0-1.20'
}
2. 在代碼中設置執行模式
在Flink 1.20.1中,可以通過以下方式設置執行模式:
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class ExecutionModeExample {
public static void main(String[] args) throws Exception {
// 創建執行環境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 設置執行模式為BATCH
env.setRuntimeMode(RuntimeExecutionMode.BATCH);
// 或者設置為STREAMING
// env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
// 或者設置為AUTOMATIC(根據數據源自動選擇)
// env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
// 后續代碼...
}
}
3. 通過命令行參數設置
也可以通過命令行參數覆蓋代碼中的設置:
bin/flink run -Dexecution.runtime-mode=BATCH -c com.example.ExecutionModeExample your-jar-file.jar
四、BATCH模式與STREAMING模式實踐
1. 批處理模式示例
以下是使用BATCH模式處理文件數據的完整示例:
package com.cn.daimajiangxin.flink;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.connector.file.src.FileSource;
import org.apache.flink.connector.file.src.reader.StreamFormat;
import org.apache.flink.connector.file.src.reader.TextLineInputFormat;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
import java.time.Duration;
import java.util.Arrays;
public class BatchWordCount {
public static void main(String[] args) throws Exception {
// 創建執行環境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 明確設置為批處理模式
env.setRuntimeMode(RuntimeExecutionMode.BATCH);
// 從文件讀取數據(有界數據源)
String inputPath = "path\\flink-learning\\data\\input.txt";
// 1. 創建文件源構建器
Path filePath = new Path(inputPath);
// 2. 配置文件讀取格式
StreamFormat<String> format =new TextLineInputFormat("UTF-8");
// 3. 構建 FileSource
FileSource<String> fileSource = FileSource
.forRecordStreamFormat(format, filePath)
.build();
// 4. 添加 Watermark 策略(批處理中可使用默認策略)
WatermarkStrategy<String> watermarkStrategy = WatermarkStrategy
.<String>forMonotonousTimestamps()
.withIdleness(Duration.ofSeconds(10));
DataStream<String> text = env.fromSource(fileSource,watermarkStrategy,"FileSource");
// 數據處理邏輯
DataStream<Tuple2<String, Integer>> counts = text
.flatMap(new Tokenizer())
.keyBy(value -> value.f0)
.sum(1);
// 輸出結果
counts.print();
// 執行作業
env.execute("Batch Word Count");
}
public static final class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> {
private static final long serialVersionUID = 1L;
@Override
public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
// 分詞并為每個單詞生成(word, 1)的元組
Arrays.stream(value.toLowerCase().split("\\W+"))
.filter(word -> word.length() > 0)
.forEach(word -> out.collect(new Tuple2<>(word, 1)));
}
}
}
2. 流處理模式示例
以下是使用STREAMING模式處理Kafka數據流的示例:
package com.cn.daimajiangxin.flink;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
import java.time.Duration;
import java.util.Arrays;
public class StreamingWordCount {
public static void main(String[] args) throws Exception {
// 創建執行環境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 明確設置為流處理模式
env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
// 啟用檢查點
env.enableCheckpointing(5000);
// 創建Kafka源(無界數據源)
KafkaSource<String> source = KafkaSource.<String>
builder()
.setBootstrapServers("localhost:9092")
.setTopics("word-count-topic")
.setGroupId("flink-group")
.setStartingOffsets(OffsetsInitializer.earliest())
.setValueOnlyDeserializer(new SimpleStringSchema())
.build();
// 從Kafka讀取數據
DataStream<String> text = env.fromSource(
source,
WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(2)),
"Kafka Source"
);
// 數據處理邏輯
DataStream<Tuple2<String, Integer>> counts = text
.flatMap(new Tokenizer())
.keyBy(value -> value.f0)
.sum(1);
// 輸出結果
counts.print();
// 執行作業
env.execute("Streaming Word Count");
}
public static final class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> {
private static final long serialVersionUID = 1L;
@Override
public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
Arrays.stream(value.toLowerCase().split("\\W+"))
.filter(word -> word.length() > 0)
.forEach(word -> out.collect(new Tuple2<>(word, 1)));
}
}
}
五、AUTOMATIC模式的智能選擇機制
1. AUTOMATIC模式的工作原理
AUTOMATIC模式是Flink 1.20.1中的一個強大特性,它能夠根據作業的數據源類型自動選擇最合適的執行模式:
- 當所有輸入源都是有界的(如文件、批量數據庫查詢),自動選擇BATCH模式
- 當至少有一個輸入源是無界的(如Kafka、Socket),自動選擇STREAMING模式
// 設置為自動模式
env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
2. 邊界情況處理
在某些復雜場景下,AUTOMATIC模式的選擇可能不完全符合預期:
- 混合數據源: 如果作業同時包含有界和無界數據源,將選擇STREAMING模式
- 動態數據源: 對于可能在運行時從有界變為無界的數據源,建議明確指定執行模式
- 復雜處理拓撲: 對于包含迭代或復雜循環的作業,可能需要手動選擇執行模式
六、批處理模式的性能優化
1. 批處理特定的優化
BATCH模式針對有界數據處理提供了多項性能優化:
- 任務調度優化: 采用更高效的批處理調度策略
- 內存管理改進: 更積極的數據物化和緩存
- 網絡傳輸優化: 批量數據傳輸減少網絡開銷
- 計算優化: 使用更適合批處理的算子實現
2. 性能對比示例
使用相同的WordCount邏輯,分別在BATCH和STREAMING模式下處理1GB文本數據的性能對比:
| 模式 | 執行時間 | CPU使用率 | 內存消耗 |
|---|---|---|---|
| STREAMING | 38秒 | 穩定在70% | 2.4GB |
| BATCH | 22秒 | 峰值95%,完成后釋放 | 1.8GB |
七、Flink 1.20.1中的執行模式改進
1. 新特性和優化
Flink 1.20.1在執行模式方面帶來了多項改進:
- 更智能的AUTOMATIC模式: 改進了自動模式的選擇邏輯,支持更復雜的數據源組合
- 批處理模式性能提升: 進一步優化了批處理執行引擎,提升了大數據量處理能力
- API一致性增強: 確保所有算子在不同執行模式下行為一致
- 資源利用率優化: 改進了批處理模式下的資源調度,減少資源浪費
2. 兼容性注意事項
在使用Flink 1.20.1的執行模式時,需要注意以下兼容性問題:
- 某些流處理特有的操作(如CEP)在BATCH模式下可能行為受限
- 窗口操作在BATCH和STREAMING模式下的實現方式不同
- 狀態過期機制在兩種模式下有細微差別
八、最佳實踐
1. 執行模式選擇指南
| 場景 | 推薦模式 | 原因 |
|---|---|---|
| 離線數據處理 | BATCH | 性能更好,資源利用率更高 |
| 實時數據處理 | STREAMING | 低延遲,持續處理能力 |
| ETL作業 | BATCH | 更適合處理有界數據集 |
| 實時分析 | STREAMING | 滿足實時性要求 |
| 不確定數據源類型 | AUTOMATIC | 自動適配不同數據源 |
2. 實際應用中的模式切換策略
在實際項目中,可以采用以下策略來管理執行模式:
- 開發環境: 使用AUTOMATIC模式,方便測試不同數據源
- 生產環境: 根據明確的數據流特征選擇BATCH或STREAMING模式
- 批處理作業: 明確設置為BATCH模式以獲得最佳性能
- 流處理作業: 明確設置為STREAMING模式,確保低延遲
九、總結與展望
Flink的批流一體執行模式是大數據處理領域的一次重要創新,它消除了批處理和流處理之間的界限,為開發者提供了統一、靈活的編程模型。通過Execution Mode的合理選擇和配置,我們可以在不同場景下獲得最佳的性能表現。
隨著Flink 1.20.1的發布,批流一體架構進一步成熟,執行模式的自動選擇更加智能,性能優化更加到位。未來,Flink將繼續完善其批流一體架構,為大數據處理提供更加強大和靈活的解決方案。
通過本文的學習,相信你已經對Flink的執行模式有了深入的理解。在實際應用中,建議根據具體的數據特征和處理需求,選擇合適的執行模式,充分發揮Flink批流一體的優勢。

本文詳細介紹Apache Flink的批處理與流處理執行模式,包括Execution Mode的概念、配置方法、實現原理以及最佳實踐。
浙公網安備 33010602011771號