動態線程池
@SpringBootAplication 聲明當前類是一個配置類
@Configurable注解用戶標記一個類可以被Spring容器管理
@SpringBootTest用于在SpringBoot應用程序中進行集成測試,允許測試類在真實的應用環境中運行
@RunWith(SpringRunner.class)讓測試運行于Spring測試環境

Redis只是注冊中心的一種實現方式,可以有多種實現方式.
動態修改分布式部署中所運行的線程池,對不同應用的線程池進行監控
項目包括三個模塊,分別是測試工程、組件的sdk和admin管理工程。
test 線程池測試
方法中如何獲得每個線程池中的狀態信息的?
邏輯:springBootApplication啟動應用,加載動態線程組件(pom.xml文件中引入了線程組件作為依賴)

DynamicThreadPoolAutoProperties里的屬性值來自application-dev.yml。
使用@ConfigurationProperties(將配置文件的屬性綁定到java的bean上)實現綁定。


DynamicThreadPoolAutoConfig將會把DynamicThreadPoolAutoProperties里的屬性值綁定到類的實例上。
使用@EnableConfigurationProperties用于啟用對@ConfigurationProperties注解類的支持。它允許將外部配置文件(如application.properties或application.yml)中的屬性綁定到強類型的Java Beans上,從而實現配置的模塊化和可維護性
![]()
config文件夾(創建兩個線程池)
對測試線程池的屬性進行了配置,包括核心線程池、最大線程數、最大等待時間、最大隊列數和拒絕策略,等待時間的單位設置為秒,線程工廠(為線程池中的線程提供統一的配置),本項目中設為默認。
// 創建線程池
return new ThreadPoolExecutor(properties.getCorePoolSize(),
properties.getMaxPoolSize(),
properties.getKeepAliveTime(),
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(properties.getBlockQueueSize()),
Executors.defaultThreadFactory(),
handler);

按照上述代碼創建了兩個線程池,分別為threadPoolExecutors01和threadPoolExecutors02。
starter
pom文件
添加redisson依賴來連接redis作為配置中心,所有應用可以通過redis進行通知的配置和變更;
config包
作為配置的入口,springboot啟動加載需要一個自動化的配置,需要指定,配置入口提供了一個類把需要用到的bean對象實例化出來,通過在spring.factories中添加配置:

DynamicThreadPoolAutoConfig
創建redis客戶端,實例化注冊中心,返回DynamicThreadPoolService對象(如果Redis中有數據,從Redis中獲取最新線程池的核心線程數和最大線程數的數據并對當前線程池對象進行修改),返回ThreadPoolDataReportJob,ThreadPoolConfigAdjustListener以及RTopic(Redisson中實習了發布/訂閱模式的組件)
Redisson中RTopic的使用場景及例子-CSDN博客
domain包
DynamicThreadPoolService類(獲取線程池列表中的信息)
queryThreadPoolList():獲取線程池集合列表中每個線程池的信息,并返回List集合。
queryThreadPoolConfigByName():通過線程池的名字獲取到該線程池的信息,并返回ThreadPoolConfigEntity實體。
updateThreadPoolConfig():根據傳入的ThreadPoolConfigEntity實體更新線程池的信息。
registry包
Redisson是基于Redis實現的并發庫,是高級的分布式數據結構和并發工具,提供鎖,信號量等并發組件。
Redis 和 Redisson 到底是啥關系 解開這個迷團Redis 與 Redisson: 探索他們的關系與共生之道 - 掘金 (juejin.cn)
RedisRegistry類(將線程池列表信息上報到Redisson注冊中心)
reportThreadPool(List<ThreadPoolConfigEntity> threadPoolEntites):通過傳入的參數更新redisson。
reportThreadPoolConfigParameter(ThreadPoolConfigEntity threadPoolConfigEntity):上報線程池的配置參數更新redisson
trigger包(觸發器)
ThreadPoolDataReportJob類
實例化上面的兩個類,分別用于獲得當前線程池的信息并上報到Redisson。
//通過服務拿數據
private final IDynamicThreadPoolService dynamicThreadPoolService;
//通過redis注冊中心上報
private final IRegistry registry;
public ThreadPoolDataReportJob(IDynamicThreadPoolService dynamicThreadPoolService,IRegistry registry){
this.dynamicThreadPoolService=dynamicThreadPoolService;
this.registry=registry;
}
@Scheduled(cron = "0/20 * * * * ?")
public void execReportThreadPoolList(){
//通過service服務方法獲得當前線程池的狀態
List<ThreadPoolConfigEntity> threadPoolConfigEntities = dynamicThreadPoolService.queryThreadPoolList();
//向注冊中心上報
registry.reportThreadPool(threadPoolConfigEntities);
log.info("動態線程池,上報線程池信息:{}", JSON.toJSONString(threadPoolConfigEntities));
for (ThreadPoolConfigEntity threadPoolConfigEntity :
threadPoolConfigEntities) {
registry.reportThreadPoolConfigParameter(threadPoolConfigEntity);
log.info("動態線程池,上報線程池配置:{}", JSON.toJSONString(threadPoolConfigEntity));
}
}
實現實時監聽當前線程池信息并上報。
ThreadPoolConfigAdjustListener類(更新線程池屬性)
//消息監聽器 每次產生的新消息會放在第二個參數里
public void onMessage(CharSequence charSequence, ThreadPoolConfigEntity threadPoolConfigEntity) {
log.info("動態線程池,調整線程池配置。線程池名稱:{} 核心線程數:{} 最大線程數:{}", threadPoolConfigEntity.getThreadPoolName(), threadPoolConfigEntity.getPoolSize(), threadPoolConfigEntity.getMaximumPoolSize());
//更新線程池
dynamicThreadPoolService.updateThreadPoolConfig(threadPoolConfigEntity);
//上報最新數據到注冊中心
List<ThreadPoolConfigEntity> threadPoolConfigEntities = dynamicThreadPoolService.queryThreadPoolList();
registry.reportThreadPool(threadPoolConfigEntities);
ThreadPoolConfigEntity threadPoolConfigEntityCurrent = dynamicThreadPoolService.queryThreadPoolConfigByName(threadPoolConfigEntity.getThreadPoolName());
registry.reportThreadPoolConfigParameter(threadPoolConfigEntityCurrent);
redisson消息訂閱的流程:
在組件中創建Redisson的配置并連接到本地的Redis服務器;
在Controller包下的更新線程池屬性事件中調用getTop()獲取一個發布頻道,并使用publish()發送更新時傳入的線程池實體;
創建訂閱消息的監聽器,通過實現MessageListner接口,并重寫onMessage方法來處理消息;
在DynamicThreadPoolAutoConfig里創建dynamicThreadPoolRedisTopic類實例對象,使用getTopic(“頻道名”)方法獲取訂閱頻道,并使用addListner()添加之前創建的監聽器,返回topic對象
resources中的spring.factories文件:指定需要spring動態加載配置類的入口地址


浙公網安備 33010602011771號