<output id="qn6qe"></output>

    1. <output id="qn6qe"><tt id="qn6qe"></tt></output>
    2. <strike id="qn6qe"></strike>

      亚洲 日本 欧洲 欧美 视频,日韩中文字幕有码av,一本一道av中文字幕无码,国产线播放免费人成视频播放,人妻少妇偷人无码视频,日夜啪啪一区二区三区,国产尤物精品自在拍视频首页,久热这里只有精品12

      Kafka如何保證「消息不丟失」,「順序傳輸」,「不重復消費」,以及為什么會發生重平衡(reblanace)

      前言

      上一篇文章總結了kafka為什么快,下面來總結一下,kafka高頻的常見的問題。內容有點多,全部看完需要有一定的耐心。

      kafka如何保證消息不丟失

      Producer端

      要保證消息不丟失,第一點要做的就是要保證消息從producer端發送到了kafka的broker中,并且broker把消息保存了下來。
      由于在發送消息的過程中有可能會發生網絡故障,broker故障等原因導致消息發送失敗,因此在producer端有兩種方式來避免消息丟失。

      接收發送消息回執

      我們在使用kafka發送消息的時候,通常是使用producer.send(msg)方法,但是這個方法其實是一種異步發送,調用此方法發送消息的時候,雖然會立即返回,但是并不代表消息真的發送成功了。
      1、所以可以使用同步發送消息,producer.send(msg).get()此方法會執行同步發生消息,并等待結果返回
      2、也可以使用帶回調函數的異步方法,producer.send(msg,callback),用回調函數來監聽消息的發送結果,如果發送失敗了,可以在回調函數里面進行重試。

      producer參數配置

      producer也提供了一些配置參數來避免消息丟失。

      // 此配置表示,Leader和Follower全部成功接收消息后才確認收到消息,
      // 可以最大限度保證消息不丟失,但是吞吐量會下降
      acks = -1 
      // producer 發送消息失敗后,自動重試次數
      retries = 3
      // 發送消息失敗后的重試時間間隔
      retry.backoff.ms = 300
      

      Broker端

      當消息發送到broker后,broker需要保證此消息不會丟失,我們都知道,kafka是會將消息持久化到磁盤中的。
      但是kafka為了保持性能采用了,頁緩存+異步刷盤的形式將消息持久化到磁盤的。也就是批量定時將消息持久化到磁盤。
      但是頁緩存如果還沒來的及將消息刷到磁盤,broker就掛了,還是會有消息丟失的風險,因此kafka又提供了partition的ISR(同步副本機制),即每一個patrtition都會有一個唯一的Leader和一到多個Follower,Leader專門處理一些事務類型的請求,Follower負責同步Leader的數據。當leader掛了后,會重新從Follower中選舉出新的Leader,保證消息能夠最終持久化。
      另外,在producer中的配置參數acks,配置不同的值,broker也是會做不同的處理的。

      acks=0:表示Producer請求立即返回,不需要等待Leader的任何確認。這種方案有最高的吞吐率,但是不保證消息是否真的發送成功。
      acks =-1: 表示分區Leader必須等待消息被成功寫入到所有的ISR副本(同步副本)中才認為Producer請求成功。這種方案提供最高的消息持久性保證,但是理論上吞吐率也是最差的。
      acks=1: 表示Leader副本必須應答此Producer請求并寫入消息到本地日志,之后Producer請求被認為成功。如果此時Leader副本應答請求之后掛掉了,消息會丟失。這個方案,提供了不錯的持久性保證和吞吐。

      producer中還有一些參數的配置也是會起到避免消息丟失的作用

      //表示分區副本的個數,replication.factor>1,當Leader掛了,follower會被選舉為leader繼續提供服務
      replication.factor=2
      
      //表示 ISR 最少的副本數量,通常設置 replication.factormin.insync.replicas>1,這樣才有可用的follower副本執行替換,保證消息不丟失
      replication.factormin.insync.replicas=2
      
      //是否可以把非ISR集合中的副本選舉為leader
      min.inunclean.leader.election.enable= false
      

      Consumer端

      Consumer端,只要保證消息接收到不胡亂的提交offset就行,kafka本身也是會記錄每個pratition的偏移量,但是為了業務的可靠性,也可以自己存儲一份offset,防止由于業務代碼的問題導致消息沒有處理就提交的offset,有自己存儲才這一份offset就可以對偏移量進行一個回撥。

      為了避免消息丟失,建議使用手動提交偏移量的方式,防止消息的業務邏輯未處理完,提交偏移量后消費者掛了的問題。

      enable.auto.commit=false
      

      kafka如何保證消息的順序傳輸

      我們知道,kafka的消息實際是存在某個topic的partition中的,一個topic有多個partition分區,同一個partition中的消息是有序的,跨partition的消息是無序的。
      這個是怎么實現的呢?
      因為我們在【Kafka為什么吞吐量大,速度快?】這篇文章里面總結了,kafka寫入磁盤時是順序寫的,并且會被分配一個唯一的offset,所以同一個partition保存的數據都是有序的。而在讀取消息時,消費者會從該分區最早的offset開始,依次讀取消息,保證了消息順序消費。

      具體實現順序發送消息有兩種方式:
      1、在使用kafka時,對需要保證順序消費的topic,只創建一個partition,這樣消息就都會順序的存儲到這一個partition中,也就能保證順序消費了。
      2、當一個topic有多個partition時,對需要保證順序的消息,都發到指定的partition即可,這樣也能保證順序消費。

      注:需要注意一點,雖然發送時保證了順序,也都寫到了同一個partition中,但在消費端,也要保證順序消費,即單線程處理消息。

      目前kafka4.0,可以允許一個consumer group下的多個消費者同時消費同一個partition了。

      其借助新推出的共享組(Shared Group)特性來達成這一功能,且支持逐條消息確認,從而讓消費模式更具靈活性,還能助力提升吞吐量。以往版本默認僅允許一個消費者組內單個消費者消費一個特定分區,當消費者多于分區時,多余消費者會閑置,共享組則可避免出現該類資源浪費情況。

      將消息發到指定partition中也有幾種方式。
      1、發送消息,組裝producerRecord時,指定partition

      // 創建Kafka生產者
      Producer<String, String> producer = new KafkaProducer<>(getProperties());
      // 指定要發送消息的topic
      String topic ="jimer_topic";
      // 發送的消息內容
      String message =“Hello World!";
      // 指定發送消息的分區
      int partition =0;
      
      // 創建包含分區信息的ProducerRecord
      ProducerRecordProducerRecord<String,String> record = new ProducerRecord<>(topic, partition, message);
      // 發送消息
      producer.send(record);
      //關閉Kafka生產者
      producer.close();
      
      

      2、指定消息的key,保證相同key的消息發送到同一個partition

      // 創建Kafka生產者
      Producer<String, String> producer = new KafkaProducer<>(getProperties());
      // 指定要發送消息的topic
      String topic ="jimer_topic";
      // 發送的消息內容
      String message =“Hello World";
      // 指定發送消息的key
      String msg_key = "order_msg";
      
      // 創建包含消息key的producerRecord
      ProducerRecordProducerRecord<String,String> record = 
      new ProducerRecord<>(topic, null,msg_key, message);
      // 發送消息
      producer.send(record);
      //關閉Kafka生產者
      producer.close();
      

      3、自定義Partitioner
      除了上面兩種方式外,還可以自定義指定分區的方式。通過實現Partitioner這個接口,具體實現partition方法,就可以了。具體使用的時候,在初始化Producer時,指定具體的partition實現類即可。
      例如:

      public class MyPartitioner implatents Partitioner{
      
      @Override
      public void configure(Map<String,?> configs){
        // 可以在這里處理和獲取分區器的配置參數
      }
      @0verride
      public int partition(String topic, Object key, byte[] keyBytes, 
      Object value,byte[] valueBytes,Cluster cluster){
          int partition =  int ss = new Random().nextInt(2);
      	// 返回分區編號
      	return partition;
      }
      @0verride
      public void close(){
      	// 可以在這里進行一些清理操作
      }
      

      使用時

      Properties propsProducer = new Properties();
              propsProducer.put("acks", "all"); // 全部ISR列表中的副本接收成功后返回
              propsProducer.put("retries", 3);//失敗時重試次數
              propsProducer.put("partitioner.class", "com.jimoer.MyPartitioner"); // 指定自定義分區器類
      // 創建Kafka生產者
      Producer<String, String> producer = new KafkaProducer<>(propsProducer);
      

      kafka如何保證消息不重復消費

      什么情況下會導致消息被重復消費呢?

      1、生產者,生產者可能重復推送了一條消息到kafka,例如:某接口未做冪等處理,接口中會發送kafka消息。
      2、kafka,在消費者消費完消息后,提交offset時,kafka突然掛了,導致kafka認為此消息還未消費,又重新推送了該條消息,導致了重復消費消息。
      3、消費者,在消費者消費完消息后,提交offset時,Consumer突然宕機掛掉,這個時候,kafka未接收到已處理的offset值,當Consumer恢復后,會重新消費此部分消息。
      4、還有一種情況,Kafka 存在 Partition Balance 機制,會將多個 Partition 均衡分配給多個消費者。若 Consumer 在默認 5 分鐘內未處理完一批消息,會觸發 Rebalance 機制,導致 offset 自動提交失敗,重新 Rebalance 后,消費者會從之前未提交的 offset 位置開始消費,從而造成消息重復消費。

      那么我們該如何防止消息被重復消費呢

      其實上面的1、2、3、4這些情況都可以用冪等機制來防止消息被重復消費。為消息生成 一個唯一標識,并保存到 mysql 或 redis 中,處理消息前先到 mysql 或 redis 中判斷該消息是否已被消費過。

      但是第4種情況,前提是要先優化消費端處理性能,避免觸發 Rebalance,例如:采用異步方式處理消息、縮短單個消息消費時長、調整消息處理超時時間、減少一次性從 Broker 拉取的數據條數等。

      kafka什么情況下會發生reblanace(重平衡)

      Kafka 的重平衡(Rebalance)是指消費者組(Consumer Group)內的消費者與分區(Partition)之間的分配關系發生重新調整的過程
      主要有以下幾種情況會觸發:
      1、消費者組成員數量發生變化。((新消費者的加入或者退出)
      2、訂閱主題(Topic)數量發生變化。
      3、訂閱主題的分區(Partition)數發生變化。

      還有某些異常情況也會觸發Rebalance:
      1、消費端處理消息超時,上面我們說到過,若消費者未在設定時間內處理完消息,消費者組會認為當前消費者有問題了,會觸發Rebalance,重新分配消息。又或者當前消費者掛了,也是一樣會觸發Rebalance。
      2、組協調器(Group Coordinator)是 Kafka 負責管理消費者組的 Broker 節點。如果它崩潰或者發生故障。Kafka 需要重新選舉新的 Group Coordinator ,并進行重平衡。

      當Kafka 集群要觸發重平衡機制時,大致的步驟如下:
      1.暫停消費:在重平衡開始之前,Kafka 會暫停所有消費者的拉取操作,以確保不會出現重平衡期間的消息丟失或重復消費。
      2.計算分區分配方案:Kafka 集群會根據當前消費者組的消費者數量和主題分區數量,計算出每個消費者應該分配的分區列表,以實現分區的負載均衡。
      3.通知消費者:一旦分區分配方案確定,Kafka 集群會將分配方案發送給每個消費者,告訴它們需要消費的分區列表,并請求它們重新加入消費者組。
      4.重新分配分區:在消費者重新加入消費者組后,Kafka 集群會將分區分配方案應用到實際的分區分配中,重新分配主題分區給各個消費者。
      5.恢復消費:最后,Kafka 會恢復所有消費者的拉取操作,允許它們消費分配給自己的分區。

      posted @ 2025-08-20 22:27  紀莫  閱讀(581)  評論(0)    收藏  舉報
      主站蜘蛛池模板: 色爱区综合激情五月激情| 亚洲香蕉免费有线视频| 中文字幕无码不卡在线| 中文字幕日韩精品有码| 成人免费AA片在线观看| 免费网站看sm调教视频| 日本高清视频网站www| 青青草久热这里只有精品| 亚洲综合一区二区三区不卡| 蜜臀久久精品亚洲一区| 日韩av无码一区二区三区| 国产精品午夜福利合集| 亚洲综合网国产精品一区| 免费无码午夜理论电影| 中文字幕国产精品自拍| 成人av片无码免费网站| 国产亚洲精品久久综合阿香| 亚洲精品综合一区二区三区| 91密桃精品国产91久久| 中文字幕精品人妻丝袜| 亚洲乱码精品久久久久..| 奇米777四色成人影视| 工布江达县| 99精品久久毛片a片| 国产精品国产三级国快看| 男女一边摸一边做爽爽| 你懂的亚洲一区二区三区| 亚洲精品一区二区口爆| 被灌满精子的波多野结衣| 久久婷婷成人综合色| 东京热一精品无码av| 亚洲综合国产一区二区三区| 无码全黄毛片免费看| 国内精品一区二区不卡| 三江| 国产精品中文字幕第一区| 国产精品国三级国产av| 老湿机69福利区无码| 亚洲精品国产精品乱码不| 亚洲 一区二区 在线| 亚洲国产成人极品综合|