<output id="qn6qe"></output>

    1. <output id="qn6qe"><tt id="qn6qe"></tt></output>
    2. <strike id="qn6qe"></strike>

      亚洲 日本 欧洲 欧美 视频,日韩中文字幕有码av,一本一道av中文字幕无码,国产线播放免费人成视频播放,人妻少妇偷人无码视频,日夜啪啪一区二区三区,国产尤物精品自在拍视频首页,久热这里只有精品12

      Flink 自定義 ClickHouse Table Connector 的簡(jiǎn)單實(shí)現(xiàn)

      本次實(shí)現(xiàn)基于 Flink 1.18 版本,具體的對(duì)象之間的關(guān)系可以先參考官網(wǎng)的圖:

      先拿官網(wǎng)上的 Socket 示例來(lái)說(shuō)一下實(shí)現(xiàn)過(guò)程:

      1. 首先編寫 SocketDynamicTableFactory 實(shí)現(xiàn) DynamicTableSourceFactory 接口。
      2. SocketDynamicTableFactory 中會(huì)返回 SocketDynamicTableSource ,同時(shí)返回實(shí)現(xiàn)了 ScanTableSource 接口。
      3. SockeDynamicTableSource 中返回了 SocketSourceFunction ,而具體的邏輯就是寫在 SocketSourceFunction 中的。
      4. SocketSourceFunction 需要繼承 RichSourceFunction<RowData> 類同時(shí)實(shí)現(xiàn) ResultTypeQueryable<RowData> 接口,在其中的 run 方法中實(shí)現(xiàn)主要的邏輯,將結(jié)果發(fā)送至下游。
      5. 另外關(guān)于序列化部分,需要編寫 ChangelogCsvFormatFactory 實(shí)現(xiàn) DeserializationFormatFactory 接口,在其中會(huì)返回 ChangelogCsvFormat
      6. ChangelogCsvFormat 會(huì)實(shí)現(xiàn) DecodingFormat<DeserializationSchema<RowData>> 接口,并在其中返回 ChangelogCsvDeserializer
      7. 同時(shí) ChangelogCsvDeserializer 又實(shí)現(xiàn)了 DeserializationSchema<RowData> 接口,并在其主要的方法 deserialize 中實(shí)現(xiàn)二進(jìn)制反序列化的過(guò)程,也就是轉(zhuǎn)換為 RowData 的過(guò)程。

      官網(wǎng)的示例鏈接為:https://nightlies.apache.org/flink/flink-docs-release-1.18/docs/dev/table/sourcessinks/

      下面我們基于這個(gè)原理來(lái)實(shí)現(xiàn)一個(gè)簡(jiǎn)單的 ClickHouse Table Source,我們不做復(fù)雜的字段映射,僅完成指定表數(shù)據(jù)的讀取,簡(jiǎn)單的將這個(gè)過(guò)程過(guò)一遍。

      需要說(shuō)明的是 Connector 項(xiàng)目和客戶端的項(xiàng)目必須拆分為兩個(gè)項(xiàng)目,因?yàn)?Connector 項(xiàng)目需要通過(guò) Flink 的 ClassLoader 進(jìn)行加載,也就是需要放到 Flink 的 lib 目錄下然后再重啟才可以,所以如果是一個(gè)整體的項(xiàng)目,那么 Flink 將會(huì)報(bào)錯(cuò)找不到具體的 Connector。

      我們首先來(lái)創(chuàng)建 Connector 的項(xiàng)目,使用 Maven 來(lái)管理,pom.xml 文件如下:

      <?xml version="1.0" encoding="UTF-8"?>
      <project xmlns="http://maven.apache.org/POM/4.0.0"
               xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
               xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
          <modelVersion>4.0.0</modelVersion>
      
          <groupId>org.example</groupId>
          <artifactId>flink-1.18-table-source-example</artifactId>
          <version>1.0.0-SNAPSHOT</version>
      
          <name>Flink Table Source Example</name>
      
          <properties>
              <maven.compiler.source>11</maven.compiler.source>
              <maven.compiler.target>11</maven.compiler.target>
              <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
              <flink.version>1.18.0</flink.version>
              <log4j.version>2.17.1</log4j.version>
          </properties>
      
          <repositories>
              <repository>
                  <id>aliyun-maven</id>
                  <name>阿里云 central倉(cāng)和jcenter倉(cāng)的聚合倉(cāng)</name>
                  <url>https://maven.aliyun.com/repository/public</url>
              </repository>
          </repositories>
      
          <dependencies>
              <!-- Table API 開(kāi)發(fā) -->
              <dependency>
                  <groupId>org.apache.flink</groupId>
                  <artifactId>flink-table-api-java-bridge</artifactId>
                  <version>${flink.version}</version>
                  <scope>provided</scope>
              </dependency>
      
              <dependency>
                  <groupId>org.apache.flink</groupId>
                  <artifactId>flink-table-common</artifactId>
                  <version>1.18.0</version>
                  <scope>provided</scope>
              </dependency>
      
              <!-- ClickHouse Client -->
              <dependency>
                  <groupId>com.clickhouse</groupId>
                  <!-- or clickhouse-grpc-client if you prefer gRPC -->
                  <artifactId>clickhouse-http-client</artifactId>
                  <version>0.5.0</version>
              </dependency>
      
              <dependency>
                  <groupId>org.apache.httpcomponents.client5</groupId>
                  <artifactId>httpclient5</artifactId>
                  <version>5.2.3</version>
              </dependency>
          </dependencies>
      
          <build>
              <plugins>
                  <plugin>
                      <artifactId>maven-assembly-plugin</artifactId>
                      <configuration>
                          <archive>
                              <manifest>
                                  <mainClass></mainClass>
                              </manifest>
                          </archive>
                          <descriptorRefs>
                              <descriptorRef>jar-with-dependencies</descriptorRef>
                          </descriptorRefs>
                      </configuration>
                      <executions>
                          <execution>
                              <id>make-assembly</id> <!-- this is used for inheritance merges -->
                              <phase>package</phase> <!-- bind to the packaging phase -->
                              <goals>
                                  <goal>single</goal>
                              </goals>
                          </execution>
                      </executions>
                  </plugin>
              </plugins>
          </build>
      
      </project>
      

      這里引入的 Flink Table API 開(kāi)發(fā)相關(guān)的包都標(biāo)記為 provided ,因?yàn)?Flink 本身已經(jīng)存在這個(gè)包了,然后 ClickHouse 相關(guān)包需要作為依賴打進(jìn)去,如果不打進(jìn)去的話需要把獨(dú)立的包放到 Flink 的 lib 目錄下,因?yàn)?Connector 依賴 ClickHouse Client,所以這倆依賴必須同時(shí)加載,在提交任務(wù)時(shí)包含依賴是無(wú)效的,如果依賴不存在那么在提交任務(wù)時(shí)會(huì)報(bào) NoClassDefFoundError 的錯(cuò)誤。這里為了將依賴打進(jìn)去,所以下面使用了 Maven 的插件。

      我們?cè)?ClickHouse 中有下面這么一張表:

      CREATE TABLE user_score
      (
          `name` String,
          `score` Int32,
          `user_id` FixedString(16)
      )
      ENGINE = MergeTree
      ORDER BY user_id
      

      我們計(jì)劃要在 Flink 中定義的 ClickHouse Table 如下:

      CREATE TABLE user_score (name STRING, score INT, user_id BYTES)
      WITH (
        'connector' = 'clickhouse',
        'hostname' = 'localhost',
        'port' = '8123',
        'username' = 'default',
        'password' = '',
        'database' = 'default',
        'table' = 'user_score',
        'format' = 'clickhouse-row'
      );
      

      首先我們創(chuàng)建 ClickHouse 連接所需配置的容器類:

      package org.example.source.clickhouse;
      
      import java.io.Serializable;
      
      public class ClickHouseConnection implements Serializable {
          private final String hostname;
          private final int port;
          private final String username;
          private final String password;
          private final String database;
          private final String table;
      
          public ClickHouseConnection(String hostname, int port, String username, String password, String database, String table) {
              this.hostname = hostname;
              this.port = port;
              this.username = username;
              this.password = password;
              this.database = database;
              this.table = table;
          }
      
          public String getEndpoint() {
              StringBuilder builder = new StringBuilder();
              builder.append("http://")
                      .append(this.hostname)
                      .append(":")
                      .append(this.port)
                      .append("/")
                      .append(this.database)
                      .append("?user=")
                      .append(username);
              if(!"".equals(password)) {
                  builder.append("&password=")
                          .append(password);
              }
              return builder.toString();
          }
      
          public String getTable() {
              return table;
          }
      }
      

      這個(gè)類必須要實(shí)現(xiàn) Serializable 接口,否則 Flink 會(huì)報(bào)錯(cuò)無(wú)法序列化。

      然后我們創(chuàng)建 ClickHouseDynamicTableFactory 內(nèi)容如下:

      package org.example.source.clickhouse;
      
      import org.apache.flink.api.common.serialization.DeserializationSchema;
      import org.apache.flink.configuration.ConfigOption;
      import org.apache.flink.configuration.ConfigOptions;
      import org.apache.flink.configuration.ReadableConfig;
      import org.apache.flink.table.connector.format.DecodingFormat;
      import org.apache.flink.table.connector.source.DynamicTableSource;
      import org.apache.flink.table.data.RowData;
      import org.apache.flink.table.factories.DeserializationFormatFactory;
      import org.apache.flink.table.factories.DynamicTableSourceFactory;
      import org.apache.flink.table.factories.FactoryUtil;
      import org.apache.flink.table.types.DataType;
      
      import java.util.HashSet;
      import java.util.Set;
      
      public class ClickHouseDynamicTableFactory implements DynamicTableSourceFactory {
          public static final ConfigOption<String> HOSTNAME = ConfigOptions.key("hostname")
                  .stringType()
                  .noDefaultValue();
      
          public static final ConfigOption<Integer> PORT = ConfigOptions.key("port")
                  .intType()
                  .defaultValue(8123);
      
          public static final ConfigOption<String> USERNAME = ConfigOptions.key("username")
                  .stringType()
                  .defaultValue("default");
      
          public static final ConfigOption<String> PASSWORD = ConfigOptions.key("password")
                  .stringType()
                  .defaultValue("");
      
          public static final ConfigOption<String> DATABASE = ConfigOptions.key("database")
                  .stringType()
                  .defaultValue("default");
          public static final ConfigOption<String> TABLE = ConfigOptions.key("table")
                  .stringType()
                  .noDefaultValue();
      
          @Override
          public DynamicTableSource createDynamicTableSource(Context context) {
              // 內(nèi)置驗(yàn)證工具
              final FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);
              final DecodingFormat<DeserializationSchema<RowData>> decodingFormat = helper.discoverDecodingFormat(
                      DeserializationFormatFactory.class,
                      FactoryUtil.FORMAT);
      
              helper.validate();
      
              // 獲取已經(jīng)驗(yàn)證的參數(shù)
              final ReadableConfig options = helper.getOptions();
              final String hostname = options.get(HOSTNAME);
              final int port = options.get(PORT);
              final String username = options.get(USERNAME);
              final String password = options.get(PASSWORD);
              final String database = options.get(DATABASE);
              final String table = options.get(TABLE);
      
              ClickHouseConnection clickHouseConnection = new ClickHouseConnection(hostname, port, username, password, database, table);
      
              final DataType producedDataType =
                      context.getCatalogTable().getResolvedSchema().toPhysicalRowDataType();
      
      
              // 返回 DynamicTableSource
              return new ClickHouseDynamicTableSource(clickHouseConnection, decodingFormat, producedDataType);
          }
      
          @Override
          public String factoryIdentifier() {
              return "clickhouse";
          }
      
          @Override
          public Set<ConfigOption<?>> requiredOptions() {
              final Set<ConfigOption<?>> options = new HashSet<>();
              options.add(HOSTNAME);
              options.add(TABLE);
              options.add(FactoryUtil.FORMAT); // use pre-defined option for format
              return options;
          }
      
          @Override
          public Set<ConfigOption<?>> optionalOptions() {
              final Set<ConfigOption<?>> options = new HashSet<>();
              options.add(PORT);
              options.add(USERNAME);
              options.add(PASSWORD);
              options.add(DATABASE);
              options.add(TABLE);
              return options;
          }
      
      }
      
      

      其中定義了各類參數(shù),也就是 Flink SQL 中傳入的參數(shù),主要是進(jìn)行了初步的參數(shù)校驗(yàn)等,其中 factoryIdentifier 返回的就是 connector 中的定義標(biāo)識(shí)。

      然后返回了 ClickHouseDynamicTableSource ,其中就包括傳入的連接參數(shù)等信息,然后我們繼續(xù)創(chuàng)建 ClickHouseDynamicTableSource 類:

      package org.example.source.clickhouse;
      
      import org.apache.flink.api.common.serialization.DeserializationSchema;
      import org.apache.flink.streaming.api.functions.source.SourceFunction;
      import org.apache.flink.table.connector.ChangelogMode;
      import org.apache.flink.table.connector.format.DecodingFormat;
      import org.apache.flink.table.connector.source.DynamicTableSource;
      import org.apache.flink.table.connector.source.ScanTableSource;
      import org.apache.flink.table.connector.source.SourceFunctionProvider;
      import org.apache.flink.table.data.RowData;
      import org.apache.flink.table.types.DataType;
      
      public class ClickHouseDynamicTableSource implements ScanTableSource {
      
          private final ClickHouseConnection clickHouseConnection;
          private final DecodingFormat<DeserializationSchema<RowData>> decodingFormat;
          private final DataType producedDataType;
      
          public ClickHouseDynamicTableSource(
                  ClickHouseConnection clickHouseConnection,
                  DecodingFormat<DeserializationSchema<RowData>> decodingFormat,
                  DataType producedDataType) {
              this.clickHouseConnection = clickHouseConnection;
              this.decodingFormat = decodingFormat;
              this.producedDataType = producedDataType;
          }
      
          @Override
          public ChangelogMode getChangelogMode() {
              return decodingFormat.getChangelogMode();
          }
      
          @Override
          public ScanRuntimeProvider getScanRuntimeProvider(ScanContext runtimeProviderContext) {
              // 發(fā)送到集群的運(yùn)行時(shí)上下文
              final DeserializationSchema<RowData> deserializer = decodingFormat.createRuntimeDecoder(
                      runtimeProviderContext,
                      producedDataType);
              DataStructureConverter converter = runtimeProviderContext.createDataStructureConverter(producedDataType);
      
              // 創(chuàng)建 SourceFunction<RowData>
              final SourceFunction<RowData> sourceFunction = new ClickHouseSourceFunction(
                      clickHouseConnection,
                      deserializer,
                      converter);
      
              // 第二個(gè)參數(shù)設(shè)置是否是有界流
              return SourceFunctionProvider.of(sourceFunction, true);
          }
      
          @Override
          public DynamicTableSource copy() {
              // 實(shí)現(xiàn)拷貝
              return new ClickHouseDynamicTableSource(clickHouseConnection, decodingFormat, producedDataType);
          }
      
          @Override
          public String asSummaryString() {
              return "ClickHouse Table Source";
          }
      }
      
      

      然后這里主要設(shè)置了一些集群上下文信息,包括反序列化器、數(shù)據(jù)的轉(zhuǎn)換器等,然后將通過(guò) SourceFunctionProvider 返回 SourceFunction 實(shí)例,第二個(gè)參數(shù)就表示是否是有界流,如果是無(wú)界流要設(shè)置為 false

      最后再來(lái)創(chuàng)建 ClickHouseSourceFunction

      package org.example.source.clickhouse;
      
      
      import com.clickhouse.client.ClickHouseClient;
      import com.clickhouse.client.ClickHouseNode;
      import com.clickhouse.client.ClickHouseResponse;
      import com.clickhouse.data.ClickHouseFormat;
      import com.clickhouse.data.ClickHouseRecord;
      import org.apache.flink.api.common.serialization.DeserializationSchema;
      import org.apache.flink.api.common.typeinfo.TypeInformation;
      import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
      import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
      import org.apache.flink.table.connector.source.DynamicTableSource;
      import org.apache.flink.table.data.RowData;
      import org.apache.flink.types.Row;
      import org.apache.flink.types.RowKind;
      
      public class ClickHouseSourceFunction extends RichSourceFunction<RowData> implements ResultTypeQueryable<RowData> {
      
          private final ClickHouseConnection clickHouseConnection;
          private final DeserializationSchema<RowData> deserializer;
          private final DynamicTableSource.DataStructureConverter converter;
      
          private volatile boolean isRunning = true;
      
          public ClickHouseSourceFunction(ClickHouseConnection clickHouseConnection, DeserializationSchema<RowData> deserializer, DynamicTableSource.DataStructureConverter converter) {
              this.clickHouseConnection = clickHouseConnection;
              this.deserializer = deserializer;
              this.converter = converter;
          }
      
          @Override
          public TypeInformation<RowData> getProducedType() {
              return deserializer.getProducedType();
          }
          
      
          @Override
          public void run(SourceContext<RowData> ctx) throws Exception {
              String endpoint = clickHouseConnection.getEndpoint();
              String table = clickHouseConnection.getTable();
              ClickHouseNode clickHouseNode = ClickHouseNode.of(endpoint);
              while (isRunning) {
                  try (ClickHouseClient client = ClickHouseClient.newInstance(clickHouseNode.getProtocol())) {
                      ClickHouseResponse response = client.read(endpoint)
                              .format(ClickHouseFormat.RowBinaryWithNamesAndTypes)
                              .query("select name, score, user_id from " + table)
                              .executeAndWait();
                      for(ClickHouseRecord record : response.records()) {
                          Row row = new Row(RowKind.INSERT, record.size());
                          row.setField(0, record.getValue("name").asString());
                          row.setField(1, record.getValue("score").asInteger());
                          row.setField(2, record.getValue("user_id").asBinary());
                          ctx.collect((RowData) converter.toInternal(row));
                      }
                      response.close();
                      cancel();
                  } catch (Throwable t) {
                      t.printStackTrace(); // print and continue
                  }
                  Thread.sleep(1000);
              }
          }
      
          @Override
          public void cancel() {
              isRunning = false;
          }
      }
      
      

      主要的業(yè)務(wù)代碼在 run 方法中,這里會(huì)讀取數(shù)據(jù)并發(fā)送至下游,下游以批的方式進(jìn)行計(jì)算,其實(shí)原始數(shù)據(jù)還是流。我們這里查詢完一波數(shù)據(jù)之后直接接將循環(huán)退出,下游會(huì)將這批數(shù)據(jù)作為整體進(jìn)行計(jì)算。

      由于我們直接就查詢出了結(jié)果,所以這里直接可以在這里創(chuàng)建 Row 并轉(zhuǎn)換為 RowData 發(fā)送到下游,不需要再經(jīng)過(guò)反序列化處理了,因?yàn)榉葱蛄谢荒軅魅?byte[] 類型的參數(shù),一來(lái)一回比較麻煩,這里直接就處理了。但是我們還必須定義一套反序列化的類,因?yàn)樵?Flink SQL 中 format 參數(shù)是必傳的,我們可以隨便傳入一個(gè),比如常用的 csv 也可以,但是這樣會(huì)造成困擾,所以我們專門定義一個(gè)為我們 Connector 使用的 format 即可,僅僅讓參數(shù)校驗(yàn)通過(guò)。好像沒(méi)找到其他方法可以使得 format 參數(shù)不傳,這里先暫且這樣實(shí)現(xiàn)。

      另外調(diào)用 row.setField 的時(shí)候,第一個(gè)參數(shù)一定是位置參數(shù),不能是字符串,否則會(huì)報(bào)錯(cuò):

      Accessing a field by name is not supported in position-based field mode.
      

      然后我們來(lái)創(chuàng)建反序列化相關(guān)的類,首先是 ClickHouseFormatFactory

      package org.example.source.clickhouse;
      
      import org.apache.flink.api.common.serialization.DeserializationSchema;
      import org.apache.flink.configuration.ConfigOption;
      import org.apache.flink.configuration.ReadableConfig;
      import org.apache.flink.table.connector.format.DecodingFormat;
      import org.apache.flink.table.data.RowData;
      import org.apache.flink.table.factories.DeserializationFormatFactory;
      import org.apache.flink.table.factories.DynamicTableFactory;
      
      import java.util.Collections;
      import java.util.Set;
      
      public class ClickHouseFormatFactory implements DeserializationFormatFactory {
          @Override
          public DecodingFormat<DeserializationSchema<RowData>> createDecodingFormat(DynamicTableFactory.Context context, ReadableConfig formatOptions) {
      
              // 返回 DecodingFormat<DeserializationSchema<RowData>> 的實(shí)現(xiàn)
              return new ClickHouseFormat();
          }
      
          @Override
          public String factoryIdentifier() {
              return "clickhouse-row";
          }
      
          @Override
          public Set<ConfigOption<?>> requiredOptions() {
              return Collections.emptySet();
          }
      
          @Override
          public Set<ConfigOption<?>> optionalOptions() {
              return Collections.emptySet();
          }
      }
      
      
      

      這里返回的標(biāo)識(shí)就是 clickhouse-row 我們?cè)谒胁煌?ClickHouse Connector 中都可以引用這一個(gè),然后返回了 ClickHouseFormat 我們繼續(xù)來(lái)創(chuàng)建:

      package org.example.source.clickhouse;
      
      import org.apache.flink.api.common.serialization.DeserializationSchema;
      import org.apache.flink.api.common.typeinfo.TypeInformation;
      import org.apache.flink.table.connector.ChangelogMode;
      import org.apache.flink.table.connector.format.DecodingFormat;
      import org.apache.flink.table.connector.source.DynamicTableSource;
      import org.apache.flink.table.data.RowData;
      import org.apache.flink.table.types.DataType;
      import org.apache.flink.types.RowKind;
      
      public class ClickHouseFormat implements DecodingFormat<DeserializationSchema<RowData>> {
      
          public ClickHouseFormat() {
          }
      
          @Override
          public DeserializationSchema<RowData> createRuntimeDecoder(DynamicTableSource.Context context, DataType producedDataType) {
              final TypeInformation<RowData> producedTypeInfo = context.createTypeInformation(
                      producedDataType);
              final DynamicTableSource.DataStructureConverter converter = context.createDataStructureConverter(producedDataType);
              // 返回 DeserializationSchema<RowData> 的實(shí)現(xiàn)
              return new ClickHouseDeserializer(converter, producedTypeInfo);
          }
      
          @Override
          public ChangelogMode getChangelogMode() {
              return ChangelogMode.newBuilder()
                      .addContainedKind(RowKind.INSERT)
                      // 批處理不能添加除 INSERT 之外的其他操作
      //                .addContainedKind(RowKind.DELETE)
                      .build();
          }
      }
      
      

      這里需要注意的一點(diǎn)就是,getChangelogMode 方法中定義了支持的操作,如果是批處理模式那么只支持 INSERT 操作,其余的都不支持,否則將會(huì)報(bào)錯(cuò):

      Querying a table in batch mode is currently only possible for INSERT-only table sources. But the source for table 'default_catalog.default_database.user_score' produces other changelog messages than just INSERT.
      

      因?yàn)榕幚砭褪且慌鷶?shù)據(jù),相當(dāng)于只有插入操作,而流處理可以支持各類操作。

      這里返回了 ClickHouseDeserializer 然后我們來(lái)實(shí)現(xiàn)它:

      package org.example.source.clickhouse;
      
      import org.apache.flink.api.common.serialization.DeserializationSchema;
      import org.apache.flink.api.common.typeinfo.TypeInformation;
      import org.apache.flink.table.connector.RuntimeConverter;
      import org.apache.flink.table.connector.source.DynamicTableSource.DataStructureConverter;
      import org.apache.flink.table.data.RowData;
      
      import java.io.IOException;
      
      public class ClickHouseDeserializer implements DeserializationSchema<RowData> {
          private final DataStructureConverter converter;
          private final TypeInformation<RowData> producedTypeInfo;
      
          public ClickHouseDeserializer(
                  DataStructureConverter converter,
                  TypeInformation<RowData> producedTypeInfo) {
              this.converter = converter;
              this.producedTypeInfo = producedTypeInfo;
          }
      
          @Override
          public void open(InitializationContext context) throws Exception {
              converter.open(RuntimeConverter.Context.create(ClickHouseDeserializer.class.getClassLoader()));
          }
      
          @Override
          public RowData deserialize(byte[] message) throws IOException {
              return null;
          }
      
          @Override
          public boolean isEndOfStream(RowData nextElement) {
              return false;
          }
      
          @Override
          public TypeInformation<RowData> getProducedType() {
              return producedTypeInfo;
          }
      }
      
      

      這個(gè)寫法也非常簡(jiǎn)單,因?yàn)槲覀冊(cè)?SourceFunction 中直接進(jìn)行了數(shù)據(jù)處理,所以這里 deserialize 直接返回空即可,我們也不會(huì)調(diào)用它。

      以上這樣,ClickHouse 的 Connector 就定義好了,然后我們安裝到本地 Maven 倉(cāng)庫(kù),以便于開(kāi)發(fā)時(shí)可以引用它:

      mvn install
      

      安裝成功后即可創(chuàng)建一個(gè)項(xiàng)目來(lái)使用它。

      項(xiàng)目的 pom.xml 定義如下:

      <?xml version="1.0" encoding="UTF-8"?>
      <project xmlns="http://maven.apache.org/POM/4.0.0"
               xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
               xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
          <modelVersion>4.0.0</modelVersion>
      
          <groupId>org.example</groupId>
          <artifactId>flink-1.18-example</artifactId>
          <version>1.0-SNAPSHOT</version>
      
          <name>Flink Table Example</name>
      
          <properties>
              <maven.compiler.source>11</maven.compiler.source>
              <maven.compiler.target>11</maven.compiler.target>
              <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
              <flink.version>1.18.0</flink.version>
              <log4j.version>2.17.1</log4j.version>
          </properties>
      
          <repositories>
              <repository>
                  <id>aliyun-maven</id>
                  <name>阿里云 central倉(cāng)和jcenter倉(cāng)的聚合倉(cāng)</name>
                  <url>https://maven.aliyun.com/repository/public</url>
              </repository>
          </repositories>
      
          <dependencies>
              <dependency>
                  <groupId>org.apache.flink</groupId>
                  <artifactId>flink-table-api-java-bridge</artifactId>
                  <version>${flink.version}</version>
                  <scope>provided</scope>
              </dependency>
              <dependency>
                  <groupId>org.apache.flink</groupId>
                  <artifactId>flink-clients</artifactId>
                  <version>${flink.version}</version>
                  <scope>provided</scope>
              </dependency>
      
              <dependency>
                  <groupId>org.apache.flink</groupId>
                  <artifactId>flink-table-planner_2.12</artifactId>
                  <version>${flink.version}</version>
                  <scope>provided</scope>
              </dependency>
      
              <dependency>
                  <groupId>org.apache.logging.log4j</groupId>
                  <artifactId>log4j-slf4j-impl</artifactId>
                  <version>${log4j.version}</version>
                  <scope>runtime</scope>
              </dependency>
              <dependency>
                  <groupId>org.apache.logging.log4j</groupId>
                  <artifactId>log4j-api</artifactId>
                  <version>${log4j.version}</version>
                  <scope>runtime</scope>
              </dependency>
              <dependency>
                  <groupId>org.apache.logging.log4j</groupId>
                  <artifactId>log4j-core</artifactId>
                  <version>${log4j.version}</version>
                  <scope>runtime</scope>
              </dependency>
      
              <dependency>
                  <groupId>org.example</groupId>
                  <artifactId>flink-1.18-table-source-example</artifactId>
                  <version>1.0.0-SNAPSHOT</version>
                  <scope>provided</scope>
              </dependency>
          </dependencies>
      
          <build>
              <plugins>
                  <plugin>
                      <groupId>org.apache.maven.plugins</groupId>
                      <artifactId>maven-shade-plugin</artifactId>
                      <version>3.2.4</version>
                      <configuration>
                          <createDependencyReducedPom>false</createDependencyReducedPom>
                      </configuration>
                      <executions>
                          <execution>
                              <phase>package</phase>
                              <goals>
                                  <goal>shade</goal>
                              </goals>
                              <configuration>
                                  <transformers>
                                      <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
                                  </transformers>
                                  <filters>
                                      <filter>
                                          <artifact>*:*</artifact>
                                          <excludes>
                                              <exclude>META-INF/*.SF</exclude>
                                              <exclude>META-INF/*.DSA</exclude>
                                              <exclude>META-INF/*.RSA</exclude>
                                          </excludes>
                                      </filter>
                                  </filters>
                                  <shadedArtifactAttached>true</shadedArtifactAttached>
                                  <shadedClassifierName>jar-with-dependencies</shadedClassifierName>
                              </configuration>
                          </execution>
                      </executions>
                  </plugin>
              </plugins>
          </build>
      
      </project>
      

      這里我們引入了 Flink 的相關(guān)依賴,但是都標(biāo)記為 provided ,然后我們也引入了我們自己的 Connector 但是也標(biāo)記為 provided ,因?yàn)槲覀冎筮\(yùn)行時(shí)要放到 Flink 的 lib 下,所以不需要帶上。另外我們也不需要引入 ClickHouse 的依賴,因?yàn)槲覀兊?Connector 中已經(jīng)包含了。

      這里我們使用了 maven-shade-plugin 進(jìn)行打包,主要是為了合并 resources 下面的 services 中的內(nèi)容,這個(gè)等下我們會(huì)說(shuō),然后我們創(chuàng)建一個(gè)測(cè)試代碼,內(nèi)容如下:

      package org.example;
      
      import org.apache.flink.api.common.RuntimeExecutionMode;
      import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
      import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
      
      public class ClickHouseTableExample {
          public static void main(String[] args) {
              final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
              env.setRuntimeMode(RuntimeExecutionMode.BATCH);
              StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
      
              tableEnv.executeSql("CREATE TABLE user_score (\n" +
                      "    `name` STRING,\n" +
                      "    `score` INTEGER,\n" +
                      "    `user_id` BYTES\n" +
                      ") WITH (\n" +
                      "    'connector' = 'clickhouse',\n" +
                      "    'hostname' = 'localhost',\n" +
                      "    'port' = '8123',\n" +
                      "    'username' = 'default',\n" +
                      "    'password' = 'wSqDxDAt',\n" +
                      "    'database' = 'default',\n" +
                      "    'table' = 'user_score',\n" +
                      "    'format' = 'clickhouse-row'\n" +
                      ");\n").print();
      
              tableEnv.executeSql("SELECT sum(score), name from user_score group by name;").print();
          }
      }
      
      

      這是一段很簡(jiǎn)單的代碼,主要就是做了一點(diǎn)統(tǒng)計(jì),其中的模式設(shè)置為了 BATCH ,其實(shí)如果 Source Connector 定義為批,那么運(yùn)行模式既可以設(shè)置為流也可以設(shè)置為批,如果是設(shè)置為流在聚合時(shí),所有的計(jì)算過(guò)程都會(huì)更新出來(lái),而如果設(shè)置為批,則只有一個(gè)最終的結(jié)果,結(jié)果是在 SourceFunction 退出后才會(huì)最終輸出。如果 SourceFunction 是無(wú)限循環(huán),那么永遠(yuǎn)也得不到最終的結(jié)果,但是流運(yùn)行模式可以不斷地得到當(dāng)前的結(jié)果。如果 Source Connector 定義為流,那么當(dāng)前的運(yùn)行模式只能設(shè)置為流,所有的聚合結(jié)果都會(huì)根據(jù)流的到來(lái)實(shí)時(shí)輸出。

      現(xiàn)在程序還無(wú)法運(yùn)行,這時(shí)候會(huì)報(bào)錯(cuò):

      Could not find any factory for identifier 'clickhouse' that implements 'org.apache.flink.table.factories.Factory' in the classpath.
      

      這是因?yàn)榫唧w的 DynamicTableSourceDynamicTableSink 是通過(guò) Java 的 SPI 提供發(fā)現(xiàn)的,簡(jiǎn)單來(lái)說(shuō)定義方法如下,我們?cè)陧?xiàng)目的 resources 目錄下,對(duì)于 Maven 詳細(xì)的目錄就是 src/main/resources 下創(chuàng)建子目錄 META-INF/services ,然后創(chuàng)建文件 org.apache.flink.table.factories.Factory ,內(nèi)容如下:

      org.example.source.clickhouse.ClickHouseDynamicTableFactory
      org.example.source.clickhouse.ClickHouseFormatFactory
      

      這樣程序在運(yùn)行時(shí)就會(huì)找到對(duì)應(yīng) Class 的位置從而加載它。

      在本地運(yùn)行時(shí),需要將相關(guān)的 provided 注釋掉并運(yùn)行即可,然后如果是打包提交集群時(shí),默認(rèn)如果其他依賴包也有 SPI 相關(guān)的文件,那么會(huì)把當(dāng)前項(xiàng)目的覆蓋掉,所以上面在 Maven 中配置了下面的內(nèi)容:

      <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
      

      這個(gè)表示將當(dāng)前項(xiàng)目的 ServicesResource 和打包中已經(jīng)存在的進(jìn)行合并,這樣我們上面寫的文件就會(huì)帶到發(fā)布的包中了。

      最后上面 pom.xml 中的 filters 也必須配置,否則一些多余的文件不過(guò)濾掉在運(yùn)行時(shí)會(huì)報(bào)錯(cuò):

      Invalid signature file digest for Manifest main
      

      表示簽名無(wú)效,去掉后才可以正常運(yùn)行。

      這樣我們先將 Connector 放到 Flink 集群中所有的 lib 目錄下,然后重啟 Flink 集群,最后再將當(dāng)前項(xiàng)目通過(guò) mvn package 打成的包提交到集群運(yùn)行就可以了。

      Reference:

      1. https://developer.aliyun.com/article/1045096
      2. https://www.modb.pro/db/634537
      3. https://juejin.cn/post/7212901628769189947
      posted @ 2024-03-19 21:41  小得盈滿  閱讀(388)  評(píng)論(0)    收藏  舉報(bào)
      主站蜘蛛池模板: 人妻性奴波多野结衣无码| 亚洲成在人线AV品善网好看| 国产欧美va欧美va在线| 苍井空毛片精品久久久| 南乐县| 亚洲国产午夜精品福利| 99在线精品免费视频九九视 | 99热门精品一区二区三区无码 | 日韩精品久久一区二区三| 免费人成视频x8x8国产| 国产一级二级三级毛片| 女人被狂躁的高潮免费视频| 国产va免费精品观看| 国产精品天天看天天狠| 亚洲精品国偷拍自产在线观看蜜臀 | 亚洲AV永久无码天堂网一线| 内射中出无码护士在线| mm1313亚洲国产精品| 九九热免费在线播放视频| 成人国产精品中文字幕| 国产精品一二三中文字幕| 欧美一区二区三区欧美日韩亚洲| 日韩深夜免费在线观看| 亚洲欧美人成人综合在线播放| 99福利一区二区视频| 日韩区中文字幕在线观看| 中文字幕国产精品自拍| 亚洲色欲在线播放一区二区三区| 国产自拍在线一区二区三区 | 亚欧成人精品一区二区乱| 国产午夜影视大全免费观看| 亚洲欧美日韩愉拍自拍美利坚| 日韩精品国产另类专区| 精品国产中文字幕av| 亚洲超碰97无码中文字幕 | 蜜臀一区二区三区精品免费| japan黑人极大黑炮| 精品国产三级在线观看| 国产目拍亚洲精品二区| 日韩一区二区大尺度在线| 亚洲VA成无码人在线观看天堂|