關于kafka
一、kafka中的基本概念:
主題和分區:分區的主要意義主要是為了提高消費的并發度。一個生產者,可以啟8個消費者來跑。分區的數量主要是為了平衡生產和消費的速度。kafka沒有隊列的概念,可以把分區理解為隊列。
偏移量:發消息的時候每一個消息存到分區里面都有offset偏移量,消費的時候是以群組進行消費。
二、kafka中的生產和消費流程

三、kafka一條消息發送和消費的流程(非集群)
一個java簡單的kafka生產消費入門例子:
kafka生產者:
package com.msb.simple;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
/**
* kafka生產者
*/
public class HelloKafkaProducer {
public static void main(String[] args) {
Properties properties = new Properties();
/**
* 指定連接的kafka服務器地址,這里默認本地,默認端口
* 可以配置多臺服務器,逗號分隔,這種一般搭建的是集群,其中一個宕機,生產者依然可以連上
*/
properties.put("bootstrap.servers", "127.0.0.1:9092");
/**
* 設置鍵值的序列化
* 序列化的本質是,將對象 -> 二進制字節數組,能夠在網絡上傳輸,網絡通訊層計算機只認byte[]
*/
properties.put("key.serializer", StringSerializer.class);
properties.put("value.serializer", StringSerializer.class);
//構建kafka生產者對象
KafkaProducer<String, String> producer = new KafkaProducer<String, String>(properties);
//構建消息
ProducerRecord<String, String> record = new ProducerRecord<>("msb", "teacher", "lijin");
//發送消息
producer.send(record);
System.out.println("message is sent.");
//釋放連接
producer.close();
}
}
kafka消費者:
package com.msb.simple;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
/**
* 消費者入門
*/
public class HelloKafkaConsumer {
public static void main(String[] args) {
Properties properties = new Properties();
properties.put("bootstrap.servers", "127.0.0.1:9092");
/**
* 設置鍵值的反序列化;生產者發過來的是byte[]。
* 和生產者序列化的方式一樣,使用對應的反序列化
*/
properties.put("key.deserializer", StringDeserializer.class);
properties.put("value.deserializer", StringDeserializer.class);
//設置group id
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test");
//構建kafka消費者對象
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);
//訂閱主題,可以訂閱多個
consumer.subscribe(Collections.singletonList("msb"));
try {
while (true) {
/**
* 調用消費者拉取消息
* 每隔1s拉取一次消息
* 其實是poll時,超過1s就超時,是timeout的概念
*/
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
for (ConsumerRecord<String, String> record : records) {
String key = record.key();
String value = record.value();
System.out.println("接收到消息: key = " + key + ", value = " + value);
}
}
} catch (Exception e) {
e.printStackTrace();
//釋放連接
consumer.close();
}
}
}
org.apache.kafka的maven配置:
pom.xml:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.kafka</groupId>
<artifactId>demo</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>msb-kafka-client</name>
<properties>
<java.version>1.8</java.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<spring-boot.version>2.6.13</spring-boot.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.3.1</version>
</dependency>
</dependencies>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-dependencies</artifactId>
<version>${spring-boot.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.1</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
<encoding>UTF-8</encoding>
</configuration>
</plugin>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<version>${spring-boot.version}</version>
<configuration>
<mainClass>com.msb.MsbKafkaClientApplication</mainClass>
<skip>true</skip>
</configuration>
<executions>
<execution>
<id>repackage</id>
<goals>
<goal>repackage</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
四、發送消息時自定義序列化,發送一個java對象:
該自定義實現只是為了演示怎么用,小例子,因為如果user加了屬性,這部分代碼要跟著改,所以僅僅只是演示怎么自定義使用。
自定義序列化器
package com.msb.selfserial;
import org.apache.kafka.common.serialization.Serializer;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
/**
* 自定義序列化器
*/
public class UserSerializer implements Serializer<User> {
/**
* 將user對象進行序列化,就是將user轉化為byte[]
* 可以這么定義這個字節數組:
* 前4位是id + 4位name的長度 + name字符串轉化后的字節數組
* 因為:
* int類型是4位;
* 字符串的長度也是int類型,也只需要4位;
* String字符串可以很長很長,所以設計一個長度。
*/
@Override
public byte[] serialize(String topic, User data) {
byte[] name;
int nameSize;
if (data == null) {
return null;
}
if (data.getName() != null) {
name = data.getName().getBytes(StandardCharsets.UTF_8);
nameSize = data.getName().length();
} else {
name = new byte[0];
nameSize = 0;
}
ByteBuffer buffer = ByteBuffer.allocate(4 + 4 + nameSize);
buffer.putInt(data.getId());
buffer.putInt(nameSize);
buffer.put(name);
return buffer.array();
}
}
自定義反序列化器
package com.msb.selfserial;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.serialization.Deserializer;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
/**
* 自定義反序列化器
*/
public class UserDeserializer implements Deserializer<User> {
@Override
public User deserialize(String topic, byte[] data) {
if (data == null) {
return null;
}
if (data.length < 8) {
throw new SerializationException("error data size.");
}
ByteBuffer buffer = ByteBuffer.wrap(data);
int id = buffer.getInt();
int nameSize = buffer.getInt();
byte[] nameByte = new byte[nameSize];
buffer.get(nameByte);
String name = new String(nameByte, StandardCharsets.UTF_8);
return new User(id, name);
}
}
生產者
package com.msb.selfserial;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
/**
* kafka生產者
*/
public class ProducerUser {
public static void main(String[] args) {
Properties properties = new Properties();
properties.put("bootstrap.servers", "127.0.0.1:9092");
properties.put("key.serializer", StringSerializer.class);
properties.put("value.serializer", UserSerializer.class);
KafkaProducer<String, User> producer = new KafkaProducer<String, User>(properties);
//構建消息
ProducerRecord<String, User> record = new ProducerRecord<>("msb-user", "teacher", new User(1, "lijin"));
//發送消息
producer.send(record);
System.out.println("message is sent.");
//釋放連接
producer.close();
}
}
消費者
package com.msb.selfserial;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
public class ConsumerUser {
public static void main(String[] args) {
Properties properties = new Properties();
properties.put("bootstrap.servers", "127.0.0.1:9092");
properties.put("key.deserializer", StringDeserializer.class);
properties.put("value.deserializer", UserDeserializer.class);
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test");
//構建kafka消費者對象
KafkaConsumer<String, User> consumer = new KafkaConsumer<String, User>(properties);
//訂閱主題,可以訂閱多個
consumer.subscribe(Collections.singletonList("msb-user"));
try {
while (true) {
ConsumerRecords<String, User> records = consumer.poll(Duration.ofSeconds(1));
for (ConsumerRecord<String, User> record : records) {
String key = record.key();
User user = record.value();
System.out.println("接收到消息: key = " + key + ", value = " + user);
}
}
} catch (Exception e) {
e.printStackTrace();
consumer.close();
}
}
}
實體類-User:
package com.msb.selfserial;
import lombok.Getter;
import lombok.Setter;
import lombok.ToString;
/**
* 實體類:User
*/
@Getter
@Setter
@ToString
public class User {
private int id;
private String name;
public User(int id) {
this.id = id;
}
public User(int id, String name) {
this.id = id;
this.name = name;
}
}
五、分區器的使用
比如生產者在發送消息的時候,kafka的topic中存在4個分區,現在想要將發送第1條消息發到分區0,第二條消息發到分區1,第三條消息發到分區2,第四條消息發到分區3,請問怎么做?
前面我們寫入門消費者程序時,并沒有指定分區器,那么沒有使用嗎?其實是有默認的分區器的,DefaultPartitioner,它是根據key值來分區。
1.系統自帶的分區器
查看代碼
package com.msb.selfpartiton;
import com.msb.selfserial.User;
import com.msb.selfserial.UserSerializer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.clients.producer.RoundRobinPartitioner;
import org.apache.kafka.clients.producer.UniformStickyPartitioner;
import org.apache.kafka.clients.producer.internals.DefaultPartitioner;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
/**
* 系統自帶的分區器
*/
public class SysPartitionProducer {
/**
* 默認分區器:
* 發送10條消息,采用默認的分區器,key值都是一樣的,這樣是發到不同的分區,還是相同的分區呢?
* 做這個試險前,需要將kafka的啟動配置,server.properties中的num.partitions=1改成num.partitions=4,這樣表示創建一個topic主題,底下就有4個分區。
* 都是相同的分區,因為默認的分區器DefaultPartitioner只是根據key值來分區。
* 所以下面測試程序中循環中key值都一樣所以都發往一個分區了。
*
* 統一粘性分區器:
*
*/
public static void main(String[] args) {
Properties properties = new Properties();
properties.put("bootstrap.servers", "127.0.0.1:9092");
properties.put("key.serializer", StringSerializer.class);
properties.put("value.serializer", UserSerializer.class);
//默認分區器
//properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, DefaultPartitioner.class);
//輪詢分區器
properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, RoundRobinPartitioner.class);
properties.put("partitioner.availability.timeout.ms", "0");
properties.put("partitioner.ignore.keys", "true");
//統一粘性分區器
//properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, UniformStickyPartitioner.class);
KafkaProducer<String, User> producer = new KafkaProducer<>(properties);
try {
for (int i = 0; i < 10; i++) {
ProducerRecord<String, User> record = new ProducerRecord<>("msb", "teacher", new User(1, "lijin"));
Future<RecordMetadata> future = producer.send(record);
RecordMetadata recordMetadata = future.get();
System.out.println(i + "," + "offset:" + recordMetadata.offset() + "," + "partition:" + recordMetadata.partition());
}
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
} finally {
producer.close();
}
System.out.println("message is sent.");
}
}
2.自定義分區器
說明一下,其實我們在構建消息的時候也可以指定分區,new ProducerRecord<>("msb", 0, "teacher", "lijin"),類似這樣。但是如果指定了分區,那么設置的分區器就沒有作用了,無論配什么分區器都沒用了,new ProducerRecord里面設置的partition優先級別最高。只會把每條消息都發送到partition:0
自定義分區器:
package com.msb.selfpartiton;
import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.utils.Utils;
import java.util.List;
import java.util.Map;
/**
* 自定義分區器,以value值進行分區
*/
public class SelfPartitioner implements Partitioner {
@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
// 獲取分區數量
List<PartitionInfo> partitionInfos = cluster.partitionsForTopic(topic);
int num = partitionInfos.size();
// 將value轉成正整數 再和分區數量進行模運算
return Utils.toPositive(Utils.murmur2(valueBytes)) % num; //來自DefaultPartitioner的處理,只不過默認的分區是按照key的值來分區的。
}
@Override
public void configure(Map<String, ?> configs) {
}
@Override
public void close() {
}
}
使用自定義分區器:
package com.msb.selfpartiton;
import com.msb.selfserial.User;
import com.msb.selfserial.UserSerializer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.clients.producer.RoundRobinPartitioner;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
/**
* 生產者,使用自定義的分區器
*/
public class SelfPartitionProducer {
public static void main(String[] args) {
Properties properties = new Properties();
properties.put("bootstrap.servers", "127.0.0.1:9092");
properties.put("key.serializer", StringSerializer.class);
properties.put("value.serializer", StringSerializer.class);
//自定義區器
properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, SelfPartitioner.class);
KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
try {
for (int i = 0; i < 10; i++) {
//構建消息
ProducerRecord<String, String> record = new ProducerRecord<>("msb", "teacher", "lijin" + i);
//指定partition
//ProducerRecord<String, String> record = new ProducerRecord<>("msb", 0, "teacher", "lijin");
Future<RecordMetadata> future = producer.send(record);
RecordMetadata recordMetadata = future.get();
System.out.println(i + "," + "offset:" + recordMetadata.offset() + "," + "partition:" + recordMetadata.partition());
}
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
} finally {
producer.close();
}
System.out.println("message is sent.");
}
}
六、以異步的方式發送消息
上面的例子producer.send之后返回future,都是在主線程里面future.get(),如果想通過異步線程,怎么做?
這么做:在調producer.send的時候,傳入Callback。
異步發送消息:
package com.msb.simple;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
/**
* kafka生產者
*/
public class AsynProducer {
public static void main(String[] args) {
Properties properties = new Properties();
properties.put("bootstrap.servers", "127.0.0.1:9092");
properties.put("key.serializer", StringSerializer.class);
properties.put("value.serializer", StringSerializer.class);
KafkaProducer<String, String> producer = new KafkaProducer<String, String>(properties);
ProducerRecord<String, String> record = new ProducerRecord<>("msb", "teacher", "lijin");
//發送消息
producer.send(record, new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception e) {
if (e == null) {
//沒有異常,輸出信息到控制臺
System.out.println("offset:" + metadata.offset() + ", partition:" + metadata.partition());
} else {
//出現異常打印
e.printStackTrace();
}
}
});
//釋放連接
producer.close();
}
}
七、緩沖

為什么kafka在客戶端發送的時候需要做一個緩沖呢?
1.減少IO的開銷(單個 -> 批次) ---- 需要改配置
2.減少GC(池化技術 ---- 緩沖池)
緩沖發送:主要是這兩個參數控制
大小控制:batch.size= 默認16384 (16K)
時間控制:linger.ms= 默認0,就是說一旦有消息來了不會延遲等待,立馬發送
緩沖池:池的大小
buffer.memory= 默認32M
消息發送的時候,調用send方法,做好了序列化器,設置好了分區器,開始發送的時候,當多個消息要發到同一個分區的時候,生產者就會把它放到同一個批次,batch.size可以指定這個批次的內存大小,是以字節進行計算的。
還有個是減少了GC,如果沒有緩沖池,生產者發送完消息后,byteBuffer沒有被使用了,垃圾回收器就會回收。有了緩沖池減少了GC。不然每次生產者發送消息的時候都要new ByteBuffer,再來發送,耗時。
八、群組消費

kafka的群組消費 ---- 負載均衡建立在分區級別
開始試驗,server.properties中的配置為:num.partitions=4。生產者會往topic中的四個分區發送消息。
注意:在增加consumer后,我們可能要稍微等一下,等Consumer做好負載再均衡。之后再重新啟動consumer去消費,才會看到效果。大概是5s鐘,Broker中的組協調器知道分區數多少,group里面有幾個消費者,再均衡監聽器發現多了consumer,這時候就會將topic中的分區進行重新分配,再對分區進行消費重分配的過程中,consumer是無法消費的,是阻塞住的。在觸發一次負載再均衡時,在觸發完成之前,消費者會阻塞。
生產者發送消息:
package com.msb.consumergroup;
import com.msb.selfpartiton.SelfPartitioner;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.clients.producer.UniformStickyPartitioner;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
/**
* kafka生產者:發送多條消息
*/
public class KafkaProducerGroup {
public static void main(String[] args) throws ExecutionException, InterruptedException {
Properties properties = new Properties();
properties.put("bootstrap.servers", "127.0.0.1:9092");
properties.put("key.serializer", StringSerializer.class);
properties.put("value.serializer", StringSerializer.class);
properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, SelfPartitioner.class);
//構建kafka生產者對象
KafkaProducer<String, String> producer = new KafkaProducer<String, String>(properties);
//發送消息
for (int i = 0; i < 8; i++) {
ProducerRecord<String, String> record = new ProducerRecord<>("msb", null, "lijin" + i);
Future<RecordMetadata> future = producer.send(record);
RecordMetadata recordMetadata = future.get();
System.out.println(i + "," + "offset:" + recordMetadata.offset() + "," + "partition:" + recordMetadata.partition());
}
System.out.println("message is sent.");
producer.close();
}
}
GroupA的Consumer1~5:
package com.msb.consumergroup;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
public class GroupAConsumer1 {
public static void main(String[] args) {
Properties properties = new Properties();
properties.put("bootstrap.servers", "127.0.0.1:9092");
properties.put("key.deserializer", StringDeserializer.class);
properties.put("value.deserializer", StringDeserializer.class);
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "groupA");
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);
consumer.subscribe(Collections.singletonList("msb"));
try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
for (ConsumerRecord<String, String> record : records) {
String key = record.key();
String value = record.value();
int partition = record.partition();
System.out.println("接收到消息: partition = " + partition + " , key = " + key + ", value = " + value);
}
}
} catch (Exception e) {
e.printStackTrace();
consumer.close();
}
}
}
GroupB的Consumer1:
package com.msb.consumergroup;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
public class GroupBConsumer1 {
public static void main(String[] args) {
Properties properties = new Properties();
properties.put("bootstrap.servers", "127.0.0.1:9092");
properties.put("key.deserializer", StringDeserializer.class);
properties.put("value.deserializer", StringDeserializer.class);
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "groupB");
//如果配置的是earliest,那么會將分區的,從頭開始消費
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
//這是默認配置,從已提交的offset開始消費
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);
consumer.subscribe(Collections.singletonList("msb"));
try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
for (ConsumerRecord<String, String> record : records) {
String key = record.key();
String value = record.value();
int partition = record.partition();
System.out.println("接收到消息: partition = " + partition + " , key = " + key + ", value = " + value);
}
}
} catch (Exception e) {
e.printStackTrace();
consumer.close();
}
}
}
1.生產者發送8條消息,只啟動1個消費者GroupAConsumer1時,收到全部消息:


2.生產者發送8條消息,啟動GroupAConsumer1、GroupAConsumer2:
GroupAConsumer1消費了分區0、1
GroupAConsumer2消費了分區2、3

3、生產者發送8條消息,啟動GroupAConsumer1、GroupAConsumer2、GroupAConsumer3:
GroupAConsumer1消費了分區0、1
GroupAConsumer2消費了分區2
GroupAConsumer3消費了分區3

4、生產者發送8條消息,啟動GroupAConsumer1、GroupAConsumer2、GroupAConsumer3、GroupAConsumer4:
GroupAConsumer1消費了分區0
GroupAConsumer2消費了分區1
GroupAConsumer3消費了分區2
GroupAConsumer4消費了分區3

5、生產者發送8條消息,啟動GroupAConsumer1、GroupAConsumer2、GroupAConsumer3、GroupAConsumer4、GroupAConsumer5:
GroupAConsumer1消費了分區0
GroupAConsumer2消費了分區1
GroupAConsumer3消費了分區2
GroupAConsumer5消費了分區3
GroupAConsumer4沒有任何消費
小結:
單個群組中不同消費者,可以動態的增加、刪除消費者,消費的核心原則是以分區作為負載均衡。一個分區不能被一個群組中的多個消費者消費,因為這會讓消息沒有順序。
6、增加GroupBConsumer1,并且啟動:
會發現GroupBConsumer1將之前分區中的所有消息都消費了。因為auto.offset.reset配置的是earliest,會將分區的消息offset從頭開始消費。
九、手動提交
上面例子中的消費者消費代碼,只要拿到了消息就是自動提交的。這種消費模式很容易造成問題,比如我在獲取消息并且進行處理時,明明拋出異常了,消費沒來及,但是在kafka來說,消費者拿到消息了就不管了,認為你已經提交了。這種稱之為自動提交的消費模式??梢愿某墒謩拥摹?/div>
將properties中的配置:enable.auto.commit 設置成false就可以了。在消費完后進行提交。如果不進行提交的話,再次消費的話又會從原offset開始拿。
手動提交代碼例子:
package com.msb.commit;
import org.apache.kafka.clients.consumer.CommitFailedException;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
public class ConsumerCommit {
/**
* 同步提交:
* commitSync之后立馬進行ack確認,往consumer_offsets中寫偏移量,consumer_offsets是一個特殊的目錄,專門保存消費者的偏移量。
* commitSync會帶來阻塞,因為他要等ack是否確認完成,甚至會不斷嘗試,直到提交成功為止。所以可能會給性能造成下降。
*
* 異步提交:
* 所以手動提交的話最好采用commitAsync,不會阻塞線程。
*
* 所以一般在接受消息處理業務完畢時就進行異步提交。最后再finally中再進行一次同步提交,以防萬一。
*/
public static void main(String[] args) {
Properties properties = new Properties();
properties.put("bootstrap.servers", "127.0.0.1:9092");
properties.put("key.deserializer", StringDeserializer.class);
properties.put("value.deserializer", StringDeserializer.class);
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test");
//取消自動提交
properties.put("enable.auto.commit", false);
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);
consumer.subscribe(Collections.singletonList("msb"));
try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
for (ConsumerRecord<String, String> record : records) {
String key = record.key();
String value = record.value();
int partition = record.partition();
System.out.println("接收到消息: partition = " + partition + " , key = " + key + ", value = " + value);
}
//異步提交,不阻塞應用程序的線程,不會重試(有可能失敗)
consumer.commitAsync();
}
} catch (CommitFailedException e) {
System.out.println("Commit Failed:");
e.printStackTrace();
} catch (Exception e) {
consumer.close();
} finally {
//同步提交,會阻塞我們應用的線程,并且會重試(一定成功)
consumer.commitSync();
}
}
}
如果每次消費的消息條數很多,這種提交的頻次是否可以自己控制呢?比如我想做到消費10條提交一次。可以采用如下的自定義提交方式:
手動提交(自定義的方式):
package com.msb.commit;
import org.apache.kafka.clients.consumer.CommitFailedException;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
/**
* 消費者,手動提交,自定義的方式
*/
public class ConsumerSpecial {
public static void main(String[] args) {
Properties properties = new Properties();
properties.put("bootstrap.servers", "127.0.0.1:9092");
properties.put("key.deserializer", StringDeserializer.class);
properties.put("value.deserializer", StringDeserializer.class);
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test");
//取消自動提交
properties.put("enable.auto.commit", false);
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);
//當前偏移量
Map<TopicPartition, OffsetAndMetadata> currOffsets = new HashMap<>();
int count = 0;
try {
consumer.subscribe(Collections.singletonList("msb"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
for (ConsumerRecord<String, String> record : records) {
String key = record.key();
String value = record.value();
int partition = record.partition();
System.out.println("接收到消息: partition = " + partition + " , key = " + key + ", value = " + value);
count++;
currOffsets.put(new TopicPartition(record.topic(), record.partition()),
new OffsetAndMetadata(record.offset() + 1, "no meta"));
//每10條提交一次(特定的偏移量)
if (count % 10 == 0) {
consumer.commitAsync(currOffsets, null);
}
}
}
} catch (CommitFailedException e) {
System.out.println("Commit Failed:");
e.printStackTrace();
} catch (Exception e) {
consumer.close();
} finally {
consumer.commitSync(currOffsets);
}
}
}
--
浙公網安備 33010602011771號