<output id="qn6qe"></output>

    1. <output id="qn6qe"><tt id="qn6qe"></tt></output>
    2. <strike id="qn6qe"></strike>

      亚洲 日本 欧洲 欧美 视频,日韩中文字幕有码av,一本一道av中文字幕无码,国产线播放免费人成视频播放,人妻少妇偷人无码视频,日夜啪啪一区二区三区,国产尤物精品自在拍视频首页,久热这里只有精品12

      從零開始學Flink:事件驅動

      在實時計算領域,很多業務邏輯天然適合“事件驅動”模式:當事件到達時觸發處理、在某個時間點觸發補償或匯總、根據狀態變化發出告警等。Apache Flink 為此提供了強大的 ProcessFunction 家族(KeyedProcessFunction、CoProcessFunction、BroadcastProcessFunction 等),它們在算子層面同時具備“事件處理 + 定時器 + 狀態”的能力,是構建復雜流式應用的核心基石。

      本文基于 Flink 1.20 的語義,帶你從零理解事件驅動的編程模型,并一步步實現一個“偽窗口 PseudoWindow”示例,體會 ProcessFunction 如何代替窗口完成時間分桶、累加和觸發輸出。

      一、為什么選擇事件驅動

      對于如下需求,事件驅動往往比簡單窗口更靈活:

      • 自定義觸發邏輯(不僅僅是固定窗口邊界)。
      • 精細的遲到事件處理策略(事件時間/處理時間混用、不同類型事件分別處理)。
      • 需要在算子級別維護復雜狀態(如每個 key 多個并發“子窗口”或會話)。
      • 需要與外部系統交互或對齊(例如到達某個業務時間點后批量寫出)。

      ProcessFunction 能滿足上述場景,因為它同時提供:

      • 事件回調:processElement,用于逐條事件處理。
      • 定時器:事件時間或處理時間兩種類型,支持在指定時刻觸發 onTimer 回調。
      • 管理狀態:借助 RichFunction 的上下文,訪問 keyed state(如 ValueState、MapState、ListState 等)。

      二、核心概念速覽

      • KeyedProcessFunction:在 keyBy 之后對每個 key 獨立處理事件、注冊和觸發定時器、讀寫 keyed state。
      • TimerService:通過 ctx.timerService() 注冊事件時間或處理時間定時器;在 onTimer 中被調用。
      • Watermark:推進事件時間的“時鐘”,只有當 Watermark 超過某個時間點時,對應的事件時間定時器才會觸發。
      • RichFunction:ProcessFunction 屬于 RichFunction,因而擁有 open/getRuntimeContext 等生命周期方法,可初始化狀態描述符等。

      三、示例:用 KeyedProcessFunction 實現“小時級偽窗口”

      目標:按司機 driverId,每小時匯總 tip(小費)之和。我們先給出窗口版本,再給出偽窗口版本以對比兩者的思路差異。

      1. 窗口實現(參考思路)

      // 每小時、每個司機的提示費求和(傳統事件時間翻轉窗口)
      DataStream<Tuple3<Long, Long, Float>> hourlyTips = fares
              .keyBy((TaxiFare fare) -> fare.driverId)
              .window(TumblingEventTimeWindows.of(Duration.ofSeconds(5)))
              .process(new AggregateTipsProcess());
      

      窗口版本直觀,但觸發邏輯受窗口邊界約束。如果我們希望完全掌控“何時觸發”和“如何管理多窗口并發”,可以使用 KeyedProcessFunction:

      2. 事件驅動實現(PseudoWindow)

      // 使用事件驅動的 KeyedProcessFunction 替代窗口
      DataStream<Tuple3<Long, Long, Float>> hourlyTips = fares
              .keyBy((TaxiFare fare) -> fare.driverId)
              .process(new PseudoWindow(Duration.ofSeconds(5)));
      
      // 偽窗口:按事件時間把每條數據歸入其所在小時段,注冊窗口結束時間的定時器,定時器觸發時輸出該小時匯總
      public static class PseudoWindow extends KeyedProcessFunction<Long, TaxiFare, Tuple3<Long, Long, Float>> {
      
          private final long durationMsec;
          // MapState<窗口結束時間, 累計 tips>
          private transient MapState<Long, Float> sumOfTips;
      
          public PseudoWindow(Duration duration) {
              this.durationMsec = duration.toMillis();
          }
      
          @Override
          public void open(Configuration parameters) throws Exception {
              MapStateDescriptor<Long, Float> sumDesc =
                      new MapStateDescriptor<>("sumOfTips", Long.class, Float.class);
              sumOfTips = getRuntimeContext().getMapState(sumDesc);
          }
      
          @Override
          public void processElement(
                  TaxiFare fare,
                  Context ctx,
                  Collector<Tuple3<Long, Long, Float>> out) throws Exception {
      
              long eventTime = fare.getEventTime();
              TimerService timerService = ctx.timerService();
      
              // 若事件時間早于當前 Watermark,說明窗口已觸發,該事件為遲到事件(按需決定丟棄或補償)
              if (eventTime <= timerService.currentWatermark()) {
                  // 遲到事件處理策略:可以記錄指標、寫側輸出、或進行補償
                  return;
              }
      
              // 計算該事件所屬小時窗口的“窗口結束時間”戳
              long endOfWindow = eventTime - (eventTime % durationMsec) + durationMsec - 1;
      
              // 注冊事件時間定時器:當 Watermark 超過 endOfWindow 時觸發 onTimer
              timerService.registerEventTimeTimer(endOfWindow);
      
              // 累加該窗口的 tips
              Float sum = sumOfTips.get(endOfWindow);
              if (sum == null) {
                  sum = 0.0F;
              }
              sum += fare.tip;
              sumOfTips.put(endOfWindow, sum);
          }
      
          @Override
          public void onTimer(
                  long timestamp,
                  OnTimerContext ctx,
                  Collector<Tuple3<Long, Long, Float>> out) throws Exception {
      
              // 定時器時間戳即窗口結束時間,輸出 (driverId, windowEnd, sum)
              Float sum = sumOfTips.get(timestamp);
              if (sum != null) {
                  Long driverId = ctx.getCurrentKey();
                  out.collect(Tuple3.of(driverId, timestamp, sum));
                  // 輸出后清理該窗口的狀態,避免泄漏
                  sumOfTips.remove(timestamp);
              }
          }
      }
      

      從這個實現可以觀察到:

      • 我們手動決定“窗口”形態與觸發時機:不依賴 Window API,而是依賴事件時間定時器和 Watermark。
      • MapState 使一個 key 能同時維護多個并發窗口(不同結束時間戳)。
      • 遲到事件處理策略高度可定制:可丟棄、可側輸出、也可做補償累加再延遲觸發。

      四、生命周期與關鍵回調

      • open:初始化狀態(如 MapState、ValueState),常用于設置描述符和外部資源連接。
      • processElement:每到一條事件都會調用。典型邏輯包括:計算歸屬時間段、注冊定時器、修改狀態、按需提前輸出。
      • onTimer:當定時器觸發時調用。常見動作:基于狀態匯總并輸出、清理過期狀態、注冊下一次定時器等。

      五、事件時間 vs 處理時間定時器

      • 事件時間(Event Time):以事件攜帶的時間戳為準,Watermark 推進時觸發。適合有亂序、需要時間一致性的業務場景。
      • 處理時間(Processing Time):以算子所在 TaskManager 的系統時間為準,時間一到立即觸發。適合周期性心跳、定時輪詢等邏輯。

      建議:涉及業務時間邏輯時優先使用事件時間,并合理設置 Watermark 與亂序容忍度;同時可以結合處理時間定時器做后臺清理或補償任務。

      六、Watermark 與遲到事件

      • Watermark 是事件時間“時鐘”。當 Watermark 超過某個窗口的結束時間,說明該窗口已“完成”,對應事件時間定時器會被觸發。
      • 遲到事件:其事件時間落在已完成窗口內。在窗口 API 中可配置允許遲到與側輸出;在 ProcessFunction 中則由你自定義策略(記錄日志、側輸出、修正狀態等)。

      在批處理場景(有界數據)中,通??梢允褂脝握{遞增或默認 Watermark 策略;在流處理場景(無界數據)中,常用“有界亂序”策略。

      七、與窗口 API 的對比

      • 窗口 API:更易用、約束更明顯,適合絕大多數時間分桶與聚合場景。
      • ProcessFunction:更低層、可完全自定義觸發與狀態管理,適合復雜業務流程編排、會話識別、跨窗口補償、規則引擎等。

      經驗法則:能用窗口優雅解決的就用窗口;當窗口表達力不夠時,考慮 ProcessFunction。

      八、常見事件驅動模式

      • 會話化(Sessionization):用 ValueState 記錄最近活動時間,注冊處理時間或事件時間定時器判定會話結束。
      • 去重(Deduplication):維護最近看到的事件 ID 集合(BloomFilter/MapState),設置過期清理定時器。
      • 告警與監控:根據狀態閾值注冊近未來定時器并在 onTimer 中發出告警。
      • 復雜匯總:如本文示例的偽窗口;或跨窗口滾動匯總、遲到補償輸出等。

      九、最佳實踐

      • 狀態清理與 TTL:定時清理過期狀態,或使用 State TTL,避免內存泄漏。
      • 觸發器設計:避免過密的定時器注冊,減少 onTimer 風暴,可合并多個時間點或批量觸發。
      • 亂序容忍:根據業務亂序程度設置 Watermark 策略,既保證準確性又避免過度延遲。
      • 側輸出:對遲到或異常事件使用 Side Output,既不影響主流計算又便于單獨監控。
      • 可觀察性:對遲到率、定時器觸發延遲、狀態大小等打點,便于定位瓶頸與異常。

      十、完整示例骨架(整合 source 與 Watermark)

      StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
      env.enableCheckpointing(10_000);
      
      // 示例:Kafka Source + Bounded Out-Of-Orderness Watermark
      KafkaSource<TaxiFare> source = KafkaSource.<TaxiFare>builder()
              .setBootstrapServers("localhost:9092")
              .setTopics("fares")
              .setGroupId("flink-fare-group")
              .setValueOnlyDeserializer(new TaxiFareDeserializer())
              .build();
      
      DataStream<TaxiFare> fares = env.fromSource(
              source,
              WatermarkStrategy
                      .<TaxiFare>forBoundedOutOfOrderness(Duration.ofSeconds(5))
                      .withTimestampAssigner((fare, ts) -> fare.getEventTime()),
              "Kafka Fares");
      
      DataStream<Tuple3<Long, Long, Float>> hourlyTips = fares
              .keyBy(f -> f.driverId)
              .process(new PseudoWindow(Duration.ofSeconds(5)));
      
      hourlyTips.print();
      env.execute("Event-driven Hourly Tips");
      

      十一、創建 Topic 和發送測試數據

      1. 創建 Topic fares
        ./bin/kafka-topics.sh --create --topic fares --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1
      2. 打開 Console Producer(交互式)
        ./bin/kafka-console-producer.sh --topic fares --bootstrap-server localhost:9092
      3. 在 Producer 里輸入 CSV 測試消息(示例)
        42,1710003600000,3.5
        42,1710007100000,2.1
        77,1710003800000,1.0
        如果希望使用當前毫秒時間戳,可以在另一個終端獲?。?br> date +%s%3N
        然后輸入例如:
        42,1699999999999,3.5
      4. 可選:使用 Console Consumer 驗證消息進出
        ./bin/kafka-console-consumer.sh --topic fares --bootstrap-server localhost:9092 --from-beginning

      十二、總結

      事件驅動讓你在算子層面掌控“事件處理 + 定時器 + 狀態”,從而能表達超越窗口 API 的復雜業務邏輯。在 Flink 中,KeyedProcessFunction 是實現事件驅動應用的核心武器:用它來注冊事件或處理時間定時器、維護鍵控狀態、為遲到與補償設計精細策略。恰當地選擇 Watermark 策略和狀態清理機制,可以在保證準確性的同時兼顧性能與資源使用。


      原文來自:http://blog.daimajiangxin.com.cn

      源碼地址:https://gitee.com/daimajiangxin/flink-learning

      posted @ 2025-11-04 15:03  代碼匠心  閱讀(129)  評論(0)    收藏  舉報
      主站蜘蛛池模板: 江源县| av大片在线无码免费| 久久精品国产99久久久古代| 日韩人妻无码一区二区三区99| 色猫咪av在线观看| 久爱无码精品免费视频在线观看| 公天天吃我奶躁我的在线观看| 国产成人精品永久免费视频 | 亚洲欧美另类久久久精品播放的| 久久人人97超碰精品| 加勒比亚洲视频在线播放| 洮南市| 日本高清视频网站www| 亚洲成av人片无码天堂下载| 国产一区二区三区小说| 在线看无码的免费网站| 国产精品v欧美精品∨日韩| 久久夜色精品国产亚av| 丁香五月亚洲综合在线国内自拍 | 国产精品乱子乱xxxx| 午夜福利日本一区二区无码| 日韩午夜午码高清福利片| 婷婷综合久久中文字幕| 婷婷五月综合激情| 中国国产免费毛卡片| 亚洲综合色一区二区三区| 亚洲免费的福利片| 日本不卡的一区二区三区| 中文字幕久久久久人妻| 久久中文字幕一区二区| 久久精品一区二区东京热| 久久精品久久电影免费理论片| 国产最新AV在线播放不卡| 99久久国产一区二区三区| h无码精品3d动漫在线观看| 中文字幕乱码中文乱码毛片 | 亚洲人妻精品一区二区| 欧美人禽zozo动人物杂交| 国产乱久久亚洲国产精品| 国产成人精品一区二区三区| 国产亚洲精品久久yy50|