使用Flink實現(xiàn)MySQL實時同步數(shù)據(jù)到StarRocks(庫表級)
這里引用官網(wǎng)的文章 + 我在使用時遇到的問題。官網(wǎng)已經(jīng)講解的很明白了。
版本信息:
mysql:8.0.33
starRocks:3.0.1
官網(wǎng)有另外一種更簡便的方法(DML、DDL一起同步),但好像是我的StarRocks版本太低了,實現(xiàn)不了,看了一下jar包,得3.1以上的才行
從MySQL實時同步
StarRocks 支持多種方式將 MySQL 的數(shù)據(jù)實時同步至 StarRocks,支撐實時分析和處理海量數(shù)據(jù)的需求。
本文介紹如何將 MySQL 的數(shù)據(jù)通過 Apache Flink? 實時(秒級)同步至 StarRocks。
注意
導入操作需要目標表的 INSERT 權限。如果您的用戶賬號沒有 INSERT 權限,請參考 GRANT 給用戶賦權。
基本原理
?信息
從 MySQL 同步至 Flink 需要使用 Flink CDC,本文使用 Flink CDC 的版本小于 3.0,因此需要借助 SMT 同步表結構。 然而如果使用 Flink CDC 3.0,則無需借助 SMT,即可將表結構同步至 StarRocks,甚至可以同步整個 MySQL 數(shù)據(jù)庫、分庫分表的結構,同時也支持同步 schema change。具體的使用方式,參見從 MySQL 到 StarRocks 的流式 ELT 管道。

將 MySQL 的數(shù)據(jù)通過 Flink 同步至 StarRocks 分成同步庫表結構、同步數(shù)據(jù)兩個階段進行。首先 StarRocks Migration Tool (數(shù)據(jù)遷移工具,以下簡稱 SMT) 將 MySQL 的庫表結構轉化成 StarRocks 的建庫和建表語句。然后 Flink 集群運行 Flink job,同步 MySQL 全量及增量數(shù)據(jù)至 StarRocks。具體同步流程如下:
?信息
該同步流程能夠保證端到端的 exactly-once 的語義一致性。
-
同步庫表結構
SMT 根據(jù)其配置文件中源 MySQL 和目標 StarRocks 的信息,讀取 MySQL 中待同步的庫表結構,并生成 SQL 文件,用于在 StarRocks 內創(chuàng)建對應的目標庫表。
-
同步數(shù)據(jù)
Flink SQL 客戶端執(zhí)行導入數(shù)據(jù)的 SQL 語句(
INSERT INTO SELECT語句),向 Flink 集群提交一個或者多個長時間運行的 Flink job。Flink集群運行 Flink job ,F(xiàn)link cdc connector 先讀取數(shù)據(jù)庫的歷史全量數(shù)據(jù),然后無縫切換到增量讀取,并且發(fā)給 flink-connector-starrocks,最后 flink-connector-starrocks 攢微批數(shù)據(jù)同步至 StarRocks。信息
僅支持同步 DML,不支持同步 DDL。
業(yè)務場景
以商品累計銷量實時榜單為例,存儲在 MySQL 中的原始訂單表,通過 Flink 處理計算出產(chǎn)品銷量的實時排行,并實時同步至 StarRocks 的主鍵模型表中。最終用戶可以通過可視化工具連接StarRocks查看到實時刷新的榜單。
準備工作
下載并安裝同步工具
同步時需要使用 SMT、 Flink、Flink CDC connector、flink-connector-starrocks,下載和安裝步驟如下:
-
下載、安裝并啟動 Flink 集群。
說明:下載和安裝方式也可以參考 Flink 官方文檔。
-
您需要提前在操作系統(tǒng)中安裝 Java 8 或者 Java 11,以正常運行 Flink。您可以通過以下命令來檢查已經(jīng)安裝的 Java 版本。
# 查看java版本 java -version # 如下顯示已經(jīng)安裝 java 8 openjdk version "1.8.0_322" OpenJDK Runtime Environment (Temurin)(build 1.8.0_322-b06) OpenJDK 64-Bit Server VM (Temurin)(build 25.322-b06, mixed mode) -
下載并解壓 Flink。本示例使用 Flink 1.14.5。
說明:推薦使用 1.14 及以上版本,最低支持 1.11 版本。
# 下載 Flink wget https://archive.apache.org/dist/flink/flink-1.14.5/flink-1.14.5-bin-scala_2.11.tgz # 解壓 Flink tar -xzf flink-1.14.5-bin-scala_2.11.tgz # 進入 Flink 目錄 cd flink-1.14.5 -
啟動 Flink 集群。
# 啟動 Flink 集群 ./bin/start-cluster.sh # 返回如下信息,表示成功啟動 flink 集群 Starting cluster. Starting standalonesession daemon on host. Starting taskexecutor daemon on host.
-
-
下載 Flink CDC connector。本示例的數(shù)據(jù)源為 MySQL,因此下載 flink-sql-connector-mysql-cdc-x.x.x.jar。并且版本需支持對應的 Flink 版本。由于本文使用 Flink 1.14.5,因此可以使用 flink-sql-connector-mysql-cdc-2.2.0.jar。
wget https://repo1.maven.org/maven2/com/ververica/flink-sql-connector-mysql-cdc/2.2.0/flink-sql-connector-mysql-cdc-2.2.0.jar -
下載 flink-connector-starrocks,并且其版本需要對應 Flink 的版本。
flink-connector-starrocks 的 JAR 包 (x.x.x_flink-y.yy_z.zz.jar) 會包含三個版本號:
- 第一個版本號 x.x.x 為 flink-connector-starrocks 的版本號。
- 第二個版本號 y.yy 為其支持的 Flink 版本號。
- 第三個版本號 z.zz 為 Flink 支持的 Scala 版本號。如果 Flink 為 1.14.x 以及之前版本,則需要下載帶有 Scala 版本號的 flink-connector-starrocks。
由于本文使用 Flink 版本號 1.14.5,Scala 版本號 2.11,因此可以下載 flink-connector-starrocks JAR 包 1.2.3_flink-1.14_2.11.jar。
-
將 Flink CDC connector、Flink-connector-starrocks 的 JAR 包 flink-sql-connector-mysql-cdc-2.2.0.jar、1.2.3_flink-1.14_2.11.jar 移動至 Flink 的 lib 目錄。
注意
如果 Flink 已經(jīng)處于運行狀態(tài)中,則需要先停止 Flink,然后重啟 Flink ,以加載并生效 JAR 包。
./bin/stop-cluster.sh ./bin/start-cluster.sh -
下載并解壓 SMT,并將其放在 flink-1.14.5 目錄下。您可以根據(jù)操作系統(tǒng)和 CPU 架構選擇對應的 SMT 安裝包。
# 適用于 Linux x86 wget https://releases.starrocks.io/resources/smt.tar.gz # 適用于 macOS ARM64 wget https://releases.starrocks.io/resources/smt_darwin_arm64.tar.gz
開啟 MySQL Binlog 日志
您需要確保已經(jīng)開啟 MySQL Binlog 日志,實時同步時需要讀取 MySQL Binlog 日志數(shù)據(jù),解析并同步至 StarRocks。
-
編輯 MySQL 配置文件 my.cnf(默認路徑為 /etc/my.cnf),以開啟 MySQL Binlog。
# 開啟 Binlog 日志 log_bin = ON # 設置 Binlog 的存儲位置 log_bin =/var/lib/mysql/mysql-bin # 設置 server_id # 在 MySQL 5.7.3 及以后版本,如果沒有 server_id,那么設置 binlog 后無法開啟 MySQL 服務 server_id = 1 # 設置 Binlog 模式為 ROW binlog_format = ROW # binlog 日志的基本文件名,后面會追加標識來表示每一個 Binlog 文件 log_bin_basename =/var/lib/mysql/mysql-bin # binlog 文件的索引文件,管理所有 Binlog 文件的目錄 log_bin_index =/var/lib/mysql/mysql-bin.index -
執(zhí)行如下命令,重啟 MySQL,生效修改后的配置文件:
# 使用 service 啟動 service mysqld restart # 使用 mysqld 腳本啟動 /etc/init.d/mysqld restart -
連接 MySQL,執(zhí)行如下語句確認是否已經(jīng)開啟 Binlog:
-- 連接 MySQL mysql -h xxx.xx.xxx.xx -P 3306 -u root -pxxxxxx -- 檢查是否已經(jīng)開啟 MySQL Binlog,`ON`就表示已開啟 mysql> SHOW VARIABLES LIKE 'log_bin'; +---------------+-------+ | Variable_name | Value | +---------------+-------+ | log_bin | ON | +---------------+-------+ 1 row in set (0.00 sec)
同步庫表結構
-
配置 SMT 配置文件。 進入 SMT 的 conf 目錄,編輯配置文件 config_prod.conf。例如源 MySQL 連接信息、待同步庫表的匹配規(guī)則,flink-connector-starrocks 配置信息等。
[db] type = mysql host = xxx.xx.xxx.xx port = 3306 user = user1 password = xxxxxx [other] # number of backends in StarRocks be_num = 3 # `decimal_v3` is supported since StarRocks-1.18.1 use_decimal_v3 = true # file to save the converted DDL SQL output_dir = ./result [table-rule.1] # pattern to match databases for setting properties database = ^demo.*$ # pattern to match tables for setting properties table = ^.*$ ############################################ ### flink sink configurations ### DO NOT set `connector`, `table-name`, `database-name`, they are auto-generated ############################################ flink.starrocks.jdbc-url=jdbc:mysql://<fe_host>:<fe_query_port> flink.starrocks.load-url= <fe_host>:<fe_http_port> flink.starrocks.username=user2 flink.starrocks.password=xxxxxx flink.starrocks.sink.properties.format=csv flink.starrocks.sink.properties.column_separator=\x01 flink.starrocks.sink.properties.row_delimiter=\x02 flink.starrocks.sink.buffer-flush.interval-ms=15000-
[db]:源數(shù)據(jù)庫的連接信息。type:源數(shù)據(jù)庫類型,本示例中源數(shù)據(jù)庫為mysql。host:MySQL 所在服務器的 IP 地址。port:MySQL 端口號,默認為3306。user:用戶名。password:用戶登錄密碼。
-
[table-rule]:庫表匹配規(guī)則,以及對應的flink-connector-starrocks 配置。database、table:MySQL 中同步對象的庫表名,支持正則表達式。flink.starrocks.*:flink-connector-starrocks 的配置信息,更多配置和說明,請參見 Flink-connector-starrocks。
-
[other]:其他信息be_num: StarRocks 集群的 BE 節(jié)點數(shù)(后續(xù)生成的 StarRocks 建表 SQL 文件會參考該參數(shù),設置合理的分桶數(shù)量)。use_decimal_v3:是否開啟 decimalV3。開啟后,MySQL 小數(shù)類型的數(shù)據(jù)同步至 StarRocks 時會轉換為 decimalV3。output_dir:待生成的 SQL 文件的路徑。SQL 文件會用于在 StarRocks 集群創(chuàng)建庫表, 向 Flink 集群提交 Flink job。默認為./result,不建議修改。
-
-
執(zhí)行如下命令,SMT 會讀取 MySQL 中同步對象的庫表結構,并且結合配置文件信息,在 result 目錄生成 SQL 文件,用于 StarRocks 集群創(chuàng)建庫表(starrocks-create.all.sql), 用于向 Flink 集群提交同步數(shù)據(jù)的 flink job(flink-create.all.sql)。 并且源表不同,則 starrocks-create.all.sql 中建表語句默認創(chuàng)建的數(shù)據(jù)模型不同。
- 如果源表沒有 Primary Key、 Unique Key,則默認創(chuàng)建明細模型。
- 如果源表有 Primary Key、 Unique Key,則區(qū)分以下幾種情況:
- 源表是 Hive 表、ClickHouse MergeTree 表,則默認創(chuàng)建明細模型。
- 源表是 ClickHouse SummingMergeTree表,則默認創(chuàng)建聚合模型。
- 源表為其他類型,則默認創(chuàng)建主鍵模型。
# 運行 SMT ./starrocks-migrate-tool # 進入并查看 result 目錄中的文件 cd result ls result flink-create.1.sql smt.tar.gz starrocks-create.all.sql flink-create.all.sql starrocks-create.1.sql -
執(zhí)行如下命令,連接 StarRocks,并執(zhí)行 SQL 文件 starrocks-create.all.sql,用于創(chuàng)建目標庫和表。推薦使用 SQL 文件中默認的建表語句,本示例中建表語句默認創(chuàng)建的數(shù)據(jù)模型為主鍵模型。
注意
- 您也可以根據(jù)業(yè)務需要,修改 SQL 文件中的建表語句,基于其他模型創(chuàng)建目標表。
- 如果您選擇基于非主鍵模型創(chuàng)建目標表,StarRocks 不支持將源表中 DELETE 操作同步至非主鍵模型的表,請謹慎使用。
mysql -h <fe_host> -P <fe_query_port> -u user2 -pxxxxxx < starrocks-create.all.sql如果數(shù)據(jù)需要經(jīng)過 Flink 處理后寫入目標表,目標表與源表的結構不一樣,則您需要修改 SQL 文件 starrocks-create.all.sql 中的建表語句。本示例中目標表僅需要保留商品 ID (product_id)、商品名稱(product_name),并且對商品銷量進行實時排名,因此可以使用如下建表語句。
CREATE DATABASE IF NOT EXISTS `demo`; CREATE TABLE IF NOT EXISTS `demo`.`orders` ( `product_id` INT(11) NOT NULL COMMENT "", `product_name` STRING NOT NULL COMMENT "", `sales_cnt` BIGINT NOT NULL COMMENT "" ) ENGINE=olap PRIMARY KEY(`product_id`) DISTRIBUTED BY HASH(`product_id`) PROPERTIES ( "replication_num" = "3" );注意
自 2.5.7 版本起,StarRocks 支持在建表和新增分區(qū)時自動設置分桶數(shù)量 (BUCKETS),您無需手動設置分桶數(shù)量。更多信息,請參見 確定分桶數(shù)量。
同步數(shù)據(jù)
運行 Flink 集群,提交 Flink job,啟動流式作業(yè),源源不斷將 MySQL 數(shù)據(jù)庫中的全量和增量數(shù)據(jù)同步到 StarRocks 中。
-
進入 Flink 目錄,執(zhí)行如下命令,在 Flink SQL 客戶端運行 SQL 文件 flink-create.all.sql。
該 SQL 文件定義了動態(tài)表 source table、sink table,查詢語句 INSERT INTO SELECT,并且指定 connector、源數(shù)據(jù)庫和目標數(shù)據(jù)庫。Flink SQL 客戶端執(zhí)行該 SQL 文件后,向 Flink 集群提交一個 Flink job,開啟同步任務。
./bin/sql-client.sh -f flink-create.all.sql注意
- 需要確保 Flink 集群已經(jīng)啟動。可通過命令
flink/bin/start-cluster.sh啟動。 - 如果您使用 Flink 1.13 之前的版本,則可能無法直接運行 SQL 文件 flink-create.all.sql。您需要在 SQL 客戶端命令行界面,逐條執(zhí)行 SQL 文件 flink-create.all.sql 中的 SQL 語句,并且需要做對
\字符進行轉義。
'sink.properties.column_separator' = '\\x01' 'sink.properties.row_delimiter' = '\\x02'處理同步數(shù)據(jù)
在同步過程中,如果您需要對數(shù)據(jù)進行一定的處理,例如 GROUP BY、JOIN 等,則可以修改 SQL 文件 flink-create.all.sql。本示例可以通過執(zhí)行 count(*) 和 GROUP BY 計算出產(chǎn)品銷量的實時排名。
$ ./bin/sql-client.sh -f flink-create.all.sql No default environment specified. Searching for '/home/disk1/flink-1.13.6/conf/sql-client-defaults.yaml'...not found. [INFO] Executing SQL from file. Flink SQL> CREATE DATABASE IF NOT EXISTS `default_catalog`.`demo`; [INFO] Execute statement succeed. -- 根據(jù) MySQL 的訂單表創(chuàng)建動態(tài)表 source table Flink SQL> CREATE TABLE IF NOT EXISTS `default_catalog`.`demo`.`orders_src` ( `order_id` BIGINT NOT NULL, `product_id` INT NULL, `order_date` TIMESTAMP NOT NULL, `customer_name` STRING NOT NULL, `product_name` STRING NOT NULL, `price` DECIMAL(10, 5) NULL, PRIMARY KEY(`order_id`) NOT ENFORCED ) with ( 'connector' = 'mysql-cdc', 'hostname' = 'xxx.xx.xxx.xxx', 'port' = '3306', 'username' = 'root', 'password' = '', 'database-name' = 'demo', 'table-name' = 'orders' ); [INFO] Execute statement succeed. -- 創(chuàng)建動態(tài)表 sink table Flink SQL> CREATE TABLE IF NOT EXISTS `default_catalog`.`demo`.`orders_sink` ( `product_id` INT NOT NULL, `product_name` STRING NOT NULL, `sales_cnt` BIGINT NOT NULL, PRIMARY KEY(`product_id`) NOT ENFORCED ) with ( 'sink.max-retries' = '10', 'jdbc-url' = 'jdbc:mysql://<fe_host>:<fe_query_port>', 'password' = '', 'sink.properties.strip_outer_array' = 'true', 'sink.properties.format' = 'json', 'load-url' = '<fe_host>:<fe_http_port>', 'username' = 'root', 'sink.buffer-flush.interval-ms' = '15000', 'connector' = 'starrocks', 'database-name' = 'demo', 'table-name' = 'orders' ); [INFO] Execute statement succeed. -- 執(zhí)行查詢,實現(xiàn)產(chǎn)品實時排行榜功能,查詢不斷更新 sink table,以反映 source table 上的更改 Flink SQL> INSERT INTO `default_catalog`.`demo`.`orders_sink` select product_id,product_name, count(*) as cnt from `default_catalog`.`demo`.`orders_src` group by product_id,product_name; [INFO] Submitting SQL update statement to the cluster... [INFO] SQL update statement has been successfully submitted to the cluster: Job ID: 5ae005c4b3425d8bb13fe660260a35da如果您只需要同步部分數(shù)據(jù),例如支付時間在 2021 年 01 月 01 日之后的數(shù)據(jù),則可以在 INSERT INTO SELECT 語句中使用 WHERE order_date >'2021-01-01' 設置過濾條件。不滿足該條件的數(shù)據(jù),即支付時間在 2021 年 01 月 01 日或者之前的數(shù)據(jù)不會同步至 StarRocks。
INSERT INTO `default_catalog`.`demo`.`orders_sink` SELECT product_id,product_name, COUNT(*) AS cnt FROM `default_catalog`.`demo`.`orders_src` WHERE order_date >'2021-01-01 00:00:01' GROUP BY product_id,product_name;如果返回如下結果,則表示 Flink job 已經(jīng)提交,開始同步全量和增量數(shù)據(jù)。
[INFO] Submitting SQL update statement to the cluster... [INFO] SQL update statement has been successfully submitted to the cluster: Job ID: 5ae005c4b3425d8bb13fe660260a35da - 需要確保 Flink 集群已經(jīng)啟動。可通過命令
-
可以通過 Flink WebUI 或者在 Flink 命令行執(zhí)行命令
bin/flink list -running,查看 Flink 集群中正在運行的 Flink job,以及 Flink job ID。-
Flink WebUI 界面

-
在 Flink 命令行執(zhí)行命令
bin/flink list -running$ bin/flink list -running Waiting for response... ------------------ Running/Restarting Jobs ------------------- 13.10.2022 15:03:54 : 040a846f8b58e82eb99c8663424294d5 : insert-into_default_catalog.lily.example_tbl1_sink (RUNNING) --------------------------------------------------------------說明
如果任務出現(xiàn)異常,可以通過 Flink WebUI 或者 flink-1.14.5/log 目錄的日志文件進行排查。
-
常見問題
如何為不同的表設置不同的 flink-connector-starrocks 配置
例如數(shù)據(jù)源某些表更新頻繁,需要提高 flink connector sr 的導入速度等,則需要在 SMT 配置文件 config_prod.conf 中為這些表設置單獨的 flink-connector-starrocks 配置。
[table-rule.1]
# pattern to match databases for setting properties
database = ^order.*$
# pattern to match tables for setting properties
table = ^.*$
############################################
### flink sink configurations
### DO NOT set `connector`, `table-name`, `database-name`, they are auto-generated
############################################
flink.starrocks.jdbc-url=jdbc:mysql://<fe_host>:<fe_query_port>
flink.starrocks.load-url= <fe_host>:<fe_http_port>
flink.starrocks.username=user2
flink.starrocks.password=xxxxxx
flink.starrocks.sink.properties.format=csv
flink.starrocks.sink.properties.column_separator=\x01
flink.starrocks.sink.properties.row_delimiter=\x02
flink.starrocks.sink.buffer-flush.interval-ms=15000
[table-rule.2]
# pattern to match databases for setting properties
database = ^order2.*$
# pattern to match tables for setting properties
table = ^.*$
############################################
### flink sink configurations
### DO NOT set `connector`, `table-name`, `database-name`, they are auto-generated
############################################
flink.starrocks.jdbc-url=jdbc:mysql://<fe_host>:<fe_query_port>
flink.starrocks.load-url= <fe_host>:<fe_http_port>
flink.starrocks.username=user2
flink.starrocks.password=xxxxxx
flink.starrocks.sink.properties.format=csv
flink.starrocks.sink.properties.column_separator=\x01
flink.starrocks.sink.properties.row_delimiter=\x02
flink.starrocks.sink.buffer-flush.interval-ms=10000
同步 MySQL 分庫分表后的多張表至 StarRocks 的一張表
如果數(shù)據(jù)源 MySQL 進行分庫分表,數(shù)據(jù)拆分成多張表甚至分布在多個庫中,并且所有表的結構都是相同的,則您可以設置[table-rule],將這些表同步至 StarRocks 的一張表中。比如 MySQL 有兩個數(shù)據(jù)庫 edu_db_1,edu_db_2,每個數(shù)據(jù)庫下面分別有兩張表 course_1,course_2,并且所有表的結構都是相同的,則通過設置如下[table-rule]可以將其同步至 StarRocks的一張表中。
說明
數(shù)據(jù)源多張表同步至 StarRocks的一張表,表名默認為 course__auto_shard。如果需要修改,則可以在 result 目錄的 SQL 文件 starrocks-create.all.sql、 flink-create.all.sql 中修改。
[table-rule.1]
# pattern to match databases for setting properties
database = ^edu_db_[0-9]*$
# pattern to match tables for setting properties
table = ^course_[0-9]*$
############################################
### flink sink configurations
### DO NOT set `connector`, `table-name`, `database-name`, they are auto-generated
############################################
flink.starrocks.jdbc-url = jdbc: mysql://xxx.xxx.x.x:xxxx
flink.starrocks.load-url = xxx.xxx.x.x:xxxx
flink.starrocks.username = user2
flink.starrocks.password = xxxxxx
flink.starrocks.sink.properties.format=csv
flink.starrocks.sink.properties.column_separator =\x01
flink.starrocks.sink.properties.row_delimiter =\x02
flink.starrocks.sink.buffer-flush.interval-ms = 5000
數(shù)據(jù)以 JSON 格式導入
以上示例數(shù)據(jù)以 CSV 格式導入,如果數(shù)據(jù)無法選出合適的分隔符,則您需要替換 [table-rule] 中flink.starrocks.*的如下參數(shù)。
flink.starrocks.sink.properties.format=csv
flink.starrocks.sink.properties.column_separator =\x01
flink.starrocks.sink.properties.row_delimiter =\x02
傳入如下參數(shù),數(shù)據(jù)以 JSON 格式導入。
flink.starrocks.sink.properties.format=json
flink.starrocks.sink.properties.strip_outer_array=true
注意
該方式會對導入速度有一定的影響。
多個的 INSERT INTO 語句合并為一個 Flink job
在 flink-create.all.sql 文件使用 STATEMENT SET 語句,將多個的 INSERT INTO 語句合并為一個 Flink job,避免占用過多的 Flink job 資源。
說明
Flink 自 1.13 起 支持 STATEMENT SET 語法。
-
打開 result/flink-create.all.sql 文件。
-
修改文件中的 SQL 語句,將所有的 INSERT INTO 語句調整位置到文件末尾。然后在第一條 INSERT語句的前面加上
EXECUTE STATEMENT SET BEGIN在最后一 INSERT 語句后面加上一行END;。注意
CREATE DATABASE、CREATE TABLE 的位置保持不變。
CREATE DATABASE IF NOT EXISTS db; CREATE TABLE IF NOT EXISTS db.a1; CREATE TABLE IF NOT EXISTS db.b1; CREATE TABLE IF NOT EXISTS db.a2; CREATE TABLE IF NOT EXISTS db.b2; EXECUTE STATEMENT SET BEGIN -- 1個或者多個 INSERT INTO statements INSERT INTO db.a1 SELECT * FROM db.b1; INSERT INTO db.a2 SELECT * FROM db.b2; END;
更多常見問題,請參見 MySQL 實時同步至 StarRocks 常見問題。
遇到的問題
MySQL表缺少主鍵
Could not execute SQL statement. Reason: org.apache.flink.table.api.ValidationException: The primary key is necessary when enable 'Key: 'scan.incremental.snapshot.enabled' , default: true (fallback keys: [])' to 'true
這是因為FlinkCDC進行增量快照機制的原因
- 增量快照機制依賴主鍵:
當啟用了scan.incremental.snapshot.enabled(默認值為true)這個配置選項時,對于像mysql-cdc這類連接器,它期望源表具有明確的主鍵定義。這是因為增量快照機制需要依靠主鍵來準確追蹤數(shù)據(jù)的變化情況,以便能夠增量地讀取和處理來自數(shù)據(jù)源(MySQL 數(shù)據(jù)庫中的對應表)的數(shù)據(jù),比如準確識別哪些行是新增的、哪些行是被更新的等操作。如果沒有主鍵,系統(tǒng)就沒辦法可靠地執(zhí)行這種基于增量的操作邏輯,所以 Flink 會拋出這個ValidationException異常來提示你需要定義主鍵。 - MySQL CDC 工作原理關聯(lián):
在使用mysql-cdc連接器從 MySQL 數(shù)據(jù)庫捕獲變更數(shù)據(jù)時,它會基于特定的機制去讀取 binlog(MySQL 的二進制日志,記錄了數(shù)據(jù)庫的所有變更操作)。主鍵在這里起到了一個關鍵的標識作用,使得連接器能夠準確地將 binlog 里的變更對應到具體的行記錄上,從而把這些變更正確地同步到 Flink 中進行后續(xù)處理。沒有主鍵的話,這個映射和追蹤過程就無法準確進行。
-
解決措施:要么給Mysql加主鍵,要么FlinkSQL中建表時加下面的參數(shù)
- 在建表語句中加入
'scan.incremental.snapshot.enabled' = 'false'即可。
?這種方法很可能會導致端到端語句不一致,出現(xiàn)重復數(shù)據(jù)。
- varchar數(shù)據(jù)類型,會自動設為2.X版本最大值,需要手動更改
- 在建表語句中加入

浙公網(wǎng)安備 33010602011771號