<output id="qn6qe"></output>

    1. <output id="qn6qe"><tt id="qn6qe"></tt></output>
    2. <strike id="qn6qe"></strike>

      亚洲 日本 欧洲 欧美 视频,日韩中文字幕有码av,一本一道av中文字幕无码,国产线播放免费人成视频播放,人妻少妇偷人无码视频,日夜啪啪一区二区三区,国产尤物精品自在拍视频首页,久热这里只有精品12

      springboot~ApplicationContextAware和Interceptor產(chǎn)生了真感情

      看著題目,有點(diǎn)一頭污水吧,事實(shí)上,沒有經(jīng)歷過,很難去說ApplicationContextAware在什么時(shí)候會(huì)用到,直接在一個(gè)bean對(duì)象里,你可以直接使用構(gòu)造方法注入或者Autowired屬性注入的方式來使用其它的bean對(duì)象,這在springboot里是非常自然的,也是天然支持的;但如果你的這個(gè)bean不是由spring ioc自動(dòng)注入的,而是通過攔截器動(dòng)態(tài)配置的,這時(shí)你使用@Autowired時(shí),是無法獲取到其它bean對(duì)象的;這時(shí)你需要使用ApplicationContextAware接口,再定義一個(gè)靜態(tài)的ApplicationContext實(shí)例,在你的攔截器執(zhí)行方法里使用它就可以了。【應(yīng)該和攔截器里的動(dòng)態(tài)代理有關(guān)】

      一個(gè)kafka的ConsumerInterceptor實(shí)例

      在這個(gè)例子中,我們通過ConsumerInterceptor實(shí)現(xiàn)了一個(gè)TTL的延時(shí)隊(duì)列,當(dāng)topic過期時(shí),再通過KafkaTemplate將消息轉(zhuǎn)發(fā)到其它隊(duì)列里

      • DelayPublisher.publish發(fā)送延時(shí)topic的方法
      	/**
      	 * 發(fā)送延時(shí)消息
      	 * @param message 消息體
      	 * @param delaySecondTime 多個(gè)秒后過期
      	 * @param delayTopic 過期后發(fā)送到的話題
      	 */
      	public void publish(String message, long delaySecondTime, String delayTopic) {
      		ProducerRecord producerRecord = new ProducerRecord<>(topic, 0, System.currentTimeMillis(), delayTopic, message,
      				new RecordHeaders().add(new RecordHeader("ttl", toBytes(delaySecondTime))));
      		kafkaTemplate.send(producerRecord);
      	}
      
      
      • ConsumerInterceptorTTL
      /**
       * @author lind
       * @date 2023/8/18 8:33
       * @since 1.0.0
       */
      @Component
      public class ConsumerInterceptorTTL implements ConsumerInterceptor<String, String>, ApplicationContextAware {
      
      	// 靜態(tài)化的上下文,用于獲取bean,因?yàn)镃onsumerInterceptor是通過反射創(chuàng)建的,所以無法通過注入的方式獲取bean
      	private static ApplicationContext applicationContext;
      
      	@Override
      	public ConsumerRecords<String, String> onConsume(ConsumerRecords<String, String> records) {
      		long now = System.currentTimeMillis();
      		Map<TopicPartition, List<ConsumerRecord<String, String>>> newRecords = new HashMap<>();
      		for (TopicPartition tp : records.partitions()) {
      			List<ConsumerRecord<String, String>> tpRecords = records.records(tp);
      			List<ConsumerRecord<String, String>> newTpRecords = new ArrayList<>();
      			for (ConsumerRecord<String, String> record : tpRecords) {
      				Headers headers = record.headers();
      				long ttl = -1;
      				for (Header header : headers) {
      					if (header.key().equals("ttl")) {
      						ttl = toLong(header.value());
      					}
      				}
      				// 消息超時(shí)判定
      				if (ttl > 0 && now - record.timestamp() < ttl * 1000) {
      					// 可以放在死信隊(duì)列中
      					System.out.println("消息超時(shí)了,需要發(fā)到topic:" + record.key());
      					KafkaTemplate kafkaTemplate = applicationContext.getBean(KafkaTemplate.class);
      					kafkaTemplate.send(record.key(), record.value());
      				}
      				else { // 沒有設(shè)置TTL,不需要超時(shí)判定
      					newTpRecords.add(record);
      				}
      
      			}
      			if (!newRecords.isEmpty()) {
      				newRecords.put(tp, newTpRecords);
      			}
      		}
      		return new ConsumerRecords<>(newRecords);
      	}
      
      	@Override
      	public void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets) {
      		offsets.forEach((tp, offset) -> System.out.println(tp + ":" + offset.offset()));
      	}
      
      	@Override
      	public void close() {
      	}
      
      	@Override
      	public void configure(Map<String, ?> configs) {
      
      	}
      
      	// 它的時(shí)機(jī)是在KafkaListenerAnnotationBeanPostProcessor的postProcessAfterInitialization方法中,applicationContext應(yīng)該定時(shí)成static,否則在實(shí)例對(duì)象中,它的值可能是空
      	@Override
      	public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
      		this.applicationContext = applicationContext;
      	}
      
      }
      
      • 配置文件中注入攔截器
      spring:
        kafka:
          consumer:
            properties:
              interceptor.classes: com.example.ConsumerInterceptorTTL 
      
      posted @ 2023-08-18 10:28  張占嶺  閱讀(107)  評(píng)論(0)    收藏  舉報(bào)
      主站蜘蛛池模板: 亚洲一级特黄大片在线播放| 成人午夜伦理在线观看| 中文字幕无码免费久久99| 闽侯县| 亚洲天堂精品一区二区| 福利一区二区不卡国产| 青草视频在线观看视频| 精品久久久久无码| 亚洲国产日韩精品久久| 精品不卡一区二区三区| 日韩精品一区二区三区色| 伊人久久大香线蕉网av| 色色97| 成人综合人人爽一区二区| 国产亚洲欧洲AⅤ综合一区| 国产不卡av一区二区| 丁香花成人电影| AV老司机AV天堂| 黄色A级国产免费大片视频| 国产自产视频一区二区三区| 久久久精品午夜免费不卡| 婷婷色综合成人成人网小说| 亚洲成人av在线高清| 国产亚洲精品久久久久蜜臀| 又污又黄又无遮挡的网站| 日韩精品国产二区三区| 西畴县| 国产成人a在线观看视频| 少妇又紧又色又爽又刺激视频| 激情综合五月丁香亚洲| 久久99九九精品久久久久蜜桃| 国产成人精品亚洲午夜麻豆| 97人妻天天摸天天爽天天| 久久精品99国产精品日本| 777奇米四色成人影视色区| 亚洲区色欧美另类图片| 国产内射xxxxx在线| 婷婷六月天在线| 色就色偷拍综合一二三区| 亚洲AV国产福利精品在现观看| 精品国产精品午夜福利|