<output id="qn6qe"></output>

    1. <output id="qn6qe"><tt id="qn6qe"></tt></output>
    2. <strike id="qn6qe"></strike>

      亚洲 日本 欧洲 欧美 视频,日韩中文字幕有码av,一本一道av中文字幕无码,国产线播放免费人成视频播放,人妻少妇偷人无码视频,日夜啪啪一区二区三区,国产尤物精品自在拍视频首页,久热这里只有精品12

      1.spring-kafka

      <dependency>
      <groupId>org.springframework.kafka</groupId>
      <artifactId>spring-kafka</artifactId>
      <version>1.3.5.RELEASE</version>
      </dependency>

      2.配置文件相關信息

      
      
      kafka.bootstrap-servers=localhost:9092
      kafka.consumer.group.id=20230321
      #可以并發消費的線程數 (通常與partition數量一致)
      kafka.consumer.concurrency=10
      kafka.consumer.enable.auto.commit=false

      kafka.bootstrap-servers.pic=localhost:29092
      kafka.consumer.group.id.pic=20230322_pic
      kafka.consumer.concurrency.pic=10
      kafka.consumer.enable.auto.commit.pic=false

      3.kafka配置類

      @Configuration
      @EnableKafka
      public class KafkaConsumerConfig {
      
          @Value("${kafka.consumer.group.id}")
          private String groupId;
      
          @Value("${kafka.consumer.concurrency}")
          private int concurrency;
      
          @Value("${kafka.consumer.enable.auto.commit}")
          private String autoCommit;
      
          @Value("${kafka.bootstrap-servers}")
          private String bootstrapServer;
      
      
          @Value("${kafka.consumer.group.id.pic}")
          private String groupIdPic;
      
          @Value("${kafka.consumer.concurrency.pic}")
          private int concurrencyPic;
      
          @Value("${kafka.consumer.enable.auto.commit.pic}")
          private String autoCommitPic;
      
          @Value("${kafka.bootstrap-servers.pic}")
          private String bootstrapServerPic;
      
      
          @Bean
          public ConsumerFactory<String, String> consumerFactory() {
              String bootstrapServers = bootstrapServer;
              Map<String, Object> configProps = new HashMap<>(16);
              configProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
              configProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
              configProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
              configProps.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
              configProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, autoCommit);
              return new DefaultKafkaConsumerFactory<>(configProps);
          }
      
       
          @Bean
          public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
              ConcurrentKafkaListenerContainerFactory<String, String> factory =
                      new ConcurrentKafkaListenerContainerFactory<>();
              factory.setConsumerFactory(consumerFactory());
              factory.setConcurrency(concurrency);
              factory.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.MANUAL_IMMEDIATE);
              return factory;
          }
      
      
      
      
          @Bean
          public ConsumerFactory<String, String> consumerFactoryPic() {
              String bootstrapServers = bootstrapServerPic;
              Map<String, Object> configProps = new HashMap<>(16);
              configProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
              configProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
              configProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
              configProps.put(ConsumerConfig.GROUP_ID_CONFIG, groupIdPic);
              configProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, autoCommitPic);
              return new DefaultKafkaConsumerFactory<>(configProps);
          }
      
      
          @Bean
          public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactoryPic() {
              ConcurrentKafkaListenerContainerFactory<String, String> factory =
                      new ConcurrentKafkaListenerContainerFactory<>();
              factory.setConsumerFactory(consumerFactoryPic());
              factory.setConcurrency(concurrencyPic);
              factory.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.MANUAL_IMMEDIATE);
              return factory;
          }
      }

      4.消費主題消息

      @KafkaListener(topics = "xxxxx", containerFactory = "kafkaListenerContainerFactoryPic")
          public void receive(ConsumerRecord<String, String> message, Acknowledgment ack) {
              try {
                  String jsonString = message.value();
                  if (StringUtils.isNoneBlank(jsonString)) {
                      log.info("消費:{}",jsonString);
                      //TODO ....
                  }
              } catch (Exception e) {
                  log.error(" receive topic error ", e);
              } finally {
                  ack.acknowledge();
              }
          }
      
      @KafkaListener(topics = "xxxxxx", containerFactory = "kafkaListenerContainerFactory")
          public void receive(ConsumerRecord<String, String> message, Acknowledgment ack) {
              try {
                  if (StringUtils.isNoneBlank(message.value())) {
                        //TODO ....
                  }
              } catch (Exception e) {
                  logger.error(" receive topic error ", e);
              } finally {
                  ack.acknowledge();
              }
          }

       

      posted on 2023-04-24 14:36  CccccDi  閱讀(536)  評論(0)    收藏  舉報

      主站蜘蛛池模板: 亚洲欧洲av一区二区久久| 日韩成人一区二区三区在线观看| 博乐市| 性xxxx搡xxxxx搡欧美| 国产偷窥熟女高潮精品视频| 欧美性猛交xxxx黑人| 欧美视频二区欧美影视| 无码专区 人妻系列 在线| 国产黄大片在线观看画质优化| 国产亚洲另类无码专区| 国产一区二区不卡91| 在线视频精品中文无码| 高清国产一区二区无遮挡| 日本一道一区二区视频| 巨胸喷奶水视频www免费网站| 亚洲v国产v天堂a无码二区| 天天爽夜夜爱| 亚洲高清国产拍精品网络战| 亚洲情色av一区二区| 国产玩具酱一区二区三区| 九台市| 无码专区视频精品老司机| 欧美福利电影A在线播放| 日韩中文字幕av有码| 黄色三级亚洲男人的天堂| 久久97人人超人人超碰超国产| 内乡县| AV最新高清无码专区| 亚洲精品国产一区二区在线观看| 日韩国产成人精品视频| 在线免费播放av观看| 黄色网站免费在线观看| 国产精品亚洲二区在线播放 | 亚洲中文字幕精品第三区| 性奴sm虐辱暴力视频网站 | 美女一区二区三区在线观看视频| 久久一级黄色大片免费观看| Y111111国产精品久久久| 亚洲精品日韩中文字幕| 四虎在线播放亚洲成人| 欧美激情a∨在线视频播放|