Sentinel 源碼學(xué)習(xí)
引入依賴
<dependency>
<groupId>com.alibaba.csp</groupId>
<artifactId>sentinel-core</artifactId>
<version>1.8.7</version>
</dependency>
基本用法
try (Entry entry = SphU.entry("HelloWorld")) {
// 被保護(hù)的邏輯
System.out.println("hello world");
} catch (BlockException ex) {
// 處理被流控的邏輯
System.out.println("blocked!");
}
接下來,閱讀源碼,我們從SphU.entry()開始






每個(gè)SphU#entry()將返回一個(gè)Entry。這個(gè)類維護(hù)了當(dāng)前調(diào)用的一些信息:
- createTime :這個(gè)entry的創(chuàng)建時(shí)間,用于響應(yīng)時(shí)間統(tǒng)計(jì)
- current Node :在當(dāng)前上下文中的資源的統(tǒng)計(jì)
- origin Node :原始節(jié)點(diǎn)的統(tǒng)計(jì)
- ResourceWrapper :資源名稱
CtSph#entryWithPriority()方法就是整個(gè)流控的基本流程:
1、首先,獲取當(dāng)前線程上下文,如果為空,則創(chuàng)建一個(gè)
2、然后,查找處理器鏈
3、最后,依次執(zhí)行處理器
這是一個(gè)典型的責(zé)任鏈
接下來,挨個(gè)來看,首先看一下上下文。上下文是一個(gè)線程局部變量 ThreadLocal<Context>

如果當(dāng)前線程還沒有上下文,則創(chuàng)建一個(gè)


有了Context之后,接下來查找處理器





這些功能插槽(slot chain)有不同的職責(zé):
- NodeSelectorSlot :負(fù)責(zé)收集資源的路徑,并將這些資源的調(diào)用路徑,以樹狀結(jié)構(gòu)存儲(chǔ)起來,用于根據(jù)調(diào)用路徑來限流降級(jí);
- ClusterBuilderSlot :用于存儲(chǔ)資源的統(tǒng)計(jì)信息以及調(diào)用者信息,例如該資源的 RT, QPS, thread count 等等,這些信息將用作為多維度限流,降級(jí)的依據(jù);
- StatisticSlot :用于記錄、統(tǒng)計(jì)不同緯度的 runtime 指標(biāo)監(jiān)控信息;
- FlowSlot :用于根據(jù)預(yù)設(shè)的限流規(guī)則以及前面 slot 統(tǒng)計(jì)的狀態(tài),來進(jìn)行流量控制;
- AuthoritySlot :根據(jù)配置的黑白名單和調(diào)用來源信息,來做黑白名單控制;
- DegradeSlot :通過統(tǒng)計(jì)信息以及預(yù)設(shè)的規(guī)則,來做熔斷降級(jí);
- SystemSlot :通過系統(tǒng)的狀態(tài),例如 load1 等,來控制總的入口流量;
到這里為止,資源有了,上下文有了,處理器鏈有了,于是,接下來就可以對(duì)資源應(yīng)用所有的處理器了












關(guān)于功能插槽的學(xué)習(xí)就先到這里,下面補(bǔ)充一個(gè)知識(shí)點(diǎn):Node

Node 用于保存資源的實(shí)時(shí)統(tǒng)計(jì)信息
StatisticNode 保存三種實(shí)時(shí)統(tǒng)計(jì)指標(biāo):
- 秒級(jí)指標(biāo)
- 分鐘級(jí)指標(biāo)
- 線程數(shù)
DefaultNode 用于保存特定上下文中特定資源名稱的統(tǒng)計(jì)信息
EntranceNode 代表調(diào)用樹的入口
總之一句話,Node是用于保存統(tǒng)計(jì)信息的。那么,這些指標(biāo)數(shù)據(jù)是如何計(jì)數(shù)的呢?


Sentinel 使用滑動(dòng)窗口實(shí)時(shí)記錄和統(tǒng)計(jì)資源指標(biāo)。ArrayMetric背后的滑動(dòng)窗口基礎(chǔ)結(jié)構(gòu)是LeapArray。
下面重點(diǎn)看一下StatisticNode
StatisticNode是用于實(shí)時(shí)統(tǒng)計(jì)的處理器插槽。在進(jìn)入這個(gè)槽位時(shí),需要分別計(jì)算以下信息:
- ClusterNode :該資源ID的集群節(jié)點(diǎn)統(tǒng)計(jì)信息總和
- Origin node :來自不同調(diào)用者/起源的集群節(jié)點(diǎn)的統(tǒng)計(jì)信息
- DefaultNode :特定上下文中特定資源名稱的統(tǒng)計(jì)信息
- 最后,是所有入口的總和統(tǒng)計(jì)







private int calculateTimeIdx(/*@Valid*/ long timeMillis) {
long timeId = timeMillis / windowLengthInMs;
// Calculate current index so we can map the timestamp to the leap array.
return (int)(timeId % array.length());
}
protected long calculateWindowStart(/*@Valid*/ long timeMillis) {
return timeMillis - timeMillis % windowLengthInMs;
}
/**
* Get bucket item at provided timestamp.
*
* @param timeMillis a valid timestamp in milliseconds
* @return current bucket item at provided timestamp if the time is valid; null if time is invalid
*/
public WindowWrap<T> currentWindow(long timeMillis) {
if (timeMillis < 0) {
return null;
}
int idx = calculateTimeIdx(timeMillis);
// Calculate current bucket start time.
long windowStart = calculateWindowStart(timeMillis);
/*
* Get bucket item at given time from the array.
*
* (1) Bucket is absent, then just create a new bucket and CAS update to circular array.
* (2) Bucket is up-to-date, then just return the bucket.
* (3) Bucket is deprecated, then reset current bucket.
*/
while (true) {
WindowWrap<T> old = array.get(idx);
if (old == null) {
/*
* B0 B1 B2 NULL B4
* ||_______|_______|_______|_______|_______||___
* 200 400 600 800 1000 1200 timestamp
* ^
* time=888
* bucket is empty, so create new and update
*
* If the old bucket is absent, then we create a new bucket at {@code windowStart},
* then try to update circular array via a CAS operation. Only one thread can
* succeed to update, while other threads yield its time slice.
*/
WindowWrap<T> window = new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));
if (array.compareAndSet(idx, null, window)) {
// Successfully updated, return the created bucket.
return window;
} else {
// Contention failed, the thread will yield its time slice to wait for bucket available.
Thread.yield();
}
} else if (windowStart == old.windowStart()) {
/*
* B0 B1 B2 B3 B4
* ||_______|_______|_______|_______|_______||___
* 200 400 600 800 1000 1200 timestamp
* ^
* time=888
* startTime of Bucket 3: 800, so it's up-to-date
*
* If current {@code windowStart} is equal to the start timestamp of old bucket,
* that means the time is within the bucket, so directly return the bucket.
*/
return old;
} else if (windowStart > old.windowStart()) {
/*
* (old)
* B0 B1 B2 NULL B4
* |_______||_______|_______|_______|_______|_______||___
* ... 1200 1400 1600 1800 2000 2200 timestamp
* ^
* time=1676
* startTime of Bucket 2: 400, deprecated, should be reset
*
* If the start timestamp of old bucket is behind provided time, that means
* the bucket is deprecated. We have to reset the bucket to current {@code windowStart}.
* Note that the reset and clean-up operations are hard to be atomic,
* so we need a update lock to guarantee the correctness of bucket update.
*
* The update lock is conditional (tiny scope) and will take effect only when
* bucket is deprecated, so in most cases it won't lead to performance loss.
*/
if (updateLock.tryLock()) {
try {
// Successfully get the update lock, now we reset the bucket.
return resetWindowTo(old, windowStart);
} finally {
updateLock.unlock();
}
} else {
// Contention failed, the thread will yield its time slice to wait for bucket available.
Thread.yield();
}
} else if (windowStart < old.windowStart()) {
// Should not go through here, as the provided time is already behind.
return new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));
}
}
}
現(xiàn)在,有2個(gè)窗口,每個(gè)窗口500ms,2個(gè)窗口總共1000ms
假設(shè),當(dāng)前時(shí)間戳是1200ms,那么 (1200 / 500) % 2 = 0, 1200 - 1200 % 500 = 1000
這個(gè)時(shí)候,如果0這個(gè)位置沒有窗口,則創(chuàng)建一個(gè)新的窗口,新窗口的窗口開始時(shí)間是1000ms
如果0這個(gè)位置有窗口,則繼續(xù)判斷舊窗口的窗口開始時(shí)間是否為1000ms,如果是,則表示窗口沒有過期,直接返回該窗口。如果舊窗口的開始時(shí)間小于1000ms,則表示舊窗口過期了,于是重置舊窗口的統(tǒng)計(jì)數(shù)據(jù),重新設(shè)置窗口開始時(shí)間(PS:相當(dāng)于將窗口向后移動(dòng))


窗口(桶)數(shù)據(jù)保存在MetricBucket中




總結(jié)一下:
1、每個(gè)線程過來之后,創(chuàng)建上下文,然后依次經(jīng)過各個(gè)功能插槽
2、每個(gè)資源都有自己的處理器鏈,也就是說多次訪問同一個(gè)資源時(shí),用的同一套處理器鏈(插槽)
3、Node相當(dāng)于是一個(gè)載體,用于保存資源的實(shí)時(shí)統(tǒng)計(jì)信息
4、第一次進(jìn)入插槽后,創(chuàng)建一個(gè)新Node,后面再補(bǔ)充Node的信息;第二次進(jìn)入的時(shí)候,由于上下文的名稱都是一樣的,所以不會(huì)再創(chuàng)建Node,而是用之前的Node,也就是還是在之前的基礎(chǔ)上記錄統(tǒng)計(jì)信息。可以這樣理解,每個(gè)DefaultNode就對(duì)應(yīng)一個(gè)特定的資源。
5、StatisticNode中保存三種類型的指標(biāo)數(shù)據(jù):每秒的指標(biāo)數(shù)據(jù),每分鐘的指標(biāo)數(shù)據(jù),線程數(shù)。
6、指標(biāo)數(shù)據(jù)統(tǒng)計(jì)采用滑動(dòng)窗口,利用當(dāng)前時(shí)間戳和窗口長(zhǎng)度計(jì)算數(shù)據(jù)應(yīng)該落在哪個(gè)窗口數(shù)組區(qū)間,通過窗口開始時(shí)間判斷窗口是否過期。實(shí)際數(shù)據(jù)保存在MetricBucket中
最后,千言萬語匯聚成這張?jiān)韴D

NodeSelectorSlot構(gòu)造調(diào)用鏈路,ClusterBuilderSlot構(gòu)造統(tǒng)計(jì)節(jié)點(diǎn),StatisticSlot利用滑動(dòng)窗口進(jìn)行指標(biāo)統(tǒng)計(jì),然后是流量控制
參考文檔
https://sentinelguard.io/zh-cn/docs/quick-start.html
https://sentinelguard.io/zh-cn/docs/basic-implementation.html
https://sentinelguard.io/zh-cn/docs/dashboard.html
https://blog.csdn.net/xiaolyuh123/article/details/107937353
http://www.rzrgm.cn/magexi/p/13124870.html
http://www.rzrgm.cn/mrxiaobai-wen/p/14212637.html
http://www.rzrgm.cn/taromilk/p/11750962.html
http://www.rzrgm.cn/taromilk/p/11751000.html
http://www.rzrgm.cn/wekenyblog/p/17519276.html

浙公網(wǎng)安備 33010602011771號(hào)