kafka消費者組狀態--rebalance(參考其他作者文檔)
零、kafka消費者組狀態
- Stable:消費者組中所有消費者都已經加入并且正在消費消息。
- Rebalancing:消費者組正在重新平衡,即有消費者加入或退出消費者組時發生的狀態。在重新平衡期間,Kafka會暫停所有消費者的消費,并重新分配分區。
- Empty:消費者組中沒有消費者或者所有消費者都已經退出消費者組。
- Dead:消費者組已經過期或者已經被刪除,無法再被使用。
- Preparing Rebalance:消費者組正在準備進行重新平衡,即將要加入或退出的消費者還沒有完成加入或退出操作。
- Dead and Empty:消費者組已經過期或者已經被刪除,并且沒有任何消費者存在于該消費者組中。
一、前言
Rebalance 就是讓一個 Consumer Group 下所有的 Consumer 實例就如何消費訂閱主題的所有分區達成共識的過程。在 Rebalance 過程中,Consumer Group 下所有的 Consumer 實例共同參與,在 Coordinator 協調者組件的幫助下,完成訂閱主題分區的分配。但是,在整個過程中,所有實例都不能消費任何消息,因此它對 Consumer 的 TPS 影響很大。像不像 JVM 的 GC?我們知道 JVM 頻繁 GC 的話,對時延敏感的業務來說,簡直是噩夢,所以我們會針對 GC 進行相應的調優,讓 JVM 不那么頻繁的發生 STW。
Kafka 消費組的重平衡也是類似的,消費組發生重平衡,Consumer 就很慢,對于實時性不敏感的業務,慢一點也能接受,就怕 Consumer 處理業務超時了,消費組把 Consumer 踢出去了,業務設置重試機制,自動從線程池中拿出一個新線程作為消費者去訂閱 topic,那么意味著有新消費者加入 Consumer Group,又會引發 Rebalance,新的消費者還是來不及處理完所有消息,又被移出 Consumer Group。如此循環,就發生了頻繁的 Rebalance 現象。
二、Rebalance 的弊端
- Rebalance 影響 Consumer 端 TPS,Coordinator 協調者組件完成訂閱主題分區的分配的過程,該消費組下所有實例都不能消費任何消息。
- 如果你的組成員消費者實例很多的話,Rebalance 很慢,對業務會造成一定的影響。
- Rebalance 效率不高。當前 Kafka 的設計機制決定了每次 Rebalance 時,Consumer Group 下的所有成員都要參與進來,而且通常不會考慮局部性原理,但局部性原理對提升系統性能是特別重要的。
對于第三點,你是不是覺得,Kafka 社區讓所有成員都要參與進來很不合理啊,應該把那個退出消費組的消費者負責的分區隨機分配給其它消費者,其它消費者的分區分配策略不變,這樣就最大限度地減少 Rebalance 對剩余 Consumer 成員的沖擊。
沒錯,你想到的社區也想到了,社區于 0.11.0.0 版本推出了 StickyAssignor,即有粘性的分區分配策略。所謂的有粘性,是指每次 Rebalance 時,該策略會盡可能地保留之前的分配方案,盡量實現分區分配的最小變動。不過有些遺憾的是,這個策略目前還有一些 bug,而且需要升級到 0.11.0.0 才能使用,因此在實際生產環境中用得還不是很多。
你可能會問了,社區對上面的弊端有沒有什么解決辦法?沒有,特別是 Rebalance 慢這個問題,Kafka 社區對此無能為力。設計就是這樣的話,那我們是不是可以盡可能去規避 Rebalance 呢,特別是那些不必要的 Rebalance。
三、觸發 Rebalance 機制的時機
要避免 Rebalance,還是要從觸發 Rebalance 機制的時機入手。我們在前面說過,觸發 Rebalance 機制的時機主要有以下幾個:
- 有新的 Consumer 加入 Consumer Group
- 有 Consumer 宕機下線。Consumer 并不一定需要真正下線,例如遇到長時間的 GC、網絡延遲導致消費者長時間未向 GroupCoordinator 發送 HeartbeatRequest 時,GroupCoordinator 會認為 Consumer 下線。
- 有 Consumer 主動退出 Consumer Group(發送 LeaveGroupRequest 請求)。比如客戶端調用了 unsubscribe() 方法取消對某些主題的訂閱。
- Consumer 消費超時,沒有在指定時間內提交 offset 偏移量。
- Consumer Group 所對應的 GroupCoordinator 節點發生了變更。
- Consumer Group 所訂閱的任一主題或者主題的分區數量發生變化。
四、Rebalance 實戰
有點抽象是不是?沒有關系,下面來看個例子,老周簡單來模擬一下 Rebalance。
4.1 生產者
/** * @author: 微信公眾號【老周聊架構】 */ public class KafkaProducerRebalanceTest { public static void main(String[] args) { Properties props = new Properties(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); KafkaProducer<String,String> producer = new KafkaProducer<>(props); for (int i = 0; i < 10; i++) { ProducerRecord record = new ProducerRecord<>("topic_test", "userName", "riemann_" + i); producer.send(record); } producer.close(); } } 123456789101112131415161718
4.2 消費者
/** * @author: 微信公眾號【老周聊架構】 */ public class KafkaConsumerRebalanceTest { public static void main(String[] args) { Properties props = new Properties(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.GROUP_ID_CONFIG, "consumer_group_test"); props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "10000"); props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, "60000"); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); String topic = "topic_test"; long pollTimeout = 100; long sleep = TimeUnit.MILLISECONDS.convert(2, TimeUnit.MINUTES); consumer.subscribe(Arrays.asList(topic), new ConsumerRebalanceListener() { @Override public void onPartitionsRevoked(Collection<TopicPartition> collection) { System.out.println("Partition Revoked"); } @Override public void onPartitionsAssigned(Collection<TopicPartition> collection) { System.out.println("New assignment : " + collection.size() + " partitions"); } }); while (true) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(pollTimeout)); for (ConsumerRecord<String, String> record : records) { System.out.printf("offset = %d, key = %s, value = %s \n", record.offset(), record.key(), record.value()); try { Thread.sleep(sleep); } catch (InterruptedException e) { e.printStackTrace(); } } } } } 12345678910111213141516171819202122232425262728293031323334353637383940414243
4.3 終端日志
消費者日志
[Consumer clientId=consumer-consumer_group_test-1, groupId=consumer_group_test] Member consumer-consumer_group_test-1-7d64e140-f0e3-49d2-8230-2621ba1d2061 sending LeaveGroup request to coordinator 127.0.0.1:9092 (id: 2147483643 rack: null) due to consumer poll timeout has expired. This means the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time processing messages. You can address this either by increasing max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with max.poll.records. (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
服務端日志

[2022-01-16 16:38:25,315] INFO [GroupCoordinator 4]: Member[group.instance.id None, member.id consumer-consumer_group_test-1-7d64e140-f0e3-49d2-8230-2621ba1d2061] in group consumer_group_test has left, removing it from the group (kafka.coordinator.group.GroupCoordinator)[2022-01-16 16:38:25,315] INFO [GroupCoordinator 4]: Preparing to rebalance group consumer_group_test in state PreparingRebalance with old generation 37 (__consumer_offsets-17) (reason: removing member consumer-consumer_group_test-1-7d64e140-f0e3-49d2-8230-2621ba1d2061 on LeaveGroup) (kafka.coordinator.group.GroupCoordinator)[2022-01-16 16:38:25,316] INFO [GroupCoordinator 4]: Group consumer_group_test with generation 38 is now empty (__consumer_offsets-17) (kafka.coordinator.group.GroupCoordinator)
那消費組為啥會出現 Rebalance 呢?從消費者日志可以看出,消費超時,導致消費線程長時間無法向 Coordinator 節點發送心跳,Coordinator 節點以為 Consumer 已經宕機,Coordinator 于是將 Consumer 節點從消費組中剔除,并觸發了 Rebalance 機制。

這其實和 Consumer 的心跳發送機制也有關系,在大多數中間件的設計中,都會分離業務線程和心跳發送線程,而 Kafka 卻沒有這樣做,其目的可能是為了實現簡單。如果消費者消費業務確實需要非常長時間,我們可以通過參數 max.poll.interval.ms 配置,它代表消費兩次 poll 最大的時間間隔,默認是 300000 ms,也就是 5 分鐘,5 分鐘都還超時,那可以再調大一點;或者我們可以減少 consumer 每次從 broker 拉取的數據量,可以通過參數 max.poll.records 配置,consumer 默認拉取 500 條,我們可以將其修改了 200 條。
Kafka 在 0.10.1 版本中修正了 Consumer 的心跳發送機制,將心跳發送的任務交給了專門的 HeartbeatThread,而不是像早期版本那樣依賴于用戶應用線程來定期輪詢。這個設計被證明是相當棘手的調整,增加會話超時將為消息處理提供更多的時間,但消費者組也將花更多的時間來檢測進程崩潰等故障。Kafka 消費者 0.10.1 引入了 max.poll.interval.ms 來解耦處理超時和會話超時。這個 max.poll.interval.ms 參數還是很有意義的,因為即使心跳發送正常,那也只能證明 Consumer 是存活狀態,但是 Consumer 可能處于假死狀態,比如 Consumer 遇到了死鎖導致長時間等待超過了poll 設定的時間間隔 max.poll.interval.ms。
五、Rebalance 問題處理思路
我們上面第三點說的觸發 Rebalance 機制的時機有好幾點,其實主要就三大類:
- 組成員數量發生變化
- 訂閱主題數量發生變化
- 訂閱主題的分區數發生變化
后面兩個通常都是運維的主動操作,所以它們引發的 Rebalance 大都是不可避免的。接下來,我們主要來說說不必要的 Rebalance 該如何避免,也就是組成員數量變化而引發的 Rebalance 該如何避免。
如果 Consumer Group 下的 Consumer 實例數量發生變化,就一定會引發 Rebalance。這是 Rebalance 發生的最常見的原因。
Consumer 實例增加的情況很好理解,當我們啟動一個配置有相同 group.id 值的 Consumer 程序時,實際上就向這個 Consumer Group 添加了一個新的 Consumer 實例。此時,Coordinator 會接納這個新實例,將其加入到組中,并重新分配分區。通常來說,增加 Consumer 實例的操作都是計劃內的,可能是出于增加 TPS 或提高伸縮性的需要。總之,它不屬于我們要規避的那類 “不必要 Rebalance”。
我們更在意的是 Consumer Group 下實例數減少這件事。如果你就是要停掉某些 Consumer 實例,那自不必說,關鍵是在某些情況下,Consumer 實例會被 Coordinator 錯誤地認為 “已停止” 從而被 “踢出” Consumer Group。如果是這個原因導致的 Rebalance,我們就不能不管了。
Coordinator 會在什么情況下認為某個 Consumer 實例已掛從而要退組呢?那就要來看看消費者端配置的幾個參數:
- session.timeout.ms 設置了超時時間
- heartbeat.interval.ms 心跳時間間隔
- max.poll.interval.ms 每次消費的處理時間
- max.poll.records 每次消費的消息數
5.1 session.timeout.ms

Consumer 與 Broker 的心跳超時時間,默認 10s,Broker 如果超過 session.timeout.ms 設定的值仍然沒有收到心跳,Broker 端將會將該消費者移除,并觸發 Rebalance。
這個值必須設置在 Broker 配置中的 group.min.session.timeout.ms 與 group.max.session.timeout.ms 之間。
該參數和 heartbeat.interval.ms 這兩個參數可以適當的控制 Rebalance 的頻率。
5.2 heartbeat.interval.ms

心跳間隔時間。心跳是在 Consumer 與 Coordinator 之間進行的。心跳用來保持 Consumer 的會話,并且在有 Consumer 加入或者離開 Consumer Group 時幫助進行 Rebalance。
這個值必須設置的小于 session.timeout.ms,因為:當 Consumer 由于某種原因不能發 Heartbeat 到 Coordinator 時,并且時間超過 session.timeout.ms 時,就會認為該 Consumer 已退出,它所訂閱的 Partition 會分配到同一 Consumer Group 內的其它的 Consumer 上。
通常設置的值要低于 session.timeout.ms 的 1/3。默認值是:3s
5.3 max.poll.interval.ms

兩次 poll 方法調用的最大間隔時間,單位毫秒,默認為 5 分鐘。如果消費端在該間隔內沒有發起 poll 操作,該消費者將被剔除,觸發重平衡,將該消費者分配的隊列分配給其他消費者。
Kafka 中有一個專門的心跳線程來實現發送心跳的動作,所以存在 Consumer Client 依舊可以有效的發送心跳,但 Consumer 實際卻處于 livelock (活鎖)狀態,從而導致無法有效的進行數據處理,所以基于此 Kafka 通過參數 max.poll.interval.ms 來規避該問題。
5.4 max.poll.records

Consumer 每次調用 poll() 時取到的 records 的最大數。每執行一次 poll 方法所拉取的最大數據量;是基于所分配的所有 Partition 而言的數據總和,而非每個 Partition 上拉去的最大數據量;默認值為 500。
通俗點講表示每次消費的時候,獲取多少條消息。獲取的消息條數越多,需要處理的時間越長。所以每次拉取的消息數不能太多,需要保證在 max.poll.interval.ms 設置的時間內能消費完,否則會發生 rebalance。
六、如何避免 Rebalance
簡單來說,非必要 Rebalance 有下面兩個點:
- 消費者心跳超時,導致 Rebalance。
- 消費者處理時間過長,導致 Rebalance。
6.1 消費者心跳超時
我們知道消費者是通過心跳和協調者保持通訊的,如果協調者收不到心跳,那么協調者會認為這個消費者死亡了,從而發起 Rebalance。
這里給一下業界主流推薦的值,可以根據自己的業務可做相應的調整:
- 設置
session.timeout.ms = 6s。
- 設置
heartbeat.interval.ms = 2s。
- 要保證 Consumer 實例在被判定為 “dead” 之前,能夠發送至少 3 輪的心跳請求,即
session.timeout.ms >= 3 * heartbeat.interval.ms。
這里你可能會問了,為啥要 session.timeout.ms >= 3 * heartbeat.interval.ms,而不是 5 或者 10 呢?
session.timeout.ms 是個"邏輯"指標,比如它指定了一個閾值 6 秒,在這個閾值內如果 Coordinator 未收到 Consumer 的任何消息,那 Coordinator 就認為 Consumer 掛了。而 heartbeat.interval.ms 是個"物理"指標,它告訴 Consumer 要每 2 秒給 Coordinator 發一個心跳包,heartbeat.interval.ms 越小,發的心跳包越多,它是會影響發 TCP 包的數量的,產生了實際的影響,這也是我為什么將之稱為"物理"指標的原因。
如果 Coordinator 在一個 heartbeat.interval.ms 周期內未收到 Consumer 的心跳,就把該 Consumer 移出 Consumer Group,這有點說不過去。就好像 Consumer 犯了一個小錯,就一棍子把它打死了。事實上,有可能網絡延時,有可能 Consumer 出現了一次長時間 GC,影響了心跳包的到達,說不定下一個 Heartbeat 就正常了。
而 heartbeat.interval.ms 肯定是要小于 session.timeout.ms 的,如果 Consumer Group 發生了 Rebalance,通過心跳包里面的 REBALANCE_IN_PROGRESS,Consumer 就能及時知道發生了 Rebalance,從而更新 Consumer 可消費的分區。而如果超過了 session.timeout.ms,Coordinator 都認為 Consumer 掛了,那也當然不用把 Rebalance 信息告訴該 Consumer 了。
在 Kafka 0.10.1 之后的版本中,將 session.timeout.ms 和 max.poll.interval.ms 解耦了。也就是說:new KafkaConsumer 對象后,在 while true 循環中執行 consumer.poll 拉取消息這個過程中,其實背后是有 2 個線程的,即一個 Consumer 實例包含 2 個線程:一個是 Heartbeat 線程,另一個是 Processing 線程。Processing 線程可理解為調用 consumer.poll 方法執行消息處理邏輯的線程,而 Heartbeat 線程是一個后臺線程,對程序員是"隱藏不見"的。如果消息處理邏輯很復雜,比如說需要處理 5 min,那么 max.poll.interval.ms 可設置成比 5 min 大一點的值。而 Heartbeat 線程則和上面提到的參數 heartbeat.interval.ms 有關,Heartbeat 線程每隔 heartbeat.interval.ms 向 Coordinator 發送一個心跳包,證明自己還活著。只要 Heartbeat 線程 在 session.timeout.ms 時間內向 Coordinator 發送過心跳包,那么 Coordinator 就認為當前的 Consumer 是活著的。
在 Kafka 0.10.1 之前,發送心跳包和消息處理邏輯這 2 個過程是耦合在一起的,試想:如果一條消息處理時長要 5 min,而 session.timeout.ms=3000ms,那么等 Consumer 處理完消息,Coordinator 早就將 Consumer 移出 Consumer Group 了,因為只有一個線程,在消息處理過程中就無法向 Coordinator 發送心跳包,超過 3000ms 未發送心跳包,Coordinator 就將該 Consumer 移出 Consumer Group 了。而將二者分開,一個 Processing 線程負責執行消息處理邏輯,一個 Heartbeat 線程負責發送心跳包。那么就算一條消息需要處理 5min,只要 Heartbeat 線程在 session.timeout.ms 時間內向 Coordinator 發送了心跳包,那 Consumer 可以繼續處理消息,而不用擔心被移出 Consumer Group 了。另一個好處是:如果 Consumer 出了問題,那么在 session.timeout.ms 內就能檢測出來,而不用等到 max.poll.interval.ms 時長后才能檢測出來。
為啥要 session.timeout.ms >= 3 * heartbeat.interval.ms,我覺得是社區做的測試得出來的最優值吧, 因為 heartbeat.interval.ms 越小,發的心跳包越頻繁,浪費沒必要的流量;而設置越大,Consumer 掛了很久才能檢測到,明顯也不合理。
6.2 消費者處理時間過長
如果消費者處理時間過長,那么同樣會導致協調者認為該 Consumer 死亡了,從而發起重平衡。
而 Kafka 的消費者參數設置中,跟消費處理的兩個參數為:
- max.poll.interval.ms 每次消費的處理最大時間
- max.poll.records 每次消費的消息數
對于這種情況,一般來說就是增加消費者處理的時間(即提高 max.poll.interval.ms 的值),減少每次處理的消息數(即減少 max.poll.records 的值)。
我們上面的那個例子就是這個場景觸發的 Rebalance,max.poll.interval.ms 每次消費的處理最大時間設置的是 60000ms,也就是 1min。而我在 consumer.poll 方法里休眠了 2min 來模擬處理業務的時間,處理業務的時間大于 max.poll.interval.ms ,導致 Rebalance。
6.3 Consumer 端的 GC 表現
如果上面兩種從 Kafka 層面還無法避免 Rebalance,那我建議你去排查下 Consumer 端的 GC 表現,比如是否出現了頻繁的 Full GC 導致的長時間停頓,從而引發了 Rebalance。

浙公網安備 33010602011771號