2. 什么?你想跨數據庫關聯查詢?
1. 簡介
我們平時開發中可能會遇到這樣的問題,現在分布式環境下每個服務對應的數據庫都是獨立的,每個應用使用的都是自己的數據庫,或者項目現場我們的服務需要使用第三方的提供的數據,但是第三方直接把數據庫信息扔給我們,讓我們自己去查詢,像這種情況我們一般就兩種做法
- 在我們的服務中添加一個數據源然后添加持久層進行操作
- 另起一個服務,然后這個服務去連接第三方數據庫最后提供服務
其實這兩種方法本質是一樣的,就是添加對應的數據源,然后添加一堆持久層的對象,最后在service中對多個數據源的結果各種組裝,實現起來頂多就是麻煩點,難度到不高。這個過程中其實最麻煩的點是數據的組裝過程,如果業務復雜,組裝起來感覺寫一堆毫無意義的代碼,也沒什么重復利用的價值。那么有沒有一種辦法能直接跨庫查詢,比如將A庫和B庫的表直接進行連表查詢,最好的是A庫和B庫即使不是同一種數據庫,也能進行關聯查詢。
ok,calcite它來了。
2. 實現思路
我們在上一篇文章中講了calcite如何建立元數據,在測試代碼中其實已經實現了跨庫的關聯查詢,本章就利用calcite的特性簡單封裝一個小demo,讓其能提供跨庫查詢能力。
-
聲明兩個類
DataSourceProperty: 配置jdbc連接信息DataSourceManager: 管理jdbc的DataSource對象
ok,有了以上兩個類,就可以做基本的jdbc的數據源管理了,其實只要能拿到DataSource,我們就可以做基本的數據庫操作了,但是現在還不具備跨庫查詢的能力。
-
聲明
SuperDataSourceManager,將DataSource對象注冊給calcite。
其實到這里,我們就可以做跨庫查詢了,但是不好用,因為直接使用Statement,不管是參數封裝還是執行結果的解析,都太原生了(不好用) -
聲明
SuperDataSourceTemplate,類似于spring的JdbcTemplate,用來簡化參數替換和結果解析(這里只是添加一個示例的模板代碼)
3. Maven
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.example</groupId>
<artifactId>super-query</artifactId>
<version>1.0-SNAPSHOT</version>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.7.3</version>
<relativePath/>
</parent>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-jdbc</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>31.1-jre</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.24</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>3.12.0</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-collections4</artifactId>
<version>4.4</version>
</dependency>
<dependency>
<groupId>org.apache.calcite</groupId>
<artifactId>calcite-core</artifactId>
<version>1.36.0</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
</dependency>
<dependency>
<groupId>org.postgresql</groupId>
<artifactId>postgresql</artifactId>
<version>42.2.23</version>
</dependency>
</dependencies>
</project>
4. 核心代碼

4.1 數據源管理器
DataSourceProperty:配置jdbc的連接信息
package com.ldx.superquery.datasource;
import com.zaxxer.hikari.HikariDataSource;
import lombok.Data;
import org.apache.commons.lang3.StringUtils;
import org.springframework.boot.jdbc.DatabaseDriver;
import javax.sql.DataSource;
/**
* 數據源連接信息
*/
@Data
public class DataSourceProperty {
/**
* URL
*/
private String url;
/**
* 用戶名
*/
private String username;
/**
* 密碼
*/
private String password;
/**
* 數據源key
*/
private String key;
/**
* 最多返回條數
*/
private int maxRows = -1;
/**
* 驅動類
*/
private String driverClassName;
/**
* 連接池類型
*/
private String type;
public String getDriverClassName() {
if (StringUtils.isNotBlank(driverClassName)) {
return driverClassName;
}
return DatabaseDriver.fromJdbcUrl(url).getDriverClassName();
}
public Class<? extends DataSource> getTypeClass() {
try {
//noinspection unchecked
return (Class<? extends DataSource>) Class.forName(type);
}
catch (Exception e) {
return HikariDataSource.class;
}
}
}
DataSourceManager:管理jdbc的連接信息,并且最后注冊給calcite
package com.ldx.superquery.datasource;
import com.google.common.collect.Maps;
import lombok.Data;
import org.springframework.boot.jdbc.DataSourceBuilder;
import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate;
import org.springframework.jdbc.datasource.DataSourceTransactionManager;
import org.springframework.transaction.PlatformTransactionManager;
import javax.sql.DataSource;
import java.util.Map;
/**
* 數據源管理器
*/
public class DataSourceManager {
private final Map<String, DataSourceNode> DATA_SOURCE_MAP = Maps.newConcurrentMap();
public void register(String dataSourceKey, DataSource dataSource) {
this.register(new DataSourceNode(dataSourceKey, dataSource));
}
public void register(DataSourceProperty dataSourceProperty) {
final DataSourceNode dataSourceNode = new DataSourceNode(dataSourceProperty);
this.register(dataSourceNode);
}
public void register(DataSourceNode dataSourceNode) {
final String dataSourceKey = dataSourceNode.getDataSourceKey();
DATA_SOURCE_MAP.put(dataSourceKey, dataSourceNode);
SuperDataSourceManager.register(dataSourceKey, dataSourceNode.getDataSource());
}
public DataSourceNode getDataSource(String dataSourceKey) {
return DATA_SOURCE_MAP.get(dataSourceKey);
}
public void unregister(String dataSourceKey) {
DATA_SOURCE_MAP.remove(dataSourceKey);
}
/**
* 用來二次封裝 datasource
*/
@Data
public static class DataSourceNode {
private String dataSourceKey;
private DataSource dataSource;
// spring內置的named template
private NamedParameterJdbcTemplate jdbcTemplate;
// 事務管理器
private PlatformTransactionManager platformTransactionManager;
public DataSourceNode(String dataSourceKey, DataSource dataSource) {
this.dataSourceKey = dataSourceKey;
this.dataSource = dataSource;
this.jdbcTemplate = new NamedParameterJdbcTemplate(dataSource);
this.platformTransactionManager = new DataSourceTransactionManager(dataSource);
}
public DataSourceNode(DataSourceProperty dataSourceProperty) {
this(dataSourceProperty.getKey(), DataSourceBuilder
.create()
.url(dataSourceProperty.getUrl())
.username(dataSourceProperty.getUsername())
.password(dataSourceProperty.getPassword())
.driverClassName(dataSourceProperty.getDriverClassName())
.type(dataSourceProperty.getTypeClass())
.build());
}
}
}
4.2 超級數據源管理器
SuperDataSourceManager: 用來管理calcite相關的連接信息
package com.ldx.superquery.datasource;
import org.apache.calcite.adapter.jdbc.JdbcSchema;
import org.apache.calcite.jdbc.CalciteConnection;
import org.apache.calcite.schema.Schema;
import org.apache.calcite.schema.SchemaPlus;
import javax.sql.DataSource;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.Properties;
/**
* 超級數據源管理器
*/
public class SuperDataSourceManager {
private static final SchemaPlus ROOT_SCHEMA;
private static final CalciteConnection CALCITE_CONNECTION;
static {
// see CalciteConnectionProperty
Properties info = new Properties();
info.setProperty("lex", "JAVA");
// 不區分大小寫
info.setProperty("caseSensitive", "false");
Connection connection = null;
try {
connection = DriverManager.getConnection("jdbc:calcite:", info);
CALCITE_CONNECTION = connection.unwrap(CalciteConnection.class);
}
catch (SQLException e) {
throw new RuntimeException("create calcite connection failed", e);
}
ROOT_SCHEMA = CALCITE_CONNECTION.getRootSchema();
}
public static void register(String dataSourceKey, DataSource dataSource) {
Schema schema = JdbcSchema.create(ROOT_SCHEMA, dataSourceKey, dataSource, null, null);
ROOT_SCHEMA.add(dataSourceKey, schema);
}
public static Statement getStatement() {
try {
return CALCITE_CONNECTION.createStatement();
}
catch (SQLException e) {
throw new RuntimeException("create calcite statement failed", e);
}
}
}
4.3 JdbcTemplate
SuperJdbcTemplate : 一個簡單的門面來提供一些常用的jdbc相關方法
package com.ldx.superquery.datasource;
import org.springframework.jdbc.core.ColumnMapRowMapper;
import org.springframework.jdbc.core.ResultSetExtractor;
import org.springframework.jdbc.core.RowMapperResultSetExtractor;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.List;
import java.util.Map;
/**
* 超級數據源template
*/
public class SuperJdbcTemplate {
private final Statement statement;
public SuperJdbcTemplate(Statement statement) {
this.statement = statement;
}
public List<Map<String, Object>> queryForList(String sql) throws SQLException {
final ColumnMapRowMapper columnMapRowMapper = new ColumnMapRowMapper();
return this.query(sql, new RowMapperResultSetExtractor<>(columnMapRowMapper));
}
public <T> T query(String sql, ResultSetExtractor<T> rse) throws SQLException {
try (ResultSet resultSet = statement.executeQuery(sql)) {
return rse.extractData(resultSet);
}
}
}
5. 測試用例
5.1 數據庫
數據庫用到的還是我們的老演員:

5.2 測試用例代碼
這里分別測試了單庫的查詢,也測試了跨庫的連表查詢
package com.ldx.superquery.datasource;
import com.google.common.collect.Maps;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate;
import java.util.List;
import java.util.Map;
/**
* 測試超級查詢
*/
@Slf4j
public class SuperQueryTest {
private static final String MYSQL_KEY = "mysql_test";
private static final String POSTGRES_KEY = "postgres_test";
private static DataSourceManager dsm;
@BeforeAll
public static void given_datasource_manager() {
final DataSourceManager dataSourceManager = new DataSourceManager();
final DataSourceProperty mysqlDataSourceProperty = buildMysqlDataSourceProperty();
final DataSourceProperty postgresDataSourceProperty = buildPostgresDataSourceProperty();
dataSourceManager.register(mysqlDataSourceProperty);
dataSourceManager.register(postgresDataSourceProperty);
dsm = dataSourceManager;
}
@Test
public void should_return_records_when_use_spring_jdbc_for_mysql() {
final DataSourceManager.DataSourceNode ds = dsm.getDataSource(MYSQL_KEY);
final NamedParameterJdbcTemplate jdbcTemplate = ds.getJdbcTemplate();
final Map<String, Object> params = Maps.newHashMap();
params.put("id", 1);
final List<Map<String, Object>> result = jdbcTemplate.queryForList("select * from `user` where id = :id", params);
log.info("execute query for mysql datasource results: {}", result);
}
@Test
public void should_return_records_when_use_spring_jdbc_for_postgres() {
final DataSourceManager.DataSourceNode ds = dsm.getDataSource(POSTGRES_KEY);
final NamedParameterJdbcTemplate jdbcTemplate = ds.getJdbcTemplate();
final Map<String, Object> params = Maps.newHashMap();
params.put("role_key", 1);
final List<Map<String, Object>> result = jdbcTemplate.queryForList("select * from role where role_key = :role_key", params);
log.info("execute query for postgres datasource results: {}", result);
}
@Test
@SneakyThrows
public void should_return_records_when_use_super_jdbc_for_postgres() {
final SuperJdbcTemplate SuperJdbcTemplate = new SuperJdbcTemplate(SuperDataSourceManager.getStatement());
final List<Map<String, Object>> result = SuperJdbcTemplate.queryForList("select * from "+ POSTGRES_KEY +".role");
log.info("execute super query for postgres datasource results: {}", result);
}
@Test
@SneakyThrows
public void should_return_records_when_use_super_jdbc() {
final SuperJdbcTemplate SuperJdbcTemplate = new SuperJdbcTemplate(SuperDataSourceManager.getStatement());
final List<Map<String, Object>> result = SuperJdbcTemplate.queryForList("select * from "+ POSTGRES_KEY +".role r right join "+ MYSQL_KEY + ".`user` u on r.role_key = u.role_key");
log.info("execute super query for postgres datasource results: ");
result.forEach(item -> log.info(item.toString()));
}
private static DataSourceProperty buildMysqlDataSourceProperty() {
final DataSourceProperty dataSourceProperty = new DataSourceProperty();
dataSourceProperty.setDriverClassName("com.mysql.cj.jdbc.Driver");
dataSourceProperty.setKey(MYSQL_KEY);
dataSourceProperty.setUrl("jdbc:mysql://localhost:3306/test");
dataSourceProperty.setUsername("root");
dataSourceProperty.setPassword("123456");
return dataSourceProperty;
}
private static DataSourceProperty buildPostgresDataSourceProperty() {
final DataSourceProperty dataSourceProperty = new DataSourceProperty();
dataSourceProperty.setDriverClassName("org.postgresql.Driver");
dataSourceProperty.setKey(POSTGRES_KEY);
dataSourceProperty.setUrl("jdbc:postgresql://localhost:5432/test");
dataSourceProperty.setUsername("root");
dataSourceProperty.setPassword("123456");
return dataSourceProperty;
}
}
5.3 測試結果展示
這里只展示一下
should_return_records_when_use_super_jdbc用例的執行


浙公網安備 33010602011771號