6. Calcite添加自定義函數(shù)
1. 簡(jiǎn)介
在上篇博文中介紹了如何使用calcite進(jìn)行sql驗(yàn)證, 但是真正在實(shí)際生產(chǎn)環(huán)境中我們可能需要使用到
- 用戶自定義函數(shù)(UDF): 通過代碼實(shí)現(xiàn)對(duì)應(yīng)的函數(shù)邏輯并注冊(cè)給calcite
- sql驗(yàn)證: 將UDF信息注冊(cè)給calcite,
SqlValidator.validator驗(yàn)證階段即可通過驗(yàn)證 - sql執(zhí)行: calcite通過調(diào)用UDF邏輯實(shí)現(xiàn)函數(shù)邏輯
- sql驗(yàn)證: 將UDF信息注冊(cè)給calcite,
- 自定義db函數(shù): 數(shù)據(jù)庫中創(chuàng)建的自定義函數(shù)
- sql驗(yàn)證: 將自定義的db函數(shù)信息注冊(cè)給calcite,
SqlValidator.validator驗(yàn)證階段即可通過驗(yàn)證 - sql執(zhí)行: 下推到db執(zhí)行對(duì)應(yīng)的db函數(shù)
- sql驗(yàn)證: 將自定義的db函數(shù)信息注冊(cè)給calcite,
此時(shí)我們需要將自定義的函數(shù)注冊(cè)到calcite中, 用于sql驗(yàn)證和執(zhí)行. 例如注冊(cè)一個(gè)簡(jiǎn)單的函數(shù) 如: 將數(shù)據(jù)庫中的性別字段值做字典轉(zhuǎn)換.
2. Maven
<dependency>
<groupId>org.apache.calcite</groupId>
<artifactId>calcite-core</artifactId>
<version>1.37.0</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.33</version>
</dependency>
2. UDF
如上述所說, UDF是將用戶自定義的方法注冊(cè)為函數(shù)使用的, 首先看一下calcite是如何注冊(cè)UDF的
SchemaPlus#add(String name, Function function);
其Function的實(shí)現(xiàn)類如下:

-
定義UDF實(shí)現(xiàn)
public class Udf { public static String dictSex(String code) { if (StringUtils.isBlank(code)) { return code; } if (StringUtils.equals(code, "1")) { return "男"; } else if (StringUtils.equals(code, "2")) { return "女"; } else { return "未知"; } } } -
把
dictSex方法注冊(cè)到calcite中, 因?yàn)樯鲜龅姆椒ㄝ斎敕祷氐亩际菃我恢? 所以直接注冊(cè)為標(biāo)量函數(shù)即可(如果是聚合函數(shù)可以使用AggregateFunction)// 指定函數(shù)名稱 和 對(duì)應(yīng)函數(shù)的class & method name rootSchema.add("dict_sex", ScalarFunctionImpl.create(Udf.class, "dictSex")); -
測(cè)試執(zhí)行
final ResultSet resultSet = statement.executeQuery("SELECT username, dict_sex(sex) sex_name FROM `user`"); printResultSet(resultSet);表數(shù)據(jù)如下

輸出結(jié)果
c.l.c.CalciteFuncTest - [printResultSet,86] - Number of columns: 2 c.l.c.CalciteFuncTest - [printResultSet,98] - {sex_name=男, username=張三} c.l.c.CalciteFuncTest - [printResultSet,98] - {sex_name=女, username=李四} c.l.c.CalciteFuncTest - [printResultSet,98] - {sex_name=女, username=張鐵牛}
3. 自定義db函數(shù)
首先 我們定義一個(gè)db 函數(shù)實(shí)現(xiàn)字典值的轉(zhuǎn)換
DELIMITER //
CREATE FUNCTION dict_sex(code VARCHAR(10))
RETURNS VARCHAR(10)
DETERMINISTIC
BEGIN
-- 如果code為空或只包含空白字符,則直接返回code
IF code IS NULL OR TRIM(code) = '' THEN
RETURN code;
END IF;
-- 如果code為'1'則返回'男'
IF code = '1' THEN
RETURN '男';
-- 如果code為'2'則返回'女'
ELSEIF code = '2' THEN
RETURN '女';
ELSE
RETURN '未知';
END IF;
END //
DELIMITER ;
驗(yàn)證函數(shù)功能

ok, 函數(shù)創(chuàng)建完成, 我們將函數(shù)注冊(cè)到calcite中
calcite中sqlfunction有很多其已經(jīng)實(shí)現(xiàn)的類, 我們這里使用SqlBasicFunction來創(chuàng)建我們的函數(shù)

-
定義SqlFunction
/* * SqlBasicFunction create(String name, SqlReturnTypeInference returnTypeInference, SqlOperandTypeChecker operandTypeChecker) * name: 函數(shù)名稱 * returnTypeInference: 返回值類型 * operandTypeChecker: 函數(shù)入?yún)⒌男r?yàn)器 */ SqlFunction DICT_SEX = SqlBasicFunction.create("dict_sex", ReturnTypes.VARCHAR, OperandTypes.family(SqlTypeFamily.CHARACTER)); -
注冊(cè)SqlFunction
從上篇博文中我們知道, calcite的sql函數(shù)都注冊(cè)到了
SqlStdOperatorTable類中, 所以我們只需要將自定義的函數(shù)注冊(cè)進(jìn)即可final SqlStdOperatorTable sqlStdOperatorTable = SqlStdOperatorTable.instance(); sqlStdOperatorTable.register(DICT_SEX);對(duì), 就這么簡(jiǎn)單. 因?yàn)?code>SqlStdOperatorTable類是單例模式, 所以我們可以隨時(shí)隨地的進(jìn)行注冊(cè), 其驗(yàn)證邏輯就可以直接調(diào)用了
當(dāng)然, 看了其他博客大多數(shù)都是繼承
SqlStdOperatorTable類實(shí)現(xiàn)自定義SqlStdOperatorTable的 如下, 最后使用自己的SqlStdOperatorTable即可public static class SqlCustomOperatorTable extends SqlStdOperatorTable { private static SqlCustomOperatorTable instance; // 只需要申明為成員變量即可, instance.init() 的時(shí)候會(huì)反射取變量進(jìn)行注冊(cè) public static final SqlFunction DICT_SEX = SqlBasicFunction.create("dict_sex", ReturnTypes.VARCHAR, OperandTypes.family(SqlTypeFamily.CHARACTER)); public static synchronized SqlCustomOperatorTable instance() { if (instance == null) { instance = new SqlCustomOperatorTable(); instance.init(); } return instance; } /** * 如果想修改獲取函數(shù)的過程, 可以重寫此方法 */ @Override protected void lookUpOperators(String name, boolean caseSensitive, Consumer<SqlOperator> consumer) { super.lookUpOperators(name, caseSensitive, consumer); } } -
測(cè)試執(zhí)行
final ResultSet resultSet = statement.executeQuery("SELECT username, dict_sex(sex) sex_name FROM `user`"); printResultSet(resultSet);輸出結(jié)果
c.l.c.CalciteFuncTest - [printResultSet,86] - Number of columns: 2 c.l.c.CalciteFuncTest - [printResultSet,98] - {sex_name=男, username=張三} c.l.c.CalciteFuncTest - [printResultSet,98] - {sex_name=女, username=李四} c.l.c.CalciteFuncTest - [printResultSet,98] - {sex_name=女, username=張鐵牛}經(jīng)測(cè)試: 如果udf 和 sqlfunction 同時(shí)存在的時(shí)候 優(yōu)先使用udf
4. 完整代碼
4.1 udf
package com.ldx.calcite;
import com.google.common.collect.Maps;
import com.mysql.cj.jdbc.MysqlDataSource;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.calcite.adapter.jdbc.JdbcSchema;
import org.apache.calcite.config.Lex;
import org.apache.calcite.jdbc.CalciteConnection;
import org.apache.calcite.schema.SchemaPlus;
import org.apache.calcite.schema.impl.ScalarFunctionImpl;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import javax.sql.DataSource;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.Map;
import java.util.Properties;
import static org.apache.calcite.config.CalciteConnectionProperty.LEX;
@Slf4j
public class CalciteFuncWithUdfTest {
private static Statement statement;
@BeforeAll
@SneakyThrows
public static void beforeAll() {
Properties info = new Properties();
// 不區(qū)分sql大小寫
info.setProperty("caseSensitive", "false");
info.setProperty(LEX.camelName(), Lex.MYSQL.name());
// 創(chuàng)建Calcite連接
Connection connection = DriverManager.getConnection("jdbc:calcite:", info);
CalciteConnection calciteConnection = connection.unwrap(CalciteConnection.class);
// 構(gòu)建RootSchema,在Calcite中,RootSchema是所有數(shù)據(jù)源schema的parent,多個(gè)不同數(shù)據(jù)源schema可以掛在同一個(gè)RootSchema下
SchemaPlus rootSchema = calciteConnection.getRootSchema();
// 設(shè)置默認(rèn)的schema, 如果不設(shè)置sql中需要加上對(duì)應(yīng)數(shù)據(jù)源的名稱
calciteConnection.setSchema("my_mysql");
final DataSource mysqlDataSource = getMysqlDataSource();
final JdbcSchema schemaWithMysql = JdbcSchema.create(rootSchema, "my_mysql", mysqlDataSource, "test", null);
final SchemaPlus myMysqlSchema = rootSchema.add("my_mysql", schemaWithMysql);
// 全局注冊(cè)
rootSchema.add("dict_sex", ScalarFunctionImpl.create(Udf.class, "dictSex"));
statement = calciteConnection.createStatement();
// 只注冊(cè)到mysql schema中
// myMysqlSchema.add("dict_sex", ScalarFunctionImpl.create(Udf.class, "dictSex"));
// 創(chuàng)建SQL語句執(zhí)行查詢
statement = calciteConnection.createStatement();
}
@Test
@SneakyThrows
public void test_udf_func() {
final ResultSet resultSet = statement.executeQuery("SELECT username, dict_sex(sex) sex_name FROM `user`");
printResultSet(resultSet);
}
private static DataSource getMysqlDataSource() {
MysqlDataSource dataSource = new MysqlDataSource();
dataSource.setUrl("jdbc:mysql://localhost:3306/test");
dataSource.setUser("root");
dataSource.setPassword("123456");
return dataSource;
}
public static void printResultSet(ResultSet resultSet) throws SQLException {
// 獲取 ResultSet 元數(shù)據(jù)
ResultSetMetaData metaData = resultSet.getMetaData();
// 獲取列數(shù)
int columnCount = metaData.getColumnCount();
log.info("Number of columns: {}",columnCount);
// 遍歷 ResultSet 并打印結(jié)果
while (resultSet.next()) {
final Map<String, String> item = Maps.newHashMap();
// 遍歷每一列并打印
for (int i = 1; i <= columnCount; i++) {
String columnName = metaData.getColumnName(i);
String columnValue = resultSet.getString(i);
item.put(columnName, columnValue);
}
log.info(item.toString());
}
}
}
4.2 db func
package com.ldx.calcite;
import com.google.common.collect.Maps;
import com.mysql.cj.jdbc.MysqlDataSource;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.calcite.adapter.jdbc.JdbcSchema;
import org.apache.calcite.config.Lex;
import org.apache.calcite.jdbc.CalciteConnection;
import org.apache.calcite.schema.SchemaPlus;
import org.apache.calcite.sql.SqlBasicFunction;
import org.apache.calcite.sql.SqlFunction;
import org.apache.calcite.sql.SqlOperator;
import org.apache.calcite.sql.fun.SqlStdOperatorTable;
import org.apache.calcite.sql.type.OperandTypes;
import org.apache.calcite.sql.type.ReturnTypes;
import org.apache.calcite.sql.type.SqlTypeFamily;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import javax.sql.DataSource;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.Map;
import java.util.Properties;
import java.util.function.Consumer;
import static org.apache.calcite.config.CalciteConnectionProperty.LEX;
@Slf4j
public class CalciteFuncWithDbTest {
private static Statement statement;
public static final SqlFunction DICT_SEX = SqlBasicFunction.create("dict_sex", ReturnTypes.VARCHAR, OperandTypes.family(SqlTypeFamily.CHARACTER));
@BeforeAll
@SneakyThrows
public static void beforeAll() {
Properties info = new Properties();
// 不區(qū)分sql大小寫
info.setProperty("caseSensitive", "false");
info.setProperty(LEX.camelName(), Lex.MYSQL.name());
// 創(chuàng)建Calcite連接
Connection connection = DriverManager.getConnection("jdbc:calcite:", info);
CalciteConnection calciteConnection = connection.unwrap(CalciteConnection.class);
// 構(gòu)建RootSchema,在Calcite中,RootSchema是所有數(shù)據(jù)源schema的parent,多個(gè)不同數(shù)據(jù)源schema可以掛在同一個(gè)RootSchema下
SchemaPlus rootSchema = calciteConnection.getRootSchema();
// 設(shè)置默認(rèn)的schema, 如果不設(shè)置sql中需要加上對(duì)應(yīng)數(shù)據(jù)源的名稱
calciteConnection.setSchema("my_mysql");
final DataSource mysqlDataSource = getMysqlDataSource();
final JdbcSchema schemaWithMysql = JdbcSchema.create(rootSchema, "my_mysql", mysqlDataSource, "test", null);
rootSchema.add("my_mysql", schemaWithMysql);
final SqlStdOperatorTable sqlStdOperatorTable = SqlStdOperatorTable.instance();
sqlStdOperatorTable.register(DICT_SEX);
statement = calciteConnection.createStatement();
}
@Test
@SneakyThrows
public void test_db_func() {
final ResultSet resultSet = statement.executeQuery("SELECT dict_sex(sex) sex_name FROM `user`");
printResultSet(resultSet);
}
private static DataSource getMysqlDataSource() {
MysqlDataSource dataSource = new MysqlDataSource();
dataSource.setUrl("jdbc:mysql://localhost:3306/test");
dataSource.setUser("root");
dataSource.setPassword("123456");
return dataSource;
}
public static void printResultSet(ResultSet resultSet) throws SQLException {
// 獲取 ResultSet 元數(shù)據(jù)
ResultSetMetaData metaData = resultSet.getMetaData();
// 獲取列數(shù)
int columnCount = metaData.getColumnCount();
log.info("Number of columns: {}",columnCount);
while (resultSet.next()) {
final Map<String, String> item = Maps.newHashMap();
// 遍歷每一列并打印
for (int i = 1; i <= columnCount; i++) {
String columnName = metaData.getColumnName(i);
String columnValue = resultSet.getString(i);
item.put(columnName, columnValue);
}
log.info(item.toString());
}
}
public static class SqlCustomOperatorTable extends SqlStdOperatorTable {
private static SqlCustomOperatorTable instance;
// 只需要申明為成員變量即可, instance.init() 的時(shí)候會(huì)反射取變量進(jìn)行注冊(cè)
public static final SqlFunction DICT_SEX = SqlBasicFunction.create("dict_sex", ReturnTypes.VARCHAR, OperandTypes.family(SqlTypeFamily.CHARACTER));
public static synchronized SqlCustomOperatorTable instance() {
if (instance == null) {
instance = new SqlCustomOperatorTable();
instance.init();
}
return instance;
}
/**
* 如果想修改獲取函數(shù)的過程, 可以重寫此方法
*/
@Override
protected void lookUpOperators(String name, boolean caseSensitive, Consumer<SqlOperator> consumer) {
super.lookUpOperators(name, caseSensitive, consumer);
}
}
}

浙公網(wǎng)安備 33010602011771號(hào)