<output id="qn6qe"></output>

    1. <output id="qn6qe"><tt id="qn6qe"></tt></output>
    2. <strike id="qn6qe"></strike>

      亚洲 日本 欧洲 欧美 视频,日韩中文字幕有码av,一本一道av中文字幕无码,国产线播放免费人成视频播放,人妻少妇偷人无码视频,日夜啪啪一区二区三区,国产尤物精品自在拍视频首页,久热这里只有精品12

      劍道第一仙

      導航

      chatgpt使用記錄

      網址:http://chat.yangxiwl.cn/chatbotai/1002?channel=bdtoufangChatztHD3

      寫一個java程序,要求將kafka的數據消費批量插入的clickhouse數據庫

      你可以使用Kafka的Java客戶端庫和ClickHouse的Java驅動來實現將Kafka數據消費并批量插入到ClickHouse數據庫中的功能。以下是一個簡單的示例程序:

      import org.apache.kafka.clients.consumer.ConsumerConfig;
      import org.apache.kafka.clients.consumer.ConsumerRecord;
      import org.apache.kafka.clients.consumer.ConsumerRecords;
      import org.apache.kafka.clients.consumer.KafkaConsumer;
      import org.apache.kafka.common.serialization.StringDeserializer;
      import ru.yandex.clickhouse.BalancedClickhouseDataSource;
      import ru.yandex.clickhouse.ClickHouseConnection;
      import ru.yandex.clickhouse.ClickHousePreparedStatement;
      import ru.yandex.clickhouse.settings.ClickHouseProperties;
      
      import java.sql.SQLException;
      import java.util.*;
      
      public class KafkaToClickHouse {
      
          public static void main(String[] args) {
              // Kafka配置
              Properties kafkaProps = new Properties();
              kafkaProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
              kafkaProps.put(ConsumerConfig.GROUP_ID_CONFIG, "clickhouse-consumer");
              kafkaProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
              kafkaProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
              kafkaProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
      
              // ClickHouse配置
              ClickHouseProperties clickHouseProps = new ClickHouseProperties();
              clickHouseProps.setHost("localhost");
              clickHouseProps.setPort(8123);
              clickHouseProps.setDatabase("your_database_name");
              clickHouseProps.setUser("your_username");
              clickHouseProps.setPassword("your_password");
      
              // Kafka消費者
              KafkaConsumer<String, String> consumer = new KafkaConsumer<>(kafkaProps);
              consumer.subscribe(Collections.singletonList("your_topic_name"));
      
              // ClickHouse連接池
              BalancedClickhouseDataSource dataSource = new BalancedClickhouseDataSource("jdbc:clickhouse://", clickHouseProps);
      
              // 消費并插入數據
              try (ClickHouseConnection connection = dataSource.getConnection();
                   ClickHousePreparedStatement stmt = connection.prepareStatement("INSERT INTO your_table_name (col1, col2) VALUES (?, ?)")) {
                  while (true) {
                      ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
                      for (ConsumerRecord<String, String> record : records) {
                          String[] data = record.value().split(","); // 假設數據格式為逗號分隔的兩列
                          stmt.setString(1, data[0]);
                          stmt.setString(2, data[1]);
                          stmt.addBatch();
                      }
                      stmt.executeBatch();
                  }
              } catch (SQLException e) {
                  e.printStackTrace();
              } finally {
                  consumer.close();
                  dataSource.close();
              }
          }
      }

       

      下面是一個示例程序,它將Kafka的數據按照每100條記錄進行批量插入到ClickHouse數據庫,并在每個批次結束后提交事務

      import org.apache.kafka.clients.consumer.ConsumerConfig;
      import org.apache.kafka.clients.consumer.ConsumerRecord;
      import org.apache.kafka.clients.consumer.ConsumerRecords;
      import org.apache.kafka.clients.consumer.KafkaConsumer;
      import org.apache.kafka.common.serialization.StringDeserializer;
      import ru.yandex.clickhouse.BalancedClickhouseDataSource;
      import ru.yandex.clickhouse.ClickHouseConnection;
      import ru.yandex.clickhouse.ClickHousePreparedStatement;
      import ru.yandex.clickhouse.settings.ClickHouseProperties;
      
      import java.sql.SQLException;
      import java.time.Duration;
      import java.util.ArrayList;
      import java.util.List;
      import java.util.Properties;
      
      public class KafkaToClickHouse {
      
          public static void main(String[] args) {
              // Kafka配置
              Properties kafkaProps = new Properties();
              kafkaProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
              kafkaProps.put(ConsumerConfig.GROUP_ID_CONFIG, "clickhouse-consumer");
              kafkaProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
              kafkaProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
              kafkaProps.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 100); // 每次最多拉取100條記錄
      
              // ClickHouse配置
              ClickHouseProperties clickHouseProps = new ClickHouseProperties();
              clickHouseProps.setHost("localhost");
              clickHouseProps.setPort(8123);
              clickHouseProps.setDatabase("your_database_name");
              clickHouseProps.setUser("your_username");
              clickHouseProps.setPassword("your_password");
      
              // Kafka消費者
              KafkaConsumer<String, String> consumer = new KafkaConsumer<>(kafkaProps);
              consumer.subscribe(Collections.singletonList("your_topic_name"));
      
              // ClickHouse連接池
              BalancedClickhouseDataSource dataSource = new BalancedClickhouseDataSource("jdbc:clickhouse://", clickHouseProps);
      
              // 消費并插入數據
              try (ClickHouseConnection connection = dataSource.getConnection()) {
                  List<String> batch = new ArrayList<>();
                  while (true) {
                      ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
                      for (ConsumerRecord<String, String> record : records) {
                          String[] data = record.value().split(","); // 假設數據格式為逗號分隔的兩列
                          String row = String.format("('%s', '%s')", data[0], data[1]);
                          batch.add(row);
                      }
                      if (batch.size() >= 100) {
                          insertBatch(connection, batch);
                          batch.clear();
                      }
                  }
              } catch (SQLException e) {
                  e.printStackTrace();
              } finally {
                  consumer.close();
                  dataSource.close();
              }
          }
      
          private static void insertBatch(ClickHouseConnection connection, List<String> batch) throws SQLException {
              try (ClickHousePreparedStatement stmt = connection.prepareStatement("INSERT INTO your_table_name (col1, col2) VALUES " + String.join(",", batch))) {
                  stmt.execute();
              }
          }
      }

       如何實現批量提交的同時避免重復插入

      import org.apache.kafka.clients.consumer.ConsumerConfig;
      import org.apache.kafka.clients.consumer.ConsumerRecord;
      import org.apache.kafka.clients.consumer.ConsumerRecords;
      import org.apache.kafka.clients.consumer.KafkaConsumer;
      import org.apache.kafka.common.serialization.StringDeserializer;
      import ru.yandex.clickhouse.BalancedClickhouseDataSource;
      import ru.yandex.clickhouse.ClickHouseConnection;
      import ru.yandex.clickhouse.ClickHousePreparedStatement;
      import ru.yandex.clickhouse.settings.ClickHouseProperties;
      
      import java.sql.SQLException;
      import java.time.Duration;
      import java.util.*;
      
      public class KafkaToClickHouse {
      
          public static void main(String[] args) {
              // Kafka配置
              Properties kafkaProps = new Properties();
              kafkaProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
              kafkaProps.put(ConsumerConfig.GROUP_ID_CONFIG, "clickhouse-consumer");
              kafkaProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
              kafkaProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
              kafkaProps.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 100); // 每次最多拉取100條記錄
      
              // ClickHouse配置
              ClickHouseProperties clickHouseProps = new ClickHouseProperties();
              clickHouseProps.setHost("localhost");
              clickHouseProps.setPort(8123);
              clickHouseProps.setDatabase("your_database_name");
              clickHouseProps.setUser("your_username");
              clickHouseProps.setPassword("your_password");
      
              // Kafka消費者
              KafkaConsumer<String, String> consumer = new KafkaConsumer<>(kafkaProps);
              consumer.subscribe(Collections.singletonList("your_topic_name"));
      
              // ClickHouse連接池
              BalancedClickhouseDataSource dataSource = new BalancedClickhouseDataSource("jdbc:clickhouse://", clickHouseProps);
      
              // 消費并插入數據
              try (ClickHouseConnection connection = dataSource.getConnection()) {
                  List<String> batch = new ArrayList<>();
                  while (true) {
                      ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
                      for (ConsumerRecord<String, String> record : records) {
                          String[] data = record.value().split(","); // 假設數據格式為逗號分隔的兩列
                          String row = String.format("('%s', '%s')", data[0], data[1]);
                          batch.add(row);
                      }
                      if (batch.size() >= 100) {
                          insertBatch(connection, batch);
                          batch.clear();
                      }
                  }
              } catch (SQLException e) {
                  e.printStackTrace();
              } finally {
                  consumer.close();
                  dataSource.close();
              }
          }
      
          private static void insertBatch(ClickHouseConnection connection, List<String> batch) throws SQLException {
              try (ClickHousePreparedStatement stmt = connection.prepareStatement("INSERT INTO your_table_name (col1, col2) VALUES " + String.join(",", batch))) {
                  stmt.execute();
              } catch (SQLException e) {
                  // 處理重復插入的異常
                  if (e.getMessage().contains("Code: 409")) {
                      // 重復插入數據,可以忽略或者進行其他處理
                  } else {
                      throw e;
                  }
              }
          }
      }

       

      posted on 2023-10-10 22:45  劍道第一仙  閱讀(34)  評論(0)    收藏  舉報

      主站蜘蛛池模板: 精品日本乱一区二区三区| 国产男女猛烈无遮挡免费视频| 亚洲一区在线成人av| 免费看黄片一区二区三区| 在线综合亚洲欧洲综合网站| 久久96热在精品国产高清| 久久精品无码中文字幕| 亚洲春色在线视频| 国产精品嫩草99av在线| 亚洲AV成人无码精品电影在线| 精品无码久久久久久尤物| 国产亚洲精品久久久久久无亚洲| 福利视频在线播放| 人人人澡人人肉久久精品| 日韩熟女熟妇久久精品综合| 久久99精品国产麻豆婷婷| 国产精品福利自产拍在线观看| 免费人成网站免费看视频| 中文字幕乱码中文乱码毛片| 狠狠v日韩v欧美v| 一本色道久久综合熟妇人妻| 亚洲自拍偷拍激情视频| 国产日本一区二区三区久久| 国产精品免费看久久久| 国精品91人妻无码一区二区三区| 国产综合视频一区二区三区| 国内精品伊人久久久影视| 暖暖 免费 高清 日本 在线观看5| 亚洲日本韩国欧美云霸高清| 777米奇影视第四色| 国产精品熟女一区二区三区 | 桂平市| 亚洲a∨国产av综合av| 久热这里只有精品12| 国产亚洲精品2021自在线| 精品无码三级在线观看视频| 三级黄色片一区二区三区| 少妇真人直播免费视频| 天天天做夜夜夜做无码| 国产AV无码专区亚洲AV潘金链| av综合亚洲一区二区|