springboot~ApplicationContextAware和Interceptor產(chǎn)生了真感情
看著題目,有點(diǎn)一頭污水吧,事實(shí)上,沒有經(jīng)歷過,很難去說ApplicationContextAware在什么時(shí)候會(huì)用到,直接在一個(gè)bean對(duì)象里,你可以直接使用構(gòu)造方法注入或者Autowired屬性注入的方式來使用其它的bean對(duì)象,這在springboot里是非常自然的,也是天然支持的;但如果你的這個(gè)bean不是由spring ioc自動(dòng)注入的,而是通過攔截器動(dòng)態(tài)配置的,這時(shí)你使用@Autowired時(shí),是無法獲取到其它bean對(duì)象的;這時(shí)你需要使用ApplicationContextAware接口,再定義一個(gè)靜態(tài)的ApplicationContext實(shí)例,在你的攔截器執(zhí)行方法里使用它就可以了。【應(yīng)該和攔截器里的動(dòng)態(tài)代理有關(guān)】
一個(gè)kafka的ConsumerInterceptor實(shí)例
在這個(gè)例子中,我們通過ConsumerInterceptor實(shí)現(xiàn)了一個(gè)TTL的延時(shí)隊(duì)列,當(dāng)topic過期時(shí),再通過KafkaTemplate將消息轉(zhuǎn)發(fā)到其它隊(duì)列里
- DelayPublisher.publish發(fā)送延時(shí)topic的方法
/**
* 發(fā)送延時(shí)消息
* @param message 消息體
* @param delaySecondTime 多個(gè)秒后過期
* @param delayTopic 過期后發(fā)送到的話題
*/
public void publish(String message, long delaySecondTime, String delayTopic) {
ProducerRecord producerRecord = new ProducerRecord<>(topic, 0, System.currentTimeMillis(), delayTopic, message,
new RecordHeaders().add(new RecordHeader("ttl", toBytes(delaySecondTime))));
kafkaTemplate.send(producerRecord);
}
- ConsumerInterceptorTTL
/**
* @author lind
* @date 2023/8/18 8:33
* @since 1.0.0
*/
@Component
public class ConsumerInterceptorTTL implements ConsumerInterceptor<String, String>, ApplicationContextAware {
// 靜態(tài)化的上下文,用于獲取bean,因?yàn)镃onsumerInterceptor是通過反射創(chuàng)建的,所以無法通過注入的方式獲取bean
private static ApplicationContext applicationContext;
@Override
public ConsumerRecords<String, String> onConsume(ConsumerRecords<String, String> records) {
long now = System.currentTimeMillis();
Map<TopicPartition, List<ConsumerRecord<String, String>>> newRecords = new HashMap<>();
for (TopicPartition tp : records.partitions()) {
List<ConsumerRecord<String, String>> tpRecords = records.records(tp);
List<ConsumerRecord<String, String>> newTpRecords = new ArrayList<>();
for (ConsumerRecord<String, String> record : tpRecords) {
Headers headers = record.headers();
long ttl = -1;
for (Header header : headers) {
if (header.key().equals("ttl")) {
ttl = toLong(header.value());
}
}
// 消息超時(shí)判定
if (ttl > 0 && now - record.timestamp() < ttl * 1000) {
// 可以放在死信隊(duì)列中
System.out.println("消息超時(shí)了,需要發(fā)到topic:" + record.key());
KafkaTemplate kafkaTemplate = applicationContext.getBean(KafkaTemplate.class);
kafkaTemplate.send(record.key(), record.value());
}
else { // 沒有設(shè)置TTL,不需要超時(shí)判定
newTpRecords.add(record);
}
}
if (!newRecords.isEmpty()) {
newRecords.put(tp, newTpRecords);
}
}
return new ConsumerRecords<>(newRecords);
}
@Override
public void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets) {
offsets.forEach((tp, offset) -> System.out.println(tp + ":" + offset.offset()));
}
@Override
public void close() {
}
@Override
public void configure(Map<String, ?> configs) {
}
// 它的時(shí)機(jī)是在KafkaListenerAnnotationBeanPostProcessor的postProcessAfterInitialization方法中,applicationContext應(yīng)該定時(shí)成static,否則在實(shí)例對(duì)象中,它的值可能是空
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
this.applicationContext = applicationContext;
}
}
- 配置文件中注入攔截器
spring:
kafka:
consumer:
properties:
interceptor.classes: com.example.ConsumerInterceptorTTL
浙公網(wǎng)安備 33010602011771號(hào)