<output id="qn6qe"></output>

    1. <output id="qn6qe"><tt id="qn6qe"></tt></output>
    2. <strike id="qn6qe"></strike>

      亚洲 日本 欧洲 欧美 视频,日韩中文字幕有码av,一本一道av中文字幕无码,国产线播放免费人成视频播放,人妻少妇偷人无码视频,日夜啪啪一区二区三区,国产尤物精品自在拍视频首页,久热这里只有精品12

      18、Flink CDC監聽MySQL-Binlog實現數據監聽

      一、CDC簡介:

      CDC(Change Data Capture)是變更數據捕獲的簡稱,其核心思想是監測并捕獲數據庫的變動(包括數據或數據表的插入、更新、刪除等),將這些變更按發生的順序完整記錄下來,并寫入到消息中間件或數據倉庫中以供其他服務進行訂閱及消費。CDC技術廣泛應用于數據同步、數據分發、數據采集等場景,是數據集成領域的重要工具。

      1、CDC常用工具:

      CDC工具

      Debezium

      Canal

      Maxwell

      Flink CDC

      核心定位

      多數據源 CDC 框架

      輕量 MySQL 同步工具

      MySQL 專屬極簡工具

      實時處理一體化框架

      支持數據源

      MySQL、PostgreSQL、Oracle 等

      MySQL(最佳)、PostgreSQL 等

      僅 MySQL

      MySQL、PostgreSQL 等(基于 Debezium)

      典型輸出目標

      Kafka、Flink/Spark

      Kafka、RocketMQ、數據庫

      Kafka、Redis、文件

      Kafka、ES、Hive 等

      突出優勢

      支持廣泛,全量 + 增量同步

      部署簡單,國內生態適配好

      配置極簡,資源占用低

      支持實時處理,Exactly-Once 語義

      適用場景

      多源同步、復雜數據管道

      MySQL 為主的輕量同步

      簡單 MySQL 變更同步

      實時數倉、捕獲 + 處理一體化

      2、相關參考:

      Flink-CDC 開源地址

      Flink-CDC 中文文檔

      Canal學習筆記

       

      二、Flink CDC工作原理:

      Flink CDC(Change Data Capture)的核心工作原理是通過捕獲數據庫的變更日志(如 binlog、WAL 等),將其轉換為結構化事件流,接入 Flink 實時計算引擎進行處理,并最終同步到目標系統。其工作流程可拆解為以下關鍵步驟:

      1、Debezium 捕獲解析數據庫日志:

      Flink CDC 本身不直接解析數據庫日志,而是集成 Debezium(開源 CDC 框架)作為底層捕獲引擎,支持 MySQL、PostgreSQL、Oracle 等多種數據庫,具體邏輯如下:

      (1)、模擬從節點獲取日志:

      • 對于支持主從復制的數據庫(如 MySQL),Debezium 會偽裝成數據庫的從節點,向主庫發送復制請求,獲取變更日志(如 MySQL 的 binlog、PostgreSQL 的 WAL 日志)。

      (2)、解析日志為結構化事件:

      (3)數據庫日志通常是二進制格式,Debezium 會將其解析為包含詳細信息的結構化事件,包括:

      • 操作類型(INSERT/UPDATE/DELETE);
      • 變更數據(UPDATE 時包含舊值和新值,INSERT/DELETE 包含對應數據);
      • 表名、數據庫名、操作時間戳等元數據。

      2. 全量 + 增量同步(無鎖機制):

      Flink CDC 支持 “全量數據初始化 + 增量變更同步” 的無縫銜接,且通過無鎖機制避免影響源庫性能:

      (1)、全量快照階段:

      • 首次同步時,會對數據庫表進行一次全量快照(讀取當前所有數據),確保初始數據完整。

      (2)、增量同步階段:

      • 快照完成后,自動切換到增量同步模式,通過監控 binlog 等日志獲取實時變更,且通過記錄日志位置(如 binlog 的文件名和偏移量)保證全量與增量數據的連續性(無重復、無遺漏)。

      3、 封裝為 Flink Source 流入引擎:

      解析后的結構化事件會被封裝為 Flink Source 連接器,直接作為 Flink 的輸入流:

      (1)、變更事件以流的形式進入 Flink 計算引擎,每條事件對應一條數據記錄,可通過 Flink 的 DataStream API 或 Table/SQL 進行處理。

      4、Flink實時數據處理:

      Flink CDC 不僅是 “捕獲工具”,更能結合 Flink 的實時計算能力對變更數據進行處理:

      (1)、數據清洗與轉換:

      • 過濾無效數據、格式轉換(如 JSON 轉 Avro)、字段映射等。

      (2)、關聯與聚合:

      • 支持與維度表(如 MySQL 維表、HBase 維表)關聯,或進行窗口聚合(如統計分鐘級變更量)。

      (3)、狀態管理:

      • 利用 Flink 的狀態后端(如 RocksDB)保存中間結果,支持復雜邏輯(如去重、累計計算)。

      5、基于 Checkpoint 保證一致性:

      Flink CDC 依賴 Flink 的 Checkpoint 機制 確保數據處理的一致性:

      (1)、Checkpoint 觸發:

      • 定期將當前處理進度(包括 CDC 捕獲的日志位置、算子狀態等)持久化到存儲(如 HDFS、本地文件)。

      (2)、故障恢復:

      • 若 Flink 任務失敗,可從最近一次 Checkpoint 恢復狀態和日志位置,保證數據不丟失、不重復,實現 Exactly-Once 語義(端到端一致性需下游 Sink 配合支持)。

      6. 通過 Sink 同步到目標系統:

      處理后的變更數據通過 Flink 的 Sink 連接器 寫入目標存儲,支持 Kafka、Elasticsearch、Hive、MySQL、TiDB 等多種系統。

       

      三、MySQL 配置(開啟 Binlog):

      1、開啟 Binlog(ROW 模式):

      # MySQL 配置文件
      # Linux:my.cnf配置文件(/etc/mysql/)
      # Window:my.ini配置文件(C:\ProgramData\MySQL\MySQL Server 5.7\)
      # 開啟 Binlog
      log_bin = mysql-bin
      
      # 選擇 ROW 模式(記錄行級變更)
      binlog-format = ROW
      
      # 配置數據庫唯一 ID(與 Canal 服務端的 slaveId 不同)
      server-id = 1

      1

      2

      2、重啟 MySQL 并驗證:

      # 打開命令提示符(cmd/services.msc):
      # 按 Win + R 鍵,輸入 cmd,然后按 Enter 鍵打開命令提示符窗口。
      # 停止MySQL服務:
      net stop MySQL57
      
      # 啟動MySQL服務:
      net start MySQL57
      
      # 驗證
      SHOW VARIABLES LIKE 'log_bin';
      SHOW VARIABLES LIKE 'binlog_format';

      3

       

      四、SpringBoot整合Flink CDC實現MySQL數據監聽:

      1、POM配置:

              <!-- Flink 核心 -->
              <dependency>
                  <groupId>org.apache.flink</groupId>
                  <artifactId>flink-java</artifactId>
                  <version>1.18.1</version>
              </dependency>
              <dependency>
                  <groupId>org.apache.flink</groupId>
                  <artifactId>flink-streaming-java</artifactId>
                  <version>1.18.1</version>
              </dependency>
              <dependency>
                  <groupId>org.apache.flink</groupId>
                  <artifactId>flink-clients</artifactId>
                  <version>1.18.1</version>
              </dependency>
              <!-- Flink Table 核心依賴 -->
              <dependency>
                  <groupId>org.apache.flink</groupId>
                  <artifactId>flink-table-api-java-bridge</artifactId>
                  <version>1.18.1</version>
              </dependency>
              <dependency>
                  <groupId>org.apache.flink</groupId>
                  <artifactId>flink-table-planner_2.12</artifactId>
                  <version>1.18.1</version>
              </dependency>
              <!-- Flink Connector Base -->
              <dependency>
                  <groupId>org.apache.flink</groupId>
                  <artifactId>flink-connector-base</artifactId>
                  <version>1.18.1</version>
              </dependency>
              <!-- Flink CDC MySQL -->
              <dependency>
                  <groupId>com.ververica</groupId>
                  <artifactId>flink-connector-mysql-cdc</artifactId>
                  <version>3.0.1</version>
              </dependency>

      2、YML配置:

      flink:
        cdc:
          # 是否開啟CDC監聽
          auto-start: true
          # 自定義一個唯一的id
          server-id: "123456"
          # 數據庫配置
          mysql:
            hostname: localhost
            port: 3306
            username: root
            password: 123

      3、Entity類聲明:

      DataChangeType.class

      /**
       * Flink CDC數據變更類型枚舉
       * 1、"c"表示創建
       * 2、"u"表示更新
       * 3、"d"表示刪除
       * 4、"r"表示讀取
       */
      public enum DataChangeType {
      
          INSERT("c"),
          UPDATE("u"),
          DELETE("d"),
          READ("r");
      
          private final String code;
      
          DataChangeType(String code) {
              this.code = code;
          }
      
          public static DataChangeType getByCode(String code) {
              for (DataChangeType type : values()) {
                  if (type.code.equals(code)) {
                      return type;
                  }
              }
              return null;
          }
      
      }

      FlinkCdcProperties.class

      import lombok.Data;
      import org.springframework.boot.context.properties.ConfigurationProperties;
      import org.springframework.stereotype.Component;
      
      
      
      @Data
      @Component
      @ConfigurationProperties(prefix = "flink.cdc")
      public class FlinkCdcProperties {
      
          /**
           * 是否自動啟動CDC監聽
           */
          private boolean autoStart;
      
          private String serverId;
      
          /**
           * MySQL配置
           */
          private Mysql mysql = new Mysql();
      
          @Data
          public static class Mysql {
              private String hostname;
              private int port;
              private String username;
              private String password;
          }
      
      }

      4、FlinkCdcRunner數據變更監聽啟動器:

      import com.alibaba.fastjson.JSON;
      import com.alibaba.fastjson.JSONObject;
      import com.iven.flinkcdcdemoservice.entity.DataChangeType;
      import com.iven.flinkcdcdemoservice.entity.FlinkCdcProperties;
      import com.iven.flinkcdcdemoservice.handler.FlinkCdcHandlerRegistry;
      import com.iven.flinkcdcdemoservice.handler.FlinkCdcHandler;
      import com.ververica.cdc.connectors.mysql.source.MySqlSource;
      import com.ververica.cdc.connectors.mysql.table.StartupOptions;
      import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
      import lombok.RequiredArgsConstructor;
      import lombok.extern.slf4j.Slf4j;
      import org.apache.flink.api.common.eventtime.WatermarkStrategy;
      import org.apache.flink.api.common.restartstrategy.RestartStrategies;
      import org.apache.flink.api.common.time.Time;
      import org.apache.flink.streaming.api.datastream.DataStreamSource;
      import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
      import org.apache.flink.streaming.api.functions.ProcessFunction;
      import org.apache.flink.util.Collector;
      import org.springframework.boot.CommandLineRunner;
      import org.springframework.stereotype.Component;
      
      import java.io.Serializable;
      import java.util.*;
      
      @Slf4j
      @Component
      @RequiredArgsConstructor
      public class FlinkCdcRunner implements CommandLineRunner {
      
          // 配置屬性
          private final FlinkCdcProperties properties;
      
          // 處理器注冊中心
          private final FlinkCdcHandlerRegistry handlerRegistry;
      
          @Override
          public void run(String... args) throws Exception {
              // 總開關關閉則不啟動
              if (!properties.isAutoStart()) {
                  log.info("Flink CDC 總開關關閉,不啟動監聽");
                  return;
              }
      
              // 沒有需要監聽的表則不啟動
              List<String> monitoredTables = handlerRegistry.getMonitoredTables();
              if (monitoredTables.isEmpty()) {
                  log.warn("未發現需要監聽的表(未實現FlinkCdcTableHandler),不啟動監聽");
                  return;
              }
      
              // 1. 創建Flink執行環境
              StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
              // 設置并行度為 1: server-id 的數量必須 ≥ 并行度
              env.setParallelism(1);
              // 啟用檢查點(可選)
              // env.enableCheckpointing(5000);
              // 配置檢查點存儲路徑(本地路徑或分布式存儲如HDFS)
              // env.getCheckpointConfig().setCheckpointStorage("file:///tmp/flink-cdc-checkpoints");
              // 檢查點超時時間(60秒未完成則取消)
              // env.getCheckpointConfig().setCheckpointTimeout(60000);
              // 允許檢查點失敗次數(默認0,即一次失敗則任務失敗)
              // env.getCheckpointConfig().setTolerableCheckpointFailureNumber(3);
              // 禁用檢查點
              env.getCheckpointConfig().disableCheckpointing();
              // 重試次數/重試間隔
              env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3,  Time.seconds(10)));
      
              // 2. 配置MySQL CDC源(動態設置需要監聽的表)
              MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
                      .serverId(properties.getServerId())
                      .hostname(properties.getMysql().getHostname())
                      .port(properties.getMysql().getPort())
                      .username(properties.getMysql().getUsername())
                      .password(properties.getMysql().getPassword())
                      // 從監聽的表中提取數據庫列表(去重)
                      .databaseList(extractDatabases(monitoredTables))
                      // 直接使用注冊中心收集的表列表
                      .tableList(monitoredTables.toArray(new String[0]))
                      // 反序列化為JSON
                      .deserializer(new JsonDebeziumDeserializationSchema())
                      /* initial: 初始化快照,即全量導入后增量導入(檢測更新數據寫入)
                       * latest: 只進行增量導入(不讀取歷史變化)
                       * timestamp: 指定時間戳進行數據導入(大于等于指定時間錯讀取數據)
                       */
                      .startupOptions(StartupOptions.latest())
                      .build();
      
              // 3. 讀取CDC數據流并處理
              DataStreamSource<String> dataStream = env.fromSource(
                      mySqlSource,
                      WatermarkStrategy.noWatermarks(),
                      "MySQL-CDC-Source"
              );
      
              // 4. 解析數據并路由到對應處理器,使用靜態內部類代替匿名內部類
              dataStream.process(new CdcDataProcessFunction(handlerRegistry));
      
              // 5. 啟動Flink作業
              env.execute("Flink-CDC-動態監聽作業");
          }
      
          /**
           * 靜態內部類實現ProcessFunction,確保可序列化
           */
          private static class CdcDataProcessFunction extends ProcessFunction<String, Void> implements Serializable {
              private final FlinkCdcHandlerRegistry handlerRegistry;
      
              // 通過構造函數傳入依賴
              public CdcDataProcessFunction(FlinkCdcHandlerRegistry handlerRegistry) {
                  this.handlerRegistry = handlerRegistry;
              }
      
              @Override
              public void processElement(String json, Context ctx, Collector<Void> out) {
                  try {
                      JSONObject cdcData = JSON.parseObject(json);
                      // 操作類型:c/u/d
                      String op = cdcData.getString("op");
                      JSONObject source = cdcData.getJSONObject("source");
                      String dbName = source.getString("db");
                      String tableName = source.getString("table");
                      // 庫名.表名
                      String fullTableName = dbName + "." + tableName;
      
                      // 找到對應表的處理器
                      FlinkCdcHandler handler = handlerRegistry.getHandler(fullTableName);
                      if (handler == null) {
                          log.warn("表[{}]無處理器,跳過處理", fullTableName);
                          return;
                      }
      
                      // 按事件類型分發
                      DataChangeType changeType = DataChangeType.getByCode(op);
                      if (changeType == null) {
                          log.warn("未知操作類型:{}", op);
                          return;
                      }
      
                      switch (changeType) {
                          case INSERT:
                              List<Map<String, Object>> insertData = Collections.singletonList(
                                      cdcData.getJSONObject("after").getInnerMap()
                              );
                              handler.handleInsert(insertData);
                              break;
                          case UPDATE:
                              List<Map<String, Object>> beforeData = Collections.singletonList(
                                      cdcData.getJSONObject("before").getInnerMap()
                              );
                              List<Map<String, Object>> afterData = Collections.singletonList(
                                      cdcData.getJSONObject("after").getInnerMap()
                              );
                              handler.handleUpdate(beforeData, afterData);
                              break;
                          case DELETE:
                              List<Map<String, Object>> deleteData = Collections.singletonList(
                                      cdcData.getJSONObject("before").getInnerMap()
                              );
                              handler.handleDelete(deleteData);
                              break;
                          case READ:
                              // 可以忽略快照階段的讀取操作,或根據需要處理
                              log.debug("處理快照讀取操作: {}", fullTableName);
                              break;
                      }
                  } catch (Exception e) {
                      log.error("Flink-CDC數據處理發生未預期異常", e);
                  }
              }
          }
      
          /**
           * 從表名(庫名.表名)中提取數據庫列表(去重)
           *
           * @param tables
           * @return
           */
          private String[] extractDatabases(List<String> tables) {
              // 截取庫名(如demo.tb_user → demo)
              return tables.stream()
                      .map(table -> table.split("\\.")[0])
                      .distinct()
                      .toArray(String[]::new);
          }
      
      }

      5、FlinkCdcHandlerRegistry策略路由:

      import lombok.extern.slf4j.Slf4j;
      import org.springframework.context.ApplicationContext;
      import org.springframework.context.ApplicationContextAware;
      import org.springframework.stereotype.Component;
      
      import java.io.Serializable;
      import java.util.ArrayList;
      import java.util.List;
      import java.util.Map;
      import java.util.concurrent.ConcurrentHashMap;
      
      /**
       * Flink CDC處理器注冊中心
       * 處理器注冊中心(自動掃描監聽表)
       */
      @Slf4j
      @Component
      public class FlinkCdcHandlerRegistry implements ApplicationContextAware, Serializable {
      
          // 緩存:表名(庫名.表名)→ 處理器
          private final Map<String, FlinkCdcHandler> handlerMap = new ConcurrentHashMap<>();
      
          // 收集所有需要監聽的表(供Flink CDC配置使用)
          private List<String> monitoredTables;
      
          @Override
          public void setApplicationContext(ApplicationContext applicationContext) {
              // 掃描所有實現類
              Map<String, FlinkCdcHandler> beans = applicationContext.getBeansOfType(FlinkCdcHandler.class);
              beans.values().forEach(handler -> {
                  String tableName = handler.getTableName();
                  handlerMap.put(tableName, handler);
                  log.info("注冊監聽表:{} → 處理器:{}", tableName, handler.getClass().getSimpleName());
              });
              // 提取所有需要監聽的表
              monitoredTables = new ArrayList<>(handlerMap.keySet());
          }
      
          /**
           * 獲取指定表的處理器
           *
           * @param tableName
           * @return
           */
          public FlinkCdcHandler getHandler(String tableName) {
              return handlerMap.get(tableName);
          }
      
          /**
           * 獲取所有需要監聽的表(供Flink CDC配置)
           *
           * @return
           */
          public List<String> getMonitoredTables() {
              return monitoredTables;
          }
      
      }

      6、FlinkCdcHandler策略模式數據處理:

       FlinkCdcHandler

      import java.util.List;
      import java.util.Map;
      
      /**
       * 表數據處理接口,每個監聽的表需實現此接口
       */
      public interface FlinkCdcHandler {
      
          /**
           * 獲取監聽的表名(格式:庫名.表名,如demo.tb_user)
           */
          String getTableName();
      
          /**
           * 處理新增數據
           */
          default void handleInsert(List<Map<String, Object>> dataList) {
              // 默認空實現,子類可重寫
          }
      
          /**
           * 處理更新數據(包含變更前和變更后的數據)
           * @param beforeList 變更前數據
           * @param afterList 變更后數據
           */
          default void handleUpdate(List<Map<String, Object>> beforeList, List<Map<String, Object>> afterList) {
              // 默認空實現,子類可重寫
          }
      
          /**
           * 處理刪除數據
           */
          default void handleDelete(List<Map<String, Object>> dataList) {
              // 默認空實現,子類可重寫
          }
      
      }

      TbUserFlinkCdcHandler

      import lombok.extern.slf4j.Slf4j;
      import org.springframework.stereotype.Component;
      
      import java.io.Serializable;
      import java.util.List;
      import java.util.Map;
      
      @Slf4j
      @Component
      public class TbUserFlinkCdcHandler implements FlinkCdcHandler, Serializable {
          @Override
          public String getTableName() {
              return "demo.tb_user";
          }
      
          @Override
          public void handleInsert(List<Map<String, Object>> dataList) {
              log.info("處理tb_user新增數據,共{}條", dataList.size());
              dataList.forEach(data -> {
                  String id = (String)data.get("id");
                  String username = (String) data.get("name");
                  // 業務邏輯:如同步到ES、緩存等
                  log.info("新增用戶:id={}, name={}", id, username);
              });
          }
      
          @Override
          public void handleUpdate(List<Map<String, Object>> beforeList, List<Map<String, Object>> afterList) {
              log.info("處理tb_user更新數據,共{}條", afterList.size());
              for (int i = 0; i < afterList.size(); i++) {
                  Map<String, Object> before = beforeList.get(i);
                  Map<String, Object> after = afterList.get(i);
                  log.info("更新用戶:id={}, 舊用戶名={}, 新用戶名={}",
                          after.get("id"), before.get("name"), after.get("name"));
              }
          }
      
          @Override
          public void handleDelete(List<Map<String, Object>> dataList) {
              log.info("處理tb_user刪除數據,共{}條", dataList.size());
              dataList.forEach(data -> {
                  log.info("刪除用戶:id={}", data.get("id"));
              });
          }
      
      }

      4

      項目啟動時,FlinkCdcHandlerRegistry 掃描并注冊所有 FlinkCdcHandler 實現類,建立表與處理器的映射;FlinkCdcRunner 在 Spring 容器初始化后觸發,檢查啟動條件,初始化 Flink 環境并構建 CDC 數據源,將數據流接入 CdcDataProcessFunction,該函數解析變更事件并路由到對應處理器執行業務邏輯,最后啟動 Flink 作業持續監聽處理。

       

      posted on 2025-11-04 23:03  愛文(Iven)  閱讀(8)  評論(0)    收藏  舉報

      導航

      主站蜘蛛池模板: 重口SM一区二区三区视频| 亚洲国产精品综合久久20| 日韩成人一区二区三区在线观看| 国产成人高清亚洲综合| 亚洲日韩精品一区二区三区无码| 亚洲人成网站18禁止无码| 国产精品久久久久7777| 一本色道久久东京热| 人妻一本久道久久综合鬼色 | 榆中县| 先锋影音男人av资源| 久操线在视频在线观看| 99精品国产中文字幕| 国产欧洲欧洲久美女久久| 日产精品久久久久久久| 贡山| 人人妻人人狠人人爽| 青草国产超碰人人添人人碱 | 116美女极品a级毛片| 国产成人欧美一区二区三区 | 久久精品国产亚洲欧美| 九九综合va免费看| 欧洲亚洲精品免费二区| 久久亚洲精品11p| 成人无码影片精品久久久| 少妇xxxxx性开放| 一区二区三区国产亚洲网站| 国产亚洲精品日韩av在| 日本亚洲一区二区精品久久| www插插插无码视频网站| 人妻少妇偷人精品一区| 策勒县| 人人妻人人澡人人爽人人精品av| 欧美一区二区三区在线观看| 国产精品美女黑丝流水| 精品国产乱来一区二区三区| 欧美寡妇xxxx黑人猛交| 日韩欧美国产aⅴ另类| 国色天香成人一区二区| 日本强伦片中文字幕免费看| 国产精品伊人久久综合网|