探尋Dubbo集群容錯機制
一. timeout 與 retries
Dubbo的服務可以通過timeout配置超時時間,防止遠程調用失敗,該屬性的默認值為1000(ms),用戶可以在多個地方配置服務的超時時間:

圖中涉及的配置方式從上至下優先級越來越低,總體來說配置覆蓋遵循以下規律:consumer配置優先于provider配置,細粒度配置優先于整體配置。
我們在消費者中sleep一段時間人為制造超時效果:
@DubboService(timeout = 3000)
@Slf4j
public class UserServiceImpl implements IUserService{
@Override
public UserDTO queryById(Integer userId) {
if (userId == null) {
throw new RuntimeException("參數不能為空");
}
log.info("remote call queryById");
User user = userMapper.selectById(userId);
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return modelToDto(user);
}
}
當我們消費者調用該服務時會出現超時異常:
UserDTO userDTO = userService.queryById(1);

我們查看服務提供者的控制臺,可以發現此次超時調用一個觸發了3次:

在Dubbo中有一個參數retries,它表示消費者在調用遠程服務時,出現超時等RpcException時會去最多重試調用N次服務,默認情況下retries=2這也是為什么在出現超時調用后,服務端會調用三次。
二. 集群容錯機制
失敗重試機制,在官方文檔中稱為集群容錯。
2.1 集群容錯機制分類
Dubbo官方提供了多種失敗重試機制,缺省值是failover。
- Failover Cluster:失敗自動切換,當出現失敗,重試其它服務器。通常用于讀操作,但重試會帶來更長延遲??赏ㄟ^
retries="2"來設置重試次數(不含第一次)。 - Failfast Cluster:快速失敗,只發起一次調用,失敗立即報錯。通常用于非冪等性的寫操作,比如新增記錄。但是這里也會有問題的,如果是因為網絡問題,Provider的響應時間慢,Consumer以為調用失敗了,但是Provider卻調用成功了,涉及到分布式事務的問題。
- Failsafe Cluster:失敗安全,出現異常時,直接忽略。通常用于寫入審計日志等操作。
- Failback Cluster:失敗自動恢復。調用失敗后,consumer不會重試,而是把這個消息丟到provider的重試的線程池里面,定時的重試調用一定的次數,調用失敗寫日志
- Forking Cluster:并行調用多個服務器,只要一個成功即返回。通常用于實時性要求較高的讀操作,但需要浪費更多服務資源??赏ㄟ^
forks="2"來設置最大并行數。 - Broadcast Cluster:廣播調用所有提供者,逐個調用,任意一臺報錯則報錯。通常用于通知所有提供者更新緩存或日志等本地資源信息?,F在廣播調用中,可以通過
broadcast.fail.percent配置節點調用失敗的比例,當達到這個比例后,BroadcastClusterInvoker將不再調用其他節點,直接拋出異常。broadcast.fail.percent取值在 0~100 范圍內。默認情況下當全部調用失敗后,才會拋出異常。broadcast.fail.percent只是控制的當失敗后是否繼續調用其他節點,并不改變結果(任意一臺報錯則報錯)。broadcast.fail.percent參數 在dubbo2.7.10及以上版本生效。
2.1 集群模式的配置
注解配置
@reference(cluster = "broadcast", parameters = {"broadcast.fail.percent", "20"})
XML配置
<dubbo:service cluster="failsafe" />
<dubbo:reference cluster="failsafe" />
三. Failover Cluster
Dubbo默認使用的是Failover Cluster機制,如果不清楚這一機制那么寫出的代碼會有很多隱患。
服務的超時是開發人員不好控制的,如果某些時候因為機器負載波動導致一個寫操作服務發生超時重試,那么就會出現重復寫的問題,導致臟數據的產生。
Failover Cluster官方建議在讀環境下使用,但是奈何很多同學都沒有注意到這一點。

Failover Cluster的實現在org.apache.dubbo.rpc.cluster.support.FailoverClusterInvoker:
public class FailoverClusterInvoker<T> extends AbstractClusterInvoker<T> {
private static final Logger logger = LoggerFactory.getLogger(FailoverClusterInvoker.class);
public FailoverClusterInvoker(Directory<T> directory) {
super(directory);
}
@Override
@SuppressWarnings({"unchecked", "rawtypes"})
public Result doInvoke(Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
List<Invoker<T>> copyInvokers = invokers;
checkInvokers(copyInvokers , invocation);
String methodName = RpcUtils.getMethodName(invocation);
//最大調用次數 = 重試次數(retris) + 1
int len = getUrl().getMethodParameter(methodName, RETRIES_KEY, DEFAULT_RETRIES) + 1;
//防御性編程,防止用戶將retris設置為小于0情況
if (len <= 0) {
len = 1;
}
// retry loop.
RpcException le = null; // last exception.
List<Invoker<T>> invoked = new ArrayList<Invoker<T>>(copyInvokers.size()); // invoked invokers.
Set<String> providers = new HashSet<String>(len);
// 調用N次,成功返回或provider拋出異常(不包括RpcException)后跳出勛魂
for (int i = 0; i < len; i++) {
//Reselect before retry to avoid a change of candidate `invokers`.
//NOTE: if `invokers` changed, then `invoked` also lose accuracy.
if (i > 0) {
checkWhetherDestroyed();
// 在進行重試前重新獲取最新的invoker集合,這樣做的好處是,如果在重試過程中某個服務掛了,通過list方法可以保證copyInvokes是最新的invoker列表
copyInvokers = list(invocation);
// check again
checkInvokers(copyInvokers, invocation);
}
// 通過負載均衡策略選擇一個Invoker
Invoker<T> invoker = select(loadbalance, invocation, copyInvokers, invoked);
// 維護已經調用過的invoker
invoked.add(invoker);
RpcContext.getContext().setInvokers((List) invoked);
try {
Result result = invoker.invoke(invocation);
// 重試調用成功后才會進入這個if打印日志
if (le != null && logger.isWarnEnabled()) {
logger.warn("Although retry the method " + methodName
+ " in the service " + getInterface().getName()
+ " was successful by the provider " + invoker.getUrl().getAddress()
+ ", but there have been failed providers " + providers
+ " (" + providers.size() + "/" + copyInvokers.size()
+ ") from the registry " + directory.getUrl().getAddress()
+ " on the consumer " + NetUtils.getLocalHost()
+ " using the dubbo version " + Version.getVersion() + ". Last error is: "
+ le.getMessage(), le);
}
return result;
} catch (RpcException e) {
// 如果是一個業務異常,則直接拋出異常。在Dubbo中繼承至RuntimeException的自定義異常都會包裝為RpcException,所以這里是為了判斷是否是業務報錯被封裝成了RpcException
// Dubbo這么設計是因為RuntimeException可以在接口未聲明該異常的情況下拋出,而缺少了接口的約束會導致Consumer應用中可能出現未定義該異常的問題,最終導致序列化失敗。
if (e.isBiz()) { // biz exception.
throw e;
}
// 若不是則繼續循環重試
le = e;
} catch (Throwable e) {
le = new RpcException(e.getMessage(), e);
} finally {
providers.add(invoker.getUrl().getAddress());
}
}
throw new RpcException(le.getCode(), "Failed to invoke the method "
+ methodName + " in the service " + getInterface().getName()
+ ". Tried " + len + " times of the providers " + providers
+ " (" + providers.size() + "/" + copyInvokers.size()
+ ") from the registry " + directory.getUrl().getAddress()
+ " on the consumer " + NetUtils.getLocalHost() + " using the dubbo version "
+ Version.getVersion() + ". Last error is: "
+ le.getMessage(), le.getCause() != null ? le.getCause() : le);
}
}

浙公網安備 33010602011771號