kafka的消費全流程
多線程安全問題
多線程安全:定義為:多線程去訪問一個類,這個類始終表現出正確的行為,不管運行的環境、讓線程交替執行、不需要任何的額外的同步、協同,都能表現出正確的行為。
比如:i++,這個肯定不是多線程安全的。
kafka中:
生產者:是多線程安全。(不可變的方式去解決)
KafkaProducer類中所有的成員變量,都是private、final的
消費者:不是多線程安全。消費的時候拿到一個分區,比如consumer1拿到分區0進行消費的時候,與此同時還需要進行ack確認,要確認消費的這一條或者幾條是否被成功的消費了,這種情況下,consumer是很難確保多線程安全的。那么, 如果你想又要使用多線程,又想使用consumer,該怎么辦?
思路:如果所有的成員變量都是不可變,final的,那一定是多線程安全的。還有一種確保多線程安全:線程封閉(每個線程實例化一個consumer對象)
多線程下使用生產者:
package com.msb.concurrent;
import com.msb.selfserial.User;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* 多線程下使用生產者
*/
public class KafkaConProducer {
//發送消息的個數
private static final int MSG_SIZE = 1000;
//負責發送消息的線程池
private static ExecutorService executorService = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
private static CountDownLatch countDownLatch = new CountDownLatch(MSG_SIZE);
private static User makeUser(int id) {
User user = new User(id);
String userName = "msb_" + id;
user.setName(userName);
return user;
}
/**
* 發送消息的任務
*/
private static class ProduceWorker implements Runnable {
private ProducerRecord<String, String> record;
private KafkaProducer<String, String> producer;
public ProduceWorker(ProducerRecord<String, String> record, KafkaProducer<String, String> producer) {
this.record = record;
this.producer = producer;
}
@Override
public void run() {
final String threadName = Thread.currentThread().getName();
producer.send(record, new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
if (null != exception) {
exception.printStackTrace();
}
if (null != metadata) {
System.out.println(threadName + "|" + String.format("偏移量:%s,分區:%s", metadata.offset(), metadata.partition()));
}
}
});
countDownLatch.countDown();
}
}
public static void main(String[] args) {
Properties properties = new Properties();
properties.put("bootstrap.servers", "127.0.0.1:9092");
properties.put("key.serializer", StringSerializer.class);
properties.put("value.serializer", StringSerializer.class);
KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
try {
for (int i = 0; i < MSG_SIZE; i++) {
User user = makeUser(i);
ProducerRecord<String, String> record = new ProducerRecord<>("concurrent-ConsumerOffsets", null, System.currentTimeMillis(), user.getId() + "", user.toString());
executorService.submit(new ProduceWorker(record, producer));
}
countDownLatch.await();
} catch (Exception e) {
e.printStackTrace();
} finally {
producer.close();
executorService.shutdown();
}
}
}
多線程下使用消費者:
package com.msb.concurrent;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* 多線程下正確的使用消費者,需要記住:一個線程一個消費者
*/
public class KafkaConConsumer {
public static final int CONCURRENT_PARTITIONS_COUNT = 2;
private static ExecutorService executorService = Executors.newFixedThreadPool(CONCURRENT_PARTITIONS_COUNT);
private static class ConsumerWorker implements Runnable {
private KafkaConsumer<String, String> consumer;
public ConsumerWorker(Map<String, Object> config, String topic) {
Properties properties = new Properties();
properties.putAll(config);
//一個線程一個消費者
this.consumer = new KafkaConsumer<>(properties);
this.consumer.subscribe(Collections.singletonList(topic));
}
@Override
public void run() {
final String threadName = Thread.currentThread().getName();
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
for (ConsumerRecord<String, String> record : records) {
System.out.println(threadName + "|" + String.format("主題:%s,分區:%d,偏移量:%d,key:%s,value:%s",
record.topic(), record.partition(), record.offset(), record.key(), record.value()));
}
}
}
}
public static void main(String[] args) {
Map<String, Object> properties = new HashMap<>();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "c_test");
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
for (int i = 0; i < CONCURRENT_PARTITIONS_COUNT; i++) {
//一個線程一個消費者
executorService.submit(new ConsumerWorker(properties, "concurrent-ConsumerOffsets"));
}
}
}
群組協調
1、組協調器(broker)
1)選舉leader消費者客戶端
2)處理申請加入組的客戶端
3)再平衡后,同步新的分配方案
4)心跳(與客戶端維持),判斷哪些客戶端離線
5)管理消費者已經消費的偏移量(consumer_offset,默認50個)
2、消費者協調器(客戶端)
1)向組協調器發起入組請求(第一個成為群主)
2)發起同步組(消費者分區劃分情況)的請求(leader客戶端,負責計算,分配策略作為入參傳入)
3)心跳(與組協調器維持)
4)發起已經提交的消費者偏移量的請求(ACK確認)
5)主動的發起離組請求
當一個消費者加入分組(組協調器、消費者協調器,干的哪些事情)
1、消費客戶端啟動、重連(JoinGroup,請求->組協調器)
2、客戶端已經完成JoinGroup,客戶端(消費者協調器)再次向組協調器發起SyncGroup(同步組),獲取新的分配方案
3、客戶端關機、異常,觸發離組。
4、客戶端加入一組之后(一直保持心跳)
分區再均衡



kafka的消費者分區再均衡問題及案例

tips:
消費者是以群組名進行消費的,比如:
topic: 名為msb,里面有100條消息
groupID:test,名為test會將這100條消息消費;
groupID:test1,又定義了一個群組,名為test1,又會拿到100條消息,進行消費;
groupID:test2,再定義了一個消費者群組,名為test2,他也會重新消費這100條消息;
上面的三個群組,test、test1、test2之間提交什么東西,偏移量等等,相互沒有任何關系的,只要groupID的名字是不一樣的。因為消費者群組的概念主要用于什么?
當你的上游系統比如說是訂單系統,產生了一批訂單,比如100條訂單信息,下游系統有:物流系統,他要拿到這100筆訂單去發物流;下面還有通知系統,他也要拿到這100條消息進行通知發送;下面還有會員系統,購買了東西加一些積分什么的,他也要拿到這100條訂單進行積分處理。所以群組的概念就在于你在進行這種生產消費的時候,生產的100條消息這是屬于生產者,然后消費的時候,我有可能有不同的系統:物流系統是一個群組,積分系統是一個群組,會員系統是一個群組,我們之間是沒有任何影響的,物流系統消費了80條消息,這和通知系統有什么關系呢?沒關系。
--

浙公網安備 33010602011771號