tcc-transaction源碼詳解
更多優秀博文,請關注博主的個人博客:聽到微笑的博客
本文主要介紹TCC的原理,以及從代碼的角度上分析如何實現的;不涉及具體使用示例。本文通過分析tcc-transaction源碼帶大家了解TCC分布式事務的實現原理。
需要注意的是,本文所有代碼都基于master-1.7.x分支,不同版本的源碼會存在一定的差異。完整代碼注釋請參考:bigcoder84/tcc-transaction
一. 概述
1.1 項目模塊
本文對 tcc-transaction 源碼分析。主要涉及如下四個模塊:
tcc-transaction-core:tcc-transaction 底層實現。tcc-transaction-api:tcc-transaction 使用 API。tcc-transaction-spring:tcc-transaction 對 Spring 的支持。tcc-transaction-dubbo:tcc-transaction 對 Dubbo 的支持。
本文基于tcc-transaction 1.7.x版本源碼進行分析。
1.2 tcc-transaction中的概念
在詳細分析框架源碼之前,我們先熟悉 tcc-transaction 中所涉及到的名詞概念:
-
事務樹:多級分布式事務類似于樹狀結構
-
根事務:事務樹的根節點
-
分支事務:所有非根節點事務節點。在上圖中,A就是整個分布式事務體系中的根事務節點,在A中會調用B、C兩個下游服務,而B、C遠程服務代碼就是分布式事務的兩個分支事務。
-
當前事務:當前正在執行代碼所在的事務節點。
-
事務參與者:每一個事務參與者存在一個
confirm和一個cancel方法。每一個事務節點都是一個事務參與者,每一個事務節點會維持自身以及孩子節點事務的事務參與者信息。例如我們在調用A方法時,會創建一個事務,我們稱為TransactionA,并將A節點本身作為事務參與者放入TransactionA中,當A調用B、C兩個服務時,會生成兩個事務參與者放入TransactionA中,這樣當A的try操作執行完畢后,TransactionA中就存在A、B、C三個事務參與者了,當TransactionA進行confirm/cancel操作時,會同時對三個事務參與者執行confirm/cancel操作。 -
事務存儲器:存儲事務上下文信息,可以有多種實現(Redis、DB、Memory)
二. tcc-transaction 原理
在 TCC 里,一個業務活動可以有多個事務,每個業務操作歸屬于不同的事務,即一個事務可以包含多個業務操作。TCC-Transaction 將每個業務操作抽象成事務參與者,每個事務可以包含多個參與者。
參與者需要聲明 try / confirm / cancel 三個類型的方法,和 TCC 的操作一一對應。在程序里,通過 @Compensable 注解標記在 try 方法上,并填寫對應的 confirm / cancel 方法,示例代碼如下:
@Compensable(confirmMethod = "confirmRecord", cancelMethod = "cancelRecord", transactionContextEditor = MethodTransactionContextEditor.class)
public String record(CapitalTradeOrderDto tradeOrderDto) {}
public void confirmRecord(CapitalTradeOrderDto tradeOrderDto) {}
public void cancelRecord(CapitalTradeOrderDto tradeOrderDto) {}
TCC-Transaction 有兩個攔截器,通過對 @Compensable AOP 切面( 參與者 try 方法 )進行攔截,,透明化對參與者 confirm / cancel 方法調用,從而實現 TCC 。
以及擁有一個業務活動管理器(JOB),業務活動管理器控制業務活動的一致性,它登記業務活動中的操作,并在業務活動提交時確認所有的TCC型操作的confirm操作,在業務活動取消時調用所以TCC操作的cancel操作。
簡化流程如下圖:

第一個攔截器,可補償事務攔截器,實現如下功能:
- 在 Try 階段,創建事務。
- 在 Confirm / Cancel 階段,對事務提交或回滾。
第二個攔截器,資源協調者攔截器,實現如下功能:
- 在 Try 階段,添加參與者到事務中。當事務上下文不存在時,進行創建。
- 對事務進行傳播,在遠程調用服務的參與者時,會通過
TransactionContextEditor傳遞事務給遠程參與者。TransactionContextEditor有很多不同的實現。
實際攔截器對事務的處理會比上圖復雜一些,在后文詳細解析。
在 tcc-transaction 代碼實現上,組件分層如下圖:

本文按照如下順序分享:
- 「4. 事務攔截器」
- 「5. 事務管理器」
- 「6. 事務管理器」
內容是自下而上的方式分享,每個組件可以更加整體的被認識。當然這可能對你理解時產生一臉悶逼,所以推薦兩種閱讀方式:
三. 事務與參與者
在 TCC 里,一個事務(org.mengyun.tcctransaction.Transaction) 可以有多個參與者(org.mengyun.tcctransaction.Participant)參與業務活動。
3.1 事務
package org.mengyun.tcctransaction;
/**
* Created by changmingxie on 10/26/15.
*/
public class Transaction implements Serializable {
private static final long serialVersionUID = 7291423944314337931L;
/**
* 事務創建時間
*/
private final Date createTime = new Date();
/**
* 參與者集合
*/
private final List<Participant> participants = new ArrayList<Participant>();
/**
* 附帶屬性映射
*/
private final Map<String, Object> attachments = new ConcurrentHashMap<String, Object>();
/**
* 事務ID
*/
private TransactionXid xid;
/**
* 事務狀態
*/
private TransactionStatus status;
/**
* 事務類型
*/
private TransactionType transactionType;
/**
* 重試次數
*/
private volatile int retriedCount = 0;
/**
* 最后更新時間
*/
private Date lastUpdateTime = new Date();
/**
* 版本號
*/
private long version = 0;
/**
* 根事務ID
*/
private TransactionXid rootXid;
/**
* 添加參與者
* @param participant
*/
public void enlistParticipant(Participant participant) {
participants.add(participant);
}
/**
* 提交TCC事務
*/
public void commit() {
for (Participant participant : participants) {
if (!participant.getStatus().equals(ParticipantStatus.CONFIRM_SUCCESS)) {
participant.commit();
participant.setStatus(ParticipantStatus.CONFIRM_SUCCESS);
}
}
}
/**
* 回滾TCC事務
*/
public void rollback() {
for (Participant participant : participants) {
if (!participant.getStatus().equals(ParticipantStatus.CANCEL_SUCCESS)) {
participant.rollback();
participant.setStatus(ParticipantStatus.CANCEL_SUCCESS);
}
}
}
}
- xid,事務編號( TransactionXid ),用于唯一標識一個事務。使用 UUID 算法生成,保證唯一性。
org.mengyun.tcctransaction.api.TransactionXid實現javax.transaction.xa.Xid接口,實現代碼如下:
public class TransactionXid implements Xid, Serializable {
private static final long serialVersionUID = -6817267250789142043L;
private static byte[] CUSTOMIZED_TRANSACTION_ID = "UniqueIdentity".getBytes();
/**
* xid格式標識
*/
private int formatId = 1;
/**
* 全局事務編號
*/
private byte[] globalTransactionId;
/**
* 分支事務編號
*/
private byte[] branchQualifier;
}
- status,事務狀態( TransactionStatus )。
org.mengyun.tcctransaction.api.TransactionStatus實現代碼如下:
public enum TransactionStatus {
/**
* 嘗試中
*/
TRYING(1),
/**
* 確認中
*/
CONFIRMING(2),
/**
* 取消中狀態
*/
CANCELLING(3),
/**
* 嘗試成功
*/
TRY_SUCCESS(11),
/**
* 嘗試失敗
*/
TRY_FAILED(12);
}
- transactionType,事務類型( TransactionType )。
org.mengyun.tcctransaction.common.TransactionType實現代碼如下:
public enum TransactionType {
/**
* 根事務
*/
ROOT(1),
/**
* 分支事務
*/
BRANCH(2);
}
-
retriedCount,重試次數。在 TCC 過程中,可能參與者異常崩潰,這個時候會進行重試直到成功或超過最大次數。
-
version,版本號,用于樂觀鎖更新事務。在《事務存儲器》詳細解析。
-
attachments,附帶屬性映射。 在《Dubbo 支持》詳細解析。
-
提供
#enlistParticipant()方法,添加事務參與者。 -
提供
#commit()方法,調用參與者們提交事務。 -
提供
#rollback()方法,調用參與者回滾事務。
3.2 參與者
public class Participant implements Serializable {
private static final long serialVersionUID = 4127729421281425247L;
Class<? extends TransactionContextEditor> transactionContextEditorClass;
private TransactionXid rootXid;
/**
* 事務編號
*/
private TransactionXid xid;
/**
* 確認執行業務方法調用上下文
*/
private InvocationContext confirmInvocationContext;
/**
* 取消執行業務方法
*/
private InvocationContext cancelInvocationContext;
private int status = ParticipantStatus.TRYING.getId();
- xid,參與者事務編號。通過
TransactionXid.globalTransactionId屬性,關聯上其所屬的事務。當參與者進行遠程調用時,遠程的分支事務的事務編號等于該參與者的事務編號。通過事務編號的關聯,TCC Confirm / Cancel 階段,使用參與者的事務編號和遠程的分支事務進行關聯,從而實現事務的提交和回滾,在「4.2 傳播發起分支事務」 + 「5.2 可補償事務攔截器」可以看到具體實現。 - confirmInvocationContext,確認執行業務方法調用上下文( InvocationContext )。
org.mengyun.tcctransaction.InvocationContext實現代碼如下:
public class InvocationContext implements Serializable {
private static final long serialVersionUID = -7969140711432461165L;
/**
* 類
*/
private Class targetClass;
/**
* 方法名
*/
private String methodName;
/**
* 參數列表
*/
private Class[] parameterTypes;
/**
* 參數數組
*/
private Object[] args;
}
-
InvocationContext,執行方法調用上下文,記錄類、方法名、參數類型數組、參數數組。通過這些屬性,可以執行提交 / 回滾事務。在
org.mengyun.tcctransaction.Terminator會看到具體的代碼實現。本質上,TCC 通過多個參與者的 try / confirm / cancel 方法,實現事務的最終一致性。 -
cancelInvocationContext,取消執行業務方法調用上下文(InvocationContext)。
-
transactionContextEditorClass,事務上下文編輯,在「5.1 Compensable」詳細解析。
-
提交
#commit()方法,提交參與者自己的事務。 -
提交
#rollback()方法,回滾參與者自己的事務。
四. 事務管理器
org.mengyun.tcctransaction.TransactionManager,事務管理器,提供事務的獲取、發起、提交、回滾,參與者的新增等等方法。
4.1 發起根事務
提供 begin() 方法,發起根事務。該方法在調用方法類型為 MethodType.ROOT 并且 事務處于 Try 階段被調用。MethodType 在「5.2 可補償事務攔截器」詳細解析。
實現代碼如下:
/**
* 發起根事務
* @return
*/
public Transaction begin() {
Transaction transaction = new Transaction(TransactionType.ROOT);
// 注冊事務
registerTransaction(transaction);
return transaction;
}
-
調用 Transaction 構造方法,創建根事務。實現代碼如下:
/** * 創建指定類型的根事務 * @param transactionType */ public Transaction(TransactionType transactionType) { this(null, transactionType); } public Transaction(Object uniqueIdentity, TransactionType transactionType) { this.xid = new TransactionXid(uniqueIdentity); this.status = TransactionStatus.TRYING; this.transactionType = transactionType; if (transactionType.equals(TransactionType.ROOT)) { this.rootXid = xid; } } -
調用
#registerTransaction(...)方法,注冊事務到當前線程事務隊列。實現代碼如下:private static final ThreadLocal<Deque<Transaction>> CURRENT = new ThreadLocal<Deque<Transaction>>(); /** * 注冊事務到當前線程事務隊列 * @param transaction */ private void registerTransaction(Transaction transaction) { if (CURRENT.get() == null) { CURRENT.set(new LinkedList<Transaction>()); } CURRENT.get().push(transaction); }- 可能有同學會比較好奇,為什么使用隊列存儲當前線程事務?TCC-Transaction 支持多個的事務獨立存在,后創建的事務先提交,類似 Spring 的
org.springframework.transaction.annotation.Propagation.REQUIRES_NEW。在下文,很快我們就會看到 TCC-Transaction 自己的org.mengyun.tcctransaction.api.Propagation。
- 可能有同學會比較好奇,為什么使用隊列存儲當前線程事務?TCC-Transaction 支持多個的事務獨立存在,后創建的事務先提交,類似 Spring 的
4.2 創建分支事務
調用 #propagationNewBegin(...) 方法,創建分支事務。該方法在調用方法類型為 MethodType.PROVIDER 并且 事務處于 Try 階段被調用。MethodType 在「5.2 可補償事務攔截器」詳細解析。
實現代碼如下:
/**
* 發起分支事務。該方法在調用方法類型為 ParticipantRole.PROVIDER 并且 事務處于 Try 階段被調用
* @param transactionContext
* @return
*/
public Transaction propagationNewBegin(TransactionContext transactionContext) {
// 創建 分支事務
Transaction transaction = new Transaction(transactionContext);
//注冊 事務
registerTransaction(transaction);
return transaction;
}
-
調用 Transaction 構造方法,創建分支事務。實現代碼如下:
/** * 創建分支事務 * @param transactionContext */ public Transaction(TransactionContext transactionContext) { this.xid = transactionContext.getXid(); this.rootXid = transactionContext.getRootXid(); this.status = TransactionStatus.TRYING; this.transactionType = TransactionType.BRANCH; } -
調用
TransactionRepository#crete()方法,存儲事務。為什么要存儲分支事務,在「5.3 資源協調者攔截器」詳細解析。 -
調用
#registerTransaction(...)方法,注冊事務到當前線程事務隊列。
4.3 獲取分支事務
調用 #propagationExistBegin(...) 方法,根據Xid獲取分支事務。該方法在調用方法類型為 MethodType.PROVIDER 并且 事務處于 Confirm / Cancel 階段被調用。MethodType 在「5.2 可補償事務攔截器」詳細解析。
/**
* 獲取分支事務。該方法在調用方法類型為 ParticipantRole.PROVIDER 并且 事務處于 Confirm / Cancel 階段被調用
* @param transactionContext
* @return
* @throws NoExistedTransactionException
*/
public Transaction propagationExistBegin(TransactionContext transactionContext) throws NoExistedTransactionException {
// 查詢事務
Transaction transaction = transactionRepository.findByXid(transactionContext.getXid());
if (transaction != null) {
registerTransaction(transaction);
return transaction;
} else {
throw new NoExistedTransactionException();
}
}
- 調用
TransactionRepository#findByXid()方法,查詢事務。 - 調用
#registerTransaction(...)方法,注冊事務到當前線程事務隊列。 - 為什么此處是分支事務呢?結合
#propagationNewBegin(...)思考下。
4.4 提交事務
調用 #commit(...) 方法,提交事務。該方法在事務處于 Confirm / Cancel 階段被調用。
public void commit(boolean asyncCommit) {
// 獲取事務
final Transaction transaction = getCurrentTransaction();
// 設置事務狀態為confirm
transaction.changeStatus(TransactionStatus.CONFIRMING);
// 更新事務
transactionRepository.update(transaction);
if (asyncCommit) {
try {
Long statTime = System.currentTimeMillis();
asyncTerminatorExecutorService.submit(new Runnable() {
@Override
public void run() {
commitTransaction(transaction);
}
});
logger.debug("async submit cost time:" + (System.currentTimeMillis() - statTime));
} catch (Throwable commitException) {
logger.warn("compensable transaction async submit confirm failed, recovery job will try to confirm later.", commitException.getCause());
//throw new ConfirmingException(commitException);
}
} else {
// 提交事務
commitTransaction(transaction);
}
}
-
asyncCommit:表示是否異步confirm,由業務開發者在
@Compensable注解中指定。 -
調用
#getCurrentTransaction()方法, 獲取事務。實現代碼如下:/** * 獲取當前線程 事務隊列的隊頭事務 * tips: registerTransaction是將事務注冊到隊列頭部 * @return */ public Transaction getCurrentTransaction() { if (isTransactionActive()) { return CURRENT.get().peek(); } return null; }- 為什么獲得隊列頭部元素呢?該元素即是上文調用
#registerTransaction(...)注冊到隊列頭部。
- 為什么獲得隊列頭部元素呢?該元素即是上文調用
-
調用
Transaction#changeStatus(...)方法, 設置事務狀態為 CONFIRMING。 -
調用
TransactionRepository#update(...)方法, 更新事務。 -
調用
Transaction#commit(...)方法, 提交事務。 -
調用
TransactionRepository#delete(...)方法,刪除事務。
4.5 回滾事務
調用 #rollback(...) 方法,取消事務,和 #commit() 方法基本類似。該方法在事務處于 Confirm / Cancel 階段被調用。
public void rollback(boolean asyncRollback) {
// 獲取事務
final Transaction transaction = getCurrentTransaction();
// 設置事務狀態為 CANCELLING
transaction.changeStatus(TransactionStatus.CANCELLING);
// 更新事務記錄
transactionRepository.update(transaction);
if (asyncRollback) {
// 是否是異步操作
try {
asyncTerminatorExecutorService.submit(new Runnable() {
@Override
public void run() {
rollbackTransaction(transaction);
}
});
} catch (Throwable rollbackException) {
logger.warn("compensable transaction async rollback failed, recovery job will try to rollback later.", rollbackException);
throw new CancellingException(rollbackException);
}
} else {
// 回滾事務
rollbackTransaction(transaction);
}
}
4.6 添加事務到事務管理器
調用 #enlistParticipant(...) 方法,添加參與者到事務。該方法在事務處于 Try 階段被調用,在「5.3 資源協調者攔截器」有詳細解析。
/**
* 添加參與者到事務
* @param participant
*/
public void enlistParticipant(Participant participant) {
Transaction transaction = this.getCurrentTransaction();
//添加參與者
transaction.enlistParticipant(participant);
if (transaction.getVersion() == 0l) {
// transaction.getVersion() 為零,這意味著之前從未持久化,需要調用 create 才能持久化。
transactionRepository.create(transaction);
} else {
// 更新transaction
transactionRepository.update(transaction);
}
}
五. 事務攔截器
tcc-transaction 基于 org.mengyun.tcctransaction.api.@Compensable + org.aspectj.lang.annotation.@Aspect 注解 AOP 切面實現業務方法的 TCC 事務聲明攔截,同 Spring 的 org.springframework.transaction.annotation.@Transactional 的實現。
TCC-Transaction 有兩個攔截器:
org.mengyun.tcctransaction.interceptor.CompensableTransactionInterceptor,可補償事務攔截器。org.mengyun.tcctransaction.interceptor.ResourceCoordinatorInterceptor,資源協調者攔截器。
在分享攔截器的實現之前,我們先一起看看 @Compensable 注解。
5.1 Compensable
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.METHOD})
public @interface Compensable {
/**
* 事務傳播級別
* @return
*/
public Propagation propagation() default Propagation.REQUIRED;
/**
* confirm執行的方法名稱
* @return
*/
public String confirmMethod() default "";
/**
* cancel執行的方法名稱
* @return
*/
public String cancelMethod() default "";
/**
* 是否異步執行confirm
* @return
*/
public boolean asyncConfirm() default false;
/**
* 是否異步執行cancel
* @return
*/
public boolean asyncCancel() default false;
/**
* 事務上下文編輯器 默認實現是:DefaultTransactionContextEditor
* @return
*/
public Class<? extends TransactionContextEditor> transactionContextEditor() default NullableTransactionContextEditor.class;
}
-
propagation,傳播級別( Propagation ),默認 Propagation.REQUIRED。和 Spring 的 Propagation 除了缺少幾個屬性,基本一致。實現代碼如下:
public enum Propagation { /** * 支持當前事務,如果當前沒有事務,就新建一個事務。 */ REQUIRED(0), /** * 支持當前事務,如果當前沒有事務,就以非事務方式執行。 */ SUPPORTS(1), /** * 支持當前事務,如果當前沒有事務,就拋出異常。 */ MANDATORY(2), /** * 新建事務,如果當前存在事務,把當前事務掛起。 */ REQUIRES_NEW(3); private final int value; private Propagation(int value) { this.value = value; } public int value() { return this.value; } } -
confirmMethod,確認執行業務方法名。
-
cancelMethod,取消執行業務方法名。
-
TransactionContextEditor,事務上下文編輯器(TransactionContextEditor),用于設置和獲得事務上下文( TransactionContext),在「5.3 資源協調者攔截器」可以看到被調用,此處只看它的代碼實現。
org.mengyun.tcctransaction.api.TransactionContextEditor接口代碼如下:public interface TransactionContextEditor { /** * 從參數中獲得事務上下文 * @param target 對象 * @param method 方法 * @param args 參數 * @return */ public TransactionContext get(Object target, Method method, Object[] args); /** * 設置事務上下文到參數中 * @param transactionContext * @param target * @param method * @param args */ public void set(TransactionContext transactionContext, Object target, Method method, Object[] args); } -
NullableTransactionContextEditor:默認事務上下文編輯器,即無事務上下文編輯器實現。(1.2.x版本的tcc-transaction默認的事務上下文編輯器是
DefaultTransactionContextEditor,也就是1.7.x版本的ParameterTransactionContextEditor),當無需進行事務傳播時,可以使用該事務上下文編輯器。public class NullableTransactionContextEditor implements TransactionContextEditor { @Override public TransactionContext get(Object target, Method method, Object[] args) { return null; } @Override public void set(TransactionContext transactionContext, Object target, Method method, Object[] args) { } } -
ParameterTransactionContextEditor:
/** * 使用參數傳遞事務上下文 */ public class ParameterTransactionContextEditor implements TransactionContextEditor { /** * 獲得事務上下文在方法參數里的位置 * @param parameterTypes * @return */ public static int getTransactionContextParamPosition(Class<?>[] parameterTypes) { int position = -1; for (int i = 0; i < parameterTypes.length; i++) { if (parameterTypes[i].equals(org.mengyun.tcctransaction.api.TransactionContext.class)) { position = i; break; } } return position; } public static boolean hasTransactionContextParameter(Class<?>[] parameterTypes) { return getTransactionContextParamPosition(parameterTypes) >= 0; } public static TransactionContext getTransactionContextFromArgs(Object[] args) { TransactionContext transactionContext = null; for (Object arg : args) { if (arg != null && org.mengyun.tcctransaction.api.TransactionContext.class.isAssignableFrom(arg.getClass())) { transactionContext = (org.mengyun.tcctransaction.api.TransactionContext) arg; } } return transactionContext; } @Override public TransactionContext get(Object target, Method method, Object[] args) { int position = getTransactionContextParamPosition(method.getParameterTypes()); if (position >= 0) { return (TransactionContext) args[position]; } else { throw new RuntimeException("No TransactionContext parameter exist while get TransactionContext with ParameterTransactionContextEditor!"); } } @Override public void set(TransactionContext transactionContext, Object target, Method method, Object[] args) { int position = getTransactionContextParamPosition(method.getParameterTypes()); if (position >= 0) { args[position] = transactionContext; } else { throw new RuntimeException("No TransactionContext parameter exist while set TransactionContext with ParameterTransactionContextEditor!"); } } } -
DubboTransactionContextEditor,Dubbo 事務上下文編輯器實現,通過 Dubbo 隱式傳參方式獲得事務上下文,在《Dubbo 支持》詳細解析。
5.2 事務攔截器
先一起來看下事務攔截器對應的切面 org.mengyun.tcctransaction.interceptor.CompensableTransactionAspect,實現代碼如下:
@Aspect
public abstract class CompensableTransactionAspect {
private CompensableTransactionInterceptor compensableTransactionInterceptor;
public void setCompensableTransactionInterceptor(CompensableTransactionInterceptor compensableTransactionInterceptor) {
this.compensableTransactionInterceptor = compensableTransactionInterceptor;
}
@Pointcut("@annotation(org.mengyun.tcctransaction.api.Compensable)")
public void compensableTransactionPointcut() {
}
@Around("compensableTransactionPointcut()")
public Object interceptCompensableMethod(ProceedingJoinPoint pjp) throws Throwable {
// 獲取目標方法
Method method = ((MethodSignature) pjp.getSignature()).getMethod();
// 獲取目標方法的Compensable注解
Compensable compensable = method.getAnnotation(Compensable.class);
Class<? extends TransactionContextEditor> transactionContextEditor = NullableTransactionContextEditor.class;
if (compensable != null) {
transactionContextEditor = compensable.transactionContextEditor();
}
if (transactionContextEditor.equals(NullableTransactionContextEditor.class)
&& ParameterTransactionContextEditor.hasTransactionContextParameter(method.getParameterTypes())) {
transactionContextEditor = ParameterTransactionContextEditor.class;
}
return compensableTransactionInterceptor.interceptCompensableMethod(new AspectJTransactionMethodJoinPoint(pjp, compensable, transactionContextEditor));
}
public abstract int getOrder();
}
在tcc-transaction-spring模塊中ConfigurableTransactionAspect繼承了CompensableTransactionAspect,在原先的基礎上提供了可配置化的功能,需要注意的是該類 實現了org.springframework.core.Ordered#getOrder接口,這樣就可以保證在整個AOP切面鏈中,“事務攔截器”位于“資源協調者攔截器”外層。
@Aspect
public class ConfigurableTransactionAspect extends CompensableTransactionAspect implements Ordered {
@Autowired
private TransactionConfigurator transactionConfigurator;
@PostConstruct
public void init() {
TransactionManager transactionManager = transactionConfigurator.getTransactionManager();
CompensableTransactionInterceptor compensableTransactionInterceptor = new CompensableTransactionInterceptor();
compensableTransactionInterceptor.setTransactionManager(transactionManager);
this.setCompensableTransactionInterceptor(compensableTransactionInterceptor);
}
/**
* 指定切面順序,AOP可以看做一個同心圓,圓心就是我們被代理的方法,當切面類getOrder方法返回值越小,則切面越在外層
* @return
*/
@Override
public int getOrder() {
return Ordered.HIGHEST_PRECEDENCE;
}
public void setTransactionConfigurator(TransactionConfigurator transactionConfigurator) {
this.transactionConfigurator = transactionConfigurator;
}
}
-
通過
org.aspectj.lang.annotation.@Pointcut+org.aspectj.lang.annotation.@Around注解,配置對 @Compensable 注解的方法進行攔截,調用CompensableTransactionInterceptor#interceptCompensableMethod(...)方法進行處理。public Object interceptCompensableMethod(TransactionMethodJoinPoint pjp) throws Throwable { // 獲得當前線程所在事務 Transaction transaction = transactionManager.getCurrentTransaction(); // 創建可補償方法上下文 CompensableMethodContext compensableMethodContext = new CompensableMethodContext(pjp, transaction); // if method is @Compensable and no transaction context and no transaction, then root // else if method is @Compensable and has transaction context and no transaction ,then provider switch (compensableMethodContext.getParticipantRole()) { case ROOT: // 根事務 return rootMethodProceed(compensableMethodContext); case PROVIDER: // 分支事務 return providerMethodProceed(compensableMethodContext); default: return compensableMethodContext.proceed(); } }-
調用CompensableMethodContext構造器,創建可補償方法上下文,代碼如下:
public class CompensableMethodContext { /** * 方法切入點,用于執行目標業務方法 */ TransactionMethodJoinPoint pjp = null; private Transaction transaction = null; TransactionContext transactionContext = null; Compensable compensable = null; public CompensableMethodContext(TransactionMethodJoinPoint pjp, Transaction transaction) { this.pjp = pjp; this.transaction = transaction; this.compensable = pjp.getCompensable(); // 獲取開發者指定的TransactionContextEditor實例,然后調用其get方法,獲取事務上下文 this.transactionContext = FactoryBuilder.factoryOf(pjp.getTransactionContextEditorClass()).getInstance().get(pjp.getTarget(), pjp.getMethod(), pjp.getArgs()); } } -
調用
CompensableMethodContext#getParticipantRole方法獲取事務參與者的角色:/** * * 如果方法被@Compensable 注釋,則表示需要tcc 事務,如果沒有活動事務,則需要require new。 * 如果方法不是@Compensable 注釋,而是帶有 TransactionContext 參數。 * 如果有活動交易,這意味著需要參與者 tcc 交易。如果 transactionContext 為 null,則它將事務登記為 CONSUMER 角色, * else 表示有另一個方法作為 Consumer 已經登記了事務,這個方法不需要登記。 * @return */ public ParticipantRole getParticipantRole() { //方法是@Compensable 注釋的。當前沒有活動事務 且 沒有活動事務上下文,那么該方法需要將事務登記為 根事務 if (compensable != null && transaction == null && transactionContext == null) { return ParticipantRole.ROOT; } // 方法是@Compensable 注釋的。當前沒有活動事務,但有活動事務上下文。這意味著有一個活躍的交易,需要更新交易并將交易登記為 PROVIDER 角色。 if (compensable != null && transaction == null && transactionContext != null) { return ParticipantRole.PROVIDER; } //方法是@Compensable 注釋的,并且有活動事務,但沒有事務上下文。那么該方法需要將事務登記為消費者角色,它之前的角色可能是ROOT if (compensable != null && transaction != null && transactionContext == null) { return ParticipantRole.CONSUMER; } // 方法是@Compensable 注解,有活動事務,也有事務上下文。那么該方法需要將事務登記為 CONSUMER 角色,它之前的角色可能是 PROVIDER。 if (compensable != null && transaction != null && transactionContext != null) { return ParticipantRole.CONSUMER; } // 方法沒有@Compensable 注釋,而是帶有TransactionContext 參數。如果當前有一個活動事務并且事務上下文為空,然后需要使用 CONSUMER 角色登記事務。 // 當Dubbo調用遠程RPC服務時,transactionContext if (compensable == null && transaction != null && transactionContext == null) { return ParticipantRole.CONSUMER; } return ParticipantRole.NORMAL; }
-
-
方法類型為 ParticipantRole.ROOT 時,創建根事務
/** * 發起根事務 * @param compensableMethodContext * @return * @throws Throwable */ private Object rootMethodProceed(CompensableMethodContext compensableMethodContext) throws Throwable { Object returnValue = null; Transaction transaction = null; boolean asyncConfirm = compensableMethodContext.getAnnotation().asyncConfirm(); boolean asyncCancel = compensableMethodContext.getAnnotation().asyncCancel(); try { // 創建根事務 transaction = transactionManager.begin(compensableMethodContext.getUniqueIdentity()); try { // 執行方法原邏輯 returnValue = compensableMethodContext.proceed(); } catch (Throwable tryingException) { // 回滾事務 transactionManager.rollback(asyncCancel); throw tryingException; } // 提交事務 transactionManager.commit(asyncConfirm); } finally { // 將事務從當前線程事務隊列移除 transactionManager.cleanAfterCompletion(transaction); } return returnValue; }-
調用
TransactionManager#begin()方法,創建根事務,TCC Try 階段開始。 -
調用
ProceedingJoinPoint#proceed()方法,執行方法原邏輯(即Try邏輯)。 -
當原邏輯執行異常時,TCC Try 階段失敗,調用
TransactionManager#rollback(...)方法,TCC Cancel 階段,回滾事務。 -
當原邏輯執行成功時,TCC Try 階段成功,調用
TransactionManager#commit(...)方法,TCC Confirm 階段,提交事務。 -
調用
TransactionManager#cleanAfterCompletion(...)方法,將事務從當前線程事務隊列移除,避免線程沖突。實現代碼如下:// TransactionManager.java public void cleanAfterCompletion(Transaction transaction) { if (isTransactionActive() && transaction != null) { Transaction currentTransaction = getCurrentTransaction(); if (currentTransaction == transaction) { CURRENT.get().pop(); if (CURRENT.get().size() == 0) { CURRENT.remove(); } } else { throw new SystemException("Illegal transaction when clean after completion"); } } }
-
-
方法類型為 ParticipantRole.PROVIDER 時,發起分支事務
private Object providerMethodProceed(CompensableMethodContext compensableMethodContext) throws Throwable { Transaction transaction = null; boolean asyncConfirm = compensableMethodContext.getAnnotation().asyncConfirm(); boolean asyncCancel = compensableMethodContext.getAnnotation().asyncCancel(); try { switch (TransactionStatus.valueOf(compensableMethodContext.getTransactionContext().getStatus())) { case TRYING: // 發起分支事務 transaction = transactionManager.propagationNewBegin(compensableMethodContext.getTransactionContext()); Object result = null; try { // 執行業務代碼 result = compensableMethodContext.proceed(); // 將狀態變為 TRY_SUCCESS transactionManager.changeStatus(TransactionStatus.TRY_SUCCESS, true); } catch (Throwable e) { transactionManager.changeStatus(TransactionStatus.TRY_FAILED); throw e; } return result; case CONFIRMING: try { // 獲取分支事務 transaction = transactionManager.propagationExistBegin(compensableMethodContext.getTransactionContext()); // 提交事務 transactionManager.commit(asyncConfirm); } catch (NoExistedTransactionException excepton) { //the transaction has been commit,ignore it. logger.info("no existed transaction found at CONFIRMING stage, will ignore and confirm automatically. transaction:" + JSON.toJSONString(transaction)); } break; case CANCELLING: try { int transactionStatusFromConsumer = compensableMethodContext.getTransactionContext().getParticipantStatus(); transaction = transactionManager.propagationExistBegin(compensableMethodContext.getTransactionContext()); // Only if transaction's status is at TRY_SUCCESS、TRY_FAILED、CANCELLING stage we can call rollback. // If transactionStatusFromConsumer is TRY_SUCCESS, no mate current transaction is TRYING or not, also can rollback. // transaction's status is TRYING while transactionStatusFromConsumer is TRY_SUCCESS may happen when transaction's changeStatus is async. //只有事務狀態在TRY_SUCCESS、TRY_FAILED、CANCELING階段才可以調用rollback。 if (transaction.getStatus().equals(TransactionStatus.TRY_SUCCESS) || transaction.getStatus().equals(TransactionStatus.TRY_FAILED) || transaction.getStatus().equals(TransactionStatus.CANCELLING) || transactionStatusFromConsumer == ParticipantStatus.TRY_SUCCESS.getId()) { // 回滾事務 transactionManager.rollback(asyncCancel); } else { //in this case, transaction's Status is TRYING and transactionStatusFromConsumer is TRY_FAILED // this may happen if timeout exception throws during rpc call. throw new IllegalTransactionStatusException("Branch transaction status is TRYING, cannot rollback directly, waiting for recovery job to rollback."); } } catch (NoExistedTransactionException exception) { //the transaction has been rollback,ignore it. logger.info("no existed transaction found at CANCELLING stage, will ignore and cancel automatically. transaction:" + JSON.toJSONString(transaction)); } break; } } finally { // 將事務從當前線程事務隊列移除 transactionManager.cleanAfterCompletion(transaction); } // 如果是CONFIRMING 或 CANCEL 則返回空值 Method method = compensableMethodContext.getMethod(); return ReflectionUtils.getNullValue(method.getReturnType()); }-
Trying階段為什么要創建分支事務?在根事務進行 Confirm / Cancel 時,調用根事務上的參與者們提交或回滾事務時,進行RPC調用遠程參與者,遠程事務參與者根據傳播的事務ID,查詢到當前事務上下文,然后進行事務的提交或回滾。
-
當事務處于 TransactionStatus.TRYING 時,調用
TransactionManager#propagationExistBegin(...)方法,創建分支事務。分支事務創建完成后,調用ProceedingJoinPoint#proceed()方法,執行方法原邏輯( 即 Try 邏輯 )。 -
當事務處于 TransactionStatus.CONFIRMING 時,調用
TransactionManager#commit()方法,提交事務。 -
當事務處于 TransactionStatus.CANCELLING 時,調用
TransactionManager#rollback()方法,提交事務。 -
調用
TransactionManager#cleanAfterCompletion(...)方法,將事務從當前線程事務隊列移除,避免線程沖突。 -
當事務處于 TransactionStatus.CONFIRMING / TransactionStatus.CANCELLING 時,調用
ReflectionUtils#getNullValue(...)方法,返回空值。為什么返回空值?Confirm / Cancel 相關方法,是通過 AOP 切面調用,只調用,不處理返回值,但是又不能沒有返回值,因此直接返回空。實現代碼如下:public static Object getNullValue(Class type) { if (boolean.class.equals(type)) { return false; } else if (byte.class.equals(type)) { return 0; } else if (short.class.equals(type)) { return 0; } else if (int.class.equals(type)) { return 0; } else if (long.class.equals(type)) { return 0; } else if (float.class.equals(type)) { return 0; } else if (double.class.equals(type)) { return 0; } else if (char.class.equals(type)) { return ' '; } return null; }
-
-
方法類型為 ParticipantRole.CONSUMER時,不發起新的事務,直接執行業務方法。
-
方法類型為 ParticipantRole.Normal 時,不進行事務處理。
5.3 資源協調者攔截器
先一起來看下資源協調者攔截器 對應的切面 org.mengyun.tcctransaction.interceptor.ResourceCoordinatorAspect,實現代碼如下:
/**
* 資源協調者攔截器
* Created by changmingxie on 11/8/15.
*/
@Aspect
public abstract class ResourceCoordinatorAspect {
private ResourceCoordinatorInterceptor resourceCoordinatorInterceptor;
@Pointcut("@annotation(org.mengyun.tcctransaction.api.Compensable) || execution(* *(org.mengyun.tcctransaction.api.TransactionContext,..))")
public void transactionResourcePointcut() {
}
@Around("transactionResourcePointcut()")
public Object interceptTransactionResourceMethodWithCompensableAnnotation(ProceedingJoinPoint pjp) throws Throwable {
Method method = ((MethodSignature) pjp.getSignature()).getMethod();
Compensable compensable = method.getAnnotation(Compensable.class);
Class<? extends TransactionContextEditor> transactionContextEditor = NullableTransactionContextEditor.class;
if (compensable != null) {
// 獲取事務上下文編輯器
transactionContextEditor = compensable.transactionContextEditor();
}
if (transactionContextEditor.equals(NullableTransactionContextEditor.class)
&& ParameterTransactionContextEditor.hasTransactionContextParameter(method.getParameterTypes())) {
transactionContextEditor = ParameterTransactionContextEditor.class;
}
return interceptTransactionContextMethod(new AspectJTransactionMethodJoinPoint(pjp, compensable, transactionContextEditor));
}
public Object interceptTransactionContextMethod(TransactionMethodJoinPoint pjp) throws Throwable {
return resourceCoordinatorInterceptor.interceptTransactionContextMethod(pjp);
}
public void setResourceCoordinatorInterceptor(ResourceCoordinatorInterceptor resourceCoordinatorInterceptor) {
this.resourceCoordinatorInterceptor = resourceCoordinatorInterceptor;
}
public abstract int getOrder();
}
-
通過
org.aspectj.lang.annotation.@Pointcut+org.aspectj.lang.annotation.@Around注解,配置對 @Compensable 注解的方法進行攔截,調用ResourceCoordinatorInterceptor#interceptTransactionContextMethod(...)方法進行處理。 -
ResourceCoordinatorInterceptor 實現代碼如下:
public Object interceptTransactionContextMethod(TransactionMethodJoinPoint pjp) throws Throwable { Transaction transaction = transactionManager.getCurrentTransaction(); if (transaction != null && transaction.getStatus().equals(TransactionStatus.TRYING)) { // 創建事務參與者,并將事務參與者添加至當前Transaction中 Participant participant = enlistParticipant(pjp); if (participant != null) { Object result = null; try { // 執行原方法邏輯 result = pjp.proceed(pjp.getArgs()); participant.setStatus(ParticipantStatus.TRY_SUCCESS); } catch (Throwable e) { participant.setStatus(ParticipantStatus.TRY_FAILED); throw e; } return result; } } return pjp.proceed(pjp.getArgs()); }-
當事務處于 TransactionStatus.TRYING 時,調用
#enlistParticipant(...)方法,添加事務參與者。private Participant enlistParticipant(TransactionMethodJoinPoint pjp) { Transaction transaction = transactionManager.getCurrentTransaction(); CompensableMethodContext compensableMethodContext = new CompensableMethodContext(pjp, transaction); if (compensableMethodContext.getParticipantRole().equals(ParticipantRole.NORMAL)) { return null; } // 獲得 確認執行業務方法 和 取消執行業務方法 String confirmMethodName = compensableMethodContext.getConfirmMethodName(); String cancelMethodName = compensableMethodContext.getCancelMethodName(); // 獲取事務上下文編輯器 Class<? extends TransactionContextEditor> transactionContextEditorClass = compensableMethodContext.getTransactionContextEditorClass(); // 創建事務ID,這個事務ID是下游分支事務的事務ID,下游服務在創建分支事務時將沿用該ID。這樣在數據結構上,所有事務節點都能關聯起來。 TransactionXid xid = new TransactionXid(transaction.getXid().getGlobalTransactionId()); if (compensableMethodContext.getTransactionContext() == null) { //實例化 事務上下文編輯器,并設置事務上下文,這里面最重要的就是需要將rootXid、和分支事務xid傳播到下游 FactoryBuilder.factoryOf(transactionContextEditorClass).getInstance().set(new TransactionContext(transaction.getRootXid(), xid, TransactionStatus.TRYING.getId(), ParticipantStatus.TRYING.getId()), pjp.getTarget(), pjp.getMethod(), pjp.getArgs()); } //獲得聲明 @Compensable 方法的實際類 Class targetClass = ReflectionUtils.getDeclaringType(pjp.getTarget().getClass(), compensableMethodContext.getMethod().getName(), compensableMethodContext.getMethod().getParameterTypes()); // 創建 確認執行方法調用上下文 和 取消執行方法調用上下文 InvocationContext confirmInvocation = new InvocationContext(targetClass, confirmMethodName, compensableMethodContext.getMethod().getParameterTypes(), pjp.getArgs()); InvocationContext cancelInvocation = new InvocationContext(targetClass, cancelMethodName, compensableMethodContext.getMethod().getParameterTypes(), pjp.getArgs()); // 創建事務參與者 Participant participant = new Participant( transaction.getRootXid(), xid, confirmInvocation, cancelInvocation, transactionContextEditorClass); // 添加 事務參與者 到 事務 transactionManager.enlistParticipant(participant); return participant; }- 調用
#getCurrentTransaction()方法, 獲取事務。 - 調用 TransactionXid 構造方法,創建分支事務編號。
- 調用 InvocationContext 構造方法,分別創建確認執行方法調用上下文和取消執行方法調用上下文。
- 調用
TransactionManager#enlistParticipant(...)方法,添加事務參與者到事務。
- 調用
-
-
調用
ProceedingJoinPoint#proceed(...)方法,執行方法原邏輯。
六. 事務存儲器
在 TCC 的過程中,根據應用內存中的事務信息完成整個事務流程。但是實際業務場景中,將事務信息只放在應用內存中是遠遠不夠可靠的。例如:
- 應用進程異常崩潰,未完成的事務信息將丟失。
- 應用進程集群,當提供遠程服務調用時,事務信息需要集群內共享。
- 發起事務的應用需要重啟部署新版本,因為各種原因,有未完成的事務。
因此,TCC-Transaction 將事務信息添加到內存中的同時,會使用外部存儲進行持久化。目前提供四種外部存儲:
- JdbcTransactionRepository,JDBC 事務存儲器
- RedisTransactionRepository,Redis 事務存儲器
- ZooKeeperTransactionRepository,Zookeeper 事務存儲器
- FileSystemTransactionRepository,File 事務存儲器
6.1 序列化
在「3. 事務與參與者」,可以看到 Transaction 是一個比較復雜的對象,內嵌 Participant 數組,而 Participant 本身也是復雜的對象,內嵌了更多的其他對象,因此,存儲器在持久化 Transaction 時,需要序列化后才能存儲。
org.mengyun.tcctransaction.serializer.ObjectSerializer,對象序列化接口。實現代碼如下:
public interface ObjectSerializer<T> {
byte[] serialize(T t);
T deserialize(byte[] bytes);
}
目前提供 JdkSerializationSerializer、KryoTransactionSerializer、JacksonTransactionSerializer 等實現方式。開發者可以在配置TransactionRepository時指定序列化器:
@Bean("transactionRepository")
public TransactionRepository memoryStoreTransactionRepository2(JedisPool jedisPool) {
RedisTransactionRepository repository = new RedisTransactionRepository();
repository.setDomain("TCC:DUBBO:INVENTORY:");
repository.setRootDomain("TCC:DUBBO:ORDER");
repository.setSerializer(new JacksonTransactionSerializer());
repository.setJedisPool(jedisPool);
return repository;
}
6.2 存儲器
org.mengyun.tcctransaction.TransactionRepository,事務存儲器接口。實現代碼如下:
public interface TransactionRepository {
/**
* 新增事務
*
* @param transaction 事務
* @return 新增數量
*/
int create(Transaction transaction);
/**
* 更新事務
*
* @param transaction 事務
* @return 更新數量
*/
int update(Transaction transaction);
/**
* 刪除事務
*
* @param transaction 事務
* @return 刪除數量
*/
int delete(Transaction transaction);
/**
* 獲取事務
*
* @param xid 事務編號
* @return 事務
*/
Transaction findByXid(TransactionXid xid);
/**
* 獲取超過指定時間的事務集合
*
* @param date 指定時間
* @return 事務集合
*/
List<Transaction> findAllUnmodifiedSince(Date date);
}
不同的存儲器通過實現該接口,提供事務的增刪改查功能。
七. 事務恢復
TCC 恢復。主要涉及如下二個 package 路徑下的類:
org.mengyun.tcctransaction.recovery-
- RecoverFrequency,事務恢復配置接口
- TransactionRecovery,事務恢復邏輯
org.mengyun.tcctransaction.spring.recover:-
- DefaultRecoverFrequency,默認事務恢復配置實現
- RecoverScheduledJob,事務恢復定時任務
事務信息被持久化到外部的存儲器中。事務存儲是事務恢復的基礎。通過讀取外部存儲器中的異常事務,定時任務會按照一定頻率對事務進行重試,直到事務完成或超過最大重試次數。
7.1 事務重試配置
public interface RecoverFrequency {
/**
* 單個事務恢復最大重試次數。超過最大重試次數后,打出錯誤日志。
*/
int getMaxRetryCount();
int getFetchPageSize();
/**
* 恢復間隔時間
* @return
*/
int getRecoverDuration();
/**
* cron 表達式
* @return
*/
String getCronExpression();
int getConcurrentRecoveryThreadCount();
}
#getMaxRetryCount(),單個事務恢復最大重試次數。超過最大重試次數后,目前僅打出錯誤日志,下文會看到實現。#getRecoverDuration(),單個事務恢復重試的間隔時間,單位:秒。#getCronExpression(),定時任務 cron 表達式。#getDelayCancelExceptions(),延遲取消異常集合。
org.mengyun.tcctransaction.recovery.DefaultRecoverFrequency默認事務恢復配置實現,實現代碼如下:
public class DefaultRecoverFrequency implements RecoverFrequency {
public static final RecoverFrequency INSTANCE = new DefaultRecoverFrequency();
private int maxRetryCount = 30;
private int recoverDuration = 30; //30 seconds
/**
* 每15s執行一次
*/
private String cronExpression = "0/15 * * * * ? ";
private int fetchPageSize = 500;
private int concurrentRecoveryThreadCount = Runtime.getRuntime().availableProcessors() * 2;
}
maxRetryCount,單個事務恢復最大重試次數 為 30。recoverDuration,單個事務恢復重試的間隔時間為 30 秒。cronExpression,定時任務 cron 表達式為"0/15 * * * * ?",每15s執行一次。
7.2 事務重試JOB
org.mengyun.tcctransaction.spring.recover.RecoverScheduledJob,事務恢復定時任務,基于 Quartz 實現調度,不斷不斷不斷執行事務恢復。實現代碼如下:
/**
* 事務重試JOB,事務恢復定時任務,基于 Quartz 實現調度,不斷執行事務恢復
* Created by changming.xie on 6/2/16.
*/
public class RecoverScheduledJob {
private TransactionRecovery transactionRecovery;
private Scheduler scheduler;
private String jobName;
private String triggerName;
private String cronExpression;
private int delayStartSeconds;
public void init() {
try {
// JOB執行QuartzRecoveryTask#execute方法
JobDetail jobDetail = JobBuilder.newJob(QuartzRecoveryTask.class).withIdentity(jobName).build();
jobDetail.getJobDataMap().put(QuartzRecoveryTask.RECOVERY_INSTANCE_KEY, transactionRecovery);
CronTrigger cronTrigger = TriggerBuilder.newTrigger().withIdentity(triggerName)
.withSchedule(CronScheduleBuilder.cronSchedule(cronExpression)
.withMisfireHandlingInstructionDoNothing()).build();
// 啟動任務調度
scheduler.scheduleJob(jobDetail, cronTrigger);
scheduler.startDelayed(delayStartSeconds);
} catch (Exception e) {
throw new SystemException(e);
}
}
}
創建的定時任務最終執行的是org.mengyun.tcctransaction.recovery.QuartzRecoveryTask方法:
public class QuartzRecoveryTask implements Job {
public final static String RECOVERY_INSTANCE_KEY = "transactionRecovery";
@Override
public void execute(JobExecutionContext context) throws JobExecutionException {
TransactionRecovery transactionRecovery = (TransactionRecovery) context.getMergedJobDataMap().get(RECOVERY_INSTANCE_KEY);
transactionRecovery.startRecover();
}
}
如果應用集群部署,會不會相同事務被多個定時任務同時重試?
答案是不會,事務在重試時會樂觀鎖更新,同時只有一個應用節點能更新成功。
官方解釋:多機部署下,所有機器都宕機,從異常中恢復時,所有的機器豈不是都可以查詢到所有的需要恢復的服務?
當然極端情況下,Socket 調用超時時間大于事務重試間隔,第一個節點在重試某個事務,一直未執行完成,第二個節點已經可以重試。
ps:建議,Socket 調用超時時間小于事務重試間隔。
是否定時任務和應用服務器解耦?
螞蟻金服的分布式事務服務 DTS 采用 client-server 模式:
- xts-client :負責事務的創建、提交、回滾、記錄。
- xts-server :負責異常事務的恢復。
FROM 《螞蟻金融云 DTS 文檔》
分布式事務服務 (Distributed Transaction Service, DTS) 是一個分布式事務框架,用來保障在大規模分布式環境下事務的最終一致性。DTS 從架構上分為 xts-client 和 xts-server 兩部分,前者是一個嵌入客戶端應用的 JAR 包,主要負責事務數據的寫入和處理;后者是一個獨立的系統,主要負責異常事務的恢復。
7.3 異常事務恢復
上述JOB最終執行的是org.mengyun.tcctransaction.recovery.TransactionRecovery#startRecover()方法:
/**
* 恢復異常事務
*/
public void startRecover() {
// 初始化
ensureRecoveryInitialized();
// 獲取事務存儲器
TransactionRepository transactionRepository = transactionConfigurator.getTransactionRepository();
if (transactionRepository instanceof SentinelTransactionRepository) {
SentinelTransactionRepository sentinelTransactionRepository = (SentinelTransactionRepository) transactionRepository;
if (!sentinelTransactionRepository.getSentinelController().degrade()) {
startRecover(sentinelTransactionRepository.getWorkTransactionRepository());
}
// 開始恢復
startRecover(sentinelTransactionRepository.getDegradedTransactionRepository());
} else {
startRecover(transactionRepository);
}
}
startRecover的重載方法,通過創建恢復任務,然后將任務列表扔到線程池中并發執行:
public void startRecover(TransactionRepository transactionRepository) {
// 如果存儲在內存,則使用默認的Lock;如果不是則獲取默認的鎖
Lock recoveryLock = transactionRepository instanceof LocalStorable ? RecoveryLock.DEFAULT_LOCK : transactionConfigurator.getRecoveryLock();
// 加鎖
if (recoveryLock.tryLock()) {
try {
String offset = null;
int totalCount = 0;
do {
// 獲取異常事務列表
Page<Transaction> page = loadErrorTransactionsByPage(transactionRepository, offset);
if (page.getData().size() > 0) {
// 并發恢復異常事務(線程池)
concurrentRecoveryErrorTransactions(transactionRepository, page.getData());
offset = page.getNextOffset();
totalCount += page.getData().size();
} else {
break;
}
} while (true);
logger.debug(String.format("total recovery count %d from repository:%s", totalCount, transactionRepository.getClass().getName()));
} catch (Throwable e) {
logger.error(String.format("recovery failed from repository:%s.", transactionRepository.getClass().getName()), e);
} finally {
recoveryLock.unlock();
}
}
}
/**
* 恢復異常事務集合
* @param transactionRepository
* @param transactions
* @throws InterruptedException
* @throws ExecutionException
*/
private void concurrentRecoveryErrorTransactions(TransactionRepository transactionRepository, List<Transaction> transactions) throws InterruptedException, ExecutionException {
initLogStatistics();
List<RecoverTask> tasks = new ArrayList<>();
for (Transaction transaction : transactions) {
tasks.add(new RecoverTask(transactionRepository, transaction));
}
List<Future<Void>> futures = recoveryExecutorService.invokeAll(tasks, CONCURRENT_RECOVERY_TIMEOUT, TimeUnit.SECONDS);
for (Future future : futures) {
future.get();
}
}
恢復任務最終執行的是org.mengyun.tcctransaction.recovery.TransactionRecovery#recoverErrorTransaction:
private void recoverErrorTransaction(TransactionRepository transactionRepository, Transaction transaction) {
if (transaction.getRetriedCount() > transactionConfigurator.getRecoverFrequency().getMaxRetryCount()) {
//當單個事務超過最大重試次數時,不再重試,只打印異常。
logSync.lock();
try {
if (triggerMaxRetryPrintCount.get() < logMaxPrintCount) {
logger.error(String.format(
"recover failed with max retry count,will not try again. txid:%s, status:%s,retried count:%d,transaction content:%s",
transaction.getXid(),
transaction.getStatus().getId(),
transaction.getRetriedCount(),
JSON.toJSONString(transaction)));
triggerMaxRetryPrintCount.incrementAndGet();
} else if (triggerMaxRetryPrintCount.get() == logMaxPrintCount) {
logger.error("Too many transaction's retried count max then MaxRetryCount during one page transactions recover process , will not print errors again!");
}
} finally {
logSync.unlock();
}
return;
}
try {
if (transaction.getTransactionType().equals(TransactionType.ROOT)) {
// 如果是根事務
switch (transaction.getStatus()) {
case CONFIRMING:
commitTransaction(transactionRepository, transaction);
break;
case CANCELLING:
rollbackTransaction(transactionRepository, transaction);
break;
default:
//the transaction status is TRYING, ignore it.
break;
}
} else {
// 如果是分支事務
//transaction type is BRANCH
switch (transaction.getStatus()) {
case CONFIRMING:
commitTransaction(transactionRepository, transaction);
break;
case CANCELLING:
case TRY_FAILED:
rollbackTransaction(transactionRepository, transaction);
break;
case TRY_SUCCESS:
if(transactionRepository.getRootDomain() == null) {
break;
}
//check the root transaction
Transaction rootTransaction = transactionRepository.findByRootXid(transaction.getRootXid());
if (rootTransaction == null) {
// In this case means the root transaction is already rollback.
// Need cancel this branch transaction.
rollbackTransaction(transactionRepository, transaction);
} else {
switch (rootTransaction.getStatus()) {
case CONFIRMING:
commitTransaction(transactionRepository, transaction);
break;
case CANCELLING:
rollbackTransaction(transactionRepository, transaction);
break;
default:
break;
}
}
break;
default:
// the transaction status is TRYING, ignore it.
break;
}
}
} catch (Throwable throwable) {
if (throwable instanceof TransactionOptimisticLockException
|| ExceptionUtils.getRootCause(throwable) instanceof TransactionOptimisticLockException) {
logger.warn(String.format(
"optimisticLockException happened while recover. txid:%s, status:%d,retried count:%d",
transaction.getXid(),
transaction.getStatus().getId(),
transaction.getRetriedCount()));
} else {
logSync.lock();
try {
if (recoveryFailedPrintCount.get() < logMaxPrintCount) {
logger.error(String.format("recover failed, txid:%s, status:%s,retried count:%d,transaction content:%s",
transaction.getXid(),
transaction.getStatus().getId(),
transaction.getRetriedCount(),
JSON.toJSONString(transaction)), throwable);
recoveryFailedPrintCount.incrementAndGet();
} else if (recoveryFailedPrintCount.get() == logMaxPrintCount) {
logger.error("Too many transaction's recover error during one page transactions recover process , will not print errors again!");
}
} finally {
logSync.unlock();
}
}
}
}
- 當單個事務超過最大重試次數時,不再重試,只打印異常,此時需要人工介入解決??梢越尤?ELK 收集日志監控報警。
- 當分支事務超過最大可重試時間時,不再重試??赡苡型瑢W和我一開始理解的是相同的,實際分支事務對應的應用服務器也可以重試分支事務,不是必須根事務發起重試,從而一起重試分支事務。這點要注意下。
- 當事務處于 TransactionStatus.CONFIRMING 狀態時,提交事務,邏輯和
TransactionManager#commit()類似。 - 當事務處于 TransactionStatus.CONFIRMING 狀態,或者事務類型為根事務,回滾事務,邏輯和
TransactionManager#rollback()類似。這里加判斷的事務類型為根事務,用于處理延遲回滾異常的事務的回滾。
八. Dubbo實現
8.1 DubboTransactionContextEditor
[5.3 資源協調者攔截器] 在切面方法調用之前會嘗試創建事務參與者,此時DubboTransactionContextEditor會通過Dubbo隱式傳參的方式,將事務信息傳遞給下游服務:
public class DubboTransactionContextEditor implements TransactionContextEditor {
@Override
public TransactionContext get(Object target, Method method, Object[] args) {
String context = RpcContext.getContext().getAttachment(TransactionContextConstants.TRANSACTION_CONTEXT);
if (StringUtils.isNotEmpty(context)) {
return JSON.parseObject(context, TransactionContext.class);
}
return null;
}
@Override
public void set(TransactionContext transactionContext, Object target, Method method, Object[] args) {
RpcContext.getContext().setAttachment(TransactionContextConstants.TRANSACTION_CONTEXT, JSON.toJSONString(transactionContext));
}
}
8.2 CompensableTransactionFilter
在tcc-transaction與Dubbo整合后,當前事務代碼在調用遠程RPC服務之前,會創建一個事務參與者(內部包含confirm、cancel上下文信息)并放入當前事務中保存,這樣就可以在當前事務回滾時通知下游分支事務進行回滾操作。而創建Dubbo事務參與者的邏輯就是通過Dubbo Filter機制去實現(Dubbo Filter具體參見:調用攔截擴展 | Apache Dubbo)。
DubboTransactionContextEditor中,會對Dubbo RPC方法進行攔截,攔截后通過ResourceCoordinatorAspect#interceptTransactionContextMethod方法創建事務參與者,并傳播分支事務ID:
package org.mengyun.tcctransaction.dubbo.filter;
/**
* Dubbo Filter機制,在調用遠程調用Dubbo下游服務之前會進入
*/
@Activate(group = {Constants.CONSUMER})
public class CompensableTransactionFilter implements Filter {
@Override
public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
Method method = null;
try {
method = invoker.getInterface().getMethod(invocation.getMethodName(), invocation.getParameterTypes());
if (ParameterTransactionContextEditor.hasTransactionContextParameter(invocation.getParameterTypes())) {
// in this case, will handler by ResourceCoordinatorAspect
return invoker.invoke(invocation);
}
EnableTcc enableTcc = method.getAnnotation(EnableTcc.class);
if (enableTcc != null) {
DubboInvokeProceedingJoinPoint pjp = new DubboInvokeProceedingJoinPoint(invoker, invocation, null, DubboTransactionContextEditor.class);
return (Result) FactoryBuilder.factoryOf(ResourceCoordinatorAspect.class).getInstance().interceptTransactionContextMethod(pjp);
} else {
return invoker.invoke(invocation);
}
} catch (Throwable e) {
throw new SystemException(e);
}
}
}
九. 總結
事務攔截器和事務恢復JOB,共同保證TCC事務最終一致性:

TCC依賴于事務記錄,在開始TCC事務前標記創建此記錄,這樣在服務宕機等意外情況下,還能通過JOB保證事務狀態最終恢復一致性。因為存在失敗重試的邏輯,所以cancel、commit方法必須實現冪等。其實在分布式開發中,凡是涉及到寫操作的地方都應該實現冪等。
事務攔截器和資源協調者攔截器以及Dubbo Filter協作生成完整事務樹:

其中“事務攔截器”主要負責生成根事務/分支事務,而“資源協調者攔截器”主要負責為事務生成事務參與者,在資源協調者切面中以及在調用Dubbo RPC服務之前都會調用“資源協調者攔截器”為事務生成事務參與者,并持久化事務信息。當事務進行confirm/camcel操作時,會根據事務ID,查詢到當前事務的所有事務參與者信息,并統一進行confirm/cancel操作。
詳細的代碼流程如下:

tips:本文所有圖例中藍色代表事務攔截器的代碼邏輯、綠色代表資源協調者攔截器的代碼邏輯、橙色代表Dubbo Filter代碼邏輯、紫色代表業務活動管理器代碼邏輯(事務恢復JOB)。
事務攔截器主要完成根事務/分支事務的創建,以及事務confirm/cancel的邏輯。資源協調者攔截器主要用于創建事務參與者并將事務參與者放入事務記錄中。這樣上游事務通知當前事務進行confirm/cancel時,可以查出對應的事務上下文,恢復執行現場。
本文參考至:
TCC-Transaction原理 - 掘金 (juejin.cn)


浙公網安備 33010602011771號