1. Calcite元數據創建
1. 簡介
Calcite 是一款來自 Apache 的開源動態數據管理框架,核心功能是提供 SQL 查詢解析、優化及執行等基礎能力,以靈活支持多種數據源,廣泛應用于各類數據處理系統。以下從其功能特性、應用場景、優勢三方面簡單概述:
- 功能特性
- SQL 解析:支持多種 SQL 方言,如標準 SQL 以及不同數據庫特定的擴展語法,能將輸入的 SQL 語句解析為抽象語法樹(AST),便于后續處理。
- 語義分析:對解析后的 SQL 進行語義檢查,比如驗證表名、列名是否存在,數據類型是否匹配等,確保 SQL 的語義正確。
- 查詢優化:運用基于規則(RBO)和基于代價(CBO)的優化策略。RBO 通過預設規則,如謂詞下推等,重寫查詢;CBO 則基于統計信息,估算不同執行計劃的代價,選擇最優方案。
- 執行計劃生成:根據優化后的結果,生成可執行的物理執行計劃,定義操作的具體執行順序和方式。
- 數據源適配:可連接多種數據源,如關系型數據庫(MySQL、Oracle 等)、文件系統(CSV、JSON 文件)、NoSQL 數據庫等,而且還支持自定義數據源適配器, 并為不同數據源生成相應的數據訪問策略。
- 跨數據源查詢: 能夠連接不同類型的數據源,通過適配器將不同數據源的操作進行統一抽象。在進行跨數據源連表查詢時,它會將查詢分解為各個數據源可以處理的子查詢,然后將各個數據源的結果進行合并和進一步處理
2. 元數據準備
準備兩個數據庫 mysql 和 postgres
庫信息如下: mysql中有張表: user, postgres有張表role

表信息如下:
CREATE TABLE `user` (
`id` int(11) NOT NULL AUTO_INCREMENT COMMENT '主鍵',
`username` varchar(255) DEFAULT NULL COMMENT '用戶名稱',
`age` int(11) DEFAULT NULL COMMENT '性別',
`sex` varchar(255) DEFAULT NULL COMMENT '性別',
`role_key` int(11) DEFAULT NULL COMMENT '角色',
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=32 DEFAULT CHARSET=utf8mb4 COMMENT='用戶信息表';
CREATE TABLE "public"."role" (
"name" varchar(255) COLLATE "pg_catalog"."default",
"role_key" int4
);
3. maven依賴
maven依賴如下:
<dependency>
<groupId>org.apache.calcite</groupId>
<artifactId>calcite-core</artifactId>
<version>1.37.0</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.29</version>
</dependency>
<dependency>
<groupId>org.postgresql</groupId>
<artifactId>postgresql</artifactId>
<version>42.2.23</version>
</dependency>
4. 元數據定義
calcite支持兩種多種定義元數據方式 常用的是通過json方式,另一種是通過SchemaFactory的方式。
4.1 Json Model
組織結構:
|- model # 數據模型
| |- schema # 數據模式
| | |- tables # 表/視圖
| | |- functions # 函數
| | |- type # 模式類型 custom: 自定義, map: 映射, jdbc: jdbc, inline: 嵌入式 (默認)
| | |- factory # 指定SchemaFactory的工廠類
| | |- operand # 指定額外參數
示例內容:
創建兩個數據源 mysql 和 postgres, 使用兩種不同的聲明方式
{
"version": "1.0",
"defaultSchema": "my_mysql",
"schemas": [
{
"type": "jdbc",
"name": "my_mysql",
"jdbcUser": "root",
"jdbcPassword": "123456",
"jdbcUrl": "jdbc:mysql://localhost:3306/test",
"jdbcCatalog": "test",
"jdbcSchema": null
},
{
"name": "my_postgres",
"type": "custom",
"factory": "org.apache.calcite.adapter.jdbc.JdbcSchema$Factory",
"operand": {
"jdbcDriver": "org.postgresql.Driver",
"jdbcUrl": "jdbc:postgresql://localhost:5432/test",
"jdbcUser": "root",
"jdbcPassword": "123456"
}
}
]
}
calcite model 實現類org.apache.calcite.jdbc.Driver --> org.apache.calcite.model.ModelHandler
calcite model doc:https://calcite.apache.org/docs/model.html
加載資源:
將json文件放到
resources下, 然后創建connection的時候指定該文件即可
Properties info = new Properties();
// 不區分sql大小寫
info.setProperty("caseSensitive", "false");
// 設置引用標識符為反引號
info.setProperty(CalciteConnectionProperty.QUOTING.camelName(), Quoting.BACK_TICK.name());
// 指定model信息
info.setProperty("model", resourcePath("model/model.json"));
// 創建Calcite連接
Connection connection = DriverManager.getConnection("jdbc:calcite:", info);
CalciteConnection calciteConnection = connection.unwrap(CalciteConnection.class);
// 構建RootSchema,在Calcite中,RootSchema是所有數據源schema的parent,多個不同數據源schema可以掛在同一個RootSchema下
SchemaPlus rootSchema = calciteConnection.getRootSchema();
// 創建SQL語句執行查詢
Statement statement = calciteConnection.createStatement();
4.2 SchemaFactory
schema UML圖如下:
先創建對應數據源的datasource對象
private static DataSource getMysqlDataSource() {
MysqlDataSource dataSource = new MysqlDataSource();
dataSource.setUrl("jdbc:mysql://localhost:3306/test");
dataSource.setUser("root");
dataSource.setPassword("123456");
return dataSource;
}
private static DataSource getPostgresDataSource() {
final PGSimpleDataSource pgSimpleDataSource = new PGSimpleDataSource();
pgSimpleDataSource.setUrl("jdbc:postgresql://localhost:5432/test");
pgSimpleDataSource.setUser("root");
pgSimpleDataSource.setPassword("123456");
return pgSimpleDataSource;
}
然后將datasource對象包裝成JdbcSchema對象最后注冊到rootSchema中
Properties info = new Properties();
// 不區分sql大小寫
info.setProperty("caseSensitive", "false");
// 設置引用標識符為反引號
info.setProperty(CalciteConnectionProperty.QUOTING.camelName(), Quoting.BACK_TICK.name());
// 創建Calcite連接
Connection connection = DriverManager.getConnection("jdbc:calcite:", info);
CalciteConnection calciteConnection = connection.unwrap(CalciteConnection.class);
// 構建RootSchema,在Calcite中,RootSchema是所有數據源schema的parent,多個不同數據源schema可以掛在同一個RootSchema下
SchemaPlus rootSchema = calciteConnection.getRootSchema();
// 設置默認的schema, 如果不設置需要加上對應數據源的名稱
calciteConnection.setSchema("my_mysql");
final DataSource mysqlDataSource = getMysqlDataSource();
final JdbcSchema schemaWithMysql = JdbcSchema.create(rootSchema, "my_mysql", mysqlDataSource, "test", null);
final DataSource postgresDataSource = getPostgresDataSource();
final JdbcSchema schemaWithPostgres = JdbcSchema.create(rootSchema, "my_postgres", postgresDataSource, "test", "public");
rootSchema.add("my_mysql", schemaWithMysql);
rootSchema.add("my_postgres", schemaWithPostgres);
// 創建SQL語句執行查詢
Statement statement = calciteConnection.createStatement();
rootSchema也可以使用如下方式創建
CalciteSchema calciteSchema = CalciteSchema.createRootSchema(true, true);
SchemaPlus rootSchema = calciteSchema.plus();
SchemaPlus rootSchema = Frameworks.createRootSchema(true);
5. 測試查詢
5.1 測試單個數據源的查詢功能
@Test
@SneakyThrows
public void test_connection() {
// 上述配置中都設置了默認的schema為my_mysql, 所以查詢的時候可以不添加數據源key前綴
final ResultSet resultSet = statement.executeQuery("SELECT * FROM `user`");
final ResultSet resultSet = statement.executeQuery("SELECT * FROM my_mysql.`user`");
printResultSet(resultSet);
}
輸出結果如下:
Number of columns: 5
{sex=1, role_key=1, id=1, age=23, username=張三}
{sex=2, role_key=2, id=2, age=18, username=李四}
{sex=2, role_key=1, id=3, age=26, username=張鐵牛}
{sex=2, role_key=3, id=4, age=30, username=王麻子}
5.2 測試不同數據源連表查詢
calcite支持將不同數據源的sql下推, 然后在內存中做對應的關聯過濾等操作
@Test
@SneakyThrows
public void test_cross_db_query() {
final ResultSet resultSet = statement.executeQuery("SELECT u.*,r.name FROM `user` u left join my_postgres.`role` r on u.role_key = r.role_key");
printResultSet(resultSet);
}
輸出結果如下:
Number of columns: 6
{sex=1, role_key=1, name=管理員, id=1, age=23, username=張三}
{sex=2, role_key=1, name=管理員, id=3, age=26, username=張鐵牛}
{sex=2, role_key=2, name=老師, id=2, age=18, username=李四}
{sex=2, role_key=3, name=學生, id=4, age=30, username=王麻子}
6. 完整測試代碼
6.1 Json Model
package com.ldx.calcite;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.calcite.avatica.util.Quoting;
import org.apache.calcite.config.CalciteConnectionProperty;
import org.apache.calcite.jdbc.CalciteConnection;
import org.apache.calcite.schema.SchemaPlus;
import org.apache.calcite.util.Sources;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.testng.collections.Maps;
import java.net.URL;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.Map;
import java.util.Properties;
@Slf4j
public class CalciteModelTest {
private static Statement statement;
@BeforeAll
@SneakyThrows
public static void beforeAll() {
Properties info = new Properties();
// 不區分sql大小寫
info.setProperty("caseSensitive", "false");
// 設置引用標識符為反引號
info.setProperty(CalciteConnectionProperty.QUOTING.camelName(), Quoting.BACK_TICK.name());
// 指定model信息
info.setProperty("model", resourcePath("model/model.json"));
// 創建Calcite連接
Connection connection = DriverManager.getConnection("jdbc:calcite:", info);
CalciteConnection calciteConnection = connection.unwrap(CalciteConnection.class);
// 構建RootSchema,在Calcite中,RootSchema是所有數據源schema的parent,多個不同數據源schema可以掛在同一個RootSchema下
SchemaPlus rootSchema = calciteConnection.getRootSchema();
// 創建SQL語句執行查詢
statement = calciteConnection.createStatement();
}
@Test
@SneakyThrows
public void test_connection() {
final ResultSet resultSet = statement.executeQuery("SELECT * FROM `user`");
printResultSet(resultSet);
}
@Test
@SneakyThrows
public void test_cross_db_query() {
final ResultSet resultSet = statement.executeQuery("SELECT u.*,r.name FROM `user` u left join my_postgres.`role` r on u.role_key = r.role_key");
printResultSet(resultSet);
}
public static void printResultSet(ResultSet resultSet) throws SQLException {
// 獲取 ResultSet 元數據
ResultSetMetaData metaData = resultSet.getMetaData();
// 獲取列數
int columnCount = metaData.getColumnCount();
log.info("Number of columns: {}",columnCount);
// 遍歷 ResultSet 并打印結果
while (resultSet.next()) {
final Map<String, String> item = Maps.newHashMap();
// 遍歷每一列并打印
for (int i = 1; i <= columnCount; i++) {
String columnName = metaData.getColumnName(i);
String columnValue = resultSet.getString(i);
item.put(columnName, columnValue);
}
log.info(item.toString());
}
}
private static String resourcePath(String path) {
final URL url = CalciteCsvTest.class.getResource("/" + path);
return Sources
.of(url).file().getAbsolutePath();
}
}
6.2 SchemaFactory
package com.ldx.calcite;
import com.mysql.cj.jdbc.MysqlDataSource;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.calcite.adapter.jdbc.JdbcSchema;
import org.apache.calcite.avatica.util.Quoting;
import org.apache.calcite.config.CalciteConnectionProperty;
import org.apache.calcite.jdbc.CalciteConnection;
import org.apache.calcite.jdbc.CalciteSchema;
import org.apache.calcite.schema.SchemaFactory;
import org.apache.calcite.schema.SchemaPlus;
import org.apache.calcite.util.Sources;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.postgresql.ds.PGSimpleDataSource;
import org.postgresql.osgi.PGDataSourceFactory;
import org.testng.collections.Maps;
import javax.sql.DataSource;
import java.net.URL;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.Map;
import java.util.Properties;
@Slf4j
public class CalciteCreateMataDataTest {
private static Statement statement;
@BeforeAll
@SneakyThrows
public static void beforeAll() {
Properties info = new Properties();
// 不區分sql大小寫
info.setProperty("caseSensitive", "false");
// 設置引用標識符為反引號
info.setProperty(CalciteConnectionProperty.QUOTING.camelName(), Quoting.BACK_TICK.name());
// 創建Calcite連接
Connection connection = DriverManager.getConnection("jdbc:calcite:", info);
CalciteConnection calciteConnection = connection.unwrap(CalciteConnection.class);
// 構建RootSchema,在Calcite中,RootSchema是所有數據源schema的parent,多個不同數據源schema可以掛在同一個RootSchema下
SchemaPlus rootSchema = calciteConnection.getRootSchema();
// 設置默認的schema, 如果不設置需要加上對應數據源的名稱
calciteConnection.setSchema("my_mysql");
final DataSource mysqlDataSource = getMysqlDataSource();
final JdbcSchema schemaWithMysql = JdbcSchema.create(rootSchema, "my_mysql", mysqlDataSource, "test", null);
final DataSource postgresDataSource = getPostgresDataSource();
final JdbcSchema schemaWithPostgres = JdbcSchema.create(rootSchema, "my_postgres", postgresDataSource, "test", "public");
rootSchema.add("my_mysql", schemaWithMysql);
rootSchema.add("my_postgres", schemaWithPostgres);
// 創建SQL語句執行查詢
statement = calciteConnection.createStatement();
}
private static DataSource getMysqlDataSource() {
MysqlDataSource dataSource = new MysqlDataSource();
dataSource.setUrl("jdbc:mysql://localhost:3306/test");
dataSource.setUser("root");
dataSource.setPassword("123456");
return dataSource;
}
private static DataSource getPostgresDataSource() {
final PGSimpleDataSource pgSimpleDataSource = new PGSimpleDataSource();
pgSimpleDataSource.setUrl("jdbc:postgresql://localhost:5432/test");
pgSimpleDataSource.setUser("root");
pgSimpleDataSource.setPassword("123456");
return pgSimpleDataSource;
}
@Test
@SneakyThrows
public void test_connection() {
final ResultSet resultSet = statement.executeQuery("SELECT * FROM `user`");
printResultSet(resultSet);
}
@Test
@SneakyThrows
public void test_cross_db_query() {
final ResultSet resultSet = statement.executeQuery("SELECT u.*,r.name FROM `user` u left join my_postgres.`role` r on u.role_key = r.role_key");
printResultSet(resultSet);
}
public static void printResultSet(ResultSet resultSet) throws SQLException {
// 獲取 ResultSet 元數據
ResultSetMetaData metaData = resultSet.getMetaData();
// 獲取列數
int columnCount = metaData.getColumnCount();
log.info("Number of columns: {}",columnCount);
// 遍歷 ResultSet 并打印結果
while (resultSet.next()) {
final Map<String, String> item = Maps.newHashMap();
// 遍歷每一列并打印
for (int i = 1; i <= columnCount; i++) {
String columnName = metaData.getColumnName(i);
String columnValue = resultSet.getString(i);
item.put(columnName, columnValue);
}
log.info(item.toString());
}
}
private static String resourcePath(String path) {
final URL url = CalciteCsvTest.class.getResource("/" + path);
return Sources
.of(url).file().getAbsolutePath();
}
}

浙公網安備 33010602011771號