18、Flink CDC監聽MySQL-Binlog實現數據監聽
一、CDC簡介:
CDC(Change Data Capture)是變更數據捕獲的簡稱,其核心思想是監測并捕獲數據庫的變動(包括數據或數據表的插入、更新、刪除等),將這些變更按發生的順序完整記錄下來,并寫入到消息中間件或數據倉庫中以供其他服務進行訂閱及消費。CDC技術廣泛應用于數據同步、數據分發、數據采集等場景,是數據集成領域的重要工具。
1、CDC常用工具:
|
CDC工具 |
Debezium |
Canal |
Maxwell |
Flink CDC |
|
核心定位 |
多數據源 CDC 框架 |
輕量 MySQL 同步工具 |
MySQL 專屬極簡工具 |
實時處理一體化框架 |
|
支持數據源 |
MySQL、PostgreSQL、Oracle 等 |
MySQL(最佳)、PostgreSQL 等 |
僅 MySQL |
MySQL、PostgreSQL 等(基于 Debezium) |
|
典型輸出目標 |
Kafka、Flink/Spark |
Kafka、RocketMQ、數據庫 |
Kafka、Redis、文件 |
Kafka、ES、Hive 等 |
|
突出優勢 |
支持廣泛,全量 + 增量同步 |
部署簡單,國內生態適配好 |
配置極簡,資源占用低 |
支持實時處理,Exactly-Once 語義 |
|
適用場景 |
多源同步、復雜數據管道 |
MySQL 為主的輕量同步 |
簡單 MySQL 變更同步 |
實時數倉、捕獲 + 處理一體化 |
2、相關參考:
二、Flink CDC工作原理:
Flink CDC(Change Data Capture)的核心工作原理是通過捕獲數據庫的變更日志(如 binlog、WAL 等),將其轉換為結構化事件流,接入 Flink 實時計算引擎進行處理,并最終同步到目標系統。其工作流程可拆解為以下關鍵步驟:
1、Debezium 捕獲解析數據庫日志:
Flink CDC 本身不直接解析數據庫日志,而是集成 Debezium(開源 CDC 框架)作為底層捕獲引擎,支持 MySQL、PostgreSQL、Oracle 等多種數據庫,具體邏輯如下:
(1)、模擬從節點獲取日志:
- 對于支持主從復制的數據庫(如 MySQL),Debezium 會偽裝成數據庫的從節點,向主庫發送復制請求,獲取變更日志(如 MySQL 的 binlog、PostgreSQL 的 WAL 日志)。
(2)、解析日志為結構化事件:
(3)數據庫日志通常是二進制格式,Debezium 會將其解析為包含詳細信息的結構化事件,包括:
- 操作類型(INSERT/UPDATE/DELETE);
- 變更數據(UPDATE 時包含舊值和新值,INSERT/DELETE 包含對應數據);
- 表名、數據庫名、操作時間戳等元數據。
2. 全量 + 增量同步(無鎖機制):
Flink CDC 支持 “全量數據初始化 + 增量變更同步” 的無縫銜接,且通過無鎖機制避免影響源庫性能:
(1)、全量快照階段:
- 首次同步時,會對數據庫表進行一次全量快照(讀取當前所有數據),確保初始數據完整。
(2)、增量同步階段:
- 快照完成后,自動切換到增量同步模式,通過監控 binlog 等日志獲取實時變更,且通過記錄日志位置(如 binlog 的文件名和偏移量)保證全量與增量數據的連續性(無重復、無遺漏)。
3、 封裝為 Flink Source 流入引擎:
解析后的結構化事件會被封裝為 Flink Source 連接器,直接作為 Flink 的輸入流:
(1)、變更事件以流的形式進入 Flink 計算引擎,每條事件對應一條數據記錄,可通過 Flink 的 DataStream API 或 Table/SQL 進行處理。
4、Flink實時數據處理:
Flink CDC 不僅是 “捕獲工具”,更能結合 Flink 的實時計算能力對變更數據進行處理:
(1)、數據清洗與轉換:
- 過濾無效數據、格式轉換(如 JSON 轉 Avro)、字段映射等。
(2)、關聯與聚合:
- 支持與維度表(如 MySQL 維表、HBase 維表)關聯,或進行窗口聚合(如統計分鐘級變更量)。
(3)、狀態管理:
- 利用 Flink 的狀態后端(如 RocksDB)保存中間結果,支持復雜邏輯(如去重、累計計算)。
5、基于 Checkpoint 保證一致性:
Flink CDC 依賴 Flink 的 Checkpoint 機制 確保數據處理的一致性:
(1)、Checkpoint 觸發:
- 定期將當前處理進度(包括 CDC 捕獲的日志位置、算子狀態等)持久化到存儲(如 HDFS、本地文件)。
(2)、故障恢復:
- 若 Flink 任務失敗,可從最近一次 Checkpoint 恢復狀態和日志位置,保證數據不丟失、不重復,實現 Exactly-Once 語義(端到端一致性需下游 Sink 配合支持)。
6. 通過 Sink 同步到目標系統:
處理后的變更數據通過 Flink 的 Sink 連接器 寫入目標存儲,支持 Kafka、Elasticsearch、Hive、MySQL、TiDB 等多種系統。
三、MySQL 配置(開啟 Binlog):
1、開啟 Binlog(ROW 模式):
# MySQL 配置文件 # Linux:my.cnf配置文件(/etc/mysql/) # Window:my.ini配置文件(C:\ProgramData\MySQL\MySQL Server 5.7\) # 開啟 Binlog log_bin = mysql-bin # 選擇 ROW 模式(記錄行級變更) binlog-format = ROW # 配置數據庫唯一 ID(與 Canal 服務端的 slaveId 不同) server-id = 1


2、重啟 MySQL 并驗證:
# 打開命令提示符(cmd/services.msc): # 按 Win + R 鍵,輸入 cmd,然后按 Enter 鍵打開命令提示符窗口。 # 停止MySQL服務: net stop MySQL57 # 啟動MySQL服務: net start MySQL57 # 驗證 SHOW VARIABLES LIKE 'log_bin'; SHOW VARIABLES LIKE 'binlog_format';

四、SpringBoot整合Flink CDC實現MySQL數據監聽:
1、POM配置:
<!-- Flink 核心 -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.18.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>1.18.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>1.18.1</version>
</dependency>
<!-- Flink Table 核心依賴 -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge</artifactId>
<version>1.18.1</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_2.12</artifactId>
<version>1.18.1</version>
</dependency>
<!-- Flink Connector Base -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-base</artifactId>
<version>1.18.1</version>
</dependency>
<!-- Flink CDC MySQL -->
<dependency>
<groupId>com.ververica</groupId>
<artifactId>flink-connector-mysql-cdc</artifactId>
<version>3.0.1</version>
</dependency>
2、YML配置:
flink: cdc: # 是否開啟CDC監聽 auto-start: true # 自定義一個唯一的id server-id: "123456" # 數據庫配置 mysql: hostname: localhost port: 3306 username: root password: 123
3、Entity類聲明:
DataChangeType.class
/** * Flink CDC數據變更類型枚舉 * 1、"c"表示創建 * 2、"u"表示更新 * 3、"d"表示刪除 * 4、"r"表示讀取 */ public enum DataChangeType { INSERT("c"), UPDATE("u"), DELETE("d"), READ("r"); private final String code; DataChangeType(String code) { this.code = code; } public static DataChangeType getByCode(String code) { for (DataChangeType type : values()) { if (type.code.equals(code)) { return type; } } return null; } }
FlinkCdcProperties.class
import lombok.Data; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.stereotype.Component; @Data @Component @ConfigurationProperties(prefix = "flink.cdc") public class FlinkCdcProperties { /** * 是否自動啟動CDC監聽 */ private boolean autoStart; private String serverId; /** * MySQL配置 */ private Mysql mysql = new Mysql(); @Data public static class Mysql { private String hostname; private int port; private String username; private String password; } }
4、FlinkCdcRunner數據變更監聽啟動器:
import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; import com.iven.flinkcdcdemoservice.entity.DataChangeType; import com.iven.flinkcdcdemoservice.entity.FlinkCdcProperties; import com.iven.flinkcdcdemoservice.handler.FlinkCdcHandlerRegistry; import com.iven.flinkcdcdemoservice.handler.FlinkCdcHandler; import com.ververica.cdc.connectors.mysql.source.MySqlSource; import com.ververica.cdc.connectors.mysql.table.StartupOptions; import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.restartstrategy.RestartStrategies; import org.apache.flink.api.common.time.Time; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.ProcessFunction; import org.apache.flink.util.Collector; import org.springframework.boot.CommandLineRunner; import org.springframework.stereotype.Component; import java.io.Serializable; import java.util.*; @Slf4j @Component @RequiredArgsConstructor public class FlinkCdcRunner implements CommandLineRunner { // 配置屬性 private final FlinkCdcProperties properties; // 處理器注冊中心 private final FlinkCdcHandlerRegistry handlerRegistry; @Override public void run(String... args) throws Exception { // 總開關關閉則不啟動 if (!properties.isAutoStart()) { log.info("Flink CDC 總開關關閉,不啟動監聽"); return; } // 沒有需要監聽的表則不啟動 List<String> monitoredTables = handlerRegistry.getMonitoredTables(); if (monitoredTables.isEmpty()) { log.warn("未發現需要監聽的表(未實現FlinkCdcTableHandler),不啟動監聽"); return; } // 1. 創建Flink執行環境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 設置并行度為 1: server-id 的數量必須 ≥ 并行度 env.setParallelism(1); // 啟用檢查點(可選) // env.enableCheckpointing(5000); // 配置檢查點存儲路徑(本地路徑或分布式存儲如HDFS) // env.getCheckpointConfig().setCheckpointStorage("file:///tmp/flink-cdc-checkpoints"); // 檢查點超時時間(60秒未完成則取消) // env.getCheckpointConfig().setCheckpointTimeout(60000); // 允許檢查點失敗次數(默認0,即一次失敗則任務失敗) // env.getCheckpointConfig().setTolerableCheckpointFailureNumber(3); // 禁用檢查點 env.getCheckpointConfig().disableCheckpointing(); // 重試次數/重試間隔 env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, Time.seconds(10))); // 2. 配置MySQL CDC源(動態設置需要監聽的表) MySqlSource<String> mySqlSource = MySqlSource.<String>builder() .serverId(properties.getServerId()) .hostname(properties.getMysql().getHostname()) .port(properties.getMysql().getPort()) .username(properties.getMysql().getUsername()) .password(properties.getMysql().getPassword()) // 從監聽的表中提取數據庫列表(去重) .databaseList(extractDatabases(monitoredTables)) // 直接使用注冊中心收集的表列表 .tableList(monitoredTables.toArray(new String[0])) // 反序列化為JSON .deserializer(new JsonDebeziumDeserializationSchema()) /* initial: 初始化快照,即全量導入后增量導入(檢測更新數據寫入) * latest: 只進行增量導入(不讀取歷史變化) * timestamp: 指定時間戳進行數據導入(大于等于指定時間錯讀取數據) */ .startupOptions(StartupOptions.latest()) .build(); // 3. 讀取CDC數據流并處理 DataStreamSource<String> dataStream = env.fromSource( mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL-CDC-Source" ); // 4. 解析數據并路由到對應處理器,使用靜態內部類代替匿名內部類 dataStream.process(new CdcDataProcessFunction(handlerRegistry)); // 5. 啟動Flink作業 env.execute("Flink-CDC-動態監聽作業"); } /** * 靜態內部類實現ProcessFunction,確保可序列化 */ private static class CdcDataProcessFunction extends ProcessFunction<String, Void> implements Serializable { private final FlinkCdcHandlerRegistry handlerRegistry; // 通過構造函數傳入依賴 public CdcDataProcessFunction(FlinkCdcHandlerRegistry handlerRegistry) { this.handlerRegistry = handlerRegistry; } @Override public void processElement(String json, Context ctx, Collector<Void> out) { try { JSONObject cdcData = JSON.parseObject(json); // 操作類型:c/u/d String op = cdcData.getString("op"); JSONObject source = cdcData.getJSONObject("source"); String dbName = source.getString("db"); String tableName = source.getString("table"); // 庫名.表名 String fullTableName = dbName + "." + tableName; // 找到對應表的處理器 FlinkCdcHandler handler = handlerRegistry.getHandler(fullTableName); if (handler == null) { log.warn("表[{}]無處理器,跳過處理", fullTableName); return; } // 按事件類型分發 DataChangeType changeType = DataChangeType.getByCode(op); if (changeType == null) { log.warn("未知操作類型:{}", op); return; } switch (changeType) { case INSERT: List<Map<String, Object>> insertData = Collections.singletonList( cdcData.getJSONObject("after").getInnerMap() ); handler.handleInsert(insertData); break; case UPDATE: List<Map<String, Object>> beforeData = Collections.singletonList( cdcData.getJSONObject("before").getInnerMap() ); List<Map<String, Object>> afterData = Collections.singletonList( cdcData.getJSONObject("after").getInnerMap() ); handler.handleUpdate(beforeData, afterData); break; case DELETE: List<Map<String, Object>> deleteData = Collections.singletonList( cdcData.getJSONObject("before").getInnerMap() ); handler.handleDelete(deleteData); break; case READ: // 可以忽略快照階段的讀取操作,或根據需要處理 log.debug("處理快照讀取操作: {}", fullTableName); break; } } catch (Exception e) { log.error("Flink-CDC數據處理發生未預期異常", e); } } } /** * 從表名(庫名.表名)中提取數據庫列表(去重) * * @param tables * @return */ private String[] extractDatabases(List<String> tables) { // 截取庫名(如demo.tb_user → demo) return tables.stream() .map(table -> table.split("\\.")[0]) .distinct() .toArray(String[]::new); } }
5、FlinkCdcHandlerRegistry策略路由:
import lombok.extern.slf4j.Slf4j; import org.springframework.context.ApplicationContext; import org.springframework.context.ApplicationContextAware; import org.springframework.stereotype.Component; import java.io.Serializable; import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; /** * Flink CDC處理器注冊中心 * 處理器注冊中心(自動掃描監聽表) */ @Slf4j @Component public class FlinkCdcHandlerRegistry implements ApplicationContextAware, Serializable { // 緩存:表名(庫名.表名)→ 處理器 private final Map<String, FlinkCdcHandler> handlerMap = new ConcurrentHashMap<>(); // 收集所有需要監聽的表(供Flink CDC配置使用) private List<String> monitoredTables; @Override public void setApplicationContext(ApplicationContext applicationContext) { // 掃描所有實現類 Map<String, FlinkCdcHandler> beans = applicationContext.getBeansOfType(FlinkCdcHandler.class); beans.values().forEach(handler -> { String tableName = handler.getTableName(); handlerMap.put(tableName, handler); log.info("注冊監聽表:{} → 處理器:{}", tableName, handler.getClass().getSimpleName()); }); // 提取所有需要監聽的表 monitoredTables = new ArrayList<>(handlerMap.keySet()); } /** * 獲取指定表的處理器 * * @param tableName * @return */ public FlinkCdcHandler getHandler(String tableName) { return handlerMap.get(tableName); } /** * 獲取所有需要監聽的表(供Flink CDC配置) * * @return */ public List<String> getMonitoredTables() { return monitoredTables; } }
6、FlinkCdcHandler策略模式數據處理:
FlinkCdcHandler
import java.util.List; import java.util.Map; /** * 表數據處理接口,每個監聽的表需實現此接口 */ public interface FlinkCdcHandler { /** * 獲取監聽的表名(格式:庫名.表名,如demo.tb_user) */ String getTableName(); /** * 處理新增數據 */ default void handleInsert(List<Map<String, Object>> dataList) { // 默認空實現,子類可重寫 } /** * 處理更新數據(包含變更前和變更后的數據) * @param beforeList 變更前數據 * @param afterList 變更后數據 */ default void handleUpdate(List<Map<String, Object>> beforeList, List<Map<String, Object>> afterList) { // 默認空實現,子類可重寫 } /** * 處理刪除數據 */ default void handleDelete(List<Map<String, Object>> dataList) { // 默認空實現,子類可重寫 } }
TbUserFlinkCdcHandler
import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; import java.io.Serializable; import java.util.List; import java.util.Map; @Slf4j @Component public class TbUserFlinkCdcHandler implements FlinkCdcHandler, Serializable { @Override public String getTableName() { return "demo.tb_user"; } @Override public void handleInsert(List<Map<String, Object>> dataList) { log.info("處理tb_user新增數據,共{}條", dataList.size()); dataList.forEach(data -> { String id = (String)data.get("id"); String username = (String) data.get("name"); // 業務邏輯:如同步到ES、緩存等 log.info("新增用戶:id={}, name={}", id, username); }); } @Override public void handleUpdate(List<Map<String, Object>> beforeList, List<Map<String, Object>> afterList) { log.info("處理tb_user更新數據,共{}條", afterList.size()); for (int i = 0; i < afterList.size(); i++) { Map<String, Object> before = beforeList.get(i); Map<String, Object> after = afterList.get(i); log.info("更新用戶:id={}, 舊用戶名={}, 新用戶名={}", after.get("id"), before.get("name"), after.get("name")); } } @Override public void handleDelete(List<Map<String, Object>> dataList) { log.info("處理tb_user刪除數據,共{}條", dataList.size()); dataList.forEach(data -> { log.info("刪除用戶:id={}", data.get("id")); }); } }

項目啟動時,FlinkCdcHandlerRegistry 掃描并注冊所有 FlinkCdcHandler 實現類,建立表與處理器的映射;FlinkCdcRunner 在 Spring 容器初始化后觸發,檢查啟動條件,初始化 Flink 環境并構建 CDC 數據源,將數據流接入 CdcDataProcessFunction,該函數解析變更事件并路由到對應處理器執行業務邏輯,最后啟動 Flink 作業持續監聽處理。
浙公網安備 33010602011771號