Quartz集群增強版_02.任務輪詢及優化??
Quartz集群增強版_02.任務輪詢及優化
開源地址 https://github.com/funnyzpc/quartz
任務輪詢的主要工作是按固定頻度(時間5s)去執行項表撈未來5s內將要執行的任務,輪詢這些任務待到執行時間點時將任務扔到線程池去執行。
看似很簡單其實也有各種各樣的問題存在,這里不表 請往下看 ~
另外,任務輪詢的主要邏輯在:QuartzSchedulerThread ,讀者有興趣可以看看源碼~
輪詢窗口內的任務
情況是這樣子的,先看圖:
假使,現在有一個任務 task1 ,他的執行時間是每2秒執行一次,但是記錄執行項里面只會存一個下一次執行時間(next_fire_time),落在上圖就是2s的位置,這樣在每5秒輪詢一次的時候會漏掉一次執行(4s的位置)
這個問題解決起來其實很簡單,就是每次從db獲取到的執行項再做計算,除當前次外 5s 內的執行的時間全部計算出來,這其中尤其要注意的是同一個時間項在當前次內有多次執行的一定要有順序!
在后續會有循環等待,但在特殊情況下,用上圖說:由于同批次其他任務存在延遲(假如延遲大于等于2s) ,這時候4s時的這個任務可能早于 2s 時的任務執行,同時又由于 4s 時的任務的 參照時間是 2s 時的任務的時間(pre_fire_time) ??(可能很難理解吧,建議看看后續update語句)
在被扔到線程池前,數據庫由于 2s 時的任務并沒有執行,數據庫里面存的是 0s 時的任務配置,從而就會導致4s時的任務不會執行(因為他競爭不到鎖)(2s任務參照的是0s時的任務 4s參照的是2s時的任務),這是很嚴重的問題; 如果任務是有序的且計算出來的4s時的任務總是排在 2s 時的任務之后,即使其他任務存在延遲,也會相應保證后續時間點兒任務正常執行,很大程度避免了任務丟失~
獲取執行權限(獲取鎖)
因為存在集群并發的問題,所以一個任務同一時間必須只由一個節點來執行,同時也為了保證執行順序 所以在任務被丟到線程池之前需要在數據庫 做一個 UPDATE 的競爭操作,具體SQL語句如下:
UPDATE
QRTZ_EXECUTE SET
PREV_FIRE_TIME =? ,
NEXT_FIRE_TIME = ?,
TIME_TRIGGERED =?,
STATE =?,
HOST_IP =?,
HOST_NAME =?,
END_TIME =?
WHERE ID = ?
AND STATE = ? -- old STATE
AND PREV_FIRE_TIME = ? -- old PREV_FIRE_TIME
AND NEXT_FIRE_TIME = ? -- old NEXT_FIRE_TIME
可以看到,必須是被更新記錄必須是要對齊 STATE、 PREV_FIRE_TIME、 NEXT_FIRE_TIME 才可更新~
使用動態線程池
Quartz 一般使用的是 SimpleThreadPool 作為其任務的線程池,既然簡單必然是: 內部使用固定線程處理
一開始,我是準備就著源碼做部分改動來著,后來發現沒這邊簡單,原 Quartz 在獲取鎖的
時候會使用線程本地變量(ThreadLocal) 緩存 執行線程 以做并發控制,后來不得已將邏輯大部分推翻做重構,這是很大的變化; 現在,對于 Quartz集群增強版 來說,不再有 ThreadLocal 的困擾, 只需關注自身 執行線程池配置的實現邏輯即可,這就有了 MeeThreadPool 不僅有了線程分配控制也有了隊列,這是一大變化,現在你可以使用 MeeThreadPool 也可以繼續使用 SimpleThreadPool ~
這是 MeeThreadPool 的主要邏輯:
protected void createWorkerThreads(final int createCount) {
int cct = this.count = createCount<1? Runtime.getRuntime().availableProcessors() :createCount;
final MyThreadFactory myThreadFactory = new MyThreadFactory(this.getThreadNamePrefix(), this);
this.poolExecutor = new ThreadPoolExecutor(cct<=4?2:cct-2,cct+2,6L, TimeUnit.SECONDS, new LinkedBlockingDeque(cct+2),myThreadFactory);
}
private final class MyThreadFactory implements ThreadFactory {
final String threadPrefix ;//= schedulerInstanceName + "_QRTZ_";
final MeeThreadPool meeThreadPool;
private final AtomicInteger threadNumber = new AtomicInteger(1);
public MyThreadFactory(final String threadPrefix,final MeeThreadPool meeThreadPool) {
this.threadPrefix = threadPrefix;
this.meeThreadPool = meeThreadPool;
}
@Override
public Thread newThread(Runnable r) {
WorkerThread wth = new WorkerThread(
meeThreadPool,
threadGroup,
threadPrefix + ((threadNumber.get())==count?threadNumber.getAndSet(1):threadNumber.getAndIncrement()),
getThreadPriority(),
isMakeThreadsDaemons(),
r);
if (isThreadsInheritContextClassLoaderOfInitializingThread()) {
wth.setContextClassLoader(Thread.currentThread().getContextClassLoader());
}
return wth;
}
}
伸縮性以及可用性有了大大的提高,需要提一嘴的是 如果使用 ThreadPoolExecutor 開發 Quartz 線程池一定要注意:
- 核心線程打滿之后
task一定是先進入隊列 - 隊列滿了之后才會依次創建線程直至最大線程數
- 一定要注意是否有線程被打滿后的異常拒絕處理策略,如果不希望出現異常拒絕 那是否要考慮在提交任務之前判斷線程池是否被打滿
- 開發完成一定要進行廣泛的測試,以符合預期
輪詢超時/執行超時問題
在JVM執行GC或者DB或者網絡存在故障,亦或是主機性能存在瓶頸,或是線程池被打滿 ... 等等,均會出現超時的問題,對于此類問題本 Quartz集群增強版 做了以下優化:
- 做了容忍度偏移,讓任務不拘泥于幾毫秒的差異提前執行
//1.時間偏移(6毫秒)
long ww = executeList.size()-1000<0 ? 4L : ((executeList.size()-1000L)/2000L)+4L ;
ww= Math.min(ww, 8L);
while( !executeList.isEmpty() && (System.currentTimeMillis()-now)<=LOOP_INTERVAL ){
long _et = System.currentTimeMillis();
QrtzExecute ce = null; // executeList.get(0);
for( int i = 0;i< executeList.size();i++ ){
QrtzExecute el = executeList.get(i);
// 這是要馬上執行的任務
if( el.getNextFireTime()-_et <= ww){
ce=el;
break;
}
if(i==0){
ce=el;
continue; // 如果執行列表長度為一,則會直接進入下面sleep等待
}
// 總是獲取最近時間呢個
if( el.getNextFireTime() <= ce.getNextFireTime() ){
ce = el;
}
}
executeList.remove(ce); // 一定要移除,否則無法退出while循環!!!
// 延遲
long w = 0;
if((w = (ce.getNextFireTime()-System.currentTimeMillis()-ww)) >0 ){
try {
Thread.sleep(w);
}catch (Exception e){
}
}
// 后續代碼略
}
- 對于任務輪詢,保證輪詢時間間隔的同時也做了偏移修正
// 延遲
long st = 0;
if((st = (LOOP_INTERVAL-(System.currentTimeMillis()-now)-2)) >0 ){
try {
Thread.sleep(st);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
if( st<-10 && st%5==0 ){
LOG.error("當前次任務輪詢超時:"+st);
}
// 防止因輪詢超時的必要手段
now = st<-1000?
System.currentTimeMillis()/1000*1000 :
System.currentTimeMillis()+(st<-10?st:0);
-
對于事實的延遲做了任務修正
這個修正主要依賴于
ClusterMisfireHandler的輪詢處理,以保證后續中斷的任務能及時恢復~
對于偏移,需要解釋下: 偏移是對于整個循環而言的,任務循環一次是 5s ,由于寫表或任務提交可能造成整個循環會有 幾毫秒或幾十毫秒的偏差 ,這是向后偏移,如果任務提前執行完成 則整個循環可能不足 5s 這是向前偏差 ~
不管是向前還是向后都是需要避免的~
最后
為了更清楚的了解 Quartz集群增強版 建議過一遍結構圖:




浙公網安備 33010602011771號