<output id="qn6qe"></output>

    1. <output id="qn6qe"><tt id="qn6qe"></tt></output>
    2. <strike id="qn6qe"></strike>

      亚洲 日本 欧洲 欧美 视频,日韩中文字幕有码av,一本一道av中文字幕无码,国产线播放免费人成视频播放,人妻少妇偷人无码视频,日夜啪啪一区二区三区,国产尤物精品自在拍视频首页,久热这里只有精品12

      kafka的消費全流程

      多線程安全問題

      多線程安全:定義為:多線程去訪問一個類,這個類始終表現出正確的行為,不管運行的環境、讓線程交替執行、不需要任何的額外的同步、協同,都能表現出正確的行為。
      比如:i++,這個肯定不是多線程安全的。
       
      kafka中:
      生產者:是多線程安全。(不可變的方式去解決)
      KafkaProducer類中所有的成員變量,都是private、final的
       
      消費者:不是多線程安全。消費的時候拿到一個分區,比如consumer1拿到分區0進行消費的時候,與此同時還需要進行ack確認,要確認消費的這一條或者幾條是否被成功的消費了,這種情況下,consumer是很難確保多線程安全的。那么, 如果你想又要使用多線程,又想使用consumer,該怎么辦?
      思路:如果所有的成員變量都是不可變,final的,那一定是多線程安全的。還有一種確保多線程安全:線程封閉(每個線程實例化一個consumer對象)
       
      多線程下使用生產者:
      
      package com.msb.concurrent;
      
      import com.msb.selfserial.User;
      import org.apache.kafka.clients.producer.Callback;
      import org.apache.kafka.clients.producer.KafkaProducer;
      import org.apache.kafka.clients.producer.ProducerRecord;
      import org.apache.kafka.clients.producer.RecordMetadata;
      import org.apache.kafka.common.serialization.StringSerializer;
      
      import java.util.Properties;
      import java.util.concurrent.CountDownLatch;
      import java.util.concurrent.ExecutorService;
      import java.util.concurrent.Executors;
      
      /**
       * 多線程下使用生產者
       */
      public class KafkaConProducer {
      
          //發送消息的個數
          private static final int MSG_SIZE = 1000;
          //負責發送消息的線程池
          private static ExecutorService executorService = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
          private static CountDownLatch countDownLatch = new CountDownLatch(MSG_SIZE);
      
          private static User makeUser(int id) {
              User user = new User(id);
              String userName = "msb_" + id;
              user.setName(userName);
              return user;
          }
      
          /**
           * 發送消息的任務
           */
          private static class ProduceWorker implements Runnable {
      
              private ProducerRecord<String, String> record;
              private KafkaProducer<String, String> producer;
      
              public ProduceWorker(ProducerRecord<String, String> record, KafkaProducer<String, String> producer) {
                  this.record = record;
                  this.producer = producer;
              }
      
              @Override
              public void run() {
                  final String threadName = Thread.currentThread().getName();
                  producer.send(record, new Callback() {
                      @Override
                      public void onCompletion(RecordMetadata metadata, Exception exception) {
                          if (null != exception) {
                              exception.printStackTrace();
                          }
                          if (null != metadata) {
                              System.out.println(threadName + "|" + String.format("偏移量:%s,分區:%s", metadata.offset(), metadata.partition()));
                          }
                      }
                  });
                  countDownLatch.countDown();
              }
          }
      
          public static void main(String[] args) {
              Properties properties = new Properties();
              properties.put("bootstrap.servers", "127.0.0.1:9092");
              properties.put("key.serializer", StringSerializer.class);
              properties.put("value.serializer", StringSerializer.class);
      
              KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
      
              try {
                  for (int i = 0; i < MSG_SIZE; i++) {
                      User user = makeUser(i);
                      ProducerRecord<String, String> record = new ProducerRecord<>("concurrent-ConsumerOffsets", null, System.currentTimeMillis(), user.getId() + "", user.toString());
                      executorService.submit(new ProduceWorker(record, producer));
                  }
                  countDownLatch.await();
              } catch (Exception e) {
                  e.printStackTrace();
              } finally {
                  producer.close();
                  executorService.shutdown();
              }
          }
      }
      多線程下使用消費者:
       package com.msb.concurrent;
      
      import org.apache.kafka.clients.consumer.ConsumerConfig;
      import org.apache.kafka.clients.consumer.ConsumerRecord;
      import org.apache.kafka.clients.consumer.ConsumerRecords;
      import org.apache.kafka.clients.consumer.KafkaConsumer;
      import org.apache.kafka.common.serialization.StringDeserializer;
      
      import java.time.Duration;
      import java.util.Collections;
      import java.util.HashMap;
      import java.util.Map;
      import java.util.Properties;
      import java.util.concurrent.ExecutorService;
      import java.util.concurrent.Executors;
      
      /**
       * 多線程下正確的使用消費者,需要記住:一個線程一個消費者
       */
      public class KafkaConConsumer {
      
          public static final int CONCURRENT_PARTITIONS_COUNT = 2;
      
          private static ExecutorService executorService = Executors.newFixedThreadPool(CONCURRENT_PARTITIONS_COUNT);
      
          private static class ConsumerWorker implements Runnable {
              private KafkaConsumer<String, String> consumer;
      
              public ConsumerWorker(Map<String, Object> config, String topic) {
                  Properties properties = new Properties();
                  properties.putAll(config);
                  //一個線程一個消費者
                  this.consumer = new KafkaConsumer<>(properties);
                  this.consumer.subscribe(Collections.singletonList(topic));
              }
      
              @Override
              public void run() {
                  final String threadName = Thread.currentThread().getName();
                  while (true) {
                      ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
                      for (ConsumerRecord<String, String> record : records) {
                          System.out.println(threadName + "|" + String.format("主題:%s,分區:%d,偏移量:%d,key:%s,value:%s",
                                  record.topic(), record.partition(), record.offset(), record.key(), record.value()));
                      }
                  }
              }
          }
      
          public static void main(String[] args) {
              Map<String, Object> properties = new HashMap<>();
              properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
              properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
              properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
              properties.put(ConsumerConfig.GROUP_ID_CONFIG, "c_test");
              properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
      
              for (int i = 0; i < CONCURRENT_PARTITIONS_COUNT; i++) {
                  //一個線程一個消費者
                  executorService.submit(new ConsumerWorker(properties, "concurrent-ConsumerOffsets"));
              }
          }
      }

       

      群組協調

      1、組協調器(broker)
      1)選舉leader消費者客戶端
      2)處理申請加入組的客戶端
      3)再平衡后,同步新的分配方案
      4)心跳(與客戶端維持),判斷哪些客戶端離線
      5)管理消費者已經消費的偏移量(consumer_offset,默認50個)
       
      2、消費者協調器(客戶端)
      1)向組協調器發起入組請求(第一個成為群主)
      2)發起同步組(消費者分區劃分情況)的請求(leader客戶端,負責計算,分配策略作為入參傳入)
      3)心跳(與組協調器維持)
      4)發起已經提交的消費者偏移量的請求(ACK確認)
      5)主動的發起離組請求
       
      當一個消費者加入分組(組協調器、消費者協調器,干的哪些事情)
      1、消費客戶端啟動、重連(JoinGroup,請求->組協調器)
      2、客戶端已經完成JoinGroup,客戶端(消費者協調器)再次向組協調器發起SyncGroup(同步組),獲取新的分配方案
      3、客戶端關機、異常,觸發離組。
      4、客戶端加入一組之后(一直保持心跳)

       

      分區再均衡

       

      kafka的消費者分區再均衡問題及案例

      tips:
      消費者是以群組名進行消費的,比如:
      topic: 名為msb,里面有100條消息
      groupID:test,名為test會將這100條消息消費;
      groupID:test1,又定義了一個群組,名為test1,又會拿到100條消息,進行消費;
      groupID:test2,再定義了一個消費者群組,名為test2,他也會重新消費這100條消息;
      上面的三個群組,test、test1、test2之間提交什么東西,偏移量等等,相互沒有任何關系的,只要groupID的名字是不一樣的。因為消費者群組的概念主要用于什么?
      當你的上游系統比如說是訂單系統,產生了一批訂單,比如100條訂單信息,下游系統有:物流系統,他要拿到這100筆訂單去發物流;下面還有通知系統,他也要拿到這100條消息進行通知發送;下面還有會員系統,購買了東西加一些積分什么的,他也要拿到這100條訂單進行積分處理。所以群組的概念就在于你在進行這種生產消費的時候,生產的100條消息這是屬于生產者,然后消費的時候,我有可能有不同的系統:物流系統是一個群組,積分系統是一個群組,會員系統是一個群組,我們之間是沒有任何影響的,物流系統消費了80條消息,這和通知系統有什么關系呢?沒關系。

       

       

       

       

       

       

       

      --

      posted on 2025-06-01 14:48  有點懶惰的大青年  閱讀(34)  評論(0)    收藏  舉報

      主站蜘蛛池模板: 亚洲精品国产字幕久久麻豆| 久热re这里精品视频在线6| a∨变态另类天堂无码专区| 忘忧草在线社区www中国中文| 国产成人综合网亚洲第一| 久热久精久品这里在线观看| 国内精品伊人久久久久影院对白| 又污又爽又黄的网站| 98久久人妻少妇激情啪啪| 久久亚洲精品情侣| 最近中文字幕国产精选| 国产精品户外野外| 性色欲情网站iwww九文堂| 一本大道久久香蕉成人网| 国产人妇三级视频在线观看| 日日噜噜夜夜狠狠久久无码区 | 花垣县| 亚洲一区二区三区在线激情| 亚洲香蕉免费有线视频| 亚洲精品国产自在现线最新 | 国模在线视频一区二区三区| 亚洲男人天堂av在线| 男女爽爽无遮挡午夜视频| 国产二区三区不卡免费| 久久中文字幕无码专区| 国产精品久久久久影院色| 日日碰狠狠躁久久躁96avv| 热re99久久精品国产99热| 中文文精品字幕一区二区| 中文字幕久久国产精品| 国产成人一区二区三区免费| 97色成人综合网站| 亚洲人成网线在线播放VA | 最近中文字幕完整版hd| 无码免费大香伊蕉在人线国产 | 国产成人精品1024免费下载| 无码人妻丰满熟妇区bbbbxxxx| 国产精品乱一区二区三区| 固安县| 99在线精品国自产拍中文字幕 | 久久成人国产精品免费软件|