<output id="qn6qe"></output>

    1. <output id="qn6qe"><tt id="qn6qe"></tt></output>
    2. <strike id="qn6qe"></strike>

      亚洲 日本 欧洲 欧美 视频,日韩中文字幕有码av,一本一道av中文字幕无码,国产线播放免费人成视频播放,人妻少妇偷人无码视频,日夜啪啪一区二区三区,国产尤物精品自在拍视频首页,久热这里只有精品12

      從零開始學Flink:數據輸出的終極指南

      在實時數據處理的完整鏈路中,數據輸出(Sink)是最后一個關鍵環節,它負責將處理后的結果傳遞到外部系統供后續使用。Flink提供了豐富的數據輸出連接器,支持將數據寫入Kafka、Elasticsearch、文件系統、數據庫等各種目標系統。本文將深入探討Flink數據輸出的核心概念、配置方法和最佳實踐,并基于Flink 1.20.1構建一個完整的數據輸出案例。

      1. 什么是Sink

      Sink(接收器)是Flink數據處理流水線的末端,負責將計算結果輸出到外部存儲系統或下游處理系統。在Flink的編程模型中,Sink是DataStream API中的一個轉換操作,它接收DataStream并將數據寫入指定的外部系統。

      2. Sink的分類

      Flink的Sink連接器可以分為以下幾類:

      • 內置Sink:如print()、printToErr()等用于調試的內置輸出
      • 文件系統Sink:支持寫入本地文件系統、HDFS等
      • 消息隊列Sink:如Kafka、RabbitMQ等
      • 數據庫Sink:如JDBC、Elasticsearch等
      • 自定義Sink:通過實現SinkFunction接口自定義輸出邏輯

      3. 輸出語義保證

      Flink為Sink提供了三種輸出語義保證:

      • 最多一次(At-most-once):數據可能丟失,但不會重復
      • 至少一次(At-least-once):數據不會丟失,但可能重復
      • 精確一次(Exactly-once):數據既不會丟失,也不會重復

      這些語義保證與Flink的檢查點(Checkpoint)機制密切相關,我們將在后面詳細討論。

      二、環境準備與依賴配置

      1. 版本說明

      • Flink:1.20.1
      • JDK:17+
      • Gradle:8.3+
      • 外部系統:Kafka 3.4.0、Elasticsearch 7.17.0、MySQL 8.0

      2. 核心依賴

      dependencies {
          // Flink核心依賴
          implementation 'org.apache.flink:flink_core:1.20.1'
          implementation 'org.apache.flink:flink-streaming-java:1.20.1'
          implementation 'org.apache.flink:flink-clients:1.20.1'
          
          // Kafka Connector
          implementation 'org.apache.flink:flink-connector-kafka:3.4.0-1.20'
          
          // Elasticsearch Connector
          implementation 'org.apache.flink:flink-connector-elasticsearch7:3.1.0-1.20'
          
          // JDBC Connector
          implementation 'org.apache.flink:flink-connector-jdbc:3.3.0-1.20'
          implementation 'mysql:mysql-connector-java:8.0.33'
          
          // FileSystem Connector
          implementation 'org.apache.flink:flink-connector-files:1.20.1'
      
      }
      

      三、基礎Sink操作

      1. 內置調試Sink

      Flink提供了一些內置的Sink用于開發和調試階段:

      import org.apache.flink.streaming.api.datastream.DataStream;
      import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
      
      public class BasicSinkDemo {
          public static void main(String[] args) throws Exception {
              // 創建執行環境
              StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
              
              // 創建數據源
              DataStream<String> stream = env.fromElements("Hello", "Flink", "Sink");
              
              // 打印到標準輸出
              stream.print("StandardOutput");
              
              // 打印到標準錯誤輸出
              stream.printToErr("ErrorOutput");
              
              // 執行作業
              env.execute("Basic Sink Demo");
          }
      }
      

      2. 文件系統Sink

      Flink支持將數據寫入本地文件系統、HDFS等。下面是一個寫入本地文件系統的示例:

      package com.cn.daimajiangxin.flink.sink;
      
      import org.apache.flink.api.common.serialization.SimpleStringEncoder;
      import org.apache.flink.configuration.MemorySize;
      import org.apache.flink.connector.file.sink.FileSink;
      import org.apache.flink.core.fs.Path;
      import org.apache.flink.streaming.api.datastream.DataStream;
      import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
      import org.apache.flink.streaming.api.functions.sink.filesystem.RollingPolicy;
      import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
      
      import java.time.Duration;
      
      public class FileSystemSinkDemo {
          public static void main(String[] args) throws Exception {
              StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
              DataStream<Object> stream = env.fromData("Hello", "Flink", "FileSystem", "Sink");
              RollingPolicy<Object, String> rollingPolicy = DefaultRollingPolicy.<Object, String>builder()
                      .withRolloverInterval(Duration.ofMinutes(15))
                      .withInactivityInterval(Duration.ofMinutes(5))
                      .withMaxPartSize(MemorySize.ofMebiBytes(64))
                      .build();
      
              // 創建文件系統Sink
              FileSink<Object> sink = FileSink
                      .forRowFormat(new Path("file:///tmp/flink-output"), new SimpleStringEncoder<>())
                      .withRollingPolicy(rollingPolicy)
                      .build();
              // 添加Sink
              stream.sinkTo(sink);
              env.execute("File System Sink Demo");
          }
      }
      

      四、高級Sink連接器

      1. Kafka Sink

      Kafka是實時數據處理中常用的消息隊列,Flink提供了強大的Kafka Sink支持:

      import org.apache.flink.api.common.serialization.SimpleStringSchema;
      import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
      import org.apache.flink.connector.kafka.sink.KafkaSink;
      import org.apache.flink.streaming.api.datastream.DataStream;
      import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
      
      import java.util.Properties;
      
      public class KafkaSinkDemo {
          public static void main(String[] args) throws Exception {
              StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
              
              // 開啟檢查點以支持Exactly-Once語義
              env.enableCheckpointing(5000);
              
              DataStream<String> stream = env.fromElements("Hello Kafka", "Flink to Kafka", "Data Pipeline");
              
              // Kafka配置
              Properties props = new Properties();
              props.setProperty("bootstrap.servers", "localhost:9092");
              
              // 創建Kafka Sink
              KafkaSink<String> sink = KafkaSink.<String>
                      builder()
                      .setKafkaProducerConfig(props)
                      .setRecordSerializer(KafkaRecordSerializationSchema.builder()
                              .setTopic("flink-output-topic")
                              .setValueSerializationSchema(new SimpleStringSchema())
                              .build())
                      .build();
              
              // 添加Sink
              stream.sinkTo(sink);
              
              env.execute("Kafka Sink Demo");
          }
      }
      

      kafka消息隊列消息:
      20250929104749

      2. Elasticsearch Sink

      Elasticsearch是一個實時的分布式搜索和分析引擎,非常適合存儲和查詢Flink處理的實時數據:

      package com.cn.daimajiangxin.flink.sink;
      
      import com.fasterxml.jackson.databind.ObjectMapper;
      import org.apache.flink.connector.elasticsearch.sink.Elasticsearch7SinkBuilder;
      import org.apache.flink.connector.elasticsearch.sink.ElasticsearchSink;
      import org.apache.flink.streaming.api.datastream.DataStream;
      import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
      import org.apache.http.HttpHost;
      import org.elasticsearch.action.index.IndexRequest;
      import org.elasticsearch.client.Requests;
      
      import java.util.Map;
      
      public class ElasticsearchSinkDemo {
          private static final ObjectMapper objectMapper = new ObjectMapper();
          public static void main(String[] args) throws Exception {
              StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
              env.enableCheckpointing(5000);
      
      
              DataStream<String> stream = env.fromData(
                      "{\"id\":\"1\",\"name\":\"Flink\",\"category\":\"framework\"}",
                      "{\"id\":\"2\",\"name\":\"Elasticsearch\",\"category\":\"database\"}");
      
              // 配置Elasticsearch節點
              HttpHost httpHost=new HttpHost("localhost", 9200, "http");
      
              // 創建Elasticsearch Sink
              ElasticsearchSink<String> sink=new Elasticsearch7SinkBuilder<String>()
                      .setBulkFlushMaxActions(10)        // 批量操作數量
                      .setBulkFlushInterval(5000)          // 批量刷新間隔(毫秒)
                      .setHosts(httpHost)
                      .setConnectionRequestTimeout(60000)  // 連接請求超時時間
                      .setConnectionTimeout(60000)         // 連接超時時間
                      .setSocketTimeout(60000)             // Socket 超時時間
                      .setEmitter((element, context, indexer) -> {
                          try {
                              Map<String, Object> json = objectMapper.readValue(element, Map.class);
                              IndexRequest request = Requests.indexRequest()
                                      .index("flink_documents")
                                      .id((String) json.get("id"))
                                      .source(json);
                              indexer.add(request);
                          } catch (Exception e) {
                              // 處理解析異常
                              System.err.println("Failed to parse JSON: " + element);
                          }
                      })
                      .build();
      
              // 添加Sink
              stream.sinkTo(sink);
      
              env.execute("Elasticsearch Sink Demo");
          }
      }
      

      使用post工具查看數據
      wechat_2025-09-29_180718_279

      3. JDBC Sink

      使用JDBC Sink可以將數據寫入各種關系型數據庫:

      package com.cn.daimajiangxin.flink.sink;
      
      import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
      import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
      import org.apache.flink.connector.jdbc.JdbcStatementBuilder;
      import org.apache.flink.connector.jdbc.core.datastream.Jdbc;
      import org.apache.flink.connector.jdbc.core.datastream.sink.JdbcSink;
      import org.apache.flink.connector.jdbc.datasource.statements.SimpleJdbcQueryStatement;
      import org.apache.flink.streaming.api.datastream.DataStream;
      import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
      
      import java.util.Arrays;
      import java.util.List;
      
      public class JdbcSinkDemo {
          public static void main(String[] args) throws Exception {
              StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
              env.enableCheckpointing(5000);
              List<User> userList = Arrays.asList(     new User(1, "Alice", 25,"alice"),
                      new User(2, "Bob", 30,"bob"),
                      new User(3, "Charlie", 35,"charlie"));
              // 模擬用戶數據
              DataStream<User> userStream = env.fromData(userList);
      
              JdbcExecutionOptions jdbcExecutionOptions = JdbcExecutionOptions.builder()
                      .withBatchSize(1000)
                      .withBatchIntervalMs(200)
                      .withMaxRetries(5)
                      .build();
              JdbcConnectionOptions connectionOptions = new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
                      .withUrl("jdbc:mysql://localhost:3306/test")
                      .withDriverName("com.mysql.cj.jdbc.Driver")
                      .withUsername("username")
                      .withPassword("password")
                      .build();
              String insertSql = "INSERT INTO user (id, name, age, user_name) VALUES (?, ?, ?, ?)";
              JdbcStatementBuilder<User> statementBuilder = (statement, user) -> {
                  statement.setInt(1, user.getId());
                  statement.setString(2, user.getName());
                  statement.setInt(3, user.getAge());
                  statement.setString(4, user.getUserName());
              };
              // 創建JDBC Sink
      
              JdbcSink<User> jdbcSink = new Jdbc().<User>sinkBuilder()
                      .withQueryStatement( new SimpleJdbcQueryStatement<User>(insertSql,statementBuilder))
                      .withExecutionOptions(jdbcExecutionOptions)
                      .buildAtLeastOnce(connectionOptions);
              // 添加Sink
              userStream.sinkTo(jdbcSink);
              env.execute("JDBC Sink Demo");
          }
      
          // 用戶實體類
          public static class User {
              private int id;
              private String name;
              private String userName;
              private int age;
      
              public User(int id, String name, int age,String userName) {
                  this.id = id;
                  this.name = name;
                  this.age = age;
                  this.userName=userName;
              }
      
              public int getId() {
                  return id;
              }
      
              public String getName() {
                  return name;
              }
      
              public int getAge() {
                  return age;
              }
      
              public String getUserName() {
                  return userName;
              }
          }
      }
      

      登錄mysql客戶端查看數據
      20250930113343

      五、Sink的可靠性保證機制

      1. 檢查點與保存點

      Flink的檢查點(Checkpoint)機制是實現精確一次語義的基礎。當開啟檢查點后,Flink會定期將作業的狀態保存到持久化存儲中。如果作業失敗,Flink可以從最近的檢查點恢復,確保數據不會丟失。

      // 配置檢查點
      StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
      
      // 啟用檢查點,間隔5000ms
      env.enableCheckpointing(5000);
      
      // 配置檢查點模式為EXACTLY_ONCE(默認)
      env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
      
      // 設置檢查點超時時間
      env.getCheckpointConfig().setCheckpointTimeout(60000);
      
      // 設置最大并行檢查點數量
      env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
      
      // 開啟外部化檢查點,作業失敗時保留檢查點
      env.getCheckpointConfig().enableExternalizedCheckpoints(
          CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
      

      2. 事務與二階段提交

      對于支持事務的外部系統,Flink使用二階段提交(Two-Phase Commit)協議來實現精確一次語義:

      • 第一階段(預提交):Flink將數據寫入外部系統的預提交區域,但不提交
      • 第二階段(提交):所有算子完成預提交后,Flink通知外部系統提交數據

      這種機制確保了即使在作業失敗或恢復的情況下,數據也不會被重復寫入或丟失。

      3. 不同Sink的語義保證級別

      不同的Sink連接器支持不同級別的語義保證:

      • 支持精確一次(Exactly-once):Kafka、Elasticsearch(版本支持)、文件系統(預寫日志模式)
      • 支持至少一次(At-least-once):JDBC、Redis、RabbitMQ
      • 最多一次(At-most-once):簡單的無狀態輸出

      六、自定義Sink實現

      當Flink內置的Sink連接器不能滿足需求時,我們可以通過實現SinkFunction接口來自定義Sink:

      package com.cn.daimajiangxin.flink.sink;
      
      import org.apache.flink.api.common.functions.RuntimeContext;
      import org.apache.flink.api.connector.sink2.Sink;
      import org.apache.flink.api.connector.sink2.SinkWriter;
      import org.apache.flink.streaming.api.datastream.DataStream;
      import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
      import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
      
      import java.io.IOException;
      
      public class CustomSinkDemo {
          public static void main(String[] args) throws Exception {
              StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
      
              DataStream<String> stream = env.fromElements("Custom", "Sink", "Example");
      
              // 使用自定義Sink
              stream.sinkTo(new CustomSink());
      
              env.execute("Custom Sink Demo");
          }
      
          // 自定義Sink實現 - 使用新API
          public static class CustomSink implements Sink<String> {
      
              @Override
              public SinkWriter<String> createWriter(InitContext context) {
                  return new CustomSinkWriter();
              }
      
              // SinkWriter負責實際的數據寫入邏輯
              private static class CustomSinkWriter implements SinkWriter<String> {
      
                  // 初始化資源
                  public CustomSinkWriter() {
                      // 初始化連接、客戶端等資源
                      System.out.println("CustomSink initialized");
                  }
      
                  // 處理每個元素
                  @Override
                  public void write(String value, Context context)  throws IOException, InterruptedException {
                      // 實際的寫入邏輯
                      System.out.println("Writing to custom sink: " + value);
                  }
      
                  // 刷新緩沖區
                  @Override
                  public void flush(boolean endOfInput) {
                      // 刷新邏輯(如果需要)
                  }
      
                  // 清理資源
                  @Override
                  public void close() throws Exception {
                      // 關閉連接、客戶端等資源
                      System.out.println("CustomSink closed");
                  }
              }
          }
      
      }
      

      sad20251006111134

      七、實戰案例:實時數據處理流水線

      下面我們將構建一個完整的實時數據處理流水線,從Kafka讀取數據,進行轉換處理,然后輸出到多個目標系統:

      1. 系統架構

      Kafka Source -> Flink Processing -> Multiple Sinks
                                     |-> Kafka Sink
                                     |-> Elasticsearch Sink
                                     |-> JDBC Sink
      

      2. 數據模型

      我們將使用日志數據模型,定義一個LogEntry類來表示日志條目:

      package com.cn.daimajiangxin.flink.sink;
      
      public class LogEntry {
          private String timestamp;
          private String logLevel;
          private String source;
          private String message;
      
          public String getTimestamp() {
              return timestamp;
          }
      
          public void setTimestamp(String timestamp) {
              this.timestamp = timestamp;
          }
      
          public String getLogLevel() {
              return logLevel;
          }
      
          public void setLogLevel(String logLevel) {
              this.logLevel = logLevel;
          }
      
          public String getSource() {
              return source;
          }
      
          public void setSource(String source) {
              this.source = source;
          }
      
          public String getMessage() {
              return message;
          }
      
          public void setMessage(String message) {
              this.message = message;
          }
      
          @Override
          public String toString() {
              return String.format("LogEntry{timestamp='%s', logLevel='%s', source='%s', message='%s'}",
                      timestamp, logLevel, source, message);
          }
      }
      

      定義一個日志統計實體類LogStats,用于表示每個源的日志統計信息:

      package com.cn.daimajiangxin.flink.sink;
      
      public class LogStats {
          private String source;
          private long count;
      
          public LogStats() {
          }
      
          public LogStats(String source, long count) {
              this.source = source;
              this.count = count;
          }
      
          public String getSource() {
              return source;
          }
      
          public void setSource(String source) {
              this.source = source;
          }
      
          public long getCount() {
              return count;
          }
      
          public void setCount(long count) {
              this.count = count;
          }
      
          @Override
          public String toString() {
              return String.format("LogStats{source='%s', count=%d}", source, count);
          }
      }
      

      3. 完整實現代碼

      package com.cn.daimajiangxin.flink.sink;
      
      import org.apache.flink.api.common.eventtime.WatermarkStrategy;
      import org.apache.flink.api.common.serialization.SimpleStringSchema;
      import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
      import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
      import org.apache.flink.connector.jdbc.JdbcStatementBuilder;
      import org.apache.flink.connector.jdbc.core.datastream.Jdbc;
      import org.apache.flink.connector.jdbc.core.datastream.sink.JdbcSink;
      import org.apache.flink.connector.jdbc.datasource.statements.SimpleJdbcQueryStatement;
      import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
      import org.apache.flink.connector.kafka.sink.KafkaSink;
      import org.apache.flink.connector.kafka.source.KafkaSource;
      import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
      import org.apache.flink.streaming.api.datastream.DataStream;
      import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
      import org.apache.flink.connector.elasticsearch.sink.Elasticsearch7SinkBuilder;
      import org.apache.flink.connector.elasticsearch.sink.ElasticsearchSink;
      import org.apache.http.HttpHost;
      import org.elasticsearch.action.index.IndexRequest;
      import org.elasticsearch.client.Requests;
      
      import java.sql.PreparedStatement;
      import java.time.LocalDateTime;
      import java.util.ArrayList;
      import java.util.HashMap;
      import java.util.List;
      import java.util.Map;
      import java.util.Properties;
      
      public class MultiSinkPipeline {
          public static void main(String[] args) throws Exception {
              // 1. 創建執行環境并配置檢查點
              StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
              env.enableCheckpointing(5000);
      
              // 2. 創建Kafka Source
              KafkaSource<String> source = KafkaSource.<String>
                              builder()
                      .setBootstrapServers("localhost:9092")
                      .setTopics("logs-input-topic")
                      .setGroupId("flink-group")
                      .setStartingOffsets(OffsetsInitializer.earliest())
                      .setValueOnlyDeserializer(new SimpleStringSchema())
                      .build();
      
              // 3. 讀取數據并解析
              DataStream<String> kafkaStream = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");
      
              // 解析日志數據
              DataStream<LogEntry> logStream = kafkaStream
                      .map(line -> {
                          String[] parts = line.split("\\|");
                          return new LogEntry(parts[0], parts[1], parts[2], parts[3]);
                      })
                      .name("Log Parser");
      
              // 4. 過濾錯誤日志
              DataStream<LogEntry> errorLogStream = logStream
                      .filter(log -> "ERROR".equals(log.getLogLevel()))
                      .name("Error Log Filter");
      
              // 5. 配置并添加Kafka Sink - 輸出錯誤日志
              // Kafka配置
              Properties props = new Properties();
              props.setProperty("bootstrap.servers", "localhost:9092");
      
              // 創建Kafka Sink
              KafkaSink<LogEntry> kafkaSink = KafkaSink.<LogEntry>builder()
                      .setKafkaProducerConfig(props)
                      .setRecordSerializer(KafkaRecordSerializationSchema.<LogEntry>builder()
                              .setTopic("error-logs-topic")
                              .setValueSerializationSchema(element -> element.toString().getBytes())
                              .build())
                      .build();
      
              errorLogStream.sinkTo(kafkaSink).name("Error Logs Kafka Sink");
      
              // 6. 配置并添加Elasticsearch Sink - 存儲所有日志
              // 配置Elasticsearch節點
              HttpHost httpHost=new HttpHost("localhost", 9200, "http");
      
              ElasticsearchSink<LogEntry> esSink = new Elasticsearch7SinkBuilder<LogEntry>()
                      .setBulkFlushMaxActions(10)        // 批量操作數量
                      .setBulkFlushInterval(5000)          // 批量刷新間隔(毫秒)
                      .setHosts(httpHost)
                      .setConnectionRequestTimeout(60000)  // 連接請求超時時間
                      .setConnectionTimeout(60000)         // 連接超時時間
                      .setSocketTimeout(60000)             // Socket 超時時間
                      .setEmitter((element, context, indexer) -> {
                          Map<String, Object> json = new HashMap<>();
                          json.put("timestamp", element.getTimestamp());
                          json.put("logLevel", element.getLogLevel());
                          json.put("source", element.getSource());
                          json.put("message", element.getMessage());
                          IndexRequest request = Requests.indexRequest()
                                  .index("logs_index")
                                  .source(json);
                          indexer.add(request);
                      })
                      .build();
      
              logStream.sinkTo(esSink).name("Elasticsearch Sink");
      
              // 7. 配置并添加JDBC Sink - 存儲錯誤日志統計
              // 先進行統計
              DataStream<LogStats> statsStream = errorLogStream
                      .map(log -> new LogStats(log.getSource(), 1))
                      .keyBy(LogStats::getSource)
                      .sum("count")
                      .name("Error Log Stats");
              JdbcExecutionOptions jdbcExecutionOptions = JdbcExecutionOptions.builder()
                      .withBatchSize(1000)
                      .withBatchIntervalMs(200)
                      .withMaxRetries(5)
                      .build();
              JdbcConnectionOptions connectionOptions = new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
                      .withUrl("jdbc:mysql://localhost:3306/test")
                      .withDriverName("com.mysql.cj.jdbc.Driver")
                      .withUsername("mysql用戶名")
                      .withPassword("mysql密碼")
                      .build();
              String insertSql = "INSERT INTO error_log_stats (source, count, last_updated) VALUES (?, ?, ?) " +
                     "ON DUPLICATE KEY UPDATE count = count + VALUES(count), last_updated = VALUES(last_updated)";
              JdbcStatementBuilder<LogStats> statementBuilder = (statement, stats) -> {
                  statement.setString(1, stats.getSource());
                  statement.setLong(2, stats.getCount());
                  statement.setTimestamp(3,  java.sql.Timestamp.valueOf(LocalDateTime.now()));
              };
              // 創建JDBC Sink
              JdbcSink<LogStats> jdbcSink = new Jdbc().<LogStats>sinkBuilder()
                      .withQueryStatement( new SimpleJdbcQueryStatement<LogStats>(insertSql,statementBuilder))
                      .withExecutionOptions(jdbcExecutionOptions)
                      .buildAtLeastOnce(connectionOptions);
              statsStream.sinkTo(jdbcSink).name("JDBC Sink");
              // 8. 執行作業
              env.execute("Multi-Sink Data Pipeline");
          }
      
      }
      

      4. 測試與驗證

      要測試這個完整的流水線,我們需要:

      1. 啟動Kafka并創建必要的主題:

        # 創建輸入主題
        kafka-topics.sh --create --topic logs-input-topic --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1
        
        # 創建錯誤日志輸出主題
        kafka-topics.sh --create --topic error-logs-topic --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1
        
      2. 啟動Elasticsearch并確保服務正常運行

      3. 在MySQL中創建必要的表:

        CREATE DATABASE test;
        USE test;
        
        CREATE TABLE error_log_stats (
          source VARCHAR(100) PRIMARY KEY,
          count BIGINT NOT NULL,
          last_updated TIMESTAMP NOT NULL
        );
        
      4. 向Kafka發送測試數據:

        kafka-console-producer.sh --topic logs-input-topic --bootstrap-server localhost:9092
        
        # 輸入以下測試數據
        2025-09-29 12:00:00|INFO|application-service|Application started successfully
        2025-09-29 12:01:30|ERROR|database-service|Failed to connect to database
        2025-09-29 12:02:15|WARN|cache-service|Cache eviction threshold reached
        2025-09-29 12:03:00|ERROR|authentication-service|Invalid credentials detected
        
      5. 運行Flink作業并觀察數據流向各個目標系統

      查看Kafka Sink中的數據:
      sad20251006122312

      查看MySQL中的數據:
      sad20251006122713

      查看Elasticsearch中的數據:
      sad20251006122853

      八、性能優化與最佳實踐

      1. 并行度配置

      合理設置Sink的并行度可以顯著提高吞吐量:

      // 為特定Sink設置并行度
      stream.addSink(sink).setParallelism(4);
      
      // 或為整個作業設置默認并行度
      env.setParallelism(4);
      

      2. 批處理配置

      對于支持批處理的Sink,合理配置批處理參數可以減少網絡開銷:

      // JDBC批處理示例
      JdbcExecutionOptions.builder()
          .withBatchSize(1000)  // 每批次處理的記錄數
          .withBatchIntervalMs(200)  // 批處理間隔
          .withMaxRetries(3)  // 最大重試次數
          .build();
      

      3. 背壓處理

      當Sink無法處理上游數據時,會產生背壓。Flink提供了背壓監控和處理機制:

      • 使用Flink Web UI監控背壓情況
      • 考慮使用緩沖機制或調整并行度
      • 對于關鍵路徑,實現自定義的背壓處理邏輯

      4. 資源管理

      合理管理連接和資源是保證Sink穩定運行的關鍵:

      • 使用連接池管理數據庫連接
      • 在RichSinkFunction的open()方法中初始化資源
      • 在close()方法中正確釋放資源

      5. 錯誤處理策略

      為Sink配置適當的錯誤處理策略:

      // 重試策略配置
      env.setRestartStrategy(RestartStrategies.fixedDelayRestart(
          3,  // 最大重試次數
          Time.of(10, TimeUnit.SECONDS)  // 重試間隔
      ));
      

      九、總結與展望

      本文深入探討了Flink數據輸出(Sink)的核心概念、各種連接器的使用方法以及可靠性保證機制。我們學習了如何配置和使用內置Sink、文件系統Sink、Kafka Sink、Elasticsearch Sink和JDBC Sink,并通過自定義Sink擴展了Flink的輸出能力。最后,我們構建了一個完整的實時數據處理流水線,將處理后的數據輸出到多個目標系統。

      在Flink的數據處理生態中,Sink是連接計算結果與外部世界的橋梁。通過選擇合適的Sink連接器并配置正確的參數,我們可以構建高效、可靠的數據處理系統。


      源文來自:http://blog.daimajiangxin.com.cn

      源碼地址:https://gitee.com/daimajiangxin/flink-learning

      posted @ 2025-10-06 12:42  代碼匠心  閱讀(308)  評論(0)    收藏  舉報
      主站蜘蛛池模板: 亚洲国产精品无码一区二区三区| 精品乱人伦一区二区三区| 377P欧洲日本亚洲大胆| 18禁免费无码无遮挡网站| 伊人色综合一区二区三区影院视频 | 加勒比亚洲视频在线播放| 男女激情一区二区三区| 日韩乱码视频一区二区三区 | 国产精品国产精品偷麻豆| 亚洲愉拍一区二区三区| 色综合久久天天综线观看| 亚洲精品久久久久国色天香| 日本精品aⅴ一区二区三区| 少妇无码av无码一区| 99精品国产综合久久久久五月天| 无码国模国产在线观看免费| 日本一道本高清一区二区| 亚洲区一区二区三区精品| 国产女同一区二区在线| av永久免费网站在线观看| 无码免费大香伊蕉在人线国产| 亚洲欧美牲交| 国产精品一二二区视在线| 国产成人综合久久精品下载| 区一区二区三区中文字幕| 午夜天堂av天堂久久久| 国内综合精品午夜久久资源| 精品一区二区亚洲国产| 亚洲人成电影网站 久久影视| 国产精品美女www爽爽爽视频| 国模粉嫩小泬视频在线观看| 99精品国产成人一区二区 | 成人啪啪高潮不断观看| 栾川县| 国产精品自拍午夜福利| 久久99久国产精品66| 亚洲国产日韩在线视频| 国产精品青草久久久久福利99| 国产丝袜视频一区二区三区| 国产网友愉拍精品视频手机 | 狠狠综合久久av一区二|