Kafka筆記4(消費者)
消費者和消費群組:
Kafka消費者從屬于消費者群組,一個群組里的消費者訂閱的是同一個主題,每個消費者接收主題的一部分分區消息
消費者的數量不要超過主題分區的數量,多余的消費者只會被閑置
一個主題可以被多個消費群組使用,消費者群組之間互不影響

當一個消費者加入群組時,他讀取的數據是原本由其他消費者讀取的信息
分區的所有權從一個消費者轉移至另一個消費者的行為稱為“再均衡”
再均衡期間,消費者當前的讀取狀態會丟失,消費者無法讀取信息,造成集群一小段時間的不可用,在恢復狀態之前會拖慢應用程序
消費者通過向群組協調器broker發送心跳維持他們和群組的從屬關系以及他們對分區的所有權關系,如果broker認為消費者死亡會觸發再均衡行為
分配分區過程:
當消費者加入群組時,他會向群組協調器發送一個JoinGroup請求,第一個加入群組的消費者稱為群主,群主從協調器那里獲得群組的成員列表,并負責給每一個消費者分配分區。他使用一個實現PartitionAssignor接口的類來決定哪些分區應該被分配給消費者,分配完畢之后,群主把分配情況列表發送給broker,broker再把這些信息發送給所有消費者,每個消費者只能看到自己的分配信息,只有群主知道群組的所有消費者的分配信息
消息輪詢是消費者API核心,通過從一個簡單的輪詢向服務器請求數據,一旦消費者訂閱了主題,輪詢就會處理所有細節,包括群組協調/分區再均衡/發送心跳/獲取數據
一個消費者使用一個線程
消費者重要的屬性參數配置:
fetch.min.bytes
指定了消費者從服務器獲取記錄的最小字節數,如果broker收到消費者請求,但數據可用量小于fetch.min.bytes,就會等到有足夠的可用數據才把它返回給消費者
fetch.max.wait.ms
指定broker等待時間,默認500ms
max.partition.fetch.bytes
指定服務器從每個分區里返回給消費者的最大字節數,默認1MB max.partition.fetch.size的值必須比broker能接收的最大消息字節數(max.message.size)大
session.timeout.ms
指定消費者在被認為死亡之前可以與服務器斷開連接的時間,默認3S
heartbeat.interval.ms = session.timeout.ms / 3
auto.offset.reset
指定消費者在讀取一個沒有偏移量的分區或者偏移量無效的情況下該如何處理
=latest 消費者從最新的記錄開始讀取數據
=earliest 消費者從起始位置讀取分區記錄
enable.auto.commit
指定消費者是否自動提交偏移量,默認true
auto.commit.interval.ms 控制提交頻率
partition.assignment.strategy
=org.apache.kafka.clients.consumer.RangeAssignor 把主題的若干連續分區分配給消費者
=org.apache.kafka.clients.consumer.RoundRobinAssignor 把主題的所有分區逐個分配給消費者
client.id
任意字符串,broker用來標識從客戶端發送來的消息
max.poll.records
用于控制單次調用call() 方法返回的記錄數量,可以幫你控制在輪詢里需要處理的數據量
receive.buffer.bytes 和 send.buffer.bytes
默認-1
更新分區當前位置的操作叫提交
消費者會向一個叫做 _consumer_offset 的特殊主題發送消息,消息里包含了每個分區的偏移量


Kafka可以設置消費者自動提交偏移量,設置enable.auto.commit=true,提交時間間隔auto.commit.interval.ms=5s
自動提交是在輪詢里進行的,消費者每次輪詢時會檢查是否該提交偏移量了,是則提交上一次輪詢返回的偏移量
提交當前偏移量,使用API函數 commitSync()
異步提交偏移量,使用API函數commitAsync()
可以使用一個單調遞增的序列號來維護異步提交順序
浙公網安備 33010602011771號