從零開始學Flink:數據源
在實時數據處理場景中,數據源(Source)是整個數據處理流程的起點。Flink作為流批一體的計算框架,提供了豐富的Source接口支持,其中通過Kafka獲取實時數據是最常見的場景之一。本文將以Flink DataStream API為核心,帶你從0到1實現“從Kafka消費數據并輸出到日志”的完整流程,掌握Flink Source的核心用法。
一、為什么選擇Kafka作為Flink的數據源?
Kafka作為分布式流處理平臺,具備高吞吐量、低延遲、持久化存儲等特性,是實時數據管道的首選。Flink與Kafka的集成方案經過多年優化,支持:
- 高吞吐量:單集群可處理數十萬條/秒的消息,滿足大規模實時數據處理需求;
- 持久化存儲:數據按時間順序寫入磁盤并保留一定周期,支持離線重放和故障恢復;
- 精確一次(Exactly-Once)消費語義:通過Kafka偏移量(Offset)管理和Flink檢查點(Checkpoint)機制保證數據一致性;
- 動態分區發現:自動感知Kafka主題的分區變化(如新增分區),無需重啟任務;
- 靈活的消費模式:支持從指定偏移量、時間戳或最新位置開始消費。
二、環境準備與依賴配置
1. 版本說明
本文基于以下版本實現(需保持版本兼容):
- Flink:1.20.1(最新穩定版)
- Kafka:3.4.0(Flink Kafka Connector兼容Kafka 2.8+)
- JDK:17+
- gradle 8.3+
2. gradle依賴
在gradle添加Flink核心依賴及Kafka Connector依賴,build.gradle配置可以是如下:
plugins {
id 'java' // Java項目插件
id 'application' // 支持main方法運行
}
// 設置主類(可選,用于application插件)
application {
mainClass.set('com.cn.daimajiangxin.flink.source.KafkaSourceDemo') // 替換為你的主類全限定名
}
// 依賴倉庫(Maven中央倉庫)
repositories {
mavenCentral()
}
// 依賴配置
dependencies {
// Flink核心依賴(生產環境通常標記為provided,由Flink運行時提供)
implementation 'org.apache.flink:flink-java:1.20.1'
implementation 'org.apache.flink:flink-streaming-java_2.12:1.20.1'
// Flink Kafka Connector(新版API,兼容Kafka 2.8+)
implementation 'org.apache.flink:flink-connector-kafka_2.12:1.20.1'
// SLF4J日志門面 + Log4j實現(避免日志警告)
implementation 'org.apache.logging.log4j:log4j-api:2.17.1'
implementation 'org.apache.logging.log4j:log4j-core:2.17.1'
implementation 'org.apache.logging.log4j:log4j-slf4j-impl:2.17.1'
}
// 編譯配置(可選,根據需要調整)
tasks.withType(JavaCompile) {
options.encoding = 'UTF-8' // 指定編碼
sourceCompatibility = JavaVersion.VERSION_17 // 兼容Java 8
targetCompatibility = JavaVersion.VERSION_17
}
三、核心概念:Flink Kafka Source的工作原理
在深入代碼前,需理解Flink Kafka Source的核心組件:
- KafkaSource:Flink提供的Kafka數據源連接器,負責與Kafka Broker建立連接、拉取消息;
- 反序列化器(Deserializer):將Kafka消息的字節數組(byte[])轉換為Flink可處理的數據類型(如String、POJO、Row等);
- 偏移量管理:記錄已消費的Kafka消息位置(Offset),確保故障恢復時能從斷點繼續消費;
- 檢查點(Checkpoint):Flink的容錯機制,定期將狀態(包括偏移量)持久化到存儲系統(如HDFS),保證Exactly-Once語義。
四、核心代碼實現:從Kafka讀取數據并輸出到日志
1. 流程概述
整個流程分為5步:
- 配置Kafka連接參數(如Broker地址、主題、消費者組);
- 創建Flink流執行環境(StreamExecutionEnvironment);
- 定義Kafka Source(使用新版KafkaSource);
- 將Source添加到執行環境,并處理數據(如打印到日志);
- 觸發任務執行。
2.代碼詳解
以下是完整的示例代碼,包含詳細注釋:
package com.cn.daimajiangxin.flink.source;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
public class KafkaSourceDemo {
private static final Logger LOG = LoggerFactory.getLogger(KafkaSourceDemo.class);
public static void main(String[] args) throws Exception {
// 1. 創建Flink流執行環境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 可選:啟用檢查點(生產環境必選,保證Exactly-Once語義)
env.enableCheckpointing(5000); // 每5秒做一次檢查點
// 啟用檢查點
env.enableCheckpointing(5000); // 每5秒做一次檢查點
// 設置檢查點超時時間
env.getCheckpointConfig().setCheckpointTimeout(60000);
// 2. 配置Kafka參數
String kafkaBootstrapServers = "172.30.244.152:9092"; // Kafka Broker地址
String topic = "test_topic"; // 目標主題
String consumerGroup = "flink-consumer-group"; // 消費者組ID
LOG.info("Connecting to Kafka at " + kafkaBootstrapServers);
LOG.info("Consuming topic: " + topic);
LOG.info("Consumer group: " + consumerGroup);
// 3. 定義Kafka Source(新版API)
KafkaSource`<String>` kafkaSourceDemo = KafkaSource.`<String>`builder()
.setBootstrapServers(kafkaBootstrapServers) // Kafka Broker地址
.setTopics(topic) // 訂閱的主題
.setGroupId(consumerGroup) // 消費者組
.setProperty("enable.auto.commit", "true")
.setProperty("auto.commit.interval.ms", "1000")
.setProperty("session.timeout.ms", "30000")
.setProperty("retry.backoff.ms", "1000")
.setProperty("reconnect.backoff.max.ms", "10000")
.setDeserializer(new KafkaRecordDeserializationSchema `<String>`() {
@Override
public void deserialize(ConsumerRecord<byte[], byte[]> record, Collector `<String>` out) throws IOException {
// 從ConsumerRecord中提取值(字節數組),并轉為字符串
String value = new String(record.value(), StandardCharsets.UTF_8);
LOG.info("Received message: " + value);
out.collect(value); // 將反序列化后的數據收集到Flink流中
}
@Override
public TypeInformation`<String>` getProducedType() {
return TypeInformation.of(String.class);
}
})
// 從最早偏移量開始消費(這樣即使沒有新消息也會讀取歷史數據)
.setStartingOffsets(OffsetsInitializer.earliest())
.build();
// 4. 將Kafka Source添加到Flink流環境,并處理數據
DataStream`<String>` kafkaStream = env.fromSource(
kafkaSourceDemo,
WatermarkStrategy.noWatermarks(), // 無水印(適用于無序數據場景)
"Kafka Source" // Source名稱(用于監控)
);
LOG.info("Kafka source created successfully");
// 5. 處理數據:將每條數據打印到日志(實際生產中可替換為寫入數據庫、消息隊列等)
kafkaStream.print("KafkaData");
LOG.info("Flink Kafka Source Demo started.");
// 6. 觸發任務執行
env.execute("Flink Kafka Source Demo");
}
}
3. 關鍵配置說明
- KafkaSource.Builder:新版Kafka Source的核心構建器,支持靈活配置;
- setDeserializer:指定反序列化方式,deserialize 接收Kafka的ConsumerRecord(包含鍵、值、偏移量等信息),提取值(record.value())并反序列化為字符,getProducedType聲明輸出數據的類型(此處為String);
- setStartingOffsets:控制消費起始位置(latest()從最新數據開始,earliest()從最早數據開始,生產環境常用OffsetsInitializer.committedOffsets()從上次提交的偏移量繼續);
- WatermarkStrategy:用于事件時間(Event Time)處理,示例中無時間窗口需求,故使用noWatermarks();
- PrintSinkFunction:Flink內置的日志打印Sink(true表示打印完整上下文,包含Subtask信息)。
五、運行與測試
在WSL2的Ubuntu 環境中安裝Kafka。
1. 安裝Kafka服務
-
下載Kafka二進制包
訪問Apache Kafka官網,選擇最新穩定版(如3.9.0),使用wget下載:wget https://mirrors.aliyun.com/apache/kafka/3.9.0/kafka_2.12-3.9.0.tgz -
解壓并配置環境變量
# 解壓到/opt/kafka(全局可訪問) sudo mkdir -p /opt/kafka tar -zxvf kafka_2.12-3.9.0.tgz -C /opt/kafka --strip-components=1 # 永久生效(編輯~/.bashrc) echo 'export KAFKA_HOME=/opt/kafka' >> /etc/profile echo 'export PATH=$KAFKA_HOME/bin:$PATH' >> /etc/profile source /etc/profile
2. 配置Kafka
Kafka的核心配置文件位于$KAFKA_HOME/config目錄,需修改以下兩個文件:
配置Kafka Broker(server.properties)
修改以下關鍵參數以適配WSL2環境:
# ==================== 核心角色與ID配置 ====================
# 啟用KRaft模式(默認已啟用)
# 單節點同時擔任Broker和控制器
process.roles=broker,controller
# 節點唯一ID(單節點必須設為0)
node.id=0
# 控制器ID(與node.id一致,單節點唯一)
controller.id=0
# ==================== 監聽端口配置 ====================
# 全局監聽端口(客戶端讀寫請求)和控制器監聽端口
# 多個監聽器使用逗號分隔,每個監聽器都需要指定安全協議
listeners=PLAINTEXT://0.0.0.0:9092,CONTROLLER://0.0.0.0:9093
# 對外暴露的地址(Windows主機通過localhost訪問)
# 多個公布的監聽器使用逗號分隔
advertised.listeners=PLAINTEXT://localhost:9092,CONTROLLER://localhost:9093
# 指定CONTROLLER監聽器的安全協議
listener.security.protocol.map=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT
# 定義控制器監聽器的名稱(KRaft模式必需)
controller.listener.names=CONTROLLER
# ==================== ZooKeeper兼容配置(可選) ====================
# 若需兼容舊客戶端,可保留ZooKeeper配置(但KRaft模式無需ZooKeeper)
# zookeeper.connect=localhost:2181
# ==================== 日志與分區配置 ====================
# 數據存儲目錄配置(Kafka的核心配置參數)
# Kafka將主題數據、索引文件等存儲在該目錄下
log.dirs=/opt/kafka/data
num.partitions=1
# 副本數(單節點設為1)
default.replication.factor=1
# 最小同步副本數(單節點設為1)
min.insync.replicas=1
# ==================== 日志存儲高級配置 ====================
# 日志保留時間(默認7天,生產環境根據存儲容量和需求調整)
# log.retention.hours=168
# 或按大小限制保留(單位:字節)
# log.retention.bytes=107374182400 # 100GB
# 單個分區日志段大小(默認1GB,可根據實際需求調整)
# log.segment.bytes=1073741824
# 日志段檢查和清理的時間間隔(默認300000ms=5分鐘)
# log.retention.check.interval.ms=300000
# 控制是否自動創建主題(生產環境建議禁用,改為手動創建)
# auto.create.topics.enable=false
# ==================== 控制器引導配置 ====================
# 控制器引導服務器(單節點指向自己,格式:host:port)
# 與控制器監聽端口一致
controller.quorum.bootstrap.servers=localhost:9093
# 控制器投票者配置(單節點設為0@localhost:9093)
controller.quorum.voters=0@localhost:9093
3.啟動Kafka服務
3.1初始化KRaft存儲目錄(首次啟動必需)
在KRaft模式下,需要先初始化元數據存儲:
# 生成集群ID并保存到變量
CLUSTER_ID=$($KAFKA_HOME/bin/kafka-storage.sh random-uuid)
echo "生成的集群ID: $CLUSTER_ID"
# 使用生成的集群ID格式化存儲目錄$KAFKA_HOME/bin/kafka-storage.sh format -t $CLUSTER_ID -c $KAFKA_HOME/config/server.properties
注意: 如果手動運行命令,請確保先執行生成集群ID的命令,然后使用實際生成的ID替換"$CLUSTER_ID"。
3.2啟動Kafka Broker
# 啟動Broker(日志輸出到$KAFKA_HOME/logs/server.log) $KAFKA_HOME/bin/kafka-server-start.sh -daemon $KAFKA_HOME/config/server.properties
3.3驗證服務狀態
檢查Kafka Broker進程:
ps -ef | grep kafka # 應看到Kafka進程
3.4創建測試主題
確保Kafka服務已啟動,并創建測試主題 test_topic:
$KAFKA_HOME/bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test_topic
3.5發送測試數據
使用Kafka內置的生產者工具發送測試消息到 test_topic:
# 啟動Kafka生產者控制臺
$KAFKA_HOME/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test_topic
# 輸入幾條測試消息(每行一條)
> hello flink
> flink kafka integration
> real-time data processing
3.6運行Flink程序
在IDE中直接運行 KafkaSourceDemo類的 main方法,或通過Gradle構建并運行:
# 構建項目
./gradlew clean build
# 運行Flink作業
./gradlew run
3.7驗證結果
成功運行后,你應該能在控制臺看到類似如下輸出:

六、進階配置與優化
1. 消費語義保證
在生產環境中,為了確保數據一致性,需要配置Flink的檢查點機制和Kafka偏移量提交策略:
// 1. 啟用檢查點
env.enableCheckpointing(5000); // 每5秒做一次檢查點
// 2. 獲取檢查點配置對象(Flink 1.20.1及以上版本推薦方式)
CheckpointConfig checkpointConfig = env.getCheckpointConfig();
// 3. 配置檢查點模式為EXACTLY_ONCE(精確一次語義)
checkpointConfig.setMode(CheckpointingMode.EXACTLY_ONCE);
// 4. 設置檢查點超時時間
checkpointConfig.setCheckpointTimeout(Duration.ofSeconds(60));
// 4. 配置從上次提交的偏移量繼續消費(生產環境推薦)
.setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST))
2. 并行度與資源配置
合理設置并行度可充分利用集群資源并提高吞吐量:
// 設置Flink作業的全局并行度
env.setParallelism(3); // 與Kafka主題分區數匹配
// 或單獨設置Source的并行度
KafkaSource`<String>` kafkaSource = KafkaSource.`<String>`builder()
// ... 其他配置 ...
.build();
DataStream`<String>` stream = env.fromSource(
kafkaSource,
WatermarkStrategy.noWatermarks(),
"Kafka Source")
.setParallelism(3); // Source并行度
3. 高級反序列化
除了基礎的字符串反序列化,還可以使用更靈活的反序列化方式:
3.1 使用預定義反序列化器
// 使用Flink提供的String反序列化器 .setDeserializer(KafkaRecordDeserializationSchema.valueOnly(StringDeserializer.class))
3.2 自定義POJO反序列化
如果Kafka消息是JSON格式,可以使用Jackson等庫將其反序列化為POJO對象:
public class User {
private String id;
private String name;
private int age;
// getters, setters, constructors...
}
// 自定義POJO反序列化器
.setDeserializer(new KafkaRecordDeserializationSchema`<User>`() {
private final ObjectMapper mapper = new ObjectMapper();
@Override
public void deserialize(ConsumerRecord<byte[], byte[]> record, Collector`<User>` out) throws IOException {
User user = mapper.readValue(record.value(), User.class);
out.collect(user);
}
@Override
public TypeInformation`<User>` getProducedType() {
return TypeInformation.of(User.class);
}
})
七、常見問題與解決方案
1. 連接超時問題
問題現象:程序啟動后報 org.apache.kafka.common.errors.TimeoutException
解決方案:
- 檢查Kafka服務是否正常運行:
ps -ef | grep kafka - 確認
bootstrap.servers配置正確,特別是在WSL2環境中確保端口映射正確 - 檢查防火墻設置,確保9092端口開放
2. 數據消費不完整
問題現象:部分Kafka消息未被Flink消費
解決方案:
- 檢查Kafka主題的分區數與Flink Source并行度是否匹配
- 確認
setStartingOffsets配置正確,生產環境建議使用OffsetsInitializer.committedOffsets() - 檢查檢查點機制是否正常啟用,確保偏移量正確提交
3. 性能優化
對于高吞吐量場景,可以通過以下方式優化性能:
- 增加Kafka主題分區數(與Flink并行度匹配)
- 調大
fetch.max.bytes和max.partition.fetch.bytes參數,增加單次拉取的數據量 - 啟用增量檢查點,減少檢查點開銷
- 使用
setUnboundedUsePreviousEventTimeWatermark()優化水印生成
八、總結與擴展
本文詳細介紹了如何使用Flink從Kafka讀取數據,包括環境準備、代碼實現、運行測試以及進階配置。通過本文的學習,你應該能夠掌握Flink數據源的核心用法,為構建企業級實時數據處理應用打下堅實基礎。
在實際應用中,Flink還支持多種其他數據源,如:
- 文件系統(HDFS、本地文件)
- 數據庫(MySQL、PostgreSQL、MongoDB等)
- 消息隊列(RabbitMQ、Pulsar等)
- 自定義數據源(通過實現
SourceFunction接口)
后續文章將繼續深入探討Flink的數據轉換、窗口計算、狀態管理等核心概念,敬請關注!

在實時數據處理場景中,數據源(Source)是整個數據處理流程的起點。Flink作為流批一體的計算框架,提供了豐富的Source接口支持,其中通過Kafka獲取實時數據是最常見的場景之一。本文將以Flink DataStream API為核心,帶你從0到1實現“從Kafka消費數據并輸出到日志”的完整流程,掌握Flink Source的核心用法。
浙公網安備 33010602011771號