3. 使用sql查詢csv/json文件內容,還能關聯查詢?
1. 簡介
我們在前面的文章提到了calcite可以支持文件系統的數據源適配, 其實官方已經提供了相應的能力, 其支持csv和json的查詢適配, 廢話不多說, 直接展示.
2. Maven
<!-- calcite文件系統支持 -->
<dependency>
<groupId>org.apache.calcite</groupId>
<artifactId>calcite-file</artifactId>
<version>1.37.0</version>
</dependency>
<dependency>
<groupId>org.apache.calcite</groupId>
<artifactId>calcite-core</artifactId>
<version>1.37.0</version>
</dependency>
3. 數據文件準備
3.1 csv
user_info.csv
首行將來被解析成表的字段, 冒號后面是字段類型, 如果未指定類型 默認使用varchar
ID:long,姓名:string,GENDER:string,BIRTHDAY:date
100,"張三",,"2001-01-01"
110,"李四","M","2001-01-01"
120,"王五","M","2002-05-03"
130,"趙六","F","2005-09-07"
140,"張鐵牛","M","2007-01-01"
3.2 json
role_info.json
[
{
"id": 123,
"name": "管理員",
"key": "manager"
},
{
"id": 234,
"name": "老師",
"key": "teacher"
},
{
"id": 345,
"name": "學生",
"key": "student"
}
]
然后將文件放到resources/file目錄下
4. 核心代碼
package com.ldx.calcite;
import com.google.common.collect.ImmutableMap;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.calcite.adapter.file.FileSchemaFactory;
import org.apache.calcite.jdbc.CalciteConnection;
import org.apache.calcite.schema.Schema;
import org.apache.calcite.schema.SchemaPlus;
import org.apache.calcite.util.Sources;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.testng.collections.Maps;
import java.net.URL;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.Map;
import java.util.Properties;
@Slf4j
public class CalciteFileTest {
private static Connection connection;
private static SchemaPlus rootSchema;
private static Statement statement;
@BeforeAll
@SneakyThrows
public static void beforeAll() {
Properties info = new Properties();
// 不區分sql大小寫
info.setProperty("caseSensitive", "false");
// 創建Calcite連接
connection = DriverManager.getConnection("jdbc:calcite:", info);
CalciteConnection calciteConnection = connection.unwrap(CalciteConnection.class);
// 構建RootSchema,在Calcite中,RootSchema是所有數據源schema的parent,多個不同數據源schema可以掛在同一個RootSchema下
rootSchema = calciteConnection.getRootSchema();
final Schema schema = FileSchemaFactory.INSTANCE.create(rootSchema, "x",
ImmutableMap.of("directory", resourcePath("file"), "flavor", "scannable"));
rootSchema.add("test", schema);
// 創建SQL語句執行查詢
statement = calciteConnection.createStatement();
}
@Test
@SneakyThrows
public void execute_simple_query() {
ResultSet resultSet = statement.executeQuery("SELECT * FROM test.user_info");
printResultSet(resultSet);
}
@Test
@SneakyThrows
public void test_execute_join_query() {
ResultSet resultSet = statement.executeQuery("SELECT * FROM test.user_info ui inner join test.role_info ri on ui.role_id = ri.id");
printResultSet(resultSet);
}
@AfterAll
@SneakyThrows
public static void closeResource() {
statement.close();
connection.close();
}
private static String resourcePath(String path) {
final URL url = CalciteFileTest.class.getResource("/" + path);
return Sources.of(url).file().getAbsolutePath();
}
public static void printResultSet(ResultSet resultSet) throws SQLException {
// 獲取 ResultSet 元數據
ResultSetMetaData metaData = resultSet.getMetaData();
// 獲取列數
int columnCount = metaData.getColumnCount();
log.info("Number of columns: {}",columnCount);
// 遍歷 ResultSet 并打印結果
while (resultSet.next()) {
final Map<String, String> item = Maps.newHashMap();
// 遍歷每一列并打印
for (int i = 1; i <= columnCount; i++) {
String columnName = metaData.getColumnName(i);
String columnValue = resultSet.getString(i);
item.put(columnName, columnValue);
}
log.info(item.toString());
}
}
}
其實核心代碼就幾行, 如下:
通過FileSchemaFactory指定文件目錄和文件內容的讀取方式, 默認將指定目錄下的csv和json文件讀取成Table, 表名就是file的名稱
flavor:
-
SCANNABLE: 數據掃描。會更側重于快速地讀取和遍歷數據。這種方式適用于需要對大量數據進行全表掃描或者范圍掃描的情況,例如統計匯總操作 -
FILTERABLE: 數據過濾。會更側重于數據的條件篩選,比如在 SQL 查詢中的WHERE子句。 -
TRANSLATABLE: 數據轉換。會更側重于數據轉換,以滿足特定的查詢需求或者數據處理要求。這種轉換可能包括數據類型的轉換(如將字符串類型的數字轉換為實際的數值類型)、格式轉換(如日期格式的調整)等。
// 這里的第二個參數“x”沒什么意義, 源碼中沒用到, 可以隨便填
final Schema schema = FileSchemaFactory.INSTANCE.create(rootSchema, "x",
ImmutableMap.of("directory", resourcePath("file"), "flavor", "scannable"));
// 使用目錄名稱為schema名稱, 這里的test就是schema名稱
rootSchema.add("test", schema);
calcite也可以做對應表的關聯查詢, 測試中csv關聯json文件信息
"SELECT * FROM test.user_info ui inner join test.role_info ri on ui.role_id = ri.id"
5. 測試查詢
execute_simple_query方法執行如下

test_execute_join_query方法執行如下:


浙公網安備 33010602011771號