<output id="qn6qe"></output>

    1. <output id="qn6qe"><tt id="qn6qe"></tt></output>
    2. <strike id="qn6qe"></strike>

      亚洲 日本 欧洲 欧美 视频,日韩中文字幕有码av,一本一道av中文字幕无码,国产线播放免费人成视频播放,人妻少妇偷人无码视频,日夜啪啪一区二区三区,国产尤物精品自在拍视频首页,久热这里只有精品12

      springboot整合kafka

      一、引入依賴 (kafka的版本和springboot的版本對不上的話,啟動會報錯,包類不存在)

         <dependency>
             <groupId>org.springframework.kafka</groupId>
              <artifactId>spring-kafka</artifactId>
              <version>2.5.1.RELEASE</version>
       </dependency>

      我的springboot版本:

          <parent>
              <groupId>org.springframework.boot</groupId>
              <artifactId>spring-boot-starter-parent</artifactId>
              <version>2.3.4.RELEASE</version>
              <relativePath/> <!-- lookup parent from repository -->
          </parent>

      二、配置 yml

      spring:
      
        kafka:
          bootstrap-servers: 192.168.233.11:9092,192.168.233.11:9093,192.168.233.11:9094
          #生產者的配置,大部分我們可以使用默認的,這里列出幾個比較重要的屬性
          producer:
            #0---表示不進行消息接收是否成功的確認
            #1---表示當Leader接收成功時確認
            #all -1---表示Leader和Follower都接收成功時確認
            acks: all
            #設置大于0的值將使客戶端重新發送任何數據,一旦這些數據發送失敗。注意,這些重試與客戶端接收到發送錯誤時的重試沒有什么不同。允許重試將潛在的改變數據的順序,如果這兩個消息記錄都是發送到同一個partition,則第一個消息失敗第二個發送成功,則第二條消息會比第一條消息出現要早。
            retries: 2
            #每批次發送消息的數量
            batch-size: 16384
            #producer可以用來緩存數據的內存大小。如果數據產生速度大于向broker發送的速度,producer會阻塞或者拋出異常,以“block.on.buffer.full”來表明。這項設置將和producer能夠使用的總內存相關,但并不是一個硬性的限制,因為不是producer使用的所有內存都是用于緩存。一些額外的內存會用于壓縮(如果引入壓縮機制),同樣還有一些用于維護請求。
            buffer-memory: 33554432
            #key序列化方式
            key-serializer: org.apache.kafka.common.serialization.StringSerializer
            value-serializer: org.apache.kafka.common.serialization.StringSerializer
            properties:
              linger.ms: 1
              enable:
                idempotence: true
          #消費者的配置
          consumer:
            #是否開啟自動提交
            enable-auto-commit: false
            #自動提交的時間間隔
            auto-commit-interval: 100ms
            #key的解碼方式
            key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
            value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
            properties:
              session.timeout.ms: 15000
            max-poll-records: 15
          listener:
            ack-mode: manual_immediate
            type: batch

      上面我配置的是手動Ack,并且批量消息,一次可以拉取15條記錄

      yaml中的各個配置參數的意思,參考官方文檔 https://kafka.apachecn.org/documentation.html#configuration

       

      三、springboot自動裝配機制

       

      package org.springframework.boot.autoconfigure.kafka;
      
      import java.io.IOException;
      import org.springframework.beans.factory.ObjectProvider;
      import org.springframework.boot.autoconfigure.condition.ConditionalOnClass;
      import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
      import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
      import org.springframework.boot.autoconfigure.kafka.KafkaProperties.Jaas;
      import org.springframework.boot.context.properties.EnableConfigurationProperties;
      import org.springframework.context.annotation.Bean;
      import org.springframework.context.annotation.Configuration;
      import org.springframework.context.annotation.Import;
      import org.springframework.kafka.core.ConsumerFactory;
      import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
      import org.springframework.kafka.core.DefaultKafkaProducerFactory;
      import org.springframework.kafka.core.KafkaAdmin;
      import org.springframework.kafka.core.KafkaTemplate;
      import org.springframework.kafka.core.ProducerFactory;
      import org.springframework.kafka.security.jaas.KafkaJaasLoginModuleInitializer;
      import org.springframework.kafka.support.LoggingProducerListener;
      import org.springframework.kafka.support.ProducerListener;
      import org.springframework.kafka.support.converter.RecordMessageConverter;
      import org.springframework.kafka.transaction.KafkaTransactionManager;
      
      @Configuration(
          proxyBeanMethods = false
      )
      @ConditionalOnClass({KafkaTemplate.class})
      @EnableConfigurationProperties({KafkaProperties.class})
      @Import({KafkaAnnotationDrivenConfiguration.class, KafkaStreamsAnnotationDrivenConfiguration.class})
      public class KafkaAutoConfiguration {
          private final KafkaProperties properties;
      
          public KafkaAutoConfiguration(KafkaProperties properties) {
              this.properties = properties;
          }
      
          @Bean
          @ConditionalOnMissingBean({KafkaTemplate.class})
          public KafkaTemplate<?, ?> kafkaTemplate(ProducerFactory<Object, Object> kafkaProducerFactory, ProducerListener<Object, Object> kafkaProducerListener, ObjectProvider<RecordMessageConverter> messageConverter) {
              KafkaTemplate<Object, Object> kafkaTemplate = new KafkaTemplate(kafkaProducerFactory);
              messageConverter.ifUnique(kafkaTemplate::setMessageConverter);
              kafkaTemplate.setProducerListener(kafkaProducerListener);
              kafkaTemplate.setDefaultTopic(this.properties.getTemplate().getDefaultTopic());
              return kafkaTemplate;
          }
      
          @Bean
          @ConditionalOnMissingBean({ProducerListener.class})
          public ProducerListener<Object, Object> kafkaProducerListener() {
              return new LoggingProducerListener();
          }
      
          @Bean
          @ConditionalOnMissingBean({ConsumerFactory.class})
          public ConsumerFactory<?, ?> kafkaConsumerFactory(ObjectProvider<DefaultKafkaConsumerFactoryCustomizer> customizers) {
              DefaultKafkaConsumerFactory<Object, Object> factory = new DefaultKafkaConsumerFactory(this.properties.buildConsumerProperties());
              customizers.orderedStream().forEach((customizer) -> {
                  customizer.customize(factory);
              });
              return factory;
          }
      
          @Bean
          @ConditionalOnMissingBean({ProducerFactory.class})
          public ProducerFactory<?, ?> kafkaProducerFactory(ObjectProvider<DefaultKafkaProducerFactoryCustomizer> customizers) {
              DefaultKafkaProducerFactory<?, ?> factory = new DefaultKafkaProducerFactory(this.properties.buildProducerProperties());
              String transactionIdPrefix = this.properties.getProducer().getTransactionIdPrefix();
              if (transactionIdPrefix != null) {
                  factory.setTransactionIdPrefix(transactionIdPrefix);
              }
      
              customizers.orderedStream().forEach((customizer) -> {
                  customizer.customize(factory);
              });
              return factory;
          }
      
          @Bean
          @ConditionalOnProperty(
              name = {"spring.kafka.producer.transaction-id-prefix"}
          )
          @ConditionalOnMissingBean
          public KafkaTransactionManager<?, ?> kafkaTransactionManager(ProducerFactory<?, ?> producerFactory) {
              return new KafkaTransactionManager(producerFactory);
          }
      
          @Bean
          @ConditionalOnProperty(
              name = {"spring.kafka.jaas.enabled"}
          )
          @ConditionalOnMissingBean
          public KafkaJaasLoginModuleInitializer kafkaJaasInitializer() throws IOException {
              KafkaJaasLoginModuleInitializer jaas = new KafkaJaasLoginModuleInitializer();
              Jaas jaasProperties = this.properties.getJaas();
              if (jaasProperties.getControlFlag() != null) {
                  jaas.setControlFlag(jaasProperties.getControlFlag());
              }
      
              if (jaasProperties.getLoginModule() != null) {
                  jaas.setLoginModule(jaasProperties.getLoginModule());
              }
      
              jaas.setOptions(jaasProperties.getOptions());
              return jaas;
          }
      
          @Bean
          @ConditionalOnMissingBean
          public KafkaAdmin kafkaAdmin() {
              KafkaAdmin kafkaAdmin = new KafkaAdmin(this.properties.buildAdminProperties());
              kafkaAdmin.setFatalIfBrokerNotAvailable(this.properties.getAdmin().isFailFast());
              return kafkaAdmin;
          }
      }

      四、生產者:

      @RestController
      public class MyController {
      
         @Autowired
          private KafkaTemplate<String,String> kafkaTemplate;
          @RequestMapping("/send/msg")
          public String sendMsg(){
              for (int i = 0; i < 100; i++) {
                  //不指定分區,會隨機發到不同的分區
                  kafkaTemplate.send("shop-topic",i+"aaaa");
      
                  //指定了key,會用key的hashcode %分區總數,根據結果發送到指定分區,因此同一個key永遠發到同一個分區
                  kafkaTemplate.send("shop-topic",i+"aaa",i+"aaaa");
              }
              return "ok";
          }
      }

      五、消費者批量消費(yaml那里要開啟批量配置)

      /**
       * author: yangxiaohui
       * date:   2023/7/13
       */
      @Service
      public class OrderService {
      
          /**
           * kafka是按照消費者分組來消費的,同一條消息,在同一個分組中,只會被消費一次,例如 order服務部署了5個節點,他們的消費者組是一樣,那么一條
           * 消息只會被這5個節點中的一個節點消費,groupId會自動創建
           * ack提交的是offset
           * @param consumerRecordList
           * @param acknowledgment
           */
          @KafkaListener(topics = "shop-topic",groupId = "my-group",clientIdPrefix = "orderService")
          public void listenMsg(List<ConsumerRecord> consumerRecordList, Acknowledgment acknowledgment){
              acknowledgment.acknowledge();
              for (ConsumerRecord consumerRecord : consumerRecordList) {
                  System.out.println(consumerRecord.value());
              }
      
          }
      
      }

      六、消費者單條消費:一定要配置單條消費邏輯,不然會報錯

      spring:
      
        kafka:
          bootstrap-servers: 192.168.233.11:9092,192.168.233.11:9093,192.168.233.11:9094
          #生產者的配置,大部分我們可以使用默認的,這里列出幾個比較重要的屬性
          producer:
            #0---表示不進行消息接收是否成功的確認
            #1---表示當Leader接收成功時確認
            #all -1---表示Leader和Follower都接收成功時確認
            acks: all
            #設置大于0的值將使客戶端重新發送任何數據,一旦這些數據發送失敗。注意,這些重試與客戶端接收到發送錯誤時的重試沒有什么不同。允許重試將潛在的改變數據的順序,如果這兩個消息記錄都是發送到同一個partition,則第一個消息失敗第二個發送成功,則第二條消息會比第一條消息出現要早。
            retries: 2
            #每批次發送消息的數量
            batch-size: 16384
            #producer可以用來緩存數據的內存大小。如果數據產生速度大于向broker發送的速度,producer會阻塞或者拋出異常,以“block.on.buffer.full”來表明。這項設置將和producer能夠使用的總內存相關,但并不是一個硬性的限制,因為不是producer使用的所有內存都是用于緩存。一些額外的內存會用于壓縮(如果引入壓縮機制),同樣還有一些用于維護請求。
            buffer-memory: 33554432
            #key序列化方式
            key-serializer: org.apache.kafka.common.serialization.StringSerializer
            value-serializer: org.apache.kafka.common.serialization.StringSerializer
            properties:
              linger.ms: 1
              enable:
                idempotence: true
          #消費者的配置
          consumer:
            #是否開啟自動提交
            enable-auto-commit: false
            #自動提交的時間間隔
            auto-commit-interval: 100ms
            #key的解碼方式
            key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
            value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
            properties:
              session.timeout.ms: 15000
            max-poll-records: 15
          listener:
            ack-mode: manual_immediate
            type: single   

      注意上面最后的配置那里 type=single

      @Service
      public class OrderService {
      
          /**
           * kafka是按照消費者分組來消費的,同一條消息,在同一個分組中,只會被消費一次,例如 order服務部署了5個節點,他們的消費者組是一樣,那么一條
           * 消息只會被這5個節點中的一個節點消費,groupId會自動創建
           *
           * @param
           * @param
           */
          @KafkaListener(topics = "shop-topic",groupId = "my-group",clientIdPrefix = "orderService")
          public void listenMsg(ConsumerRecord consumerRecord,Acknowledgment  acknowledgment){
              acknowledgment.acknowledge();
              System.out.println(consumerRecord.value());
          }
      
      }

       七、需要注意的地方:

       

      第八、關于topic的創建

      1.如果沒有事先創建,然后往一個不存在的topic中發送消息,kafka會自動創建這個topic,并且是只有一個分區,一個副本的topic,如:

       

       2. 生產如果需要創建多分區,多副本的topic,可以登錄kafka,使用命令行方式創建,就像數據庫一樣,我們都是先在數據庫創建好表,然后java項目再連接該數據庫

      bin/kafka-topics.sh --create --zookeeper 192.168.233.11:2182,192.168.233.11:2183,192.168.233.11:2184 --replication-factor 2 --partitions 2 --topic order-topic

       第九、kafka常用的概念理解

       

       

       

      posted @ 2023-07-13 14:42  yangxiaohui227  閱讀(1660)  評論(0)    收藏  舉報
      主站蜘蛛池模板: 免费无码一区无码东京热 | 久久精品日日躁夜夜躁| 久久亚洲日韩精品一区二区三区| 国产日韩精品一区在线不卡| 午夜福利日本一区二区无码| 久热re这里精品视频在线6| 日本道不卡一二三区视频| 国产精品国三级国产av| 欧美不卡无线在线一二三区观| 亚洲色最新高清AV网站| 国产一区二区精品久久呦| 国产免费视频一区二区| 亚洲国产亚洲综合在线尤物| 国产精品福利午夜久久香蕉| 四虎永久在线精品无码视频| 亚洲av熟女国产一二三| 熟女系列丰满熟妇AV| 強壮公弄得我次次高潮A片| 亚洲精品成人一二三专区| 99在线视频免费观看| 亚洲精品日本久久久中文字幕| 亚洲www永久成人网站| 国产精品视频免费一区二区三区| 亚洲精品国产福利一区二区| 欧美日韩亚洲国产| 国产对白叫床清晰在线播放 | 91精品国产色综合久久| 花莲县| 无码人妻aⅴ一区二区三区蜜桃 | 国产精品免费观看色悠悠| 国产欧美日韩高清在线不卡| 亚洲另类激情专区小说图片| 国产蜜臀久久av一区二区| 久草热8精品视频在线观看| 任我爽精品视频在线播放| 中文字幕乱码在线播放| 国产成人无码AV片在线观看不卡| 国产精一区二区黑人巨大| 国产网友愉拍精品视频手机| 91精品国产午夜福利| 你拍自拍亚洲一区二区三区|