Kafka消費消息丟失排查,原因竟是groupId重復(fù)
現(xiàn)象
BI的同事發(fā)現(xiàn)某指標(biāo)數(shù)據(jù)展示有問題,發(fā)現(xiàn)最近入庫的數(shù)據(jù)缺失,然后反饋到DBA. 經(jīng)DBA排查后發(fā)現(xiàn)原始數(shù)據(jù)缺少.
排查
- 之前筆者在休假,同事初步排查懷疑是消息阻塞導(dǎo)致.經(jīng)過代碼調(diào)整發(fā)版之后發(fā)現(xiàn)還是有情況發(fā)生.
- 筆者接手之后,在本地打印指定點位的消息,發(fā)現(xiàn)沒有丟失消息的情況.(15分鐘一條消息)
于是在線上系統(tǒng)中添加了打印指定點位的日志.(發(fā)版下班) - 第二天,查看日志發(fā)現(xiàn)有缺失情況,本地打印繼續(xù)開啟 發(fā)現(xiàn)沒有復(fù)現(xiàn)
于是查詢消費組,收集到一組 host(ip)
@Test
public void showGroupInfo() throws ExecutionException, InterruptedException {
String id = "kafka消費組id";
DescribeConsumerGroupsResult describeConsumerGroupsResult = admin.describeConsumerGroups(Collections.singleton(id));
final KafkaFuture<Map<String, ConsumerGroupDescription>> all = describeConsumerGroupsResult.all();
final Map<String, ConsumerGroupDescription> stringConsumerGroupDescriptionMap = all.get();
final Set<Map.Entry<String, ConsumerGroupDescription>> entries1 = stringConsumerGroupDescriptionMap.entrySet();
for (Map.Entry<String, ConsumerGroupDescription> stringConsumerGroupDescriptionEntry : entries1) {
final ConsumerGroupDescription value = stringConsumerGroupDescriptionEntry.getValue();
final Collection<MemberDescription> members = value.members();
for (MemberDescription member : members) {
String s1 = member.consumerId();
String host = member.host();
String s = member.clientId();
String s2 = member.assignment().toString();
System.out.printf("clientId[%s],memberId[%s],host[%s],assignment[%s]%n", s, s1, host, s2);
}
}
}
- 找運維同事查看ip之后發(fā)現(xiàn)了另一個項目,拉取jar反編譯之后發(fā)現(xiàn)配置文件中kafka 消費者 groupId 配置相同
- 反饋相關(guān)項目負責(zé)人
疑問
如何統(tǒng)一管理/監(jiān)控 kafka group 劃分 避免此類問題發(fā)生?
kafka groupId 是否要按照微服務(wù)項目來劃分?

浙公網(wǎng)安備 33010602011771號