Apache Doris 整合 FLINK CDC + Iceberg 構建實時湖倉一體的聯邦查詢
1概況
本文展示如何使用 Flink CDC + Iceberg + Doris 構建實時湖倉一體的聯邦查詢分析,Doris 1.1版本提供了Iceberg的支持,本文主要展示Doris和Iceberg怎么使用,大家按照步驟可以一步步完成。完整體驗整個搭建操作的過程。
2系統架構
我們整理架構圖如下,

1.首先我們從Mysql數據中使用Flink 通過 Binlog完成數據的實時采集
2.然后再Flink 中創建 Iceberg 表,Iceberg的元數據保存在hive里
3.最后我們在Doris中創建Iceberg外表
4.在通過Doris 統一查詢入口完成對Iceberg里的數據進行查詢分析,供前端應用調用,這里iceberg外表的數據可以和Doris內部數據或者Doris其他外部數據源的數據進行關聯查詢分析
Doris湖倉一體的聯邦查詢架構如下:

1.Doris 通過 ODBC 方式支持:MySQL,Postgresql,Oracle ,SQLServer
2.同時支持 Elasticsearch 外表
3.1.0版本支持Hive外表
4.1.1版本支持Iceberg外表
5.1.2版本支持Hudi 外表
3 創建MySQL數據庫表并初始化數據
CREATE DATABASE demo;
USE demo;
CREATE TABLE userinfo (
id int NOT NULL AUTO_INCREMENT,
name VARCHAR(255) NOT NULL DEFAULT 'flink',
address VARCHAR(1024),
phone_number VARCHAR(512),
email VARCHAR(255),
PRIMARY KEY (`id`)
)ENGINE=InnoDB ;
INSERT INTO userinfo VALUES (10001,'user_110','Shanghai','13347420870', NULL);
INSERT INTO userinfo VALUES (10002,'user_111','xian','13347420870', NULL);
INSERT INTO userinfo VALUES (10003,'user_112','beijing','13347420870', NULL);
INSERT INTO userinfo VALUES (10004,'user_113','shenzheng','13347420870', NULL);
INSERT INTO userinfo VALUES (10005,'user_114','hangzhou','13347420870', NULL);
INSERT INTO userinfo VALUES (10006,'user_115','guizhou','13347420870', NULL);
INSERT INTO userinfo VALUES (10007,'user_116','chengdu','13347420870', NULL);
INSERT INTO userinfo VALUES (10008,'user_117','guangzhou','13347420870', NULL);
INSERT INTO userinfo VALUES (10009,'user_118','xian','13347420870', NULL);
4 創建Iceberg Catalog
CREATE CATALOG hive_catalog WITH (
'type'='iceberg',
'catalog-type'='hive',
'uri'='thrift://localhost:9083',
'clients'='5',
'property-version'='1',
'warehouse'='hdfs://localhost:8020/user/hive/warehouse'
);
5 創建 Mysql CDC 表
CREATE TABLE user_source (
database_name STRING METADATA VIRTUAL,
table_name STRING METADATA VIRTUAL,
`id` DECIMAL(20, 0) NOT NULL,
name STRING,
address STRING,
phone_number STRING,
email STRING,
PRIMARY KEY (`id`) NOT ENFORCED
) WITH (
'connector' = 'mysql-cdc',
'hostname' = 'localhost',
'port' = '3306',
'username' = 'root',
'password' = 'MyNewPass4!',
'database-name' = 'demo',
'table-name' = 'userinfo'
);
6 創建Iceberg表
---查看catalog
show catalogs;
---使用catalog
use catalog hive_catalog;
--創建數據庫
CREATE DATABASE iceberg_hive;
--使用數據庫
use iceberg_hive;
?
7 創建表
CREATE TABLE all_users_info (
database_name STRING,
table_name STRING,
`id` DECIMAL(20, 0) NOT NULL,
name STRING,
address STRING,
phone_number STRING,
email STRING,
PRIMARY KEY (database_name, table_name, `id`) NOT ENFORCED
) WITH (
'catalog-type'='hive'
);
從CDC表里插入數據到Iceberg表里
use catalog default_catalog;
?
insert into hive_catalog.iceberg_hive.all_users_info select * from user_source;
我們去查詢iceberg表
select * from hive_catalog.iceberg_hive.all_users_info
8 Doris 查詢 Iceberg
8.1 創建Iceberg外表
CREATE TABLE `all_users_info`
ENGINE = ICEBERG
PROPERTIES (
"iceberg.database" = "iceberg_hive",
"iceberg.table" = "all_users_info",
"iceberg.hive.metastore.uris" = "thrift://localhost:9083",
"iceberg.catalog.type" = "HIVE_CATALOG"
);
參數說明
?ENGINE 需要指定為 ICEBERG
?PROPERTIES 屬性:
?
iceberg.hive.metastore.uris:Hive Metastore 服務地址?
iceberg.database:掛載 Iceberg 對應的數據庫名?
iceberg.table:掛載 Iceberg 對應的表名,掛載 Iceberg database 時無需指定。?
iceberg.catalog.type:Iceberg 中使用的 catalog 方式,默認為 HIVE_CATALOG,當前僅支持該方式,后續會支持更多的 Iceberg catalog 接入方式。mysql> CREATE TABLE `all_users_info`
-> ENGINE = ICEBERG
-> PROPERTIES (
-> "iceberg.database" = "iceberg_hive",
-> "iceberg.table" = "all_users_info",
-> "iceberg.hive.metastore.uris" = "thrift://localhost:9083",
-> "iceberg.catalog.type" = "HIVE_CATALOG"
-> );
Query OK, 0 rows affected (0.23 sec)
?
mysql> select * from all_users_info;
+---------------+------------+-------+----------+-----------+--------------+-------+
| database_name | table_name | id | name | address | phone_number | email |
+---------------+------------+-------+----------+-----------+--------------+-------+
| demo | userinfo | 10004 | user_113 | shenzheng | 13347420870 | NULL |
| demo | userinfo | 10005 | user_114 | hangzhou | 13347420870 | NULL |
| demo | userinfo | 10002 | user_111 | xian | 13347420870 | NULL |
| demo | userinfo | 10003 | user_112 | beijing | 13347420870 | NULL |
| demo | userinfo | 10001 | user_110 | Shanghai | 13347420870 | NULL |
| demo | userinfo | 10008 | user_117 | guangzhou | 13347420870 | NULL |
| demo | userinfo | 10009 | user_118 | xian | 13347420870 | NULL |
| demo | userinfo | 10006 | user_115 | guizhou | 13347420870 | NULL |
| demo | userinfo | 10007 | user_116 | chengdu | 13347420870 | NULL |
+---------------+------------+-------+----------+-----------+--------------+-------+
9 rows in set (0.18 sec)
上述Doris On Iceberg我們只演示了Iceberg單表的查詢,你還可以聯合Doris的表,或者其他的ODBC外表,Hive外表,ES外表等進行聯合查詢分析,通過Doris對外提供統一的查詢分析入口。
自此我們完整從搭建Hadoop,hive、flink 、Mysql、Doris 及Doris On Iceberg的使用全部介紹完了,Doris朝著數據倉庫和數據融合的架構演進,支持湖倉一體的聯邦查詢,給我們的開發帶來更多的便利,更高效的開發,省去了很多數據同步的繁瑣工作。
作者:京東零售 吳化斌
來源:京東云開發者社區 轉載請注明來源
浙公網安備 33010602011771號