<output id="qn6qe"></output>

    1. <output id="qn6qe"><tt id="qn6qe"></tt></output>
    2. <strike id="qn6qe"></strike>

      亚洲 日本 欧洲 欧美 视频,日韩中文字幕有码av,一本一道av中文字幕无码,国产线播放免费人成视频播放,人妻少妇偷人无码视频,日夜啪啪一区二区三区,国产尤物精品自在拍视频首页,久热这里只有精品12

      使用Flink實現(xiàn)MySQL實時同步數(shù)據(jù)到StarRocks(庫表級)

      這里引用官網(wǎng)的文章 + 我在使用時遇到的問題。官網(wǎng)已經(jīng)講解的很明白了。

      版本信息:
      mysql:8.0.33
      starRocks:3.0.1

      官網(wǎng)有另外一種更簡便的方法(DML、DDL一起同步),但好像是我的StarRocks版本太低了,實現(xiàn)不了,看了一下jar包,得3.1以上的才行

      從MySQL實時同步

      StarRocks 支持多種方式將 MySQL 的數(shù)據(jù)實時同步至 StarRocks,支撐實時分析和處理海量數(shù)據(jù)的需求。

      本文介紹如何將 MySQL 的數(shù)據(jù)通過 Apache Flink? 實時(秒級)同步至 StarRocks。

      注意

      導入操作需要目標表的 INSERT 權限。如果您的用戶賬號沒有 INSERT 權限,請參考 GRANT 給用戶賦權。

      基本原理

      ?信息

      從 MySQL 同步至 Flink 需要使用 Flink CDC,本文使用 Flink CDC 的版本小于 3.0,因此需要借助 SMT 同步表結構。 然而如果使用 Flink CDC 3.0,則無需借助 SMT,即可將表結構同步至 StarRocks,甚至可以同步整個 MySQL 數(shù)據(jù)庫、分庫分表的結構,同時也支持同步 schema change。具體的使用方式,參見從 MySQL 到 StarRocks 的流式 ELT 管道。

      將 MySQL 的數(shù)據(jù)通過 Flink 同步至 StarRocks 分成同步庫表結構、同步數(shù)據(jù)兩個階段進行。首先 StarRocks Migration Tool (數(shù)據(jù)遷移工具,以下簡稱 SMT) 將 MySQL 的庫表結構轉化成 StarRocks 的建庫和建表語句。然后 Flink 集群運行 Flink job,同步 MySQL 全量及增量數(shù)據(jù)至 StarRocks。具體同步流程如下:

      ?信息

      該同步流程能夠保證端到端的 exactly-once 的語義一致性。

      1. 同步庫表結構

        SMT 根據(jù)其配置文件中源 MySQL 和目標 StarRocks 的信息,讀取 MySQL 中待同步的庫表結構,并生成 SQL 文件,用于在 StarRocks 內創(chuàng)建對應的目標庫表。

      2. 同步數(shù)據(jù)

        Flink SQL 客戶端執(zhí)行導入數(shù)據(jù)的 SQL 語句(INSERT INTO SELECT語句),向 Flink 集群提交一個或者多個長時間運行的 Flink job。Flink集群運行 Flink job ,F(xiàn)link cdc connector 先讀取數(shù)據(jù)庫的歷史全量數(shù)據(jù),然后無縫切換到增量讀取,并且發(fā)給 flink-connector-starrocks,最后 flink-connector-starrocks 攢微批數(shù)據(jù)同步至 StarRocks。

        信息

        僅支持同步 DML,不支持同步 DDL。

      業(yè)務場景

      以商品累計銷量實時榜單為例,存儲在 MySQL 中的原始訂單表,通過 Flink 處理計算出產(chǎn)品銷量的實時排行,并實時同步至 StarRocks 的主鍵模型表中。最終用戶可以通過可視化工具連接StarRocks查看到實時刷新的榜單。

      準備工作

      下載并安裝同步工具

      同步時需要使用 SMT、 Flink、Flink CDC connector、flink-connector-starrocks,下載和安裝步驟如下:

      1. 下載、安裝并啟動 Flink 集群。

        說明:下載和安裝方式也可以參考 Flink 官方文檔

        1. 您需要提前在操作系統(tǒng)中安裝 Java 8 或者 Java 11,以正常運行 Flink。您可以通過以下命令來檢查已經(jīng)安裝的 Java 版本。

          # 查看java版本
          java -version
          
          # 如下顯示已經(jīng)安裝 java 8
          openjdk version "1.8.0_322"
          OpenJDK Runtime Environment (Temurin)(build 1.8.0_322-b06)
          OpenJDK 64-Bit Server VM (Temurin)(build 25.322-b06, mixed mode)
          
        2. 下載并解壓 Flink。本示例使用 Flink 1.14.5。

          說明:推薦使用 1.14 及以上版本,最低支持 1.11 版本。

          # 下載 Flink
          wget https://archive.apache.org/dist/flink/flink-1.14.5/flink-1.14.5-bin-scala_2.11.tgz
          # 解壓 Flink  
          tar -xzf flink-1.14.5-bin-scala_2.11.tgz
          # 進入 Flink 目錄
          cd flink-1.14.5
          
        3. 啟動 Flink 集群。

          # 啟動 Flink 集群
          ./bin/start-cluster.sh
          
          # 返回如下信息,表示成功啟動 flink 集群
          Starting cluster.
          Starting standalonesession daemon on host.
          Starting taskexecutor daemon on host.
          
      2. 下載 Flink CDC connector。本示例的數(shù)據(jù)源為 MySQL,因此下載 flink-sql-connector-mysql-cdc-x.x.x.jar。并且版本需支持對應的 Flink 版本。由于本文使用 Flink 1.14.5,因此可以使用 flink-sql-connector-mysql-cdc-2.2.0.jar。

        wget https://repo1.maven.org/maven2/com/ververica/flink-sql-connector-mysql-cdc/2.2.0/flink-sql-connector-mysql-cdc-2.2.0.jar
        
      3. 下載 flink-connector-starrocks,并且其版本需要對應 Flink 的版本。

        flink-connector-starrocks 的 JAR 包 (x.x.x_flink-y.yy_z.zz.jar) 會包含三個版本號:

        • 第一個版本號 x.x.x 為 flink-connector-starrocks 的版本號。
        • 第二個版本號 y.yy 為其支持的 Flink 版本號。
        • 第三個版本號 z.zz 為 Flink 支持的 Scala 版本號。如果 Flink 為 1.14.x 以及之前版本,則需要下載帶有 Scala 版本號的 flink-connector-starrocks。

        由于本文使用 Flink 版本號 1.14.5,Scala 版本號 2.11,因此可以下載 flink-connector-starrocks JAR 包 1.2.3_flink-1.14_2.11.jar。

      4. 將 Flink CDC connector、Flink-connector-starrocks 的 JAR 包 flink-sql-connector-mysql-cdc-2.2.0.jar、1.2.3_flink-1.14_2.11.jar 移動至 Flink 的 lib 目錄。

        注意

        如果 Flink 已經(jīng)處于運行狀態(tài)中,則需要先停止 Flink,然后重啟 Flink ,以加載并生效 JAR 包。

        ./bin/stop-cluster.sh
        ./bin/start-cluster.sh
        
      5. 下載并解壓 SMT,并將其放在 flink-1.14.5 目錄下。您可以根據(jù)操作系統(tǒng)和 CPU 架構選擇對應的 SMT 安裝包。

        # 適用于 Linux x86
        wget https://releases.starrocks.io/resources/smt.tar.gz
        # 適用于 macOS ARM64
        wget https://releases.starrocks.io/resources/smt_darwin_arm64.tar.gz
        

      開啟 MySQL Binlog 日志

      您需要確保已經(jīng)開啟 MySQL Binlog 日志,實時同步時需要讀取 MySQL Binlog 日志數(shù)據(jù),解析并同步至 StarRocks。

      1. 編輯 MySQL 配置文件 my.cnf(默認路徑為 /etc/my.cnf),以開啟 MySQL Binlog。

        # 開啟 Binlog 日志
        log_bin = ON
        # 設置 Binlog 的存儲位置
        log_bin =/var/lib/mysql/mysql-bin
        # 設置 server_id 
        # 在 MySQL 5.7.3 及以后版本,如果沒有 server_id,那么設置 binlog 后無法開啟 MySQL 服務 
        server_id = 1
        # 設置 Binlog 模式為 ROW
        binlog_format = ROW
        # binlog 日志的基本文件名,后面會追加標識來表示每一個 Binlog 文件
        log_bin_basename =/var/lib/mysql/mysql-bin
        # binlog 文件的索引文件,管理所有 Binlog 文件的目錄
        log_bin_index =/var/lib/mysql/mysql-bin.index
        
      2. 執(zhí)行如下命令,重啟 MySQL,生效修改后的配置文件:

         # 使用 service 啟動
         service mysqld restart
         # 使用 mysqld 腳本啟動
         /etc/init.d/mysqld restart
        
      3. 連接 MySQL,執(zhí)行如下語句確認是否已經(jīng)開啟 Binlog:

        -- 連接 MySQL
        mysql -h xxx.xx.xxx.xx -P 3306 -u root -pxxxxxx
        
        -- 檢查是否已經(jīng)開啟 MySQL Binlog,`ON`就表示已開啟
        mysql> SHOW VARIABLES LIKE 'log_bin'; 
        +---------------+-------+
        | Variable_name | Value |
        +---------------+-------+
        | log_bin       | ON    |
        +---------------+-------+
        1 row in set (0.00 sec)
        

      同步庫表結構

      1. 配置 SMT 配置文件。 進入 SMT 的 conf 目錄,編輯配置文件 config_prod.conf。例如源 MySQL 連接信息、待同步庫表的匹配規(guī)則,flink-connector-starrocks 配置信息等。

        [db]
        type = mysql
        host = xxx.xx.xxx.xx
        port = 3306
        user = user1
        password = xxxxxx
        
        [other]
        # number of backends in StarRocks
        be_num = 3
        # `decimal_v3` is supported since StarRocks-1.18.1
        use_decimal_v3 = true
        # file to save the converted DDL SQL
        output_dir = ./result
        
        [table-rule.1]
        # pattern to match databases for setting properties
        database = ^demo.*$
        # pattern to match tables for setting properties
        table = ^.*$
        
        ############################################
        ### flink sink configurations
        ### DO NOT set `connector`, `table-name`, `database-name`, they are auto-generated
        ############################################
        flink.starrocks.jdbc-url=jdbc:mysql://<fe_host>:<fe_query_port>
        flink.starrocks.load-url= <fe_host>:<fe_http_port>
        flink.starrocks.username=user2
        flink.starrocks.password=xxxxxx
        flink.starrocks.sink.properties.format=csv
        flink.starrocks.sink.properties.column_separator=\x01
        flink.starrocks.sink.properties.row_delimiter=\x02
        flink.starrocks.sink.buffer-flush.interval-ms=15000
        
        • [db]:源數(shù)據(jù)庫的連接信息。

          • type:源數(shù)據(jù)庫類型,本示例中源數(shù)據(jù)庫為 mysql。
          • host :MySQL 所在服務器的 IP 地址。
          • port:MySQL 端口號,默認為3306
          • user :用戶名。
          • password:用戶登錄密碼。
        • [table-rule] :庫表匹配規(guī)則,以及對應的flink-connector-starrocks 配置。

          • 如果需要為不同表匹配不同的 flink-connector-starrocks 配置,例如部分表更新頻繁,需要提高導入速度,請參見補充說明
          • 如果需要將 MySQL 分庫分表后的多張表導入至 StarRocks的一張表中,請參見補充說明
          • database、table:MySQL 中同步對象的庫表名,支持正則表達式。
          • flink.starrocks.* :flink-connector-starrocks 的配置信息,更多配置和說明,請參見 Flink-connector-starrocks。
        • [other] :其他信息

          • be_num: StarRocks 集群的 BE 節(jié)點數(shù)(后續(xù)生成的 StarRocks 建表 SQL 文件會參考該參數(shù),設置合理的分桶數(shù)量)。
          • use_decimal_v3:是否開啟 decimalV3。開啟后,MySQL 小數(shù)類型的數(shù)據(jù)同步至 StarRocks 時會轉換為 decimalV3。
          • output_dir :待生成的 SQL 文件的路徑。SQL 文件會用于在 StarRocks 集群創(chuàng)建庫表, 向 Flink 集群提交 Flink job。默認為 ./result,不建議修改。
      2. 執(zhí)行如下命令,SMT 會讀取 MySQL 中同步對象的庫表結構,并且結合配置文件信息,在 result 目錄生成 SQL 文件,用于 StarRocks 集群創(chuàng)建庫表(starrocks-create.all.sql), 用于向 Flink 集群提交同步數(shù)據(jù)的 flink job(flink-create.all.sql)。 并且源表不同,則 starrocks-create.all.sql 中建表語句默認創(chuàng)建的數(shù)據(jù)模型不同。

        • 如果源表沒有 Primary Key、 Unique Key,則默認創(chuàng)建明細模型。
        • 如果源表有 Primary Key、 Unique Key,則區(qū)分以下幾種情況:
          • 源表是 Hive 表、ClickHouse MergeTree 表,則默認創(chuàng)建明細模型。
          • 源表是 ClickHouse SummingMergeTree表,則默認創(chuàng)建聚合模型。
          • 源表為其他類型,則默認創(chuàng)建主鍵模型。
        # 運行 SMT
        ./starrocks-migrate-tool
        
        # 進入并查看 result 目錄中的文件
        cd result
        ls result
        flink-create.1.sql    smt.tar.gz              starrocks-create.all.sql
        flink-create.all.sql  starrocks-create.1.sql
        
      3. 執(zhí)行如下命令,連接 StarRocks,并執(zhí)行 SQL 文件 starrocks-create.all.sql,用于創(chuàng)建目標庫和表。推薦使用 SQL 文件中默認的建表語句,本示例中建表語句默認創(chuàng)建的數(shù)據(jù)模型為主鍵模型。

        注意

        • 您也可以根據(jù)業(yè)務需要,修改 SQL 文件中的建表語句,基于其他模型創(chuàng)建目標表。
        • 如果您選擇基于非主鍵模型創(chuàng)建目標表,StarRocks 不支持將源表中 DELETE 操作同步至非主鍵模型的表,請謹慎使用。
        mysql -h <fe_host> -P <fe_query_port> -u user2 -pxxxxxx < starrocks-create.all.sql
        

        如果數(shù)據(jù)需要經(jīng)過 Flink 處理后寫入目標表,目標表與源表的結構不一樣,則您需要修改 SQL 文件 starrocks-create.all.sql 中的建表語句。本示例中目標表僅需要保留商品 ID (product_id)、商品名稱(product_name),并且對商品銷量進行實時排名,因此可以使用如下建表語句。

        CREATE DATABASE IF NOT EXISTS `demo`;
        
        CREATE TABLE IF NOT EXISTS `demo`.`orders` (
        `product_id` INT(11) NOT NULL COMMENT "",
        `product_name` STRING NOT NULL COMMENT "",
        `sales_cnt` BIGINT NOT NULL COMMENT ""
        ) ENGINE=olap
        PRIMARY KEY(`product_id`)
        DISTRIBUTED BY HASH(`product_id`)
        PROPERTIES (
        "replication_num" = "3"
        );
        

        注意

        自 2.5.7 版本起,StarRocks 支持在建表和新增分區(qū)時自動設置分桶數(shù)量 (BUCKETS),您無需手動設置分桶數(shù)量。更多信息,請參見 確定分桶數(shù)量

      同步數(shù)據(jù)

      運行 Flink 集群,提交 Flink job,啟動流式作業(yè),源源不斷將 MySQL 數(shù)據(jù)庫中的全量和增量數(shù)據(jù)同步到 StarRocks 中。

      1. 進入 Flink 目錄,執(zhí)行如下命令,在 Flink SQL 客戶端運行 SQL 文件 flink-create.all.sql。

        該 SQL 文件定義了動態(tài)表 source table、sink table,查詢語句 INSERT INTO SELECT,并且指定 connector、源數(shù)據(jù)庫和目標數(shù)據(jù)庫。Flink SQL 客戶端執(zhí)行該 SQL 文件后,向 Flink 集群提交一個 Flink job,開啟同步任務。

        ./bin/sql-client.sh -f flink-create.all.sql
        

        注意

        • 需要確保 Flink 集群已經(jīng)啟動。可通過命令 flink/bin/start-cluster.sh 啟動。
        • 如果您使用 Flink 1.13 之前的版本,則可能無法直接運行 SQL 文件 flink-create.all.sql。您需要在 SQL 客戶端命令行界面,逐條執(zhí)行 SQL 文件 flink-create.all.sql 中的 SQL 語句,并且需要做對\字符進行轉義。
        'sink.properties.column_separator' = '\\x01'
        'sink.properties.row_delimiter' = '\\x02'  
        

        處理同步數(shù)據(jù)

        在同步過程中,如果您需要對數(shù)據(jù)進行一定的處理,例如 GROUP BY、JOIN 等,則可以修改 SQL 文件 flink-create.all.sql。本示例可以通過執(zhí)行 count(*) 和 GROUP BY 計算出產(chǎn)品銷量的實時排名。

        $ ./bin/sql-client.sh -f flink-create.all.sql
        No default environment specified.
        Searching for '/home/disk1/flink-1.13.6/conf/sql-client-defaults.yaml'...not found.
        [INFO] Executing SQL from file.
        
        Flink SQL> CREATE DATABASE IF NOT EXISTS `default_catalog`.`demo`;
        [INFO] Execute statement succeed.
        
        -- 根據(jù) MySQL 的訂單表創(chuàng)建動態(tài)表 source table
        Flink SQL> 
        CREATE TABLE IF NOT EXISTS `default_catalog`.`demo`.`orders_src` (
          `order_id` BIGINT NOT NULL,
          `product_id` INT NULL,
          `order_date` TIMESTAMP NOT NULL,
          `customer_name` STRING NOT NULL,
          `product_name` STRING NOT NULL,
          `price` DECIMAL(10, 5) NULL,
          PRIMARY KEY(`order_id`)
         NOT ENFORCED
        ) with (
          'connector' = 'mysql-cdc',
          'hostname' = 'xxx.xx.xxx.xxx',
          'port' = '3306',
          'username' = 'root',
          'password' = '',
          'database-name' = 'demo',
          'table-name' = 'orders'
        );
        [INFO] Execute statement succeed.
        
        -- 創(chuàng)建動態(tài)表 sink table
        Flink SQL> 
        CREATE TABLE IF NOT EXISTS `default_catalog`.`demo`.`orders_sink` (
         `product_id` INT NOT NULL,
         `product_name` STRING NOT NULL,
         `sales_cnt` BIGINT NOT NULL,
         PRIMARY KEY(`product_id`)
        NOT ENFORCED
        ) with (
          'sink.max-retries' = '10',
          'jdbc-url' = 'jdbc:mysql://<fe_host>:<fe_query_port>',
          'password' = '',
          'sink.properties.strip_outer_array' = 'true',
          'sink.properties.format' = 'json',
          'load-url' = '<fe_host>:<fe_http_port>',
          'username' = 'root',
          'sink.buffer-flush.interval-ms' = '15000',
          'connector' = 'starrocks',
          'database-name' = 'demo',
          'table-name' = 'orders'
        );
        [INFO] Execute statement succeed.
        
        -- 執(zhí)行查詢,實現(xiàn)產(chǎn)品實時排行榜功能,查詢不斷更新 sink table,以反映 source table 上的更改
        Flink SQL> 
        INSERT INTO `default_catalog`.`demo`.`orders_sink` select product_id,product_name, count(*) as cnt from `default_catalog`.`demo`.`orders_src` group by product_id,product_name;
        [INFO] Submitting SQL update statement to the cluster...
        [INFO] SQL update statement has been successfully submitted to the cluster:
        Job ID: 5ae005c4b3425d8bb13fe660260a35da
        

        如果您只需要同步部分數(shù)據(jù),例如支付時間在 2021 年 01 月 01 日之后的數(shù)據(jù),則可以在 INSERT INTO SELECT 語句中使用 WHERE order_date >'2021-01-01' 設置過濾條件。不滿足該條件的數(shù)據(jù),即支付時間在 2021 年 01 月 01 日或者之前的數(shù)據(jù)不會同步至 StarRocks。

        INSERT INTO `default_catalog`.`demo`.`orders_sink` SELECT product_id,product_name, COUNT(*) AS cnt FROM `default_catalog`.`demo`.`orders_src` WHERE order_date >'2021-01-01 00:00:01' GROUP BY product_id,product_name;
        

        如果返回如下結果,則表示 Flink job 已經(jīng)提交,開始同步全量和增量數(shù)據(jù)。

        [INFO] Submitting SQL update statement to the cluster...
        [INFO] SQL update statement has been successfully submitted to the cluster:
        Job ID: 5ae005c4b3425d8bb13fe660260a35da
        
      2. 可以通過 Flink WebUI 或者在 Flink 命令行執(zhí)行命令bin/flink list -running,查看 Flink 集群中正在運行的 Flink job,以及 Flink job ID。

        1. Flink WebUI 界面

        2. 在 Flink 命令行執(zhí)行命令bin/flink list -running

          $ bin/flink list -running
          Waiting for response...
          ------------------ Running/Restarting Jobs -------------------
          13.10.2022 15:03:54 : 040a846f8b58e82eb99c8663424294d5 : insert-into_default_catalog.lily.example_tbl1_sink (RUNNING)
          --------------------------------------------------------------
          

          說明

          如果任務出現(xiàn)異常,可以通過 Flink WebUI 或者 flink-1.14.5/log 目錄的日志文件進行排查。

      常見問題

      例如數(shù)據(jù)源某些表更新頻繁,需要提高 flink connector sr 的導入速度等,則需要在 SMT 配置文件 config_prod.conf 中為這些表設置單獨的 flink-connector-starrocks 配置。

      [table-rule.1]
      # pattern to match databases for setting properties
      database = ^order.*$
      # pattern to match tables for setting properties
      table = ^.*$
      
      ############################################
      ### flink sink configurations
      ### DO NOT set `connector`, `table-name`, `database-name`, they are auto-generated
      ############################################
      flink.starrocks.jdbc-url=jdbc:mysql://<fe_host>:<fe_query_port>
      flink.starrocks.load-url= <fe_host>:<fe_http_port>
      flink.starrocks.username=user2
      flink.starrocks.password=xxxxxx
      flink.starrocks.sink.properties.format=csv
      flink.starrocks.sink.properties.column_separator=\x01
      flink.starrocks.sink.properties.row_delimiter=\x02
      flink.starrocks.sink.buffer-flush.interval-ms=15000
      
      [table-rule.2]
      # pattern to match databases for setting properties
      database = ^order2.*$
      # pattern to match tables for setting properties
      table = ^.*$
      
      ############################################
      ### flink sink configurations
      ### DO NOT set `connector`, `table-name`, `database-name`, they are auto-generated
      ############################################
      flink.starrocks.jdbc-url=jdbc:mysql://<fe_host>:<fe_query_port>
      flink.starrocks.load-url= <fe_host>:<fe_http_port>
      flink.starrocks.username=user2
      flink.starrocks.password=xxxxxx
      flink.starrocks.sink.properties.format=csv
      flink.starrocks.sink.properties.column_separator=\x01
      flink.starrocks.sink.properties.row_delimiter=\x02
      flink.starrocks.sink.buffer-flush.interval-ms=10000
      

      同步 MySQL 分庫分表后的多張表至 StarRocks 的一張表

      如果數(shù)據(jù)源 MySQL 進行分庫分表,數(shù)據(jù)拆分成多張表甚至分布在多個庫中,并且所有表的結構都是相同的,則您可以設置[table-rule],將這些表同步至 StarRocks 的一張表中。比如 MySQL 有兩個數(shù)據(jù)庫 edu_db_1,edu_db_2,每個數(shù)據(jù)庫下面分別有兩張表 course_1,course_2,并且所有表的結構都是相同的,則通過設置如下[table-rule]可以將其同步至 StarRocks的一張表中。

      說明

      數(shù)據(jù)源多張表同步至 StarRocks的一張表,表名默認為 course__auto_shard。如果需要修改,則可以在 result 目錄的 SQL 文件 starrocks-create.all.sql、 flink-create.all.sql 中修改。

      [table-rule.1]
      # pattern to match databases for setting properties
      database = ^edu_db_[0-9]*$
      # pattern to match tables for setting properties
      table = ^course_[0-9]*$
      
      ############################################
      ### flink sink configurations
      ### DO NOT set `connector`, `table-name`, `database-name`, they are auto-generated
      ############################################
      flink.starrocks.jdbc-url = jdbc: mysql://xxx.xxx.x.x:xxxx
      flink.starrocks.load-url = xxx.xxx.x.x:xxxx
      flink.starrocks.username = user2
      flink.starrocks.password = xxxxxx
      flink.starrocks.sink.properties.format=csv
      flink.starrocks.sink.properties.column_separator =\x01
      flink.starrocks.sink.properties.row_delimiter =\x02
      flink.starrocks.sink.buffer-flush.interval-ms = 5000
      

      數(shù)據(jù)以 JSON 格式導入

      以上示例數(shù)據(jù)以 CSV 格式導入,如果數(shù)據(jù)無法選出合適的分隔符,則您需要替換 [table-rule]flink.starrocks.*的如下參數(shù)。

      flink.starrocks.sink.properties.format=csv
      flink.starrocks.sink.properties.column_separator =\x01
      flink.starrocks.sink.properties.row_delimiter =\x02
      

      傳入如下參數(shù),數(shù)據(jù)以 JSON 格式導入。

      flink.starrocks.sink.properties.format=json
      flink.starrocks.sink.properties.strip_outer_array=true
      

      注意

      該方式會對導入速度有一定的影響。

      flink-create.all.sql 文件使用 STATEMENT SET 語句,將多個的 INSERT INTO 語句合并為一個 Flink job,避免占用過多的 Flink job 資源。

      說明

      Flink 自 1.13 起 支持 STATEMENT SET 語法。

      1. 打開 result/flink-create.all.sql 文件。

      2. 修改文件中的 SQL 語句,將所有的 INSERT INTO 語句調整位置到文件末尾。然后在第一條 INSERT語句的前面加上EXECUTE STATEMENT SET BEGIN 在最后一 INSERT 語句后面加上一行END;

        注意

        CREATE DATABASE、CREATE TABLE 的位置保持不變。

        CREATE DATABASE IF NOT EXISTS db;
        CREATE TABLE IF NOT EXISTS db.a1;
        CREATE TABLE IF NOT EXISTS db.b1;
        CREATE TABLE IF NOT EXISTS db.a2;
        CREATE TABLE IF NOT EXISTS db.b2;
        EXECUTE STATEMENT SET 
        BEGIN
          -- 1個或者多個 INSERT INTO statements
        INSERT INTO db.a1 SELECT * FROM db.b1;
        INSERT INTO db.a2 SELECT * FROM db.b2;
        END;
        

      更多常見問題,請參見 MySQL 實時同步至 StarRocks 常見問題。

      遇到的問題

      MySQL表缺少主鍵

      Could not execute SQL statement. Reason: org.apache.flink.table.api.ValidationException: The primary key is necessary when enable 'Key: 'scan.incremental.snapshot.enabled' , default: true (fallback keys: [])' to 'true

      這是因為FlinkCDC進行增量快照機制的原因

      1. 增量快照機制依賴主鍵
        當啟用了 scan.incremental.snapshot.enabled(默認值為 true)這個配置選項時,對于像 mysql-cdc 這類連接器,它期望源表具有明確的主鍵定義。這是因為增量快照機制需要依靠主鍵來準確追蹤數(shù)據(jù)的變化情況,以便能夠增量地讀取和處理來自數(shù)據(jù)源(MySQL 數(shù)據(jù)庫中的對應表)的數(shù)據(jù),比如準確識別哪些行是新增的、哪些行是被更新的等操作。如果沒有主鍵,系統(tǒng)就沒辦法可靠地執(zhí)行這種基于增量的操作邏輯,所以 Flink 會拋出這個 ValidationException 異常來提示你需要定義主鍵。
      2. MySQL CDC 工作原理關聯(lián)
        在使用 mysql-cdc 連接器從 MySQL 數(shù)據(jù)庫捕獲變更數(shù)據(jù)時,它會基于特定的機制去讀取 binlog(MySQL 的二進制日志,記錄了數(shù)據(jù)庫的所有變更操作)。主鍵在這里起到了一個關鍵的標識作用,使得連接器能夠準確地將 binlog 里的變更對應到具體的行記錄上,從而把這些變更正確地同步到 Flink 中進行后續(xù)處理。沒有主鍵的話,這個映射和追蹤過程就無法準確進行。
      • 解決措施:要么給Mysql加主鍵,要么FlinkSQL中建表時加下面的參數(shù)

        • 在建表語句中加入'scan.incremental.snapshot.enabled' = 'false'即可。

        ?這種方法很可能會導致端到端語句不一致,出現(xiàn)重復數(shù)據(jù)。

        • varchar數(shù)據(jù)類型,會自動設為2.X版本最大值,需要手動更改
      posted @ 2024-12-17 16:52  MrSponge  Views(1632)  Comments(0)    收藏  舉報
      主站蜘蛛池模板: 欧美丰满熟妇vaideos| 无码人妻精品一区二区三区蜜桃| 欧美丰满熟妇hdxx| 无码av中文字幕久久专区| 99精品国产一区二区三| 亚洲中国精品精华液| 桂林市| 人妻护士在线波多野结衣| 韩国无码AV片午夜福利| 亚洲日本va午夜在线电影| 亚洲精品国产字幕久久麻豆| 色综合五月伊人六月丁香| 国产精品先锋资源站先锋影院| 精品中文人妻中文字幕| 亚洲这里只有久热精品伊人 | 漂亮人妻被强中文字幕久久| 少妇撒尿一区二区在线视频| 久久96热在精品国产高清| 老妇xxxxx性开放| 吉川爱美一区二区三区视频| 国产伦码精品一区二区| 国产亚洲视频在线播放香蕉| 国产一区视频一区欧美| 亚洲欧美日韩成人综合一区 | 拜城县| 免费国产精品黄色一区二区 | 97人妻中文字幕总站| 久久久午夜精品福利内容 | 一本一道久久综合狠狠老| 日韩AV高清在线看片| 麻豆精品国产熟妇aⅴ一区| 娱乐| 亚洲人成人网站色www| 99国精品午夜福利视频不卡99| 国产偷国产偷亚洲清高网站| 人妻护士在线波多野结衣| 天天躁日日躁狠狠躁2018| 激情综合网激情综合| 黑人玩弄人妻中文在线| 国产精品伦人一久二久三久| 国产精品乱码人妻一区二区三区|