<output id="qn6qe"></output>

    1. <output id="qn6qe"><tt id="qn6qe"></tt></output>
    2. <strike id="qn6qe"></strike>

      亚洲 日本 欧洲 欧美 视频,日韩中文字幕有码av,一本一道av中文字幕无码,国产线播放免费人成视频播放,人妻少妇偷人无码视频,日夜啪啪一区二区三区,国产尤物精品自在拍视频首页,久热这里只有精品12

      關于kafka

      一、kafka中的基本概念:

      主題和分區:分區的主要意義主要是為了提高消費的并發度。一個生產者,可以啟8個消費者來跑。分區的數量主要是為了平衡生產和消費的速度。kafka沒有隊列的概念,可以把分區理解為隊列。
      偏移量:發消息的時候每一個消息存到分區里面都有offset偏移量,消費的時候是以群組進行消費。
       

      二、kafka中的生產和消費流程

       

      三、kafka一條消息發送和消費的流程(非集群)

       

       一個java簡單的kafka生產消費入門例子:

      kafka生產者:

      package com.msb.simple;
      
      import org.apache.kafka.clients.producer.KafkaProducer;
      import org.apache.kafka.clients.producer.ProducerRecord;
      import org.apache.kafka.common.serialization.StringSerializer;
      
      import java.util.Properties;
      
      /**
       * kafka生產者
       */
      public class HelloKafkaProducer {
      
          public static void main(String[] args) {
              Properties properties = new Properties();
              /**
               * 指定連接的kafka服務器地址,這里默認本地,默認端口
               * 可以配置多臺服務器,逗號分隔,這種一般搭建的是集群,其中一個宕機,生產者依然可以連上
               */
              properties.put("bootstrap.servers", "127.0.0.1:9092");
              /**
               * 設置鍵值的序列化
               * 序列化的本質是,將對象 -> 二進制字節數組,能夠在網絡上傳輸,網絡通訊層計算機只認byte[]
               */
              properties.put("key.serializer", StringSerializer.class);
              properties.put("value.serializer", StringSerializer.class);
      
              //構建kafka生產者對象
              KafkaProducer<String, String> producer = new KafkaProducer<String, String>(properties);
              //構建消息
              ProducerRecord<String, String> record = new ProducerRecord<>("msb", "teacher", "lijin");
              //發送消息
              producer.send(record);
              System.out.println("message is sent.");
      
              //釋放連接
              producer.close();
          }
      }

      kafka消費者:

      package com.msb.simple;
      
      import org.apache.kafka.clients.consumer.ConsumerConfig;
      import org.apache.kafka.clients.consumer.ConsumerRecord;
      import org.apache.kafka.clients.consumer.ConsumerRecords;
      import org.apache.kafka.clients.consumer.KafkaConsumer;
      import org.apache.kafka.common.serialization.StringDeserializer;
      
      import java.time.Duration;
      import java.util.Collections;
      import java.util.Properties;
      
      /**
       * 消費者入門
       */
      public class HelloKafkaConsumer {
      
          public static void main(String[] args) {
              Properties properties = new Properties();
              properties.put("bootstrap.servers", "127.0.0.1:9092");
              /**
               * 設置鍵值的反序列化;生產者發過來的是byte[]。
               * 和生產者序列化的方式一樣,使用對應的反序列化
               */
              properties.put("key.deserializer", StringDeserializer.class);
              properties.put("value.deserializer", StringDeserializer.class);
              //設置group id
              properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test");
      
              //構建kafka消費者對象
              KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);
      
              //訂閱主題,可以訂閱多個
              consumer.subscribe(Collections.singletonList("msb"));
      
              try {
                  while (true) {
                      /**
                       * 調用消費者拉取消息
                       * 每隔1s拉取一次消息
                       * 其實是poll時,超過1s就超時,是timeout的概念
                       */
                      ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
                      for (ConsumerRecord<String, String> record : records) {
                          String key = record.key();
                          String value = record.value();
                          System.out.println("接收到消息: key = " + key + ", value = " + value);
                      }
                  }
              } catch (Exception e) {
                  e.printStackTrace();
                  //釋放連接
                  consumer.close();
              }
          }
      }

      org.apache.kafka的maven配置:

      pom.xml:
       <?xml version="1.0" encoding="UTF-8"?>
      <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
               xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
          <modelVersion>4.0.0</modelVersion>
          <groupId>com.kafka</groupId>
          <artifactId>demo</artifactId>
          <version>0.0.1-SNAPSHOT</version>
          <name>msb-kafka-client</name>
          <properties>
              <java.version>1.8</java.version>
              <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
              <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
              <spring-boot.version>2.6.13</spring-boot.version>
          </properties>
      
          <dependencies>
              <dependency>
                  <groupId>org.springframework.boot</groupId>
                  <artifactId>spring-boot-starter</artifactId>
              </dependency>
      
              <dependency>
                  <groupId>org.apache.kafka</groupId>
                  <artifactId>kafka-clients</artifactId>
                  <version>3.3.1</version>
              </dependency>
          </dependencies>
      
          <dependencyManagement>
              <dependencies>
                  <dependency>
                      <groupId>org.springframework.boot</groupId>
                      <artifactId>spring-boot-dependencies</artifactId>
                      <version>${spring-boot.version}</version>
                      <type>pom</type>
                      <scope>import</scope>
                  </dependency>
              </dependencies>
          </dependencyManagement>
      
          <build>
              <plugins>
                  <plugin>
                      <groupId>org.apache.maven.plugins</groupId>
                      <artifactId>maven-compiler-plugin</artifactId>
                      <version>3.8.1</version>
                      <configuration>
                          <source>1.8</source>
                          <target>1.8</target>
                          <encoding>UTF-8</encoding>
                      </configuration>
                  </plugin>
                  <plugin>
                      <groupId>org.springframework.boot</groupId>
                      <artifactId>spring-boot-maven-plugin</artifactId>
                      <version>${spring-boot.version}</version>
                      <configuration>
                          <mainClass>com.msb.MsbKafkaClientApplication</mainClass>
                          <skip>true</skip>
                      </configuration>
                      <executions>
                          <execution>
                              <id>repackage</id>
                              <goals>
                                  <goal>repackage</goal>
                              </goals>
                          </execution>
                      </executions>
                  </plugin>
              </plugins>
          </build>
      
      </project>

       

       四、發送消息時自定義序列化,發送一個java對象:

      該自定義實現只是為了演示怎么用,小例子,因為如果user加了屬性,這部分代碼要跟著改,所以僅僅只是演示怎么自定義使用。
      自定義序列化器
       package com.msb.selfserial;
      
      import org.apache.kafka.common.serialization.Serializer;
      
      import java.nio.ByteBuffer;
      import java.nio.charset.StandardCharsets;
      
      /**
       * 自定義序列化器
       */
      public class UserSerializer implements Serializer<User> {
      
      
          /**
           * 將user對象進行序列化,就是將user轉化為byte[]
           * 可以這么定義這個字節數組:
           * 前4位是id + 4位name的長度 + name字符串轉化后的字節數組
           * 因為:
           * int類型是4位;
           * 字符串的長度也是int類型,也只需要4位;
           * String字符串可以很長很長,所以設計一個長度。
           */
          @Override
          public byte[] serialize(String topic, User data) {
              byte[] name;
              int nameSize;
              if (data == null) {
                  return null;
              }
      
              if (data.getName() != null) {
                  name = data.getName().getBytes(StandardCharsets.UTF_8);
                  nameSize = data.getName().length();
              } else {
                  name = new byte[0];
                  nameSize = 0;
              }
      
              ByteBuffer buffer = ByteBuffer.allocate(4 + 4 + nameSize);
              buffer.putInt(data.getId());
              buffer.putInt(nameSize);
              buffer.put(name);
              return buffer.array();
          }
      }
      自定義反序列化器
       package com.msb.selfserial;
      
      import org.apache.kafka.common.errors.SerializationException;
      import org.apache.kafka.common.serialization.Deserializer;
      
      import java.nio.ByteBuffer;
      import java.nio.charset.StandardCharsets;
      
      /**
       * 自定義反序列化器
       */
      public class UserDeserializer implements Deserializer<User> {
      
          @Override
          public User deserialize(String topic, byte[] data) {
              if (data == null) {
                  return null;
              }
              if (data.length < 8) {
                  throw new SerializationException("error data size.");
              }
      
              ByteBuffer buffer = ByteBuffer.wrap(data);
              int id = buffer.getInt();
              int nameSize = buffer.getInt();
              byte[] nameByte = new byte[nameSize];
              buffer.get(nameByte);
              String name = new String(nameByte, StandardCharsets.UTF_8);
              return new User(id, name);
          }
      }
      生產者
       package com.msb.selfserial;
      
      import org.apache.kafka.clients.producer.KafkaProducer;
      import org.apache.kafka.clients.producer.ProducerRecord;
      import org.apache.kafka.common.serialization.StringSerializer;
      
      import java.util.Properties;
      
      /**
       * kafka生產者
       */
      public class ProducerUser {
      
          public static void main(String[] args) {
              Properties properties = new Properties();
              properties.put("bootstrap.servers", "127.0.0.1:9092");
              properties.put("key.serializer", StringSerializer.class);
              properties.put("value.serializer", UserSerializer.class);
      
              KafkaProducer<String, User> producer = new KafkaProducer<String, User>(properties);
              //構建消息
              ProducerRecord<String, User> record = new ProducerRecord<>("msb-user", "teacher", new User(1, "lijin"));
              //發送消息
              producer.send(record);
              System.out.println("message is sent.");
      
              //釋放連接
              producer.close();
          }
      }
      消費者
       package com.msb.selfserial;
      
      import org.apache.kafka.clients.consumer.ConsumerConfig;
      import org.apache.kafka.clients.consumer.ConsumerRecord;
      import org.apache.kafka.clients.consumer.ConsumerRecords;
      import org.apache.kafka.clients.consumer.KafkaConsumer;
      import org.apache.kafka.common.serialization.StringDeserializer;
      
      import java.time.Duration;
      import java.util.Collections;
      import java.util.Properties;
      
      public class ConsumerUser {
      
          public static void main(String[] args) {
              Properties properties = new Properties();
              properties.put("bootstrap.servers", "127.0.0.1:9092");
              properties.put("key.deserializer", StringDeserializer.class);
              properties.put("value.deserializer", UserDeserializer.class);
              properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test");
      
              //構建kafka消費者對象
              KafkaConsumer<String, User> consumer = new KafkaConsumer<String, User>(properties);
      
              //訂閱主題,可以訂閱多個
              consumer.subscribe(Collections.singletonList("msb-user"));
      
              try {
                  while (true) {
                      ConsumerRecords<String, User> records = consumer.poll(Duration.ofSeconds(1));
                      for (ConsumerRecord<String, User> record : records) {
                          String key = record.key();
                          User user = record.value();
                          System.out.println("接收到消息: key = " + key + ", value = " + user);
                      }
                  }
              } catch (Exception e) {
                  e.printStackTrace();
                  consumer.close();
              }
          }
      }
      實體類-User:
       package com.msb.selfserial;
      
      import lombok.Getter;
      import lombok.Setter;
      import lombok.ToString;
      
      /**
       * 實體類:User
       */
      @Getter
      @Setter
      @ToString
      public class User {
          private int id;
          private String name;
      
          public User(int id) {
              this.id = id;
          }
      
          public User(int id, String name) {
              this.id = id;
              this.name = name;
          }
      }

       

      五、分區器的使用

      比如生產者在發送消息的時候,kafka的topic中存在4個分區,現在想要將發送第1條消息發到分區0,第二條消息發到分區1,第三條消息發到分區2,第四條消息發到分區3,請問怎么做?
      前面我們寫入門消費者程序時,并沒有指定分區器,那么沒有使用嗎?其實是有默認的分區器的,DefaultPartitioner,它是根據key值來分區。
       
      1.系統自帶的分區器
      查看代碼
       package com.msb.selfpartiton;
      
      import com.msb.selfserial.User;
      import com.msb.selfserial.UserSerializer;
      import org.apache.kafka.clients.producer.KafkaProducer;
      import org.apache.kafka.clients.producer.ProducerConfig;
      import org.apache.kafka.clients.producer.ProducerRecord;
      import org.apache.kafka.clients.producer.RecordMetadata;
      import org.apache.kafka.clients.producer.RoundRobinPartitioner;
      import org.apache.kafka.clients.producer.UniformStickyPartitioner;
      import org.apache.kafka.clients.producer.internals.DefaultPartitioner;
      import org.apache.kafka.common.serialization.StringSerializer;
      
      import java.util.Properties;
      import java.util.concurrent.ExecutionException;
      import java.util.concurrent.Future;
      
      /**
       * 系統自帶的分區器
       */
      public class SysPartitionProducer {
      
          /**
           * 默認分區器:
           * 發送10條消息,采用默認的分區器,key值都是一樣的,這樣是發到不同的分區,還是相同的分區呢?
           * 做這個試險前,需要將kafka的啟動配置,server.properties中的num.partitions=1改成num.partitions=4,這樣表示創建一個topic主題,底下就有4個分區。
           * 都是相同的分區,因為默認的分區器DefaultPartitioner只是根據key值來分區。
           * 所以下面測試程序中循環中key值都一樣所以都發往一個分區了。
           *
           * 統一粘性分區器:
           *
           */
          public static void main(String[] args) {
              Properties properties = new Properties();
              properties.put("bootstrap.servers", "127.0.0.1:9092");
              properties.put("key.serializer", StringSerializer.class);
              properties.put("value.serializer", UserSerializer.class);
              //默認分區器
              //properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, DefaultPartitioner.class);
      
              //輪詢分區器
              properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, RoundRobinPartitioner.class);
              properties.put("partitioner.availability.timeout.ms", "0");
              properties.put("partitioner.ignore.keys", "true");
      
              //統一粘性分區器
              //properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, UniformStickyPartitioner.class);
      
              KafkaProducer<String, User> producer = new KafkaProducer<>(properties);
      
              try {
                  for (int i = 0; i < 10; i++) {
                      ProducerRecord<String, User> record = new ProducerRecord<>("msb", "teacher", new User(1, "lijin"));
                      Future<RecordMetadata> future = producer.send(record);
                      RecordMetadata recordMetadata = future.get();
                      System.out.println(i + "," + "offset:" + recordMetadata.offset() + "," + "partition:" + recordMetadata.partition());
                  }
              } catch (InterruptedException | ExecutionException e) {
                  e.printStackTrace();
              } finally {
                  producer.close();
              }
      
              System.out.println("message is sent.");
          }
      }

       

      2.自定義分區器
      說明一下,其實我們在構建消息的時候也可以指定分區,new ProducerRecord<>("msb", 0, "teacher", "lijin"),類似這樣。但是如果指定了分區,那么設置的分區器就沒有作用了,無論配什么分區器都沒用了,new ProducerRecord里面設置的partition優先級別最高。只會把每條消息都發送到partition:0
      自定義分區器:
       package com.msb.selfpartiton;
      
      import org.apache.kafka.clients.producer.Partitioner;
      import org.apache.kafka.common.Cluster;
      import org.apache.kafka.common.PartitionInfo;
      import org.apache.kafka.common.utils.Utils;
      
      import java.util.List;
      import java.util.Map;
      
      /**
       * 自定義分區器,以value值進行分區
       */
      public class SelfPartitioner implements Partitioner {
      
          @Override
          public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
              // 獲取分區數量
              List<PartitionInfo> partitionInfos = cluster.partitionsForTopic(topic);
              int num = partitionInfos.size();
              // 將value轉成正整數 再和分區數量進行模運算
              return Utils.toPositive(Utils.murmur2(valueBytes)) % num;   //來自DefaultPartitioner的處理,只不過默認的分區是按照key的值來分區的。
          }
      
          @Override
          public void configure(Map<String, ?> configs) {
      
          }
      
          @Override
          public void close() {
      
          }
      
      }
      使用自定義分區器:
       package com.msb.selfpartiton;
      
      import com.msb.selfserial.User;
      import com.msb.selfserial.UserSerializer;
      import org.apache.kafka.clients.producer.KafkaProducer;
      import org.apache.kafka.clients.producer.ProducerConfig;
      import org.apache.kafka.clients.producer.ProducerRecord;
      import org.apache.kafka.clients.producer.RecordMetadata;
      import org.apache.kafka.clients.producer.RoundRobinPartitioner;
      import org.apache.kafka.common.serialization.StringSerializer;
      
      import java.util.Properties;
      import java.util.concurrent.ExecutionException;
      import java.util.concurrent.Future;
      
      /**
       * 生產者,使用自定義的分區器
       */
      public class SelfPartitionProducer {
      
          public static void main(String[] args) {
              Properties properties = new Properties();
              properties.put("bootstrap.servers", "127.0.0.1:9092");
              properties.put("key.serializer", StringSerializer.class);
              properties.put("value.serializer", StringSerializer.class);
      
              //自定義區器
              properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, SelfPartitioner.class);
              KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
      
              try {
                  for (int i = 0; i < 10; i++) {
                      //構建消息
                      ProducerRecord<String, String> record = new ProducerRecord<>("msb", "teacher", "lijin" + i);
                      //指定partition
                      //ProducerRecord<String, String> record = new ProducerRecord<>("msb", 0, "teacher", "lijin");
                      Future<RecordMetadata> future = producer.send(record);
                      RecordMetadata recordMetadata = future.get();
                      System.out.println(i + "," + "offset:" + recordMetadata.offset() + "," + "partition:" + recordMetadata.partition());
                  }
              } catch (InterruptedException | ExecutionException e) {
                  e.printStackTrace();
              } finally {
                  producer.close();
              }
      
              System.out.println("message is sent.");
          }
      }

       

      六、以異步的方式發送消息

      上面的例子producer.send之后返回future,都是在主線程里面future.get(),如果想通過異步線程,怎么做?
      這么做:在調producer.send的時候,傳入Callback。
      異步發送消息:
       package com.msb.simple;
      
      import org.apache.kafka.clients.producer.Callback;
      import org.apache.kafka.clients.producer.KafkaProducer;
      import org.apache.kafka.clients.producer.ProducerRecord;
      import org.apache.kafka.clients.producer.RecordMetadata;
      import org.apache.kafka.common.serialization.StringSerializer;
      
      import java.util.Properties;
      
      /**
       * kafka生產者
       */
      public class AsynProducer {
      
          public static void main(String[] args) {
              Properties properties = new Properties();
              properties.put("bootstrap.servers", "127.0.0.1:9092");
              properties.put("key.serializer", StringSerializer.class);
              properties.put("value.serializer", StringSerializer.class);
      
              KafkaProducer<String, String> producer = new KafkaProducer<String, String>(properties);
              ProducerRecord<String, String> record = new ProducerRecord<>("msb", "teacher", "lijin");
      
              //發送消息
              producer.send(record, new Callback() {
                  @Override
                  public void onCompletion(RecordMetadata metadata, Exception e) {
                      if (e == null) {
                          //沒有異常,輸出信息到控制臺
                          System.out.println("offset:" + metadata.offset() + ", partition:" + metadata.partition());
                      } else {
                          //出現異常打印
                          e.printStackTrace();
                      }
                  }
              });
      
              //釋放連接
              producer.close();
          }
      }

       

      七、緩沖

      為什么kafka在客戶端發送的時候需要做一個緩沖呢?
      1.減少IO的開銷(單個 -> 批次) ---- 需要改配置
      2.減少GC(池化技術 ---- 緩沖池)
       
      緩沖發送:主要是這兩個參數控制
      大小控制:batch.size= 默認16384 (16K)
      時間控制:linger.ms= 默認0,就是說一旦有消息來了不會延遲等待,立馬發送
       
      緩沖池:池的大小
      buffer.memory= 默認32M
       
      消息發送的時候,調用send方法,做好了序列化器,設置好了分區器,開始發送的時候,當多個消息要發到同一個分區的時候,生產者就會把它放到同一個批次,batch.size可以指定這個批次的內存大小,是以字節進行計算的。
      還有個是減少了GC,如果沒有緩沖池,生產者發送完消息后,byteBuffer沒有被使用了,垃圾回收器就會回收。有了緩沖池減少了GC。不然每次生產者發送消息的時候都要new ByteBuffer,再來發送,耗時。

       

      八、群組消費

      kafka的群組消費 ---- 負載均衡建立在分區級別
       
      開始試驗,server.properties中的配置為:num.partitions=4。生產者會往topic中的四個分區發送消息。
      注意:在增加consumer后,我們可能要稍微等一下,等Consumer做好負載再均衡。之后再重新啟動consumer去消費,才會看到效果。大概是5s鐘,Broker中的組協調器知道分區數多少,group里面有幾個消費者,再均衡監聽器發現多了consumer,這時候就會將topic中的分區進行重新分配,再對分區進行消費重分配的過程中,consumer是無法消費的,是阻塞住的。在觸發一次負載再均衡時,在觸發完成之前,消費者會阻塞。
       
      生產者發送消息:
       package com.msb.consumergroup;
      
      import com.msb.selfpartiton.SelfPartitioner;
      import org.apache.kafka.clients.producer.KafkaProducer;
      import org.apache.kafka.clients.producer.ProducerConfig;
      import org.apache.kafka.clients.producer.ProducerRecord;
      import org.apache.kafka.clients.producer.RecordMetadata;
      import org.apache.kafka.clients.producer.UniformStickyPartitioner;
      import org.apache.kafka.common.serialization.StringSerializer;
      
      import java.util.Properties;
      import java.util.concurrent.ExecutionException;
      import java.util.concurrent.Future;
      
      /**
       * kafka生產者:發送多條消息
       */
      public class KafkaProducerGroup {
      
          public static void main(String[] args) throws ExecutionException, InterruptedException {
              Properties properties = new Properties();
              properties.put("bootstrap.servers", "127.0.0.1:9092");
              properties.put("key.serializer", StringSerializer.class);
              properties.put("value.serializer", StringSerializer.class);
              properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, SelfPartitioner.class);
      
              //構建kafka生產者對象
              KafkaProducer<String, String> producer = new KafkaProducer<String, String>(properties);
      
              //發送消息
              for (int i = 0; i < 8; i++) {
                  ProducerRecord<String, String> record = new ProducerRecord<>("msb", null, "lijin" + i);
                  Future<RecordMetadata> future = producer.send(record);
                  RecordMetadata recordMetadata = future.get();
                  System.out.println(i + "," + "offset:" + recordMetadata.offset() + "," + "partition:" + recordMetadata.partition());
              }
      
              System.out.println("message is sent.");
              producer.close();
          }
      }
       
      GroupA的Consumer1~5:
       package com.msb.consumergroup;
      
      import org.apache.kafka.clients.consumer.ConsumerConfig;
      import org.apache.kafka.clients.consumer.ConsumerRecord;
      import org.apache.kafka.clients.consumer.ConsumerRecords;
      import org.apache.kafka.clients.consumer.KafkaConsumer;
      import org.apache.kafka.common.serialization.StringDeserializer;
      
      import java.time.Duration;
      import java.util.Collections;
      import java.util.Properties;
      
      public class GroupAConsumer1 {
      
          public static void main(String[] args) {
              Properties properties = new Properties();
              properties.put("bootstrap.servers", "127.0.0.1:9092");
              properties.put("key.deserializer", StringDeserializer.class);
              properties.put("value.deserializer", StringDeserializer.class);
              properties.put(ConsumerConfig.GROUP_ID_CONFIG, "groupA");
      
              KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);
              consumer.subscribe(Collections.singletonList("msb"));
      
              try {
                  while (true) {
                      ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
                      for (ConsumerRecord<String, String> record : records) {
                          String key = record.key();
                          String value = record.value();
                          int partition = record.partition();
                          System.out.println("接收到消息: partition = " + partition + " , key = " + key + ", value = " + value);
                      }
                  }
              } catch (Exception e) {
                  e.printStackTrace();
                  consumer.close();
              }
          }
      }
       
      GroupB的Consumer1:
       package com.msb.consumergroup;
      
      import org.apache.kafka.clients.consumer.ConsumerConfig;
      import org.apache.kafka.clients.consumer.ConsumerRecord;
      import org.apache.kafka.clients.consumer.ConsumerRecords;
      import org.apache.kafka.clients.consumer.KafkaConsumer;
      import org.apache.kafka.common.serialization.StringDeserializer;
      
      import java.time.Duration;
      import java.util.Collections;
      import java.util.Properties;
      
      public class GroupBConsumer1 {
      
          public static void main(String[] args) {
              Properties properties = new Properties();
              properties.put("bootstrap.servers", "127.0.0.1:9092");
              properties.put("key.deserializer", StringDeserializer.class);
              properties.put("value.deserializer", StringDeserializer.class);
              properties.put(ConsumerConfig.GROUP_ID_CONFIG, "groupB");
              //如果配置的是earliest,那么會將分區的,從頭開始消費
              properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
              //這是默認配置,從已提交的offset開始消費
              properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
      
              KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);
              consumer.subscribe(Collections.singletonList("msb"));
      
              try {
                  while (true) {
                      ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
                      for (ConsumerRecord<String, String> record : records) {
                          String key = record.key();
                          String value = record.value();
                          int partition = record.partition();
                          System.out.println("接收到消息: partition = " + partition + " , key = " + key + ", value = " + value);
                      }
                  }
              } catch (Exception e) {
                  e.printStackTrace();
                  consumer.close();
              }
          }
      }
       
      1.生產者發送8條消息,只啟動1個消費者GroupAConsumer1時,收到全部消息:

      2.生產者發送8條消息,啟動GroupAConsumer1、GroupAConsumer2:
      GroupAConsumer1消費了分區0、1
      GroupAConsumer2消費了分區2、3

      3、生產者發送8條消息,啟動GroupAConsumer1、GroupAConsumer2、GroupAConsumer3:
      GroupAConsumer1消費了分區0、1
      GroupAConsumer2消費了分區2
      GroupAConsumer3消費了分區3

      4、生產者發送8條消息,啟動GroupAConsumer1、GroupAConsumer2、GroupAConsumer3、GroupAConsumer4:
      GroupAConsumer1消費了分區0
      GroupAConsumer2消費了分區1
      GroupAConsumer3消費了分區2
      GroupAConsumer4消費了分區3

      5、生產者發送8條消息,啟動GroupAConsumer1、GroupAConsumer2、GroupAConsumer3、GroupAConsumer4、GroupAConsumer5:
      GroupAConsumer1消費了分區0
      GroupAConsumer2消費了分區1
      GroupAConsumer3消費了分區2
      GroupAConsumer5消費了分區3
      GroupAConsumer4沒有任何消費
       
      小結:
      單個群組中不同消費者,可以動態的增加、刪除消費者,消費的核心原則是以分區作為負載均衡。一個分區不能被一個群組中的多個消費者消費,因為這會讓消息沒有順序。
       
      6、增加GroupBConsumer1,并且啟動:
      會發現GroupBConsumer1將之前分區中的所有消息都消費了。因為auto.offset.reset配置的是earliest,會將分區的消息offset從頭開始消費。

       

      九、手動提交

      上面例子中的消費者消費代碼,只要拿到了消息就是自動提交的。這種消費模式很容易造成問題,比如我在獲取消息并且進行處理時,明明拋出異常了,消費沒來及,但是在kafka來說,消費者拿到消息了就不管了,認為你已經提交了。這種稱之為自動提交的消費模式??梢愿某墒謩拥摹?/div>
       
      將properties中的配置:enable.auto.commit 設置成false就可以了。在消費完后進行提交。如果不進行提交的話,再次消費的話又會從原offset開始拿。
      手動提交代碼例子:
       package com.msb.commit;
      
      import org.apache.kafka.clients.consumer.CommitFailedException;
      import org.apache.kafka.clients.consumer.ConsumerConfig;
      import org.apache.kafka.clients.consumer.ConsumerRecord;
      import org.apache.kafka.clients.consumer.ConsumerRecords;
      import org.apache.kafka.clients.consumer.KafkaConsumer;
      import org.apache.kafka.common.serialization.StringDeserializer;
      
      import java.time.Duration;
      import java.util.Collections;
      import java.util.Properties;
      
      public class ConsumerCommit {
      
          /**
           * 同步提交:
           * commitSync之后立馬進行ack確認,往consumer_offsets中寫偏移量,consumer_offsets是一個特殊的目錄,專門保存消費者的偏移量。
           * commitSync會帶來阻塞,因為他要等ack是否確認完成,甚至會不斷嘗試,直到提交成功為止。所以可能會給性能造成下降。
           *
           * 異步提交:
           * 所以手動提交的話最好采用commitAsync,不會阻塞線程。
           *
           * 所以一般在接受消息處理業務完畢時就進行異步提交。最后再finally中再進行一次同步提交,以防萬一。
           */
          public static void main(String[] args) {
              Properties properties = new Properties();
              properties.put("bootstrap.servers", "127.0.0.1:9092");
              properties.put("key.deserializer", StringDeserializer.class);
              properties.put("value.deserializer", StringDeserializer.class);
              properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test");
              //取消自動提交
              properties.put("enable.auto.commit", false);
      
              KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);
              consumer.subscribe(Collections.singletonList("msb"));
      
              try {
                  while (true) {
                      ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
                      for (ConsumerRecord<String, String> record : records) {
                          String key = record.key();
                          String value = record.value();
                          int partition = record.partition();
                          System.out.println("接收到消息: partition = " + partition + " , key = " + key + ", value = " + value);
                      }
                      //異步提交,不阻塞應用程序的線程,不會重試(有可能失敗)
                      consumer.commitAsync();
                  }
              } catch (CommitFailedException e) {
                  System.out.println("Commit Failed:");
                  e.printStackTrace();
              } catch (Exception e) {
                  consumer.close();
              } finally {
                  //同步提交,會阻塞我們應用的線程,并且會重試(一定成功)
                  consumer.commitSync();
              }
          }
      }

       

      如果每次消費的消息條數很多,這種提交的頻次是否可以自己控制呢?比如我想做到消費10條提交一次。可以采用如下的自定義提交方式:
      手動提交(自定義的方式):
       package com.msb.commit;
      
      import org.apache.kafka.clients.consumer.CommitFailedException;
      import org.apache.kafka.clients.consumer.ConsumerConfig;
      import org.apache.kafka.clients.consumer.ConsumerRecord;
      import org.apache.kafka.clients.consumer.ConsumerRecords;
      import org.apache.kafka.clients.consumer.KafkaConsumer;
      import org.apache.kafka.clients.consumer.OffsetAndMetadata;
      import org.apache.kafka.common.TopicPartition;
      import org.apache.kafka.common.serialization.StringDeserializer;
      
      import java.time.Duration;
      import java.util.Collections;
      import java.util.HashMap;
      import java.util.Map;
      import java.util.Properties;
      
      /**
       * 消費者,手動提交,自定義的方式
       */
      public class ConsumerSpecial {
      
          public static void main(String[] args) {
              Properties properties = new Properties();
              properties.put("bootstrap.servers", "127.0.0.1:9092");
              properties.put("key.deserializer", StringDeserializer.class);
              properties.put("value.deserializer", StringDeserializer.class);
              properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test");
              //取消自動提交
              properties.put("enable.auto.commit", false);
      
              KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);
      
      
              //當前偏移量
              Map<TopicPartition, OffsetAndMetadata> currOffsets = new HashMap<>();
              int count = 0;
      
              try {
                  consumer.subscribe(Collections.singletonList("msb"));
                  while (true) {
                      ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
                      for (ConsumerRecord<String, String> record : records) {
                          String key = record.key();
                          String value = record.value();
                          int partition = record.partition();
                          System.out.println("接收到消息: partition = " + partition + " , key = " + key + ", value = " + value);
                          count++;
                          currOffsets.put(new TopicPartition(record.topic(), record.partition()),
                                  new OffsetAndMetadata(record.offset() + 1, "no meta"));
                          //每10條提交一次(特定的偏移量)
                          if (count % 10 == 0) {
                              consumer.commitAsync(currOffsets, null);
                          }
                      }
                  }
              } catch (CommitFailedException e) {
                  System.out.println("Commit Failed:");
                  e.printStackTrace();
              } catch (Exception e) {
                  consumer.close();
              } finally {
                  consumer.commitSync(currOffsets);
              }
          }
      }

       

       

       

      --

      posted on 2025-05-25 11:38  有點懶惰的大青年  閱讀(26)  評論(0)    收藏  舉報

      主站蜘蛛池模板: 少妇高潮喷水久久久影院| 欧洲精品码一区二区三区| 日韩免费码中文在线观看| 亚洲欧洲精品日韩av| 日韩精品中文字幕亚洲| 亚洲色一区二区三区四区| 国产精品自在线拍国产手青青机版| 99精品久久免费精品久久| 国产精品香港三级国产av| 潮喷无码正在播放| 将乐县| 国产精品综合在线免费看| 99国精品午夜福利视频不卡99| 国产精品人妻久久无码不卡| 性欧美欧美巨大69| av新版天堂在线观看| 中文字幕日韩区二区三区| 少妇人妻偷人精品免费| 中文字幕日韩有码av| 亚洲欧美日韩人成在线播放| 亚洲色婷婷一区二区| 欧美18videosex性欧美tube1080| 中文字幕在线日韩一区| 92国产精品午夜福利免费| 亚洲欧美高清在线精品一区二区 | 377P欧洲日本亚洲大胆| 久久婷婷五月综合色和啪| 人妻少妇无码精品视频区 | 资源在线观看视频一区二区| 熟女视频一区二区在线观看| 亚洲综合精品第一页| 无码专区视频精品老司机| 家庭乱码伦区中文字幕在线| 无码一区二区三区久久精品| 亚洲精品国产av成人网| 国产又大又黑又粗免费视频 | 年轻女教师hd中字3| 国产不卡精品一区二区三区| 国产精品久久毛片av大全日韩| 国产三级国产精品国产专| 成人免费无遮挡在线播放|