Flink 自定義 ClickHouse Table Connector 的簡(jiǎn)單實(shí)現(xiàn)
本次實(shí)現(xiàn)基于 Flink 1.18 版本,具體的對(duì)象之間的關(guān)系可以先參考官網(wǎng)的圖:

先拿官網(wǎng)上的 Socket 示例來(lái)說(shuō)一下實(shí)現(xiàn)過(guò)程:
- 首先編寫
SocketDynamicTableFactory實(shí)現(xiàn)DynamicTableSourceFactory接口。 - 在
SocketDynamicTableFactory中會(huì)返回SocketDynamicTableSource,同時(shí)返回實(shí)現(xiàn)了ScanTableSource接口。 - 在
SockeDynamicTableSource中返回了SocketSourceFunction,而具體的邏輯就是寫在SocketSourceFunction中的。 SocketSourceFunction需要繼承RichSourceFunction<RowData>類同時(shí)實(shí)現(xiàn)ResultTypeQueryable<RowData>接口,在其中的run方法中實(shí)現(xiàn)主要的邏輯,將結(jié)果發(fā)送至下游。- 另外關(guān)于序列化部分,需要編寫
ChangelogCsvFormatFactory實(shí)現(xiàn)DeserializationFormatFactory接口,在其中會(huì)返回ChangelogCsvFormat。 - 而
ChangelogCsvFormat會(huì)實(shí)現(xiàn)DecodingFormat<DeserializationSchema<RowData>>接口,并在其中返回ChangelogCsvDeserializer。 - 同時(shí)
ChangelogCsvDeserializer又實(shí)現(xiàn)了DeserializationSchema<RowData>接口,并在其主要的方法deserialize中實(shí)現(xiàn)二進(jìn)制反序列化的過(guò)程,也就是轉(zhuǎn)換為RowData的過(guò)程。
官網(wǎng)的示例鏈接為:https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/dev/table/sourcessinks/
下面我們基于這個(gè)原理來(lái)實(shí)現(xiàn)一個(gè)簡(jiǎn)單的 ClickHouse Table Source,我們不做復(fù)雜的字段映射,僅完成指定表數(shù)據(jù)的讀取,簡(jiǎn)單的將這個(gè)過(guò)程過(guò)一遍。
需要說(shuō)明的是 Connector 項(xiàng)目和客戶端的項(xiàng)目必須拆分為兩個(gè)項(xiàng)目,因?yàn)?Connector 項(xiàng)目需要通過(guò) Flink 的 ClassLoader 進(jìn)行加載,也就是需要放到 Flink 的 lib 目錄下然后再重啟才可以,所以如果是一個(gè)整體的項(xiàng)目,那么 Flink 將會(huì)報(bào)錯(cuò)找不到具體的 Connector。
我們首先來(lái)創(chuàng)建 Connector 的項(xiàng)目,使用 Maven 來(lái)管理,pom.xml 文件如下:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.example</groupId>
<artifactId>flink-1.18-table-source-example</artifactId>
<version>1.0.0-SNAPSHOT</version>
<name>Flink Table Source Example</name>
<properties>
<maven.compiler.source>11</maven.compiler.source>
<maven.compiler.target>11</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<flink.version>1.18.0</flink.version>
<log4j.version>2.17.1</log4j.version>
</properties>
<repositories>
<repository>
<id>aliyun-maven</id>
<name>阿里云 central倉(cāng)和jcenter倉(cāng)的聚合倉(cāng)</name>
<url>https://maven.aliyun.com/repository/public</url>
</repository>
</repositories>
<dependencies>
<!-- Table API 開(kāi)發(fā) -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<version>1.18.0</version>
<scope>provided</scope>
</dependency>
<!-- ClickHouse Client -->
<dependency>
<groupId>com.clickhouse</groupId>
<!-- or clickhouse-grpc-client if you prefer gRPC -->
<artifactId>clickhouse-http-client</artifactId>
<version>0.5.0</version>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents.client5</groupId>
<artifactId>httpclient5</artifactId>
<version>5.2.3</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<archive>
<manifest>
<mainClass></mainClass>
</manifest>
</archive>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id> <!-- this is used for inheritance merges -->
<phase>package</phase> <!-- bind to the packaging phase -->
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
這里引入的 Flink Table API 開(kāi)發(fā)相關(guān)的包都標(biāo)記為 provided ,因?yàn)?Flink 本身已經(jīng)存在這個(gè)包了,然后 ClickHouse 相關(guān)包需要作為依賴打進(jìn)去,如果不打進(jìn)去的話需要把獨(dú)立的包放到 Flink 的 lib 目錄下,因?yàn)?Connector 依賴 ClickHouse Client,所以這倆依賴必須同時(shí)加載,在提交任務(wù)時(shí)包含依賴是無(wú)效的,如果依賴不存在那么在提交任務(wù)時(shí)會(huì)報(bào) NoClassDefFoundError 的錯(cuò)誤。這里為了將依賴打進(jìn)去,所以下面使用了 Maven 的插件。
我們?cè)?ClickHouse 中有下面這么一張表:
CREATE TABLE user_score
(
`name` String,
`score` Int32,
`user_id` FixedString(16)
)
ENGINE = MergeTree
ORDER BY user_id
我們計(jì)劃要在 Flink 中定義的 ClickHouse Table 如下:
CREATE TABLE user_score (name STRING, score INT, user_id BYTES)
WITH (
'connector' = 'clickhouse',
'hostname' = 'localhost',
'port' = '8123',
'username' = 'default',
'password' = '',
'database' = 'default',
'table' = 'user_score',
'format' = 'clickhouse-row'
);
首先我們創(chuàng)建 ClickHouse 連接所需配置的容器類:
package org.example.source.clickhouse;
import java.io.Serializable;
public class ClickHouseConnection implements Serializable {
private final String hostname;
private final int port;
private final String username;
private final String password;
private final String database;
private final String table;
public ClickHouseConnection(String hostname, int port, String username, String password, String database, String table) {
this.hostname = hostname;
this.port = port;
this.username = username;
this.password = password;
this.database = database;
this.table = table;
}
public String getEndpoint() {
StringBuilder builder = new StringBuilder();
builder.append("http://")
.append(this.hostname)
.append(":")
.append(this.port)
.append("/")
.append(this.database)
.append("?user=")
.append(username);
if(!"".equals(password)) {
builder.append("&password=")
.append(password);
}
return builder.toString();
}
public String getTable() {
return table;
}
}
這個(gè)類必須要實(shí)現(xiàn) Serializable 接口,否則 Flink 會(huì)報(bào)錯(cuò)無(wú)法序列化。
然后我們創(chuàng)建 ClickHouseDynamicTableFactory 內(nèi)容如下:
package org.example.source.clickhouse;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.table.connector.format.DecodingFormat;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.factories.DeserializationFormatFactory;
import org.apache.flink.table.factories.DynamicTableSourceFactory;
import org.apache.flink.table.factories.FactoryUtil;
import org.apache.flink.table.types.DataType;
import java.util.HashSet;
import java.util.Set;
public class ClickHouseDynamicTableFactory implements DynamicTableSourceFactory {
public static final ConfigOption<String> HOSTNAME = ConfigOptions.key("hostname")
.stringType()
.noDefaultValue();
public static final ConfigOption<Integer> PORT = ConfigOptions.key("port")
.intType()
.defaultValue(8123);
public static final ConfigOption<String> USERNAME = ConfigOptions.key("username")
.stringType()
.defaultValue("default");
public static final ConfigOption<String> PASSWORD = ConfigOptions.key("password")
.stringType()
.defaultValue("");
public static final ConfigOption<String> DATABASE = ConfigOptions.key("database")
.stringType()
.defaultValue("default");
public static final ConfigOption<String> TABLE = ConfigOptions.key("table")
.stringType()
.noDefaultValue();
@Override
public DynamicTableSource createDynamicTableSource(Context context) {
// 內(nèi)置驗(yàn)證工具
final FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);
final DecodingFormat<DeserializationSchema<RowData>> decodingFormat = helper.discoverDecodingFormat(
DeserializationFormatFactory.class,
FactoryUtil.FORMAT);
helper.validate();
// 獲取已經(jīng)驗(yàn)證的參數(shù)
final ReadableConfig options = helper.getOptions();
final String hostname = options.get(HOSTNAME);
final int port = options.get(PORT);
final String username = options.get(USERNAME);
final String password = options.get(PASSWORD);
final String database = options.get(DATABASE);
final String table = options.get(TABLE);
ClickHouseConnection clickHouseConnection = new ClickHouseConnection(hostname, port, username, password, database, table);
final DataType producedDataType =
context.getCatalogTable().getResolvedSchema().toPhysicalRowDataType();
// 返回 DynamicTableSource
return new ClickHouseDynamicTableSource(clickHouseConnection, decodingFormat, producedDataType);
}
@Override
public String factoryIdentifier() {
return "clickhouse";
}
@Override
public Set<ConfigOption<?>> requiredOptions() {
final Set<ConfigOption<?>> options = new HashSet<>();
options.add(HOSTNAME);
options.add(TABLE);
options.add(FactoryUtil.FORMAT); // use pre-defined option for format
return options;
}
@Override
public Set<ConfigOption<?>> optionalOptions() {
final Set<ConfigOption<?>> options = new HashSet<>();
options.add(PORT);
options.add(USERNAME);
options.add(PASSWORD);
options.add(DATABASE);
options.add(TABLE);
return options;
}
}
其中定義了各類參數(shù),也就是 Flink SQL 中傳入的參數(shù),主要是進(jìn)行了初步的參數(shù)校驗(yàn)等,其中 factoryIdentifier 返回的就是 connector 中的定義標(biāo)識(shí)。
然后返回了 ClickHouseDynamicTableSource ,其中就包括傳入的連接參數(shù)等信息,然后我們繼續(xù)創(chuàng)建 ClickHouseDynamicTableSource 類:
package org.example.source.clickhouse;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.format.DecodingFormat;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.connector.source.ScanTableSource;
import org.apache.flink.table.connector.source.SourceFunctionProvider;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.DataType;
public class ClickHouseDynamicTableSource implements ScanTableSource {
private final ClickHouseConnection clickHouseConnection;
private final DecodingFormat<DeserializationSchema<RowData>> decodingFormat;
private final DataType producedDataType;
public ClickHouseDynamicTableSource(
ClickHouseConnection clickHouseConnection,
DecodingFormat<DeserializationSchema<RowData>> decodingFormat,
DataType producedDataType) {
this.clickHouseConnection = clickHouseConnection;
this.decodingFormat = decodingFormat;
this.producedDataType = producedDataType;
}
@Override
public ChangelogMode getChangelogMode() {
return decodingFormat.getChangelogMode();
}
@Override
public ScanRuntimeProvider getScanRuntimeProvider(ScanContext runtimeProviderContext) {
// 發(fā)送到集群的運(yùn)行時(shí)上下文
final DeserializationSchema<RowData> deserializer = decodingFormat.createRuntimeDecoder(
runtimeProviderContext,
producedDataType);
DataStructureConverter converter = runtimeProviderContext.createDataStructureConverter(producedDataType);
// 創(chuàng)建 SourceFunction<RowData>
final SourceFunction<RowData> sourceFunction = new ClickHouseSourceFunction(
clickHouseConnection,
deserializer,
converter);
// 第二個(gè)參數(shù)設(shè)置是否是有界流
return SourceFunctionProvider.of(sourceFunction, true);
}
@Override
public DynamicTableSource copy() {
// 實(shí)現(xiàn)拷貝
return new ClickHouseDynamicTableSource(clickHouseConnection, decodingFormat, producedDataType);
}
@Override
public String asSummaryString() {
return "ClickHouse Table Source";
}
}
然后這里主要設(shè)置了一些集群上下文信息,包括反序列化器、數(shù)據(jù)的轉(zhuǎn)換器等,然后將通過(guò) SourceFunctionProvider 返回 SourceFunction 實(shí)例,第二個(gè)參數(shù)就表示是否是有界流,如果是無(wú)界流要設(shè)置為 false 。
最后再來(lái)創(chuàng)建 ClickHouseSourceFunction :
package org.example.source.clickhouse;
import com.clickhouse.client.ClickHouseClient;
import com.clickhouse.client.ClickHouseNode;
import com.clickhouse.client.ClickHouseResponse;
import com.clickhouse.data.ClickHouseFormat;
import com.clickhouse.data.ClickHouseRecord;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.data.RowData;
import org.apache.flink.types.Row;
import org.apache.flink.types.RowKind;
public class ClickHouseSourceFunction extends RichSourceFunction<RowData> implements ResultTypeQueryable<RowData> {
private final ClickHouseConnection clickHouseConnection;
private final DeserializationSchema<RowData> deserializer;
private final DynamicTableSource.DataStructureConverter converter;
private volatile boolean isRunning = true;
public ClickHouseSourceFunction(ClickHouseConnection clickHouseConnection, DeserializationSchema<RowData> deserializer, DynamicTableSource.DataStructureConverter converter) {
this.clickHouseConnection = clickHouseConnection;
this.deserializer = deserializer;
this.converter = converter;
}
@Override
public TypeInformation<RowData> getProducedType() {
return deserializer.getProducedType();
}
@Override
public void run(SourceContext<RowData> ctx) throws Exception {
String endpoint = clickHouseConnection.getEndpoint();
String table = clickHouseConnection.getTable();
ClickHouseNode clickHouseNode = ClickHouseNode.of(endpoint);
while (isRunning) {
try (ClickHouseClient client = ClickHouseClient.newInstance(clickHouseNode.getProtocol())) {
ClickHouseResponse response = client.read(endpoint)
.format(ClickHouseFormat.RowBinaryWithNamesAndTypes)
.query("select name, score, user_id from " + table)
.executeAndWait();
for(ClickHouseRecord record : response.records()) {
Row row = new Row(RowKind.INSERT, record.size());
row.setField(0, record.getValue("name").asString());
row.setField(1, record.getValue("score").asInteger());
row.setField(2, record.getValue("user_id").asBinary());
ctx.collect((RowData) converter.toInternal(row));
}
response.close();
cancel();
} catch (Throwable t) {
t.printStackTrace(); // print and continue
}
Thread.sleep(1000);
}
}
@Override
public void cancel() {
isRunning = false;
}
}
主要的業(yè)務(wù)代碼在 run 方法中,這里會(huì)讀取數(shù)據(jù)并發(fā)送至下游,下游以批的方式進(jìn)行計(jì)算,其實(shí)原始數(shù)據(jù)還是流。我們這里查詢完一波數(shù)據(jù)之后直接接將循環(huán)退出,下游會(huì)將這批數(shù)據(jù)作為整體進(jìn)行計(jì)算。
由于我們直接就查詢出了結(jié)果,所以這里直接可以在這里創(chuàng)建 Row 并轉(zhuǎn)換為 RowData 發(fā)送到下游,不需要再經(jīng)過(guò)反序列化處理了,因?yàn)榉葱蛄谢荒軅魅?byte[] 類型的參數(shù),一來(lái)一回比較麻煩,這里直接就處理了。但是我們還必須定義一套反序列化的類,因?yàn)樵?Flink SQL 中 format 參數(shù)是必傳的,我們可以隨便傳入一個(gè),比如常用的 csv 也可以,但是這樣會(huì)造成困擾,所以我們專門定義一個(gè)為我們 Connector 使用的 format 即可,僅僅讓參數(shù)校驗(yàn)通過(guò)。好像沒(méi)找到其他方法可以使得 format 參數(shù)不傳,這里先暫且這樣實(shí)現(xiàn)。
另外調(diào)用 row.setField 的時(shí)候,第一個(gè)參數(shù)一定是位置參數(shù),不能是字符串,否則會(huì)報(bào)錯(cuò):
Accessing a field by name is not supported in position-based field mode.
然后我們來(lái)創(chuàng)建反序列化相關(guān)的類,首先是 ClickHouseFormatFactory :
package org.example.source.clickhouse;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.table.connector.format.DecodingFormat;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.factories.DeserializationFormatFactory;
import org.apache.flink.table.factories.DynamicTableFactory;
import java.util.Collections;
import java.util.Set;
public class ClickHouseFormatFactory implements DeserializationFormatFactory {
@Override
public DecodingFormat<DeserializationSchema<RowData>> createDecodingFormat(DynamicTableFactory.Context context, ReadableConfig formatOptions) {
// 返回 DecodingFormat<DeserializationSchema<RowData>> 的實(shí)現(xiàn)
return new ClickHouseFormat();
}
@Override
public String factoryIdentifier() {
return "clickhouse-row";
}
@Override
public Set<ConfigOption<?>> requiredOptions() {
return Collections.emptySet();
}
@Override
public Set<ConfigOption<?>> optionalOptions() {
return Collections.emptySet();
}
}
這里返回的標(biāo)識(shí)就是 clickhouse-row 我們?cè)谒胁煌?ClickHouse Connector 中都可以引用這一個(gè),然后返回了 ClickHouseFormat 我們繼續(xù)來(lái)創(chuàng)建:
package org.example.source.clickhouse;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.format.DecodingFormat;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.DataType;
import org.apache.flink.types.RowKind;
public class ClickHouseFormat implements DecodingFormat<DeserializationSchema<RowData>> {
public ClickHouseFormat() {
}
@Override
public DeserializationSchema<RowData> createRuntimeDecoder(DynamicTableSource.Context context, DataType producedDataType) {
final TypeInformation<RowData> producedTypeInfo = context.createTypeInformation(
producedDataType);
final DynamicTableSource.DataStructureConverter converter = context.createDataStructureConverter(producedDataType);
// 返回 DeserializationSchema<RowData> 的實(shí)現(xiàn)
return new ClickHouseDeserializer(converter, producedTypeInfo);
}
@Override
public ChangelogMode getChangelogMode() {
return ChangelogMode.newBuilder()
.addContainedKind(RowKind.INSERT)
// 批處理不能添加除 INSERT 之外的其他操作
// .addContainedKind(RowKind.DELETE)
.build();
}
}
這里需要注意的一點(diǎn)就是,getChangelogMode 方法中定義了支持的操作,如果是批處理模式那么只支持 INSERT 操作,其余的都不支持,否則將會(huì)報(bào)錯(cuò):
Querying a table in batch mode is currently only possible for INSERT-only table sources. But the source for table 'default_catalog.default_database.user_score' produces other changelog messages than just INSERT.
因?yàn)榕幚砭褪且慌鷶?shù)據(jù),相當(dāng)于只有插入操作,而流處理可以支持各類操作。
這里返回了 ClickHouseDeserializer 然后我們來(lái)實(shí)現(xiàn)它:
package org.example.source.clickhouse;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.table.connector.RuntimeConverter;
import org.apache.flink.table.connector.source.DynamicTableSource.DataStructureConverter;
import org.apache.flink.table.data.RowData;
import java.io.IOException;
public class ClickHouseDeserializer implements DeserializationSchema<RowData> {
private final DataStructureConverter converter;
private final TypeInformation<RowData> producedTypeInfo;
public ClickHouseDeserializer(
DataStructureConverter converter,
TypeInformation<RowData> producedTypeInfo) {
this.converter = converter;
this.producedTypeInfo = producedTypeInfo;
}
@Override
public void open(InitializationContext context) throws Exception {
converter.open(RuntimeConverter.Context.create(ClickHouseDeserializer.class.getClassLoader()));
}
@Override
public RowData deserialize(byte[] message) throws IOException {
return null;
}
@Override
public boolean isEndOfStream(RowData nextElement) {
return false;
}
@Override
public TypeInformation<RowData> getProducedType() {
return producedTypeInfo;
}
}
這個(gè)寫法也非常簡(jiǎn)單,因?yàn)槲覀冊(cè)?SourceFunction 中直接進(jìn)行了數(shù)據(jù)處理,所以這里 deserialize 直接返回空即可,我們也不會(huì)調(diào)用它。
以上這樣,ClickHouse 的 Connector 就定義好了,然后我們安裝到本地 Maven 倉(cāng)庫(kù),以便于開(kāi)發(fā)時(shí)可以引用它:
mvn install
安裝成功后即可創(chuàng)建一個(gè)項(xiàng)目來(lái)使用它。
項(xiàng)目的 pom.xml 定義如下:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.example</groupId>
<artifactId>flink-1.18-example</artifactId>
<version>1.0-SNAPSHOT</version>
<name>Flink Table Example</name>
<properties>
<maven.compiler.source>11</maven.compiler.source>
<maven.compiler.target>11</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<flink.version>1.18.0</flink.version>
<log4j.version>2.17.1</log4j.version>
</properties>
<repositories>
<repository>
<id>aliyun-maven</id>
<name>阿里云 central倉(cāng)和jcenter倉(cāng)的聚合倉(cāng)</name>
<url>https://maven.aliyun.com/repository/public</url>
</repository>
</repositories>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_2.12</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
<version>${log4j.version}</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId>
<version>${log4j.version}</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>${log4j.version}</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.example</groupId>
<artifactId>flink-1.18-table-source-example</artifactId>
<version>1.0.0-SNAPSHOT</version>
<scope>provided</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.2.4</version>
<configuration>
<createDependencyReducedPom>false</createDependencyReducedPom>
</configuration>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
</transformers>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<shadedArtifactAttached>true</shadedArtifactAttached>
<shadedClassifierName>jar-with-dependencies</shadedClassifierName>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
這里我們引入了 Flink 的相關(guān)依賴,但是都標(biāo)記為 provided ,然后我們也引入了我們自己的 Connector 但是也標(biāo)記為 provided ,因?yàn)槲覀冎筮\(yùn)行時(shí)要放到 Flink 的 lib 下,所以不需要帶上。另外我們也不需要引入 ClickHouse 的依賴,因?yàn)槲覀兊?Connector 中已經(jīng)包含了。
這里我們使用了 maven-shade-plugin 進(jìn)行打包,主要是為了合并 resources 下面的 services 中的內(nèi)容,這個(gè)等下我們會(huì)說(shuō),然后我們創(chuàng)建一個(gè)測(cè)試代碼,內(nèi)容如下:
package org.example;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
public class ClickHouseTableExample {
public static void main(String[] args) {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.BATCH);
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
tableEnv.executeSql("CREATE TABLE user_score (\n" +
" `name` STRING,\n" +
" `score` INTEGER,\n" +
" `user_id` BYTES\n" +
") WITH (\n" +
" 'connector' = 'clickhouse',\n" +
" 'hostname' = 'localhost',\n" +
" 'port' = '8123',\n" +
" 'username' = 'default',\n" +
" 'password' = 'wSqDxDAt',\n" +
" 'database' = 'default',\n" +
" 'table' = 'user_score',\n" +
" 'format' = 'clickhouse-row'\n" +
");\n").print();
tableEnv.executeSql("SELECT sum(score), name from user_score group by name;").print();
}
}
這是一段很簡(jiǎn)單的代碼,主要就是做了一點(diǎn)統(tǒng)計(jì),其中的模式設(shè)置為了 BATCH ,其實(shí)如果 Source Connector 定義為批,那么運(yùn)行模式既可以設(shè)置為流也可以設(shè)置為批,如果是設(shè)置為流在聚合時(shí),所有的計(jì)算過(guò)程都會(huì)更新出來(lái),而如果設(shè)置為批,則只有一個(gè)最終的結(jié)果,結(jié)果是在 SourceFunction 退出后才會(huì)最終輸出。如果 SourceFunction 是無(wú)限循環(huán),那么永遠(yuǎn)也得不到最終的結(jié)果,但是流運(yùn)行模式可以不斷地得到當(dāng)前的結(jié)果。如果 Source Connector 定義為流,那么當(dāng)前的運(yùn)行模式只能設(shè)置為流,所有的聚合結(jié)果都會(huì)根據(jù)流的到來(lái)實(shí)時(shí)輸出。
現(xiàn)在程序還無(wú)法運(yùn)行,這時(shí)候會(huì)報(bào)錯(cuò):
Could not find any factory for identifier 'clickhouse' that implements 'org.apache.flink.table.factories.Factory' in the classpath.
這是因?yàn)榫唧w的 DynamicTableSource 和 DynamicTableSink 是通過(guò) Java 的 SPI 提供發(fā)現(xiàn)的,簡(jiǎn)單來(lái)說(shuō)定義方法如下,我們?cè)陧?xiàng)目的 resources 目錄下,對(duì)于 Maven 詳細(xì)的目錄就是 src/main/resources 下創(chuàng)建子目錄 META-INF/services ,然后創(chuàng)建文件 org.apache.flink.table.factories.Factory ,內(nèi)容如下:
org.example.source.clickhouse.ClickHouseDynamicTableFactory
org.example.source.clickhouse.ClickHouseFormatFactory
這樣程序在運(yùn)行時(shí)就會(huì)找到對(duì)應(yīng) Class 的位置從而加載它。
在本地運(yùn)行時(shí),需要將相關(guān)的 provided 注釋掉并運(yùn)行即可,然后如果是打包提交集群時(shí),默認(rèn)如果其他依賴包也有 SPI 相關(guān)的文件,那么會(huì)把當(dāng)前項(xiàng)目的覆蓋掉,所以上面在 Maven 中配置了下面的內(nèi)容:
<transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
這個(gè)表示將當(dāng)前項(xiàng)目的 ServicesResource 和打包中已經(jīng)存在的進(jìn)行合并,這樣我們上面寫的文件就會(huì)帶到發(fā)布的包中了。
最后上面 pom.xml 中的 filters 也必須配置,否則一些多余的文件不過(guò)濾掉在運(yùn)行時(shí)會(huì)報(bào)錯(cuò):
Invalid signature file digest for Manifest main
表示簽名無(wú)效,去掉后才可以正常運(yùn)行。
這樣我們先將 Connector 放到 Flink 集群中所有的 lib 目錄下,然后重啟 Flink 集群,最后再將當(dāng)前項(xiàng)目通過(guò) mvn package 打成的包提交到集群運(yùn)行就可以了。
Reference:

浙公網(wǎng)安備 33010602011771號(hào)