<output id="qn6qe"></output>

    1. <output id="qn6qe"><tt id="qn6qe"></tt></output>
    2. <strike id="qn6qe"></strike>

      亚洲 日本 欧洲 欧美 视频,日韩中文字幕有码av,一本一道av中文字幕无码,国产线播放免费人成视频播放,人妻少妇偷人无码视频,日夜啪啪一区二区三区,国产尤物精品自在拍视频首页,久热这里只有精品12

      從零開始學Flink:數據源

      在實時數據處理場景中,數據源(Source)是整個數據處理流程的起點。Flink作為流批一體的計算框架,提供了豐富的Source接口支持,其中通過Kafka獲取實時數據是最常見的場景之一。本文將以Flink DataStream API為核心,帶你從0到1實現“從Kafka消費數據并輸出到日志”的完整流程,掌握Flink Source的核心用法。

      一、為什么選擇Kafka作為Flink的數據源?

      Kafka作為分布式流處理平臺,具備高吞吐量、低延遲、持久化存儲等特性,是實時數據管道的首選。Flink與Kafka的集成方案經過多年優化,支持:

      • 高吞吐量:單集群可處理數十萬條/秒的消息,滿足大規模實時數據處理需求;
      • 持久化存儲:數據按時間順序寫入磁盤并保留一定周期,支持離線重放和故障恢復;
      • 精確一次(Exactly-Once)消費語義:通過Kafka偏移量(Offset)管理和Flink檢查點(Checkpoint)機制保證數據一致性;
      • 動態分區發現:自動感知Kafka主題的分區變化(如新增分區),無需重啟任務;
      • 靈活的消費模式:支持從指定偏移量、時間戳或最新位置開始消費。

      二、環境準備與依賴配置

      1. 版本說明

      本文基于以下版本實現(需保持版本兼容):

      • Flink:1.20.1(最新穩定版)
      • Kafka:3.4.0(Flink Kafka Connector兼容Kafka 2.8+)
      • JDK:17+
      • gradle 8.3+

      2. gradle依賴

      在gradle添加Flink核心依賴及Kafka Connector依賴,build.gradle配置可以是如下:

      plugins {
          id 'java' // Java項目插件
          id 'application' // 支持main方法運行
          }
      
          // 設置主類(可選,用于application插件)
          application {
          mainClass.set('com.cn.daimajiangxin.flink.source.KafkaSourceDemo') // 替換為你的主類全限定名
          }
      
          // 依賴倉庫(Maven中央倉庫)
          repositories {
          mavenCentral()
          }
      
          // 依賴配置
          dependencies {
          // Flink核心依賴(生產環境通常標記為provided,由Flink運行時提供)
          implementation 'org.apache.flink:flink-java:1.20.1'
          implementation 'org.apache.flink:flink-streaming-java_2.12:1.20.1'
      
          // Flink Kafka Connector(新版API,兼容Kafka 2.8+)
          implementation 'org.apache.flink:flink-connector-kafka_2.12:1.20.1'
      
          // SLF4J日志門面 + Log4j實現(避免日志警告)
          implementation 'org.apache.logging.log4j:log4j-api:2.17.1'
          implementation 'org.apache.logging.log4j:log4j-core:2.17.1'
          implementation 'org.apache.logging.log4j:log4j-slf4j-impl:2.17.1'
          }
      
          // 編譯配置(可選,根據需要調整)
          tasks.withType(JavaCompile) {
          options.encoding = 'UTF-8' // 指定編碼
          sourceCompatibility = JavaVersion.VERSION_17 // 兼容Java 8
          targetCompatibility = JavaVersion.VERSION_17
          }
      

      在深入代碼前,需理解Flink Kafka Source的核心組件:

      • KafkaSource:Flink提供的Kafka數據源連接器,負責與Kafka Broker建立連接、拉取消息;
      • 反序列化器(Deserializer):將Kafka消息的字節數組(byte[])轉換為Flink可處理的數據類型(如String、POJO、Row等);
      • 偏移量管理:記錄已消費的Kafka消息位置(Offset),確保故障恢復時能從斷點繼續消費;
      • 檢查點(Checkpoint):Flink的容錯機制,定期將狀態(包括偏移量)持久化到存儲系統(如HDFS),保證Exactly-Once語義。

      四、核心代碼實現:從Kafka讀取數據并輸出到日志

      1. 流程概述

      整個流程分為5步:

      1. 配置Kafka連接參數(如Broker地址、主題、消費者組);
      2. 創建Flink流執行環境(StreamExecutionEnvironment);
      3. 定義Kafka Source(使用新版KafkaSource);
      4. 將Source添加到執行環境,并處理數據(如打印到日志);
      5. 觸發任務執行。

      2.代碼詳解

      以下是完整的示例代碼,包含詳細注釋:

      package com.cn.daimajiangxin.flink.source;
      
      import org.apache.flink.api.common.eventtime.WatermarkStrategy;
      import org.apache.flink.api.common.typeinfo.TypeInformation;
      import org.apache.flink.connector.kafka.source.KafkaSource;
      import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
      import org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema;
      import org.apache.flink.streaming.api.datastream.DataStream;
      import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
      import org.apache.flink.util.Collector;
      import org.apache.kafka.clients.consumer.ConsumerRecord;
      import org.slf4j.Logger;
      import org.slf4j.LoggerFactory;
      
      import java.io.IOException;
      import java.nio.charset.StandardCharsets;
      
      public class KafkaSourceDemo {
              private static final Logger LOG = LoggerFactory.getLogger(KafkaSourceDemo.class);
      
      public static void main(String[] args) throws Exception {
                  // 1. 創建Flink流執行環境
                  StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
      
      // 可選:啟用檢查點(生產環境必選,保證Exactly-Once語義)
                  env.enableCheckpointing(5000); // 每5秒做一次檢查點
                  // 啟用檢查點
                  env.enableCheckpointing(5000); // 每5秒做一次檢查點
      
      // 設置檢查點超時時間
                  env.getCheckpointConfig().setCheckpointTimeout(60000);
      
      // 2. 配置Kafka參數
                  String kafkaBootstrapServers = "172.30.244.152:9092"; // Kafka Broker地址
                  String topic = "test_topic"; // 目標主題
                  String consumerGroup = "flink-consumer-group"; // 消費者組ID
      
      LOG.info("Connecting to Kafka at " + kafkaBootstrapServers);
                  LOG.info("Consuming topic: " + topic);
                  LOG.info("Consumer group: " + consumerGroup);
      
      // 3. 定義Kafka Source(新版API)
                  KafkaSource`<String>` kafkaSourceDemo = KafkaSource.`<String>`builder()
                          .setBootstrapServers(kafkaBootstrapServers) // Kafka Broker地址
                          .setTopics(topic) // 訂閱的主題
                          .setGroupId(consumerGroup) // 消費者組
                          .setProperty("enable.auto.commit", "true")
                          .setProperty("auto.commit.interval.ms", "1000")
                          .setProperty("session.timeout.ms", "30000")
                          .setProperty("retry.backoff.ms", "1000")
                          .setProperty("reconnect.backoff.max.ms", "10000")
                          .setDeserializer(new KafkaRecordDeserializationSchema `<String>`() {
                              @Override
                              public void deserialize(ConsumerRecord<byte[], byte[]> record, Collector `<String>` out) throws IOException {
                                  // 從ConsumerRecord中提取值(字節數組),并轉為字符串
                                  String value = new String(record.value(), StandardCharsets.UTF_8);
                                  LOG.info("Received message: " + value);
                                  out.collect(value); // 將反序列化后的數據收集到Flink流中
                              }
      
      @Override
                              public TypeInformation`<String>` getProducedType() {
                                  return TypeInformation.of(String.class);
                              }
                          })
                          // 從最早偏移量開始消費(這樣即使沒有新消息也會讀取歷史數據)
                          .setStartingOffsets(OffsetsInitializer.earliest())
                          .build();
      
      // 4. 將Kafka Source添加到Flink流環境,并處理數據
                  DataStream`<String>` kafkaStream = env.fromSource(
                          kafkaSourceDemo,
                          WatermarkStrategy.noWatermarks(), // 無水印(適用于無序數據場景)
                          "Kafka Source" // Source名稱(用于監控)
                  );
      
      LOG.info("Kafka source created successfully");
      
      // 5. 處理數據:將每條數據打印到日志(實際生產中可替換為寫入數據庫、消息隊列等)
                  kafkaStream.print("KafkaData");
                  LOG.info("Flink Kafka Source Demo started.");
                  // 6. 觸發任務執行
                  env.execute("Flink Kafka Source Demo");
      
      }
          }
      

      3. 關鍵配置說明

      • KafkaSource.Builder:新版Kafka Source的核心構建器,支持靈活配置;
      • setDeserializer:指定反序列化方式,deserialize 接收Kafka的ConsumerRecord(包含鍵、值、偏移量等信息),提取值(record.value())并反序列化為字符,getProducedType聲明輸出數據的類型(此處為String);
      • setStartingOffsets:控制消費起始位置(latest()從最新數據開始,earliest()從最早數據開始,生產環境常用OffsetsInitializer.committedOffsets()從上次提交的偏移量繼續);
      • WatermarkStrategy:用于事件時間(Event Time)處理,示例中無時間窗口需求,故使用noWatermarks();
      • PrintSinkFunction:Flink內置的日志打印Sink(true表示打印完整上下文,包含Subtask信息)。

      五、運行與測試

      在WSL2的Ubuntu 環境中安裝Kafka。

      1. 安裝Kafka服務

      • 下載Kafka二進制包
        訪問Apache Kafka官網,選擇最新穩定版(如3.9.0),使用wget下載:

        wget https://mirrors.aliyun.com/apache/kafka/3.9.0/kafka_2.12-3.9.0.tgz
        
      • 解壓并配置環境變量

        # 解壓到/opt/kafka(全局可訪問)
        sudo mkdir -p /opt/kafka
        tar -zxvf kafka_2.12-3.9.0.tgz -C /opt/kafka --strip-components=1
        
        # 永久生效(編輯~/.bashrc)
        echo 'export KAFKA_HOME=/opt/kafka' >> /etc/profile
        echo 'export PATH=$KAFKA_HOME/bin:$PATH' >> /etc/profile
        source /etc/profile
        

      2. 配置Kafka

      Kafka的核心配置文件位于$KAFKA_HOME/config目錄,需修改以下兩個文件:

      配置Kafka Broker(server.properties)

      修改以下關鍵參數以適配WSL2環境:

      # ==================== 核心角色與ID配置 ====================
          # 啟用KRaft模式(默認已啟用)
          # 單節點同時擔任Broker和控制器
          process.roles=broker,controller
          # 節點唯一ID(單節點必須設為0)
          node.id=0
          # 控制器ID(與node.id一致,單節點唯一)
          controller.id=0
      
          # ==================== 監聽端口配置 ====================
          # 全局監聽端口(客戶端讀寫請求)和控制器監聽端口
          # 多個監聽器使用逗號分隔,每個監聽器都需要指定安全協議
          listeners=PLAINTEXT://0.0.0.0:9092,CONTROLLER://0.0.0.0:9093
      
          # 對外暴露的地址(Windows主機通過localhost訪問)
          # 多個公布的監聽器使用逗號分隔
          advertised.listeners=PLAINTEXT://localhost:9092,CONTROLLER://localhost:9093
      
          # 指定CONTROLLER監聽器的安全協議
          listener.security.protocol.map=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT
      
          # 定義控制器監聽器的名稱(KRaft模式必需)
          controller.listener.names=CONTROLLER
      
          # ==================== ZooKeeper兼容配置(可選) ====================
          # 若需兼容舊客戶端,可保留ZooKeeper配置(但KRaft模式無需ZooKeeper)
          # zookeeper.connect=localhost:2181
      
          # ==================== 日志與分區配置 ====================
          # 數據存儲目錄配置(Kafka的核心配置參數)
          # Kafka將主題數據、索引文件等存儲在該目錄下
          log.dirs=/opt/kafka/data
          num.partitions=1
          # 副本數(單節點設為1)
          default.replication.factor=1
          # 最小同步副本數(單節點設為1)
          min.insync.replicas=1
          # ==================== 日志存儲高級配置 ====================
          # 日志保留時間(默認7天,生產環境根據存儲容量和需求調整)
          # log.retention.hours=168
          # 或按大小限制保留(單位:字節)
          # log.retention.bytes=107374182400  # 100GB
      
          # 單個分區日志段大小(默認1GB,可根據實際需求調整)
          # log.segment.bytes=1073741824
      
          # 日志段檢查和清理的時間間隔(默認300000ms=5分鐘)
          # log.retention.check.interval.ms=300000
      
          # 控制是否自動創建主題(生產環境建議禁用,改為手動創建)
          # auto.create.topics.enable=false
      
          # ==================== 控制器引導配置 ====================
          # 控制器引導服務器(單節點指向自己,格式:host:port)
          # 與控制器監聽端口一致
          controller.quorum.bootstrap.servers=localhost:9093
      
          # 控制器投票者配置(單節點設為0@localhost:9093)
          controller.quorum.voters=0@localhost:9093
      

      3.啟動Kafka服務

      3.1初始化KRaft存儲目錄(首次啟動必需)

      在KRaft模式下,需要先初始化元數據存儲:

      
      # 生成集群ID并保存到變量
      CLUSTER_ID=$($KAFKA_HOME/bin/kafka-storage.sh random-uuid)
      echo "生成的集群ID: $CLUSTER_ID"
      
      # 使用生成的集群ID格式化存儲目錄$KAFKA_HOME/bin/kafka-storage.sh format -t $CLUSTER_ID -c $KAFKA_HOME/config/server.properties
      

      注意: 如果手動運行命令,請確保先執行生成集群ID的命令,然后使用實際生成的ID替換"$CLUSTER_ID"。

      3.2啟動Kafka Broker

      # 啟動Broker(日志輸出到$KAFKA_HOME/logs/server.log)     $KAFKA_HOME/bin/kafka-server-start.sh -daemon $KAFKA_HOME/config/server.properties
      

      3.3驗證服務狀態

      檢查Kafka Broker進程:
          ps -ef | grep kafka  # 應看到Kafka進程
      

      3.4創建測試主題

      確保Kafka服務已啟動,并創建測試主題 test_topic

      $KAFKA_HOME/bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test_topic
      

      3.5發送測試數據

      使用Kafka內置的生產者工具發送測試消息到 test_topic

      # 啟動Kafka生產者控制臺
          $KAFKA_HOME/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test_topic
      
          # 輸入幾條測試消息(每行一條)
          > hello flink
          > flink kafka integration
          > real-time data processing
      

      3.6運行Flink程序

      在IDE中直接運行 KafkaSourceDemo類的 main方法,或通過Gradle構建并運行:

      # 構建項目
          ./gradlew clean build
      
          # 運行Flink作業
          ./gradlew run
      

      3.7驗證結果

      成功運行后,你應該能在控制臺看到類似如下輸出:

      20250918103316

      六、進階配置與優化

      1. 消費語義保證

      在生產環境中,為了確保數據一致性,需要配置Flink的檢查點機制和Kafka偏移量提交策略:

      // 1. 啟用檢查點
          env.enableCheckpointing(5000); // 每5秒做一次檢查點
      
          // 2. 獲取檢查點配置對象(Flink 1.20.1及以上版本推薦方式)
          CheckpointConfig checkpointConfig = env.getCheckpointConfig();
      
          // 3. 配置檢查點模式為EXACTLY_ONCE(精確一次語義)
          checkpointConfig.setMode(CheckpointingMode.EXACTLY_ONCE);
      
          // 4. 設置檢查點超時時間
          checkpointConfig.setCheckpointTimeout(Duration.ofSeconds(60));
      
          // 4. 配置從上次提交的偏移量繼續消費(生產環境推薦)
          .setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST))
      

      2. 并行度與資源配置

      合理設置并行度可充分利用集群資源并提高吞吐量:

      // 設置Flink作業的全局并行度
          env.setParallelism(3); // 與Kafka主題分區數匹配
      
          // 或單獨設置Source的并行度
          KafkaSource`<String>` kafkaSource = KafkaSource.`<String>`builder()
              // ... 其他配置 ...
              .build();
      
          DataStream`<String>` stream = env.fromSource(
              kafkaSource,
              WatermarkStrategy.noWatermarks(),
              "Kafka Source")
              .setParallelism(3); // Source并行度
      

      3. 高級反序列化

      除了基礎的字符串反序列化,還可以使用更靈活的反序列化方式:

      3.1 使用預定義反序列化器

      // 使用Flink提供的String反序列化器     .setDeserializer(KafkaRecordDeserializationSchema.valueOnly(StringDeserializer.class)) 
      

      3.2 自定義POJO反序列化

      如果Kafka消息是JSON格式,可以使用Jackson等庫將其反序列化為POJO對象:

      public class User {
              private String id;
              private String name;
              private int age;
              // getters, setters, constructors...
          }
      
          // 自定義POJO反序列化器
          .setDeserializer(new KafkaRecordDeserializationSchema`<User>`() {
              private final ObjectMapper mapper = new ObjectMapper();
      
          @Override
              public void deserialize(ConsumerRecord<byte[], byte[]> record, Collector`<User>` out) throws IOException {
                  User user = mapper.readValue(record.value(), User.class);
                  out.collect(user);
              }
      
          @Override
              public TypeInformation`<User>` getProducedType() {
                  return TypeInformation.of(User.class);
              }
          })
      

      七、常見問題與解決方案

      1. 連接超時問題

      問題現象:程序啟動后報 org.apache.kafka.common.errors.TimeoutException

      解決方案

      • 檢查Kafka服務是否正常運行:ps -ef | grep kafka
      • 確認 bootstrap.servers配置正確,特別是在WSL2環境中確保端口映射正確
      • 檢查防火墻設置,確保9092端口開放

      2. 數據消費不完整

      問題現象:部分Kafka消息未被Flink消費

      解決方案

      • 檢查Kafka主題的分區數與Flink Source并行度是否匹配
      • 確認 setStartingOffsets配置正確,生產環境建議使用 OffsetsInitializer.committedOffsets()
      • 檢查檢查點機制是否正常啟用,確保偏移量正確提交

      3. 性能優化

      對于高吞吐量場景,可以通過以下方式優化性能:

      • 增加Kafka主題分區數(與Flink并行度匹配)
      • 調大 fetch.max.bytesmax.partition.fetch.bytes參數,增加單次拉取的數據量
      • 啟用增量檢查點,減少檢查點開銷
      • 使用 setUnboundedUsePreviousEventTimeWatermark()優化水印生成

      八、總結與擴展

      本文詳細介紹了如何使用Flink從Kafka讀取數據,包括環境準備、代碼實現、運行測試以及進階配置。通過本文的學習,你應該能夠掌握Flink數據源的核心用法,為構建企業級實時數據處理應用打下堅實基礎。

      在實際應用中,Flink還支持多種其他數據源,如:

      • 文件系統(HDFS、本地文件)
      • 數據庫(MySQL、PostgreSQL、MongoDB等)
      • 消息隊列(RabbitMQ、Pulsar等)
      • 自定義數據源(通過實現 SourceFunction接口)

      后續文章將繼續深入探討Flink的數據轉換、窗口計算、狀態管理等核心概念,敬請關注!


      源文來自:http://blog.daimajiangxin.com.cn

      源碼地址:https://gitee.com/daimajiangxin/flink-learning

      posted @ 2025-09-18 15:54  代碼匠心  閱讀(236)  評論(0)    收藏  舉報
      主站蜘蛛池模板: 97se亚洲综合在线天天| 国产伦一区二区三区久久| 污污网站18禁在线永久免费观看| 日本不卡三区| 成人午夜免费无码视频在线观看 | 玖玖在线精品免费视频| 无码 人妻 在线 视频| 一区二区三区四区精品黄| 国产精品美女久久久久久麻豆| 99RE8这里有精品热视频| 狠狠亚洲色一日本高清色| 制服丝袜美腿一区二区| 亚洲无线看天堂av| 亚洲色最新高清AV网站| 无码丰满人妻熟妇区| 欧洲熟妇色xxxx欧美老妇免费| 波多结野衣一区二区三区| 免费现黄频在线观看国产| 久久精品无码一区二区小草| 天天综合色一区二区三区| 免费超爽大片黄| 国产精品午夜福利免费看| 亚洲综合一区国产精品| 日韩有码av中文字幕| 久久国产乱子精品免费女| 欧美一级高清片久久99| 国产成人精品18| 欧美黑人又粗又大又爽免费| 国产亚洲精品VA片在线播放| 精品无码国产自产拍在线观看蜜| 国产黄色三级三级看三级| 国产精品亚洲а∨天堂2021| 亚洲国产成人极品综合| 日韩一区二区三在线观看| 99久久国产成人免费网站| 成人拍拍拍无遮挡免费视频| 亚洲综合无码久久精品综合| 国产成人一区二区三区在线| 亚洲爆乳成av人在线视菜奈实| 长顺县| 亚洲东京色一区二区三区|