<output id="qn6qe"></output>

    1. <output id="qn6qe"><tt id="qn6qe"></tt></output>
    2. <strike id="qn6qe"></strike>

      亚洲 日本 欧洲 欧美 视频,日韩中文字幕有码av,一本一道av中文字幕无码,国产线播放免费人成视频播放,人妻少妇偷人无码视频,日夜啪啪一区二区三区,国产尤物精品自在拍视频首页,久热这里只有精品12

      Kafka原理剖析之「位點(diǎn)提交」

      一、背景

      Kafka的位點(diǎn)提交一直是Consumer端非常重要的一部分,業(yè)務(wù)上我們經(jīng)常遇到的消息丟失、消息重復(fù)也與其息息相關(guān)。位點(diǎn)提交說(shuō)簡(jiǎn)單也簡(jiǎn)單,說(shuō)復(fù)雜也確實(shí)復(fù)雜,沒(méi)有人能用一段簡(jiǎn)短的話(huà)將其說(shuō)清楚,最近團(tuán)隊(duì)生產(chǎn)環(huán)境便遇到一個(gè)小概率的報(bào)錯(cuò)

      Offset commit failed with a retriable exception. You should retry committing the latest consumed offsets. The coordinator is not available.

      此錯(cuò)誤一出,Consumer的流量直接跌0,且無(wú)法自愈,雖然客戶(hù)端重啟后可自動(dòng)恢復(fù),但影響及損失還是非常巨大的,當(dāng)然最后定位就是「位點(diǎn)提交」一手炮制的,是開(kāi)源的一個(gè)重大bug,受此影響的版本跨越2.6.x ~ 3.1.x :

      https://issues.apache.org/jira/browse/KAFKA-13840

      借此bug正好來(lái)梳理一下Kafka有關(guān)位點(diǎn)提交的知識(shí)點(diǎn)

      二、概述

      關(guān)于位點(diǎn)提交(commit offset)大家最直觀的感受便是自動(dòng)或手動(dòng)提交,但是仔細(xì)一想,還是有很多細(xì)節(jié)問(wèn)題,例如:

      • 手動(dòng)的同步提交與異步提交有什么區(qū)別?
      • 使用自動(dòng)提交模式時(shí),提交動(dòng)作是同步的還是異步的?
      • 消費(fèi)模式使用assign或subscribe,在提交位點(diǎn)時(shí)有區(qū)別嗎?
      • 同步提交與異步提交能否混合使用?
      • 手動(dòng)提交與自動(dòng)提交能否混合使用?

      其實(shí)這些問(wèn)題都是萬(wàn)變不離其宗,我們把各個(gè)特征總結(jié)一下,這些問(wèn)題自然也就迎刃而解

      三、為什么要提交位點(diǎn)?

      在開(kāi)始介紹各類(lèi)位點(diǎn)提交的策略之前,我們先拋出一個(gè)靈魂拷問(wèn):“為什么一定要提交位點(diǎn)?”。 Consumer會(huì)周期性的從Broker拉取消息,每次拉取消息的時(shí)候,順便提交位點(diǎn)不可以嗎?為什么一定要讓用戶(hù)感知提交位點(diǎn),還提供了各種各樣的策略?

      其實(shí)回答這個(gè)問(wèn)題,我們理解以下2個(gè)配置就夠了

      • fetch.max.bytes and max.partition.fetch.bytes
        • 均作用于Broker端。首先需要明確的是,Consumer的一次拉取經(jīng)常是針對(duì)多個(gè)partition的,因此max.partition.fetch.bytes控制的一個(gè)partition拉取消息的最大值,而fetch.max.bytes控制的則是本次請(qǐng)求整體的上限
      • max.poll.records
        • 作用于Consumer端。而此參數(shù)控制的就是一次 poll 方法最多返回的消息條數(shù),因此并不是每次調(diào)用 poll 方法都會(huì)發(fā)起一次網(wǎng)絡(luò)請(qǐng)求的

      因此也就導(dǎo)致了發(fā)起網(wǎng)絡(luò)的頻次跟用戶(hù)處理業(yè)務(wù)數(shù)據(jù)的頻次是不一樣的

      簡(jiǎn)單總結(jié)一下,單次網(wǎng)絡(luò)請(qǐng)求拉取的數(shù)據(jù)量可能是很大的,需要客戶(hù)端通過(guò)多次調(diào)用poll()方法來(lái)消化,如果按照網(wǎng)絡(luò)請(qǐng)求的頻次來(lái)提交位點(diǎn)的話(huà),那這個(gè)提交頻次未免太粗了,Consumer一旦發(fā)生重啟,將會(huì)導(dǎo)致大量的消息重復(fù)

      其次按照網(wǎng)絡(luò)請(qǐng)求的頻次來(lái)提交位點(diǎn)的話(huà),程序?qū)⒆兊貌粔蜢`活,業(yè)務(wù)端對(duì)于消息的處理會(huì)有自己的理解,將提交位點(diǎn)的發(fā)起動(dòng)作放在Consumer,設(shè)計(jì)更有彈性

      四、Consumer網(wǎng)絡(luò)模型簡(jiǎn)介

      4.1、單線程的Consumer

      在開(kāi)始介紹Consumer端的網(wǎng)絡(luò)模型之前,我們先看下Producer的

      可見(jiàn)Producer是線程安全的,Producer內(nèi)部維護(hù)了一個(gè)并發(fā)的緩存隊(duì)列,所有的數(shù)據(jù)都會(huì)先寫(xiě)入隊(duì)列,然后由Sender線程負(fù)責(zé)將其發(fā)送至網(wǎng)絡(luò)

      而Consumer則不同,我們羅列一下Consumer的特點(diǎn)

      • 單線程(業(yè)務(wù)處理與網(wǎng)絡(luò)共享一個(gè)線程)
      • 非線程安全

      不過(guò)這里說(shuō)的單線程不夠嚴(yán)謹(jǐn),在0.10.1版本以后:

      • Subscribe模式下,Consumer將心跳邏輯放在了一個(gè)獨(dú)立線程中,如果消息處理邏輯不能在 max.poll.interval.ms 內(nèi)完成,則consumer將停止發(fā)送心跳,然后發(fā)送LeaveGroup請(qǐng)求主動(dòng)離組,從而引發(fā)coordinator開(kāi)啟新一輪rebalance
      • Assign模式下,則只有一個(gè)Main線程

      用戶(hù)處理業(yè)務(wù)邏輯的時(shí)間可能會(huì)很長(zhǎng),因此心跳線程的引入主要是為了解決心跳問(wèn)題,其非常輕量,因此我們泛泛的講,Consumer其實(shí)就是單線程的,包括提交位點(diǎn),那一個(gè)單線程的客戶(hù)端是如何保證高效的吞吐,又是如何與用戶(hù)處理數(shù)據(jù)的邏輯解耦呢?其實(shí)這是個(gè)很有意思,也很有深度的問(wèn)題,但不是本文的重點(diǎn),后續(xù)我們?cè)僬归_(kāi)詳聊

      因此我們知道,所有提交位點(diǎn)的動(dòng)作均是交由Consumer Main線程來(lái)提交的,但是單線程并不意味著阻塞,不要忘記,我們底層依賴(lài)的是JDK的NIO,因此網(wǎng)絡(luò)發(fā)送、接受部分均是異步執(zhí)行的

      4.2、網(wǎng)絡(luò)模型

      既然Consumer是單線程,而NIO是異步的,那么Consumer如何處理這些網(wǎng)絡(luò)請(qǐng)求呢?Producer比較好理解,有一個(gè)專(zhuān)門(mén)負(fù)責(zé)交互的Sender線程,而單線程的Consumer如何處理呢

      其實(shí)Consumer所有的網(wǎng)絡(luò)發(fā)送動(dòng)作,均放在main線程中,而在Consumer內(nèi)部,為每個(gè)建聯(lián)的Broker都維護(hù)了一個(gè)unsent列表,這個(gè)列表中存放了待發(fā)送的請(qǐng)求,每次業(yè)務(wù)端程序執(zhí)行consumer.poll()方法時(shí),會(huì)先后觸發(fā)2次網(wǎng)絡(luò)發(fā)送的操作:

      1. 嘗試將所有Broker待發(fā)送區(qū)的數(shù)據(jù)發(fā)送出去
      2. 處理網(wǎng)絡(luò)接收到的請(qǐng)求
      3. 嘗試將所有Broker待發(fā)送區(qū)的數(shù)據(jù)發(fā)送出去(again)

      回到我們位點(diǎn)提交的case中,如果某個(gè)Broker積攢了大量的未發(fā)送請(qǐng)求,那提交位點(diǎn)的請(qǐng)求豈不是要等待很久才能發(fā)出去?是的,如果unsent列表中有很多請(qǐng)求確實(shí)會(huì)這樣,不過(guò)正常情況下,同一個(gè)Broker中不會(huì)積攢大量請(qǐng)求,如果一次從Broker中拉取的消息還沒(méi)有被消費(fèi)完,是不會(huì)向該Broker再次發(fā)送請(qǐng)求的,因此業(yè)務(wù)poll()的頻率是要遠(yuǎn)高于網(wǎng)絡(luò)發(fā)送頻率的,而單次poll時(shí),又會(huì)觸發(fā)2次trySend,因此可保證不存在unsent列表的數(shù)據(jù)過(guò)多而發(fā)不出的情況

      BTW:Consumer的網(wǎng)絡(luò)三件套:NetworkClient、Selector、KafkaChannel與Producer是完全一樣的。關(guān)于Consumer的核心組件,盜用一張網(wǎng)上的圖

      有了上面的基礎(chǔ),我們?cè)賮?lái)討論位點(diǎn)提交的方式,就會(huì)變得非常清晰明朗了

      五、手動(dòng)-異步提交

      執(zhí)行異步提交的代碼通常是這樣寫(xiě)的

      while (true) {
          // 拉取消息
          ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofSeconds(1));
          // 如果拉取的消息不為空
          if (!records.isEmpty()) {
              // 執(zhí)行常規(guī)業(yè)務(wù)處理邏輯
              doBusiness(records);
              // 異步提交位點(diǎn)
              kafkaConsumer.commitAsync();
          }
      }

      kafkaConsumer.commitAsync() 負(fù)責(zé)拼接提交位點(diǎn)的request,然后將請(qǐng)求放在對(duì)應(yīng)Broker的unsent列表中,程序?qū)⒎祷兀较乱淮螛I(yè)務(wù)執(zhí)行poll(),或合適的時(shí)機(jī),會(huì)將此請(qǐng)求發(fā)出去,并不阻塞main線程

      而對(duì)于提交位點(diǎn)的結(jié)果,如果指定了回調(diào)函數(shù),如下:

      kafkaConsumer.commitAsync(new OffsetCommitCallback() {
          @Override
          public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
          }
      });

      可以對(duì)異常進(jìn)行處理,也可以拿到實(shí)時(shí)提交的位點(diǎn)

      而對(duì)于沒(méi)有指定回調(diào)函數(shù)的case,Consumer會(huì)提供一個(gè)默認(rèn)的回調(diào)函數(shù)org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.DefaultOffsetCommitCallback,在發(fā)生異常時(shí),輸出error日志

      六、手動(dòng)-同步提交

      而對(duì)于同步提交

      kafkaConsumer.commitSync();

      首先需要明確的是,同步提交是會(huì)阻塞Consumer的Main線程的,手動(dòng)提交會(huì)首先將提交請(qǐng)求放在對(duì)應(yīng)Broker的unsent列表的尾部,繼而不斷地觸發(fā)調(diào)用,將網(wǎng)絡(luò)待發(fā)送區(qū)的數(shù)據(jù)發(fā)送出去,同時(shí)不間斷接收網(wǎng)絡(luò)請(qǐng)求,直到收到本次提交的響應(yīng);不過(guò)同步提交也有超時(shí)時(shí)間,默認(rèn)為60s,如果超時(shí),將會(huì)拋出TimeoutException異常

      同步提交是低效的,會(huì)影響Consumer整體的消費(fèi)吞吐,而有些相對(duì)嚴(yán)苛的業(yè)務(wù)場(chǎng)景,同步提交又是必不可少的,讀者根據(jù)自己的業(yè)務(wù)case來(lái)決定使用哪種策略

      七、自動(dòng)提交

      與手動(dòng)提交相對(duì)的,便是自動(dòng)提交,首先明確一點(diǎn),自動(dòng)提交的模式,是異步提交

      自動(dòng)提交并不是啟動(dòng)一個(gè)全新的線程去提交位點(diǎn),也不是嚴(yán)格按照固定時(shí)間間隔去提交。自動(dòng)提交與手動(dòng)提交一樣,也是由Consumer Main線程觸發(fā)的

      由于位點(diǎn)提交、處理業(yè)務(wù)邏輯、網(wǎng)絡(luò)收發(fā)、元數(shù)據(jù)更新等,都共享了Consumer的Main線程,因此并不能保證提交位點(diǎn)的時(shí)間間隔嚴(yán)格控制在auto.commit.interval.ms(默認(rèn)5000,即5s)內(nèi),因此真實(shí)提交位點(diǎn)的時(shí)間間隔只會(huì)大于等于auto.commit.interval.ms

      總結(jié)一下自動(dòng)提交的特點(diǎn):

      • 異步提交
      • 提交操作由Consumer的Main線程發(fā)起
      • 配置 auto.commit.interval.ms 只能保證提交的最小間隔,真實(shí)提交時(shí)間間隔通常大于此配置

      至此,我們嘗試回答一下剛開(kāi)始提出的問(wèn)題

      • 手動(dòng)的同步提交與異步提交有什么區(qū)別?
        • 同步提交會(huì)阻塞Consumer的Main線程,相對(duì)而言,異步提交性能更高
      • 使用自動(dòng)提交模式時(shí),提交動(dòng)作是同步的還是異步的?
        • 異步的
      • 消費(fèi)模式使用assign或subscribe,在提交位點(diǎn)時(shí)有區(qū)別嗎?
        • subscribe模式會(huì)有心跳線程,心跳線程維護(hù)了與Coordinator的建聯(lián)
      • 同步提交與異步提交能否混合使用?
        • 可以,通常在大部分場(chǎng)景使用異步提交,而在需要明確拿到已提交位點(diǎn)的case下使用同步提交
      • 手動(dòng)提交與自動(dòng)提交能否混合使用?
        • 可以,不過(guò)語(yǔ)義上會(huì)有很多沖突,不建議混合使用

      八、開(kāi)源Bug

      回到文章剛開(kāi)始提到的異常報(bào)錯(cuò)

      Offset commit failed with a retriable exception. You should retry committing the latest consumed offsets. The coordinator is not available.

      這個(gè)bug并不是在所有case下都會(huì)存在

      • Subscribe
        • 自動(dòng)提交 -- 正常運(yùn)行
        • 手動(dòng)-異步提交 -- 正常運(yùn)行
        • 手動(dòng)-同步提交 -- 正常運(yùn)行
      • Assign
        • 自動(dòng)提交 -- Bug
        • 手動(dòng)-異步提交 -- Bug
        • 手動(dòng)-同步提交 -- 正常運(yùn)行

      為什么會(huì)出現(xiàn)如何奇怪的情況呢?其實(shí)跟一下源碼便會(huì)有結(jié)論

      8.1、Subscribe

      在Subscribe模式下,Consumer與Coordinator的交互是通過(guò)線程org.apache.kafka.clients.consumer.internals.AbstractCoordinator.HeartbeatThread進(jìn)行的。當(dāng)程序發(fā)現(xiàn)Coordinator找不到時(shí),便會(huì)發(fā)起尋找Coordinator的網(wǎng)絡(luò)請(qǐng)求,方法如下

      // org.apache.kafka.clients.consumer.internals.AbstractCoordinator#lookupCoordinator
      protected synchronized RequestFuture<Void> lookupCoordinator() {
          if (findCoordinatorFuture == null) {
              // find a node to ask about the coordinator
              Node node = this.client.leastLoadedNode();
              if (node == null) {
                  log.debug("No broker available to send FindCoordinator request");
                  return RequestFuture.noBrokersAvailable();
              } else {
                  findCoordinatorFuture = sendFindCoordinatorRequest(node);
              }
          }
          return findCoordinatorFuture;
      }

      而其中涉及一個(gè)findCoordinatorFuture的成員變量,必須要滿(mǎn)足findCoordinatorFuture == null,才會(huì)真正發(fā)起網(wǎng)絡(luò)請(qǐng)求,因此在方法執(zhí)行完,需要將其置空,如下方法

      // org.apache.kafka.clients.consumer.internals.AbstractCoordinator#clearFindCoordinatorFuture
      private synchronized void clearFindCoordinatorFuture() {
          findCoordinatorFuture = null;
      }

      說(shuō)白了,也就是每次調(diào)用,都需要lookupCoordinator()clearFindCoordinatorFuture()成對(duì)兒出現(xiàn);當(dāng)然心跳線程也是這樣做的

      if (coordinatorUnknown()) {
          if (findCoordinatorFuture != null) {
              // clear the future so that after the backoff, if the hb still sees coordinator unknown in
              // the next iteration it will try to re-discover the coordinator in case the main thread cannot
              // 清理輔助變量findCoordinatorFuture
              clearFindCoordinatorFuture();
      
              // backoff properly
              AbstractCoordinator.this.wait(rebalanceConfig.retryBackoffMs);
          } else {
              // 尋找Coordinator
              lookupCoordinator();
          }
      }

      因此在Subscribe模式下,無(wú)論何種提交方式,都是沒(méi)有Bug的

      8.2、Assign

      因?yàn)樽詣?dòng)提交也是異步提交,因此我們只聚焦在同步提交與異步提交。其實(shí)同步提交與異步提交,它們構(gòu)建入?yún)ⅰ⑻幚眄憫?yīng)等均是調(diào)用的同一個(gè)方法,唯一不同的是發(fā)起調(diào)用處的邏輯。我們先看下同步提交的邏輯

      
      // org.apache.kafka.clients.consumer.internals.AbstractCoordinator#ensureCoordinatorReady
      protected synchronized boolean ensureCoordinatorReady(final Timer timer) {
          if (!coordinatorUnknown())
              return true;
      
          do {
              if (fatalFindCoordinatorException != null) {
                  final RuntimeException fatalException = fatalFindCoordinatorException;
                  fatalFindCoordinatorException = null;
                  throw fatalException;
              }
              final RequestFuture<Void> future = lookupCoordinator();
      
              // some other business
              // .......
      
              clearFindCoordinatorFuture();
              if (fatalException != null)
                  throw fatalException;
          } while (coordinatorUnknown() && timer.notExpired());
      
          return !coordinatorUnknown();
      }

      沒(méi)有問(wèn)題,lookupCoordinator()clearFindCoordinatorFuture()又成對(duì)兒出現(xiàn)

      而異步提交呢?

      // org.apache.kafka.clients.consumer.internals.ConsumerCoordinator#commitOffsetsAsync
      public void commitOffsetsAsync(final Map<TopicPartition, OffsetAndMetadata> offsets, final OffsetCommitCallback callback) {
          invokeCompletedOffsetCommitCallbacks();
      
          if (!coordinatorUnknown()) {
              doCommitOffsetsAsync(offsets, callback);
          } else {
              // we don't know the current coordinator, so try to find it and then send the commit
              // or fail (we don't want recursive retries which can cause offset commits to arrive
              // out of order). Note that there may be multiple offset commits chained to the same
              // coordinator lookup request. This is fine because the listeners will be invoked in
              // the same order that they were added. Note also that AbstractCoordinator prevents
              // multiple concurrent coordinator lookup requests.
              pendingAsyncCommits.incrementAndGet();
              lookupCoordinator().addListener(new RequestFutureListener<Void>() {
                  @Override
                  public void onSuccess(Void value) {
                      // do something
                  }
      
                  @Override
                  public void onFailure(RuntimeException e) {
                      // do something
                  }
              });
          }
      
          // ensure the commit has a chance to be transmitted (without blocking on its completion).
          // Note that commits are treated as heartbeats by the coordinator, so there is no need to
          // explicitly allow heartbeats through delayed task execution.
          client.pollNoWakeup();
      }

      非常遺憾,只有lookupCoordinator(),卻沒(méi)有clearFindCoordinatorFuture(),導(dǎo)致成員變量一直得不到重置,也就無(wú)法正常發(fā)起尋找Coordinator的請(qǐng)求,其實(shí)如果修復(fù)的話(huà),也非常簡(jiǎn)單,只需要在RequestFutureListener的回調(diào)結(jié)果中顯式調(diào)用clearFindCoordinatorFuture()即可

      這個(gè)bug隱藏的很深,只靠單測(cè),感覺(jué)還是很難發(fā)現(xiàn)的,bug已經(jīng)在3.2.1版本修復(fù)。雖然我們生產(chǎn)環(huán)境是2.8.2的Broker,但是還是可以直接通過(guò)升級(jí)Consumer版本來(lái)解決,即便client版本高于了server端。這個(gè)當(dāng)然得益于Kafka靈活的版本策略,還是要為其點(diǎn)個(gè)贊的

      參考文檔

      posted @ 2024-04-07 19:12  昔久  閱讀(856)  評(píng)論(9)    收藏  舉報(bào)
      主站蜘蛛池模板: 亚洲国产成人久久一区久久| 成人精品老熟妇一区二区| 国产区精品福利在线观看精品| 国产欧美精品一区二区三区四区| 最近中文国语字幕在线播放| 国产精品国产三级国产av剧情| 久久午夜无码鲁丝片直播午夜精品| 成av人电影在线观看| 精品无套挺进少妇内谢| 精品一区二区三区蜜桃久| 亚洲高清 一区二区三区| 鸡泽县| 久久热在线视频精品视频| 日韩中文字幕av有码| 又黄又爽又色视频免费| 一区二区三区在线 | 欧洲| 精品亚洲国产成人av在线| aa级毛片毛片免费观看久| 少妇又爽又刺激视频| 蜜桃一区二区三区免费看| 国产成人免费观看在线视频| 亚洲va久久久噜噜噜久久狠狠| 狼色精品人妻在线视频| 在线日韩日本国产亚洲| 男人的天堂av社区在线| 国产av一区二区久久蜜臀| 国产免费高清69式视频在线观看 | 国产精品成人一区二区三区| 日韩av综合中文字幕| 国产精品成人免费视频网站京东| 97人妻中文字幕总站| 重庆市| 国产SM重味一区二区三区| 亚洲高清国产拍精品5G| 乡宁县| 性人久久久久| 一区二区视频| 四虎在线成人免费观看| 自拍第一区视频在线观看| 亚洲永久精品ww47永久入口| 国产亚洲精品AA片在线爽|