Kafka原理剖析之「位點(diǎn)提交」
一、背景
Kafka的位點(diǎn)提交一直是Consumer端非常重要的一部分,業(yè)務(wù)上我們經(jīng)常遇到的消息丟失、消息重復(fù)也與其息息相關(guān)。位點(diǎn)提交說(shuō)簡(jiǎn)單也簡(jiǎn)單,說(shuō)復(fù)雜也確實(shí)復(fù)雜,沒(méi)有人能用一段簡(jiǎn)短的話(huà)將其說(shuō)清楚,最近團(tuán)隊(duì)生產(chǎn)環(huán)境便遇到一個(gè)小概率的報(bào)錯(cuò)
“Offset commit failed with a retriable exception. You should retry committing the latest consumed offsets. The coordinator is not available.”
此錯(cuò)誤一出,Consumer的流量直接跌0,且無(wú)法自愈,雖然客戶(hù)端重啟后可自動(dòng)恢復(fù),但影響及損失還是非常巨大的,當(dāng)然最后定位就是「位點(diǎn)提交」一手炮制的,是開(kāi)源的一個(gè)重大bug,受此影響的版本跨越2.6.x ~ 3.1.x :
https://issues.apache.org/jira/browse/KAFKA-13840
借此bug正好來(lái)梳理一下Kafka有關(guān)位點(diǎn)提交的知識(shí)點(diǎn)
二、概述
關(guān)于位點(diǎn)提交(commit offset)大家最直觀的感受便是自動(dòng)或手動(dòng)提交,但是仔細(xì)一想,還是有很多細(xì)節(jié)問(wèn)題,例如:
- 手動(dòng)的同步提交與異步提交有什么區(qū)別?
- 使用自動(dòng)提交模式時(shí),提交動(dòng)作是同步的還是異步的?
- 消費(fèi)模式使用assign或subscribe,在提交位點(diǎn)時(shí)有區(qū)別嗎?
- 同步提交與異步提交能否混合使用?
- 手動(dòng)提交與自動(dòng)提交能否混合使用?
其實(shí)這些問(wèn)題都是萬(wàn)變不離其宗,我們把各個(gè)特征總結(jié)一下,這些問(wèn)題自然也就迎刃而解
三、為什么要提交位點(diǎn)?
在開(kāi)始介紹各類(lèi)位點(diǎn)提交的策略之前,我們先拋出一個(gè)靈魂拷問(wèn):“為什么一定要提交位點(diǎn)?”。 Consumer會(huì)周期性的從Broker拉取消息,每次拉取消息的時(shí)候,順便提交位點(diǎn)不可以嗎?為什么一定要讓用戶(hù)感知提交位點(diǎn),還提供了各種各樣的策略?
其實(shí)回答這個(gè)問(wèn)題,我們理解以下2個(gè)配置就夠了
fetch.max.bytesandmax.partition.fetch.bytes
- 均作用于Broker端。首先需要明確的是,Consumer的一次拉取經(jīng)常是針對(duì)多個(gè)partition的,因此max.partition.fetch.bytes控制的一個(gè)partition拉取消息的最大值,而fetch.max.bytes控制的則是本次請(qǐng)求整體的上限
max.poll.records
- 作用于Consumer端。而此參數(shù)控制的就是一次 poll 方法最多返回的消息條數(shù),因此并不是每次調(diào)用 poll 方法都會(huì)發(fā)起一次網(wǎng)絡(luò)請(qǐng)求的
因此也就導(dǎo)致了發(fā)起網(wǎng)絡(luò)的頻次跟用戶(hù)處理業(yè)務(wù)數(shù)據(jù)的頻次是不一樣的

簡(jiǎn)單總結(jié)一下,單次網(wǎng)絡(luò)請(qǐng)求拉取的數(shù)據(jù)量可能是很大的,需要客戶(hù)端通過(guò)多次調(diào)用poll()方法來(lái)消化,如果按照網(wǎng)絡(luò)請(qǐng)求的頻次來(lái)提交位點(diǎn)的話(huà),那這個(gè)提交頻次未免太粗了,Consumer一旦發(fā)生重啟,將會(huì)導(dǎo)致大量的消息重復(fù)
其次按照網(wǎng)絡(luò)請(qǐng)求的頻次來(lái)提交位點(diǎn)的話(huà),程序?qū)⒆兊貌粔蜢`活,業(yè)務(wù)端對(duì)于消息的處理會(huì)有自己的理解,將提交位點(diǎn)的發(fā)起動(dòng)作放在Consumer,設(shè)計(jì)更有彈性
四、Consumer網(wǎng)絡(luò)模型簡(jiǎn)介
4.1、單線程的Consumer
在開(kāi)始介紹Consumer端的網(wǎng)絡(luò)模型之前,我們先看下Producer的

可見(jiàn)Producer是線程安全的,Producer內(nèi)部維護(hù)了一個(gè)并發(fā)的緩存隊(duì)列,所有的數(shù)據(jù)都會(huì)先寫(xiě)入隊(duì)列,然后由Sender線程負(fù)責(zé)將其發(fā)送至網(wǎng)絡(luò)
而Consumer則不同,我們羅列一下Consumer的特點(diǎn)
- 單線程(業(yè)務(wù)處理與網(wǎng)絡(luò)共享一個(gè)線程)
- 非線程安全
不過(guò)這里說(shuō)的單線程不夠嚴(yán)謹(jǐn),在0.10.1版本以后:
- Subscribe模式下,Consumer將心跳邏輯放在了一個(gè)獨(dú)立線程中,如果消息處理邏輯不能在
max.poll.interval.ms內(nèi)完成,則consumer將停止發(fā)送心跳,然后發(fā)送LeaveGroup請(qǐng)求主動(dòng)離組,從而引發(fā)coordinator開(kāi)啟新一輪rebalance - Assign模式下,則只有一個(gè)Main線程

用戶(hù)處理業(yè)務(wù)邏輯的時(shí)間可能會(huì)很長(zhǎng),因此心跳線程的引入主要是為了解決心跳問(wèn)題,其非常輕量,因此我們泛泛的講,Consumer其實(shí)就是單線程的,包括提交位點(diǎn),那一個(gè)單線程的客戶(hù)端是如何保證高效的吞吐,又是如何與用戶(hù)處理數(shù)據(jù)的邏輯解耦呢?其實(shí)這是個(gè)很有意思,也很有深度的問(wèn)題,但不是本文的重點(diǎn),后續(xù)我們?cè)僬归_(kāi)詳聊
因此我們知道,所有提交位點(diǎn)的動(dòng)作均是交由Consumer Main線程來(lái)提交的,但是單線程并不意味著阻塞,不要忘記,我們底層依賴(lài)的是JDK的NIO,因此網(wǎng)絡(luò)發(fā)送、接受部分均是異步執(zhí)行的
4.2、網(wǎng)絡(luò)模型
既然Consumer是單線程,而NIO是異步的,那么Consumer如何處理這些網(wǎng)絡(luò)請(qǐng)求呢?Producer比較好理解,有一個(gè)專(zhuān)門(mén)負(fù)責(zé)交互的Sender線程,而單線程的Consumer如何處理呢

其實(shí)Consumer所有的網(wǎng)絡(luò)發(fā)送動(dòng)作,均放在main線程中,而在Consumer內(nèi)部,為每個(gè)建聯(lián)的Broker都維護(hù)了一個(gè)unsent列表,這個(gè)列表中存放了待發(fā)送的請(qǐng)求,每次業(yè)務(wù)端程序執(zhí)行consumer.poll()方法時(shí),會(huì)先后觸發(fā)2次網(wǎng)絡(luò)發(fā)送的操作:
- 嘗試將所有Broker待發(fā)送區(qū)的數(shù)據(jù)發(fā)送出去
- 處理網(wǎng)絡(luò)接收到的請(qǐng)求
- 嘗試將所有Broker待發(fā)送區(qū)的數(shù)據(jù)發(fā)送出去(again)
回到我們位點(diǎn)提交的case中,如果某個(gè)Broker積攢了大量的未發(fā)送請(qǐng)求,那提交位點(diǎn)的請(qǐng)求豈不是要等待很久才能發(fā)出去?是的,如果unsent列表中有很多請(qǐng)求確實(shí)會(huì)這樣,不過(guò)正常情況下,同一個(gè)Broker中不會(huì)積攢大量請(qǐng)求,如果一次從Broker中拉取的消息還沒(méi)有被消費(fèi)完,是不會(huì)向該Broker再次發(fā)送請(qǐng)求的,因此業(yè)務(wù)poll()的頻率是要遠(yuǎn)高于網(wǎng)絡(luò)發(fā)送頻率的,而單次poll時(shí),又會(huì)觸發(fā)2次trySend,因此可保證不存在unsent列表的數(shù)據(jù)過(guò)多而發(fā)不出的情況
BTW:Consumer的網(wǎng)絡(luò)三件套:NetworkClient、Selector、KafkaChannel與Producer是完全一樣的。關(guān)于Consumer的核心組件,盜用一張網(wǎng)上的圖

有了上面的基礎(chǔ),我們?cè)賮?lái)討論位點(diǎn)提交的方式,就會(huì)變得非常清晰明朗了
五、手動(dòng)-異步提交
執(zhí)行異步提交的代碼通常是這樣寫(xiě)的
while (true) {
// 拉取消息
ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofSeconds(1));
// 如果拉取的消息不為空
if (!records.isEmpty()) {
// 執(zhí)行常規(guī)業(yè)務(wù)處理邏輯
doBusiness(records);
// 異步提交位點(diǎn)
kafkaConsumer.commitAsync();
}
}
kafkaConsumer.commitAsync() 負(fù)責(zé)拼接提交位點(diǎn)的request,然后將請(qǐng)求放在對(duì)應(yīng)Broker的unsent列表中,程序?qū)⒎祷兀较乱淮螛I(yè)務(wù)執(zhí)行poll(),或合適的時(shí)機(jī),會(huì)將此請(qǐng)求發(fā)出去,并不阻塞main線程
而對(duì)于提交位點(diǎn)的結(jié)果,如果指定了回調(diào)函數(shù),如下:
kafkaConsumer.commitAsync(new OffsetCommitCallback() {
@Override
public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
}
});
可以對(duì)異常進(jìn)行處理,也可以拿到實(shí)時(shí)提交的位點(diǎn)
而對(duì)于沒(méi)有指定回調(diào)函數(shù)的case,Consumer會(huì)提供一個(gè)默認(rèn)的回調(diào)函數(shù)org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.DefaultOffsetCommitCallback,在發(fā)生異常時(shí),輸出error日志
六、手動(dòng)-同步提交
而對(duì)于同步提交
kafkaConsumer.commitSync();
首先需要明確的是,同步提交是會(huì)阻塞Consumer的Main線程的,手動(dòng)提交會(huì)首先將提交請(qǐng)求放在對(duì)應(yīng)Broker的unsent列表的尾部,繼而不斷地觸發(fā)調(diào)用,將網(wǎng)絡(luò)待發(fā)送區(qū)的數(shù)據(jù)發(fā)送出去,同時(shí)不間斷接收網(wǎng)絡(luò)請(qǐng)求,直到收到本次提交的響應(yīng);不過(guò)同步提交也有超時(shí)時(shí)間,默認(rèn)為60s,如果超時(shí),將會(huì)拋出TimeoutException異常
同步提交是低效的,會(huì)影響Consumer整體的消費(fèi)吞吐,而有些相對(duì)嚴(yán)苛的業(yè)務(wù)場(chǎng)景,同步提交又是必不可少的,讀者根據(jù)自己的業(yè)務(wù)case來(lái)決定使用哪種策略
七、自動(dòng)提交
與手動(dòng)提交相對(duì)的,便是自動(dòng)提交,首先明確一點(diǎn),自動(dòng)提交的模式,是異步提交
自動(dòng)提交并不是啟動(dòng)一個(gè)全新的線程去提交位點(diǎn),也不是嚴(yán)格按照固定時(shí)間間隔去提交。自動(dòng)提交與手動(dòng)提交一樣,也是由Consumer Main線程觸發(fā)的

由于位點(diǎn)提交、處理業(yè)務(wù)邏輯、網(wǎng)絡(luò)收發(fā)、元數(shù)據(jù)更新等,都共享了Consumer的Main線程,因此并不能保證提交位點(diǎn)的時(shí)間間隔嚴(yán)格控制在auto.commit.interval.ms(默認(rèn)5000,即5s)內(nèi),因此真實(shí)提交位點(diǎn)的時(shí)間間隔只會(huì)大于等于auto.commit.interval.ms
總結(jié)一下自動(dòng)提交的特點(diǎn):
- 異步提交
- 提交操作由Consumer的Main線程發(fā)起
- 配置
auto.commit.interval.ms只能保證提交的最小間隔,真實(shí)提交時(shí)間間隔通常大于此配置
至此,我們嘗試回答一下剛開(kāi)始提出的問(wèn)題
- 手動(dòng)的同步提交與異步提交有什么區(qū)別?
- 同步提交會(huì)阻塞Consumer的Main線程,相對(duì)而言,異步提交性能更高
- 使用自動(dòng)提交模式時(shí),提交動(dòng)作是同步的還是異步的?
- 異步的
- 消費(fèi)模式使用assign或subscribe,在提交位點(diǎn)時(shí)有區(qū)別嗎?
- subscribe模式會(huì)有心跳線程,心跳線程維護(hù)了與Coordinator的建聯(lián)
- 同步提交與異步提交能否混合使用?
- 可以,通常在大部分場(chǎng)景使用異步提交,而在需要明確拿到已提交位點(diǎn)的case下使用同步提交
- 手動(dòng)提交與自動(dòng)提交能否混合使用?
- 可以,不過(guò)語(yǔ)義上會(huì)有很多沖突,不建議混合使用
八、開(kāi)源Bug
回到文章剛開(kāi)始提到的異常報(bào)錯(cuò)
“Offset commit failed with a retriable exception. You should retry committing the latest consumed offsets. The coordinator is not available.”
這個(gè)bug并不是在所有case下都會(huì)存在
- Subscribe
- 自動(dòng)提交 -- 正常運(yùn)行
- 手動(dòng)-異步提交 -- 正常運(yùn)行
- 手動(dòng)-同步提交 -- 正常運(yùn)行
- Assign
- 自動(dòng)提交 -- Bug
- 手動(dòng)-異步提交 -- Bug
- 手動(dòng)-同步提交 -- 正常運(yùn)行
為什么會(huì)出現(xiàn)如何奇怪的情況呢?其實(shí)跟一下源碼便會(huì)有結(jié)論
8.1、Subscribe
在Subscribe模式下,Consumer與Coordinator的交互是通過(guò)線程org.apache.kafka.clients.consumer.internals.AbstractCoordinator.HeartbeatThread進(jìn)行的。當(dāng)程序發(fā)現(xiàn)Coordinator找不到時(shí),便會(huì)發(fā)起尋找Coordinator的網(wǎng)絡(luò)請(qǐng)求,方法如下
// org.apache.kafka.clients.consumer.internals.AbstractCoordinator#lookupCoordinator
protected synchronized RequestFuture<Void> lookupCoordinator() {
if (findCoordinatorFuture == null) {
// find a node to ask about the coordinator
Node node = this.client.leastLoadedNode();
if (node == null) {
log.debug("No broker available to send FindCoordinator request");
return RequestFuture.noBrokersAvailable();
} else {
findCoordinatorFuture = sendFindCoordinatorRequest(node);
}
}
return findCoordinatorFuture;
}
而其中涉及一個(gè)findCoordinatorFuture的成員變量,必須要滿(mǎn)足findCoordinatorFuture == null,才會(huì)真正發(fā)起網(wǎng)絡(luò)請(qǐng)求,因此在方法執(zhí)行完,需要將其置空,如下方法
// org.apache.kafka.clients.consumer.internals.AbstractCoordinator#clearFindCoordinatorFuture
private synchronized void clearFindCoordinatorFuture() {
findCoordinatorFuture = null;
}
說(shuō)白了,也就是每次調(diào)用,都需要lookupCoordinator()與clearFindCoordinatorFuture()成對(duì)兒出現(xiàn);當(dāng)然心跳線程也是這樣做的
if (coordinatorUnknown()) {
if (findCoordinatorFuture != null) {
// clear the future so that after the backoff, if the hb still sees coordinator unknown in
// the next iteration it will try to re-discover the coordinator in case the main thread cannot
// 清理輔助變量findCoordinatorFuture
clearFindCoordinatorFuture();
// backoff properly
AbstractCoordinator.this.wait(rebalanceConfig.retryBackoffMs);
} else {
// 尋找Coordinator
lookupCoordinator();
}
}
因此在Subscribe模式下,無(wú)論何種提交方式,都是沒(méi)有Bug的
8.2、Assign
因?yàn)樽詣?dòng)提交也是異步提交,因此我們只聚焦在同步提交與異步提交。其實(shí)同步提交與異步提交,它們構(gòu)建入?yún)ⅰ⑻幚眄憫?yīng)等均是調(diào)用的同一個(gè)方法,唯一不同的是發(fā)起調(diào)用處的邏輯。我們先看下同步提交的邏輯
// org.apache.kafka.clients.consumer.internals.AbstractCoordinator#ensureCoordinatorReady
protected synchronized boolean ensureCoordinatorReady(final Timer timer) {
if (!coordinatorUnknown())
return true;
do {
if (fatalFindCoordinatorException != null) {
final RuntimeException fatalException = fatalFindCoordinatorException;
fatalFindCoordinatorException = null;
throw fatalException;
}
final RequestFuture<Void> future = lookupCoordinator();
// some other business
// .......
clearFindCoordinatorFuture();
if (fatalException != null)
throw fatalException;
} while (coordinatorUnknown() && timer.notExpired());
return !coordinatorUnknown();
}
沒(méi)有問(wèn)題,lookupCoordinator()與clearFindCoordinatorFuture()又成對(duì)兒出現(xiàn)
而異步提交呢?
// org.apache.kafka.clients.consumer.internals.ConsumerCoordinator#commitOffsetsAsync
public void commitOffsetsAsync(final Map<TopicPartition, OffsetAndMetadata> offsets, final OffsetCommitCallback callback) {
invokeCompletedOffsetCommitCallbacks();
if (!coordinatorUnknown()) {
doCommitOffsetsAsync(offsets, callback);
} else {
// we don't know the current coordinator, so try to find it and then send the commit
// or fail (we don't want recursive retries which can cause offset commits to arrive
// out of order). Note that there may be multiple offset commits chained to the same
// coordinator lookup request. This is fine because the listeners will be invoked in
// the same order that they were added. Note also that AbstractCoordinator prevents
// multiple concurrent coordinator lookup requests.
pendingAsyncCommits.incrementAndGet();
lookupCoordinator().addListener(new RequestFutureListener<Void>() {
@Override
public void onSuccess(Void value) {
// do something
}
@Override
public void onFailure(RuntimeException e) {
// do something
}
});
}
// ensure the commit has a chance to be transmitted (without blocking on its completion).
// Note that commits are treated as heartbeats by the coordinator, so there is no need to
// explicitly allow heartbeats through delayed task execution.
client.pollNoWakeup();
}
非常遺憾,只有lookupCoordinator(),卻沒(méi)有clearFindCoordinatorFuture(),導(dǎo)致成員變量一直得不到重置,也就無(wú)法正常發(fā)起尋找Coordinator的請(qǐng)求,其實(shí)如果修復(fù)的話(huà),也非常簡(jiǎn)單,只需要在RequestFutureListener的回調(diào)結(jié)果中顯式調(diào)用clearFindCoordinatorFuture()即可
這個(gè)bug隱藏的很深,只靠單測(cè),感覺(jué)還是很難發(fā)現(xiàn)的,bug已經(jīng)在3.2.1版本修復(fù)。雖然我們生產(chǎn)環(huán)境是2.8.2的Broker,但是還是可以直接通過(guò)升級(jí)Consumer版本來(lái)解決,即便client版本高于了server端。這個(gè)當(dāng)然得益于Kafka靈活的版本策略,還是要為其點(diǎn)個(gè)贊的
參考文檔

浙公網(wǎng)安備 33010602011771號(hào)