Dubbo 源碼分析 - 集群容錯之 Directory
1. 簡介
前面文章分析了服務(wù)的導(dǎo)出與引用過程,從本篇文章開始,我將開始分析 Dubbo 集群容錯方面的源碼。這部分源碼包含四個部分,分別是服務(wù)目錄 Directory、服務(wù)路由 Router、集群 Cluster 和負(fù)載均衡 LoadBalance。這幾個部分的源碼邏輯比較獨立,我會分四篇文章進行分析。本篇文章作為集群容錯的開篇文章,將和大家一起分析服務(wù)目錄相關(guān)的源碼。在進行深入分析之前,我們先來了解一下服務(wù)目錄是什么。服務(wù)目錄中存儲了一些和服務(wù)提供者有關(guān)的信息,通過服務(wù)目錄,服務(wù)消費者可獲取到服務(wù)提供者的信息,比如 ip、端口、服務(wù)協(xié)議等。通過這些信息,服務(wù)消費者就可通過 Netty 等客戶端進行遠(yuǎn)程調(diào)用。在一個服務(wù)集群中,服務(wù)提供者數(shù)量并不是一成不變的,如果集群中新增了一臺機器,相應(yīng)地在服務(wù)目錄中就要新增一條服務(wù)提供者記錄。或者,如果服務(wù)提供者的配置修改了,服務(wù)目錄中的記錄也要做相應(yīng)的更新。如果這樣說,服務(wù)目錄和注冊中心的功能不就雷同了嗎。確實如此,這里這么說是為了方便大家理解。實際上服務(wù)目錄在獲取注冊中心的服務(wù)配置信息后,會為每條配置信息生成一個 Invoker 對象,并把這個 Invoker 對象存儲起來,這個 Invoker 才是服務(wù)目錄最終持有的對象。Invoker 有什么用呢?看名字就知道了,這是一個具有遠(yuǎn)程調(diào)用功能的對象。講到這大家應(yīng)該知道了什么是服務(wù)目錄了,它可以看做是 Invoker 集合,且這個集合中的元素會隨注冊中心的變化而進行動態(tài)調(diào)整。
好了,關(guān)于服務(wù)目錄這里就先介紹這些,大家先有個大致印象即可。接下來我們通過繼承體系圖來了解一下服務(wù)目錄的家族成員都有哪些。
2. 繼承體系
服務(wù)目錄目前內(nèi)置的實現(xiàn)有兩個,分別為 StaticDirectory 和 RegistryDirectory,它們均是 AbstractDirectory 的子類。AbstractDirectory 實現(xiàn)了 Directory 接口,這個接口包含了一個重要的方法定義,即 list(Invocation),用于列舉 Invoker。下面我們來看一下他們的繼承體系圖。

如上,Directory 繼承自 Node 接口,Node 這個接口繼承者比較多,像 Registry、Monitor、Invoker 等繼承了這個接口。這個接口包含了一個獲取配置信息的方法 getUrl,實現(xiàn)該接口的類可以向外提供配置信息。另外,大家注意看 RegistryDirectory 實現(xiàn)了 NotifyListener 接口,當(dāng)注冊中心節(jié)點信息發(fā)生變化后,RegistryDirectory 可以通過此接口方法得到變更信息,并根據(jù)變更信息動態(tài)調(diào)整內(nèi)部 Invoker 列表。
現(xiàn)在大家對服務(wù)目錄的繼承體系應(yīng)該比較清楚了,下面我們深入到源碼中,探索服務(wù)目錄是如何實現(xiàn)的。
3. 源碼分析
本章我將分析 AbstractDirectory 和它兩個子類的源碼。這里之所以要分析 AbstractDirectory,而不是直接分析子類是有一定原因的。AbstractDirectory 封裝了 Invoker 列舉流程,具體的列舉邏輯則由子類實現(xiàn),這是典型的模板模式。所以,接下來我們先來看一下 AbstractDirectory 的源碼。
public List<Invoker<T>> list(Invocation invocation) throws RpcException {
if (destroyed) {
throw new RpcException("Directory already destroyed...");
}
// 調(diào)用 doList 方法列舉 Invoker,這里的 doList 是模板方法,由子類實現(xiàn)
List<Invoker<T>> invokers = doList(invocation);
// 獲取路由器
List<Router> localRouters = this.routers;
if (localRouters != null && !localRouters.isEmpty()) {
for (Router router : localRouters) {
try {
// 獲取 runtime 參數(shù),并根據(jù)參數(shù)決定是否進行路由
if (router.getUrl() == null || router.getUrl().getParameter(Constants.RUNTIME_KEY, false)) {
// 進行服務(wù)路由
invokers = router.route(invokers, getConsumerUrl(), invocation);
}
} catch (Throwable t) {
logger.error("Failed to execute router: ...");
}
}
}
return invokers;
}
// 模板方法,由子類實現(xiàn)
protected abstract List<Invoker<T>> doList(Invocation invocation) throws RpcException;
上面就是 AbstractDirectory 的 list 方法源碼,這個方法封裝了 Invoker 的列舉過程。如下:
- 調(diào)用 doList 獲取 Invoker 列表
- 根據(jù) Router 的 getUrl 返回值為空與否,以及 runtime 參數(shù)決定是否進行服務(wù)路由
以上步驟中,doList 是模板方法,需由子類實現(xiàn)。Router 的 runtime 參數(shù)這里簡單說明一下,這個參數(shù)決定了是否在每次調(diào)用服務(wù)時都執(zhí)行路由規(guī)則。如果 runtime 為 true,那么每次調(diào)用服務(wù)前,都需要進行服務(wù)路由。這個對性能造成影響,慎重配置。關(guān)于該參數(shù)更詳細(xì)的說明,請參考官方文檔。
介紹完 AbstractDirectory,接下來我們開始分析子類的源碼。
3.1 StaticDirectory
StaticDirectory 即靜態(tài)服務(wù)目錄,顧名思義,它內(nèi)部存放的 Invoker 是不會變動的。所以,理論上它和不可變 List 的功能很相似。下面我們來看一下這個類的實現(xiàn)。
public class StaticDirectory<T> extends AbstractDirectory<T> {
// Invoker 列表
private final List<Invoker<T>> invokers;
// 省略構(gòu)造方法
@Override
public Class<T> getInterface() {
// 獲取接口類
return invokers.get(0).getInterface();
}
// 檢測服務(wù)目錄是否可用
@Override
public boolean isAvailable() {
if (isDestroyed()) {
return false;
}
for (Invoker<T> invoker : invokers) {
if (invoker.isAvailable()) {
// 只要有一個 Invoker 是可用的,就任務(wù)當(dāng)前目錄是可用的
return true;
}
}
return false;
}
@Override
public void destroy() {
if (isDestroyed()) {
return;
}
// 調(diào)用父類銷毀邏輯
super.destroy();
// 遍歷 Invoker 列表,并執(zhí)行相應(yīng)的銷毀邏輯
for (Invoker<T> invoker : invokers) {
invoker.destroy();
}
invokers.clear();
}
@Override
protected List<Invoker<T>> doList(Invocation invocation) throws RpcException {
// 列舉 Inovker,也就是直接返回 invokers 成員變量
return invokers;
}
}
以上就是 StaticDirectory 的代碼邏輯,很簡單,大家都能看懂,我就不多說了。下面來看看 RegistryDirectory,這個類的邏輯比較復(fù)雜。
3.2 RegistryDirectory
RegistryDirectory 是一種動態(tài)服務(wù)目錄,它實現(xiàn)了 NotifyListener 接口。當(dāng)注冊中心服務(wù)配置發(fā)生變化后,RegistryDirectory 可收到與當(dāng)前服務(wù)相關(guān)的變化。收到變更通知后,RegistryDirectory 可根據(jù)配置變更信息刷新 Invoker 列表。RegistryDirectory 中有幾個比較重要的邏輯,第一是 Invoker 的列舉邏輯,第二是接受服務(wù)配置變更的邏輯,第三是 Invoker 的刷新邏輯。接下來,我將按順序?qū)@三塊邏輯。
3.2.1 列舉 Invoker
Invoker 列舉邏輯封裝在 doList 方法中,這是個模板方法,前面已經(jīng)介紹過了。那這里就不過多啰嗦了,我們直入主題吧。
public List<Invoker<T>> doList(Invocation invocation) {
if (forbidden) {
// 服務(wù)提供者關(guān)閉或禁用了服務(wù),此時拋出 No provider 異常
throw new RpcException(RpcException.FORBIDDEN_EXCEPTION,
"No provider available from registry ...");
}
List<Invoker<T>> invokers = null;
// 獲取 Invoker 本地緩存
Map<String, List<Invoker<T>>> localMethodInvokerMap = this.methodInvokerMap;
if (localMethodInvokerMap != null && localMethodInvokerMap.size() > 0) {
// 獲取方法名和參數(shù)列表
String methodName = RpcUtils.getMethodName(invocation);
Object[] args = RpcUtils.getArguments(invocation);
// 檢測參數(shù)列表的第一個參數(shù)是否為 String 或 enum 類型
if (args != null && args.length > 0 && args[0] != null
&& (args[0] instanceof String || args[0].getClass().isEnum())) {
// 通過 方法名 + 第一個參數(shù)名稱 查詢 Invoker 列表,具體的使用場景暫時沒想到
invokers = localMethodInvokerMap.get(methodName + "." + args[0]);
}
if (invokers == null) {
// 通過方法名獲取 Invoker 列表
invokers = localMethodInvokerMap.get(methodName);
}
if (invokers == null) {
// 通過星號 * 獲取 Invoker 列表
invokers = localMethodInvokerMap.get(Constants.ANY_VALUE);
}
if (invokers == null) {
Iterator<List<Invoker<T>>> iterator = localMethodInvokerMap.values().iterator();
if (iterator.hasNext()) {
// 通過迭代器獲取 Invoker 列表
invokers = iterator.next();
}
}
}
// 返回 Invoker 列表
return invokers == null ? new ArrayList<Invoker<T>>(0) : invokers;
}
以上代碼進行多次嘗試,以期從 localMethodInvokerMap 中獲取到 Invoker 列表。一般情況下,普通的調(diào)用可通過方法名獲取到對應(yīng)的 Invoker 列表,泛化調(diào)用可通過 * 獲取到 Invoker 列表。按現(xiàn)有的邏輯,不管什么情況下,* 到 Invoker 列表的映射關(guān)系 <*, invokers> 總是存在的,也就意味著 localMethodInvokerMap.get(Constants.ANY_VALUE) 總是有值返回。除非這個值是 null,才會通過通過迭代器獲取 Invoker 列表。至于什么情況下為空,我暫時未完全搞清楚,我猜測是被路由規(guī)則(用戶可基于 Router 接口實現(xiàn)自定義路由器)處理后,可能會得到一個 null。目前僅是猜測,未做驗證。
本節(jié)的邏輯主要是從 localMethodInvokerMap 中獲取 Invoker,localMethodInvokerMap 源自 RegistryDirectory 類的成員變量 methodInvokerMap。doList 方法可以看做是對 methodInvokerMap 變量的讀操作,至于對 methodInvokerMap 變量的寫操作,這個將在后續(xù)進行分析。
3.2.2 接收服務(wù)變更通知
RegistryDirectory 是一個動態(tài)服務(wù)目錄,它需要接受注冊中心配置進行動態(tài)調(diào)整。因此 RegistryDirectory 實現(xiàn)了 NotifyListener 接口,通過這個接口獲取注冊中心變更通知。下面我們來看一下具體的邏輯。
public synchronized void notify(List<URL> urls) {
// 定義三個集合,分別用于存放服務(wù)提供者 url,路由 url,配置器 url
List<URL> invokerUrls = new ArrayList<URL>();
List<URL> routerUrls = new ArrayList<URL>();
List<URL> configuratorUrls = new ArrayList<URL>();
for (URL url : urls) {
String protocol = url.getProtocol();
// 獲取 category 參數(shù)
String category = url.getParameter(Constants.CATEGORY_KEY, Constants.DEFAULT_CATEGORY);
// 根據(jù) category 參數(shù)將 url 分別放到不同的列表中
if (Constants.ROUTERS_CATEGORY.equals(category)
|| Constants.ROUTE_PROTOCOL.equals(protocol)) {
// 添加路由器 url
routerUrls.add(url);
} else if (Constants.CONFIGURATORS_CATEGORY.equals(category)
|| Constants.OVERRIDE_PROTOCOL.equals(protocol)) {
// 添加配置器 url
configuratorUrls.add(url);
} else if (Constants.PROVIDERS_CATEGORY.equals(category)) {
// 添加服務(wù)提供者 url
invokerUrls.add(url);
} else {
// 忽略不支持的 category
logger.warn("Unsupported category ...");
}
}
if (configuratorUrls != null && !configuratorUrls.isEmpty()) {
// 將 url 轉(zhuǎn)成 Configurator
this.configurators = toConfigurators(configuratorUrls);
}
if (routerUrls != null && !routerUrls.isEmpty()) {
// 將 url 轉(zhuǎn)成 Router
List<Router> routers = toRouters(routerUrls);
if (routers != null) {
setRouters(routers);
}
}
List<Configurator> localConfigurators = this.configurators;
this.overrideDirectoryUrl = directoryUrl;
if (localConfigurators != null && !localConfigurators.isEmpty()) {
for (Configurator configurator : localConfigurators) {
// 配置 overrideDirectoryUrl
this.overrideDirectoryUrl = configurator.configure(overrideDirectoryUrl);
}
}
// 刷新 Invoker 列表
refreshInvoker(invokerUrls);
}
如上,notify 方法首先是根據(jù) url 的 category 參數(shù)對 url 進行分門別類存儲,然后通過 toRouters 和 toConfigurators 將 url 列表轉(zhuǎn)成 Router 和 Configurator 列表。最后調(diào)用 refreshInvoker 方法刷新 Invoker 列表。這里的 toRouters 和 toConfigurators 方法邏輯不復(fù)雜,大家自行分析。接下來,我們把重點放在 refreshInvoker 方法上。
3.2.3 刷新 Invoker 列表
接著上一節(jié)繼續(xù)分析,refreshInvoker 方法是保證 RegistryDirectory 隨注冊中心變化而變化的關(guān)鍵所在。這一塊邏輯比較多,接下來一一進行分析。
private void refreshInvoker(List<URL> invokerUrls) {
// invokerUrls 僅有一個元素,且 url 協(xié)議頭為 empty,此時表示禁用所有服務(wù)
if (invokerUrls != null && invokerUrls.size() == 1 && invokerUrls.get(0) != null
&& Constants.EMPTY_PROTOCOL.equals(invokerUrls.get(0).getProtocol())) {
// 設(shè)置 forbidden 為 true
this.forbidden = true;
this.methodInvokerMap = null;
// 銷毀所有 Invoker
destroyAllInvokers();
} else {
this.forbidden = false;
Map<String, Invoker<T>> oldUrlInvokerMap = this.urlInvokerMap;
if (invokerUrls.isEmpty() && this.cachedInvokerUrls != null) {
// 添加緩存 url 到 invokerUrls 中
invokerUrls.addAll(this.cachedInvokerUrls);
} else {
this.cachedInvokerUrls = new HashSet<URL>();
// 緩存 invokerUrls
this.cachedInvokerUrls.addAll(invokerUrls);
}
if (invokerUrls.isEmpty()) {
return;
}
// 將 url 轉(zhuǎn)成 Invoker
Map<String, Invoker<T>> newUrlInvokerMap = toInvokers(invokerUrls);
// 將 newUrlInvokerMap 轉(zhuǎn)成方法名到 Invoker 列表的映射
Map<String, List<Invoker<T>>> newMethodInvokerMap = toMethodInvokers(newUrlInvokerMap);
// 轉(zhuǎn)換出錯,直接打印異常,并返回
if (newUrlInvokerMap == null || newUrlInvokerMap.size() == 0) {
logger.error(new IllegalStateException("urls to invokers error ..."));
return;
}
// 合并多個組的 Invoker
this.methodInvokerMap = multiGroup ? toMergeMethodInvokerMap(newMethodInvokerMap) : newMethodInvokerMap;
// 保存為本地緩存
this.urlInvokerMap = newUrlInvokerMap;
try {
// 銷毀無用 Invoker
destroyUnusedInvokers(oldUrlInvokerMap, newUrlInvokerMap);
} catch (Exception e) {
logger.warn("destroyUnusedInvokers error. ", e);
}
}
}
上面方法的代碼不是很多,但是邏輯卻不少。首先時根據(jù)入?yún)?invokerUrls 的數(shù)量和協(xié)議頭判斷是否禁用所有的服務(wù),如果禁用,則將 forbidden 設(shè)為 true,并銷毀所有的 Invoker。若不禁用,則將 url 轉(zhuǎn)成 Invoker,得到 <url, Invoker> 的映射關(guān)系。然后進一步進行轉(zhuǎn)換,得到 <methodName, Invoker 列表>。之后進行多組 Invoker 合并操作,并將合并結(jié)果賦值給 methodInvokerMap。methodInvokerMap 變量在 doList 方法中會被用到,doList 會對該變量進行讀操作,在這里是寫操作。當(dāng)新的 Invoker 列表生成后,還要一個重要的工作要做,就是銷毀無用的 Invoker,避免服務(wù)消費者調(diào)用已下線的服務(wù)的服務(wù)。
接下里,我將對上面涉及到的調(diào)用進行分析。按照順序,這里先來分析 url 到 Invoker 的轉(zhuǎn)換過程。
private Map<String, Invoker<T>> toInvokers(List<URL> urls) {
Map<String, Invoker<T>> newUrlInvokerMap = new HashMap<String, Invoker<T>>();
if (urls == null || urls.isEmpty()) {
return newUrlInvokerMap;
}
Set<String> keys = new HashSet<String>();
// 獲取服務(wù)消費端配置的協(xié)議
String queryProtocols = this.queryMap.get(Constants.PROTOCOL_KEY);
for (URL providerUrl : urls) {
if (queryProtocols != null && queryProtocols.length() > 0) {
boolean accept = false;
String[] acceptProtocols = queryProtocols.split(",");
// 檢測服務(wù)提供者協(xié)議是否被服務(wù)消費者所支持
for (String acceptProtocol : acceptProtocols) {
if (providerUrl.getProtocol().equals(acceptProtocol)) {
accept = true;
break;
}
}
if (!accept) {
// 若服務(wù)消費者協(xié)議頭不被消費者所支持,則忽略當(dāng)前 providerUrl
continue;
}
}
// 忽略 empty 協(xié)議
if (Constants.EMPTY_PROTOCOL.equals(providerUrl.getProtocol())) {
continue;
}
// 通過 SPI 檢測服務(wù)端協(xié)議是否被消費端支持
if (!ExtensionLoader.getExtensionLoader(Protocol.class).hasExtension(providerUrl.getProtocol())) {
logger.error(new IllegalStateException("Unsupported protocol..."));
continue;
}
// 合并 url
URL url = mergeUrl(providerUrl);
String key = url.toFullString();
if (keys.contains(key)) {
// 忽略重復(fù) url
continue;
}
keys.add(key);
// 本地 Invoker 緩存列表
Map<String, Invoker<T>> localUrlInvokerMap = this.urlInvokerMap;
Invoker<T> invoker = localUrlInvokerMap == null ? null : localUrlInvokerMap.get(key);
// 緩存未命中
if (invoker == null) {
try {
boolean enabled = true;
if (url.hasParameter(Constants.DISABLED_KEY)) {
// 獲取 disable 配置,并修改 enable 變量
enabled = !url.getParameter(Constants.DISABLED_KEY, false);
} else {
enabled = url.getParameter(Constants.ENABLED_KEY, true);
}
if (enabled) {
// 調(diào)用 refer 獲取 Invoker
invoker = new InvokerDelegate<T>(protocol.refer(serviceType, url), url, providerUrl);
}
} catch (Throwable t) {
logger.error("Failed to refer invoker for interface...");
}
if (invoker != null) {
// 緩存 Invoker 實例
newUrlInvokerMap.put(key, invoker);
}
} else {
// 緩存命中,將 invoker 存儲到 newUrlInvokerMap 中
newUrlInvokerMap.put(key, invoker);
}
}
keys.clear();
return newUrlInvokerMap;
}
toInvokers 方法一開始會對服務(wù)提供者 url 進行檢測,若服務(wù)消費端的配置不支持服務(wù)端的協(xié)議,或服務(wù)端 url 協(xié)議頭為 empty 時,toInvokers 均會忽略服務(wù)提供方 url。必要的檢測做完后,緊接著是合并 url,然后訪問緩存,嘗試獲取與 url 對應(yīng)的 invoker。如果緩存命中,直接將 Invoker 存入 newUrlInvokerMap 中即可。如果未命中,則需要新建 Invoker。Invoker 是通過 Protocol 的 refer 方法創(chuàng)建的,這個我在上一篇文章中已經(jīng)分析過了,這里就不贅述了。
toInvokers 方法返回的是 <url, Invoker> 映射關(guān)系表,接下來還要對這個結(jié)果進行進一步處理,得到方法名到 Invoker 列表的映射關(guān)系。這個過程由 toMethodInvokers 方法完成,如下:
private Map<String, List<Invoker<T>>> toMethodInvokers(Map<String, Invoker<T>> invokersMap) {
// 方法名 -> Invoker 列表
Map<String, List<Invoker<T>>> newMethodInvokerMap = new HashMap<String, List<Invoker<T>>>();
List<Invoker<T>> invokersList = new ArrayList<Invoker<T>>();
if (invokersMap != null && invokersMap.size() > 0) {
for (Invoker<T> invoker : invokersMap.values()) {
// 獲取 methods 參數(shù)
String parameter = invoker.getUrl().getParameter(Constants.METHODS_KEY);
if (parameter != null && parameter.length() > 0) {
// 切分 methods 參數(shù)值,得到方法名數(shù)組
String[] methods = Constants.COMMA_SPLIT_PATTERN.split(parameter);
if (methods != null && methods.length > 0) {
for (String method : methods) {
// 方法名不為 *
if (method != null && method.length() > 0
&& !Constants.ANY_VALUE.equals(method)) {
// 根據(jù)方法名獲取 Invoker 列表
List<Invoker<T>> methodInvokers = newMethodInvokerMap.get(method);
if (methodInvokers == null) {
methodInvokers = new ArrayList<Invoker<T>>();
newMethodInvokerMap.put(method, methodInvokers);
}
// 存儲 Invoker 到列表中
methodInvokers.add(invoker);
}
}
}
}
invokersList.add(invoker);
}
}
// 進行服務(wù)級別路由,參考:https://github.com/apache/incubator-dubbo/pull/749
List<Invoker<T>> newInvokersList = route(invokersList, null);
// 存儲 <*, newInvokersList> 映射關(guān)系
newMethodInvokerMap.put(Constants.ANY_VALUE, newInvokersList);
if (serviceMethods != null && serviceMethods.length > 0) {
for (String method : serviceMethods) {
List<Invoker<T>> methodInvokers = newMethodInvokerMap.get(method);
if (methodInvokers == null || methodInvokers.isEmpty()) {
methodInvokers = newInvokersList;
}
// 進行方法級別路由
newMethodInvokerMap.put(method, route(methodInvokers, method));
}
}
// 排序,轉(zhuǎn)成不可變列表
for (String method : new HashSet<String>(newMethodInvokerMap.keySet())) {
List<Invoker<T>> methodInvokers = newMethodInvokerMap.get(method);
Collections.sort(methodInvokers, InvokerComparator.getComparator());
newMethodInvokerMap.put(method, Collections.unmodifiableList(methodInvokers));
}
return Collections.unmodifiableMap(newMethodInvokerMap);
}
上面方法主要做了三件事情, 第一是對入?yún)⑦M行遍歷,然后獲取 methods 參數(shù),并切分成數(shù)組。隨后以方法名為鍵,Invoker 列表為值,將映射關(guān)系存儲到 newMethodInvokerMap 中。第二是分別基于類和方法對 Invoker 列表進行路由操作。第三是對 Invoker 列表進行排序,并轉(zhuǎn)成不可變列表。關(guān)于 toMethodInvokers 方法就先分析到這,我們繼續(xù)向下分析,這次要分析的多組服務(wù)的合并邏輯。
private Map<String, List<Invoker<T>>> toMergeMethodInvokerMap(Map<String, List<Invoker<T>>> methodMap) {
Map<String, List<Invoker<T>>> result = new HashMap<String, List<Invoker<T>>>();
// 遍歷入?yún)? for (Map.Entry<String, List<Invoker<T>>> entry : methodMap.entrySet()) {
String method = entry.getKey();
List<Invoker<T>> invokers = entry.getValue();
// group -> Invoker 列表
Map<String, List<Invoker<T>>> groupMap = new HashMap<String, List<Invoker<T>>>();
// 遍歷 Invoker 列表
for (Invoker<T> invoker : invokers) {
// 獲取分組配置
String group = invoker.getUrl().getParameter(Constants.GROUP_KEY, "");
List<Invoker<T>> groupInvokers = groupMap.get(group);
if (groupInvokers == null) {
groupInvokers = new ArrayList<Invoker<T>>();
// 緩存 <group, List<Invoker>> 到 groupMap 中
groupMap.put(group, groupInvokers);
}
// 存儲 invoker 到 groupInvokers
groupInvokers.add(invoker);
}
if (groupMap.size() == 1) {
// 如果 groupMap 中僅包含一組鍵值對,此時直接取出該鍵值對的值即可
result.put(method, groupMap.values().iterator().next());
// groupMap 中包含多組鍵值對,比如:
// {
// "dubbo": [invoker1, invoker2, invoker3, ...],
// "hello": [invoker4, invoker5, invoker6, ...]
// }
} else if (groupMap.size() > 1) {
List<Invoker<T>> groupInvokers = new ArrayList<Invoker<T>>();
for (List<Invoker<T>> groupList : groupMap.values()) {
// 通過集群類合并每個分組對應(yīng)的 Invoker 列表
groupInvokers.add(cluster.join(new StaticDirectory<T>(groupList)));
}
// 緩存結(jié)果
result.put(method, groupInvokers);
} else {
result.put(method, invokers);
}
}
return result;
}
上面方法首先是生成 group 到 Invoker 類比的映射關(guān)系表,若關(guān)系表中的映射關(guān)系數(shù)量大于1,表示有多組服務(wù)。此時通過集群類合并每組 Invoker,并將合并結(jié)果存儲到 groupInvokers 中。之后將方法名與 groupInvokers 存到到 result 中,并返回,整個邏輯結(jié)束。
接下來我們再來看一下 Invoker 列表刷新邏輯的最后一個動作 -- 刪除無用 Invoker。如下:
private void destroyUnusedInvokers(Map<String, Invoker<T>> oldUrlInvokerMap, Map<String, Invoker<T>> newUrlInvokerMap) {
if (newUrlInvokerMap == null || newUrlInvokerMap.size() == 0) {
destroyAllInvokers();
return;
}
List<String> deleted = null;
if (oldUrlInvokerMap != null) {
// 獲取新生成的 Invoker 列表
Collection<Invoker<T>> newInvokers = newUrlInvokerMap.values();
// 遍歷老的 <url, Invoker> 映射表
for (Map.Entry<String, Invoker<T>> entry : oldUrlInvokerMap.entrySet()) {
// 檢測 newInvokers 中是否包含老的 Invoker
if (!newInvokers.contains(entry.getValue())) {
if (deleted == null) {
deleted = new ArrayList<String>();
}
// 若不包含,則將老的 Invoker 對應(yīng)的 url 存入 deleted 列表中
deleted.add(entry.getKey());
}
}
}
if (deleted != null) {
// 遍歷 deleted 集合,并到老的 <url, Invoker> 映射關(guān)系表查出 Invoker,銷毀之
for (String url : deleted) {
if (url != null) {
// 從 oldUrlInvokerMap 中移除 url 對應(yīng)的 Invoker
Invoker<T> invoker = oldUrlInvokerMap.remove(url);
if (invoker != null) {
try {
// 銷毀 Invoker
invoker.destroy();
} catch (Exception e) {
logger.warn("destroy invoker...");
}
}
}
}
}
}
destroyUnusedInvokers 方法的主要邏輯是通過 newUrlInvokerMap 找出待刪除 Invoker 對應(yīng)的 url,并將 url 存入到 deleted 列表中。然后再遍歷 deleted 列表,并從 oldUrlInvokerMap 中移除相應(yīng)的 Invoker,銷毀之。整個邏輯大致如此,不是很難理解。
到此關(guān)于 Invoker 列表的刷新邏輯就分析了,這里對整個過程進行簡單總結(jié)。如下:
- 檢測入?yún)⑹欠駜H包含一個 url,且 url 協(xié)議頭為 empty
- 若第一步檢測結(jié)果為 true,表示禁用所有服務(wù),此時銷毀所有的 Invoker
- 若第一步檢測結(jié)果為 false,此時將入?yún)⑥D(zhuǎn)為 Invoker 列表
- 對將上一步邏輯刪除的結(jié)果進行進一步處理,得到方法名到 Invoker 的映射關(guān)系表
- 合并多組 Invoker
- 銷毀無用 Invoker
Invoker 的刷新邏輯還是比較復(fù)雜的,大家在看的過程中多寫點 demo 進行調(diào)試。好了,本節(jié)就到這。
4. 總結(jié)
本篇文章對 Dubbo 服務(wù)目錄進行了較為詳細(xì)的分析,篇幅主要集中在 RegistryDirectory 的源碼分析上。分析下來,不由得感嘆,想讓本地服務(wù)目錄和注冊中心保持一致還是需要做很多事情的,并不簡單。服務(wù)目錄是 Dubbo 集群容錯的一部分,也是比較基礎(chǔ)的部分,所以大家務(wù)必搞懂。
好了,本篇文章就先到這了。感謝大家閱讀。
本文在知識共享許可協(xié)議 4.0 下發(fā)布,轉(zhuǎn)載需在明顯位置處注明出處
作者:田小波
本文同步發(fā)布在我的個人博客:http://www.tianxiaobo.com

本作品采用知識共享署名-非商業(yè)性使用-禁止演繹 4.0 國際許可協(xié)議進行許可。

浙公網(wǎng)安備 33010602011771號