Spring事務(wù)實現(xiàn)原理
1、引言
spring的spring-tx模塊提供了對事務(wù)管理支持,使用spring事務(wù)可以讓我們從復(fù)雜的事務(wù)處理中得到解脫,無需要去處理獲得連接、關(guān)閉連接、事務(wù)提交和回滾等這些操作。
spring事務(wù)有編程式事務(wù)和聲明式事務(wù)兩種實現(xiàn)方式。編程式事務(wù)是通過編寫代碼來管理事務(wù)的提交、回滾、以及事務(wù)的邊界。這意味著開發(fā)者需要在代碼中顯式地調(diào)用事務(wù)的開始、提交和回滾。聲明式事務(wù)是通過配置來管理事務(wù),您可以使用注解或XML配置來定義事務(wù)的邊界和屬性,而無需顯式編寫事務(wù)管理的代碼。
下面我們逐步分析spring源代碼,理解spring事務(wù)的實現(xiàn)原理。
2、編程式事務(wù)
2.1 使用示例
// transactionManager是某一個具體的PlatformTransactionManager實現(xiàn)類的對象
private PlatformTransactionManager transactionManager;
// 定義事務(wù)屬性
DefaultTransactionDefinition def = new DefaultTransactionDefinition();
// 獲取事務(wù)
TransactionStatus status = transactionManager.getTransaction(def);
try {
// 執(zhí)行數(shù)據(jù)庫操作
// ...
// 提交事務(wù)
transactionManager.commit(status);
} catch (Exception ex) {
// 回滾事務(wù)
transactionManager.rollback(status);
}
在使用編程式事務(wù)處理的過程中,利用 DefaultTransactionDefinition 對象來持有事務(wù)處理屬性。同時,在創(chuàng)建事務(wù)的過程中得到一個 TransactionStatus 對象,然后通過直接調(diào)用 transactionManager 對象 的 commit() 和 rollback()方法 來完成事務(wù)處理。
2.2 PlatformTransactionManager核心接口

PlatformTransactionManager是Spring事務(wù)管理的核心接口,通過 PlatformTransactionManager 接口設(shè)計了一系列與事務(wù)處理息息相關(guān)的接口方法,如 getTransaction()、commit()、rollback() 這些和事務(wù)處理相關(guān)的統(tǒng)一接口。對于這些接口的實現(xiàn),很大一部分是由 AbstractTransactionManager 抽象類來完成的。
AbstractPlatformManager 封裝了 Spring 事務(wù)處理中通用的處理部分,比如事務(wù)的創(chuàng)建、提交、回滾,事務(wù)狀態(tài)和信息的處理,與線程的綁定等,有了這些通用處理的支持,對于具體的事務(wù)管理器而言,它們只需要處理和具體數(shù)據(jù)源相關(guān)的組件設(shè)置就可以了,比如在DataSourceTransactionManager中,就只需要配置好和DataSource事務(wù)處理相關(guān)的接口以及相關(guān)的設(shè)置。
2.3 事務(wù)的創(chuàng)建
PlatformTransactionManager的getTransaction()方法,封裝了底層事務(wù)的創(chuàng)建,并生成一個 TransactionStatus對象。AbstractPlatformTransactionManager提供了創(chuàng)建事務(wù)的模板,這個模板會被具體的事務(wù)處理器所使用。從下面的代碼中可以看到,AbstractPlatformTransactionManager會根據(jù)事務(wù)屬性配置和當(dāng)前進(jìn)程綁定的事務(wù)信息,對事務(wù)是否需要創(chuàng)建,怎樣創(chuàng)建 進(jìn)行一些通用的處理,然后把事務(wù)創(chuàng)建的底層工作交給具體的事務(wù)處理器完成,如:DataSourceTransactionManager、HibernateTransactionManager。
public final TransactionStatus getTransaction(@Nullable TransactionDefinition definition)
throws TransactionException {
TransactionDefinition def = (definition != null ? definition : TransactionDefinition.withDefaults());
Object transaction = doGetTransaction();
boolean debugEnabled = logger.isDebugEnabled();
if (isExistingTransaction(transaction)) {
return handleExistingTransaction(def, transaction, debugEnabled);
}
if (def.getTimeout() TransactionDefinition.TIMEOUT_DEFAULT) {
throw new InvalidTimeoutException("Invalid transaction timeout", def.getTimeout());
}
if (def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_MANDATORY) {
throw new IllegalTransactionStateException(
"No existing transaction found for transaction marked with propagation 'mandatory'");
}
else if (def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED ||
def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW ||
def.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
SuspendedResourcesHolder suspendedResources = suspend(null);
if (debugEnabled) {
logger.debug("Creating new transaction with name [" + def.getName() + "]: " + def);
}
try {
return startTransaction(def, transaction, false, debugEnabled, suspendedResources);
}
catch (RuntimeException | Error ex) {
resume(null, suspendedResources);
throw ex;
}
}
else {
if (def.getIsolationLevel() .isWarnEnabled()) {
logger.warn("Custom isolation level specified but no actual transaction initiated; " +
"isolation level will effectively be ignored: " + def);
}
boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
return prepareTransactionStatus(def, null, true, newSynchronization, debugEnabled, null);
}
}
private TransactionStatus startTransaction(TransactionDefinition definition, Object transaction, boolean debugEnabled, @Nullable SuspendedResourcesHolder suspendedResources) {
boolean newSynchronization = this.getTransactionSynchronization() != SYNCHRONIZATION_NEVER;
DefaultTransactionStatus status = this.newTransactionStatus(definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);
this.doBegin(transaction, definition);
this.prepareSynchronization(status, definition);
return status;
}
事務(wù)創(chuàng)建的結(jié)果是生成一個TransactionStatus對象,通過這個對象來保存事務(wù)處理需要的基本信息,TransactionStatus的創(chuàng)建過程如下:
protected DefaultTransactionStatus newTransactionStatus(TransactionDefinition definition, @Nullable Object transaction, boolean newTransaction, boolean newSynchronization, boolean debug, @Nullable Object suspendedResources) {
boolean actualNewSynchronization !TransactionSynchronizationManager.isSynchronizationActive();
return new DefaultTransactionStatus(transaction, newTransaction, actualNewSynchronization, definition.isReadOnly(), debug, suspendedResources);
}
以上是創(chuàng)建一個全新事務(wù)的過程,還有另一種情況是:在創(chuàng)建當(dāng)前事務(wù)時,線程中已經(jīng)有事務(wù)存在了。這種情況會涉及事務(wù)傳播行為的處理。spring中七種事務(wù)傳播行為如下:
| 事務(wù)傳播行為類型 | 說明 |
| PROPAGATION_REQUIRED | 如果當(dāng)前沒有事務(wù),就新建一個事務(wù),如果已經(jīng)存在一個事務(wù)中,加入到這個事務(wù)中。這是最常見的選擇。 |
| PROPAGATION_SUPPORTS | 支持當(dāng)前事務(wù),如果當(dāng)前沒有事務(wù),就以非事務(wù)方式執(zhí)行。 |
| PROPAGATION_MANDATORY | 使用當(dāng)前的事務(wù),如果當(dāng)前沒有事務(wù),就拋出異常。 |
| PROPAGATION_REQUIRES_NEW | 新建事務(wù),如果當(dāng)前存在事務(wù),把當(dāng)前事務(wù)掛起。 |
| PROPAGATION_NOT_SUPPORTED | 以非事務(wù)方式執(zhí)行操作,如果當(dāng)前存在事務(wù),就把當(dāng)前事務(wù)掛起。 |
| PROPAGATION_NEVER | 以非事務(wù)方式執(zhí)行,如果當(dāng)前存在事務(wù),則拋出異常。 |
| PROPAGATION_NESTED | 如果當(dāng)前存在事務(wù),則在嵌套事務(wù)內(nèi)執(zhí)行。如果當(dāng)前沒有事務(wù),則執(zhí)行與PROPAGATION_REQUIRED類似的操作。 |
如果檢測到已存在事務(wù),handleExistingTransaction()方法將根據(jù)不同的事務(wù)傳播行為類型執(zhí)行相應(yīng)邏輯。
PROPAGATION_NEVER
即當(dāng)前方法需要在非事務(wù)的環(huán)境下執(zhí)行,如果有事務(wù)存在,那么拋出異常。
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NEVER) {
throw new IllegalTransactionStateException(
"Existing transaction found for transaction marked with propagation 'never'");
}
PROPAGATION_NOT_SUPPORTED
與前者的區(qū)別在于,如果有事務(wù)存在,那么將事務(wù)掛起,而不是拋出異常。
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NOT_SUPPORTED) {
Object suspendedResources = suspend(transaction);
boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
return prepareTransactionStatus(
definition, null, false, newSynchronization, debugEnabled, suspendedResources);
}
PROPAGATION_REQUIRES_NEW
新建事務(wù),如果當(dāng)前存在事務(wù),把當(dāng)前事務(wù)掛起。
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW) {
SuspendedResourcesHolder suspendedResources = suspend(transaction);
boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
DefaultTransactionStatus status = newTransactionStatus(
definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);
doBegin(transaction, definition);
prepareSynchronization(status, definition);
return status;
}
PROPAGATION_NESTED
開始一個 "嵌套的" 事務(wù), 它是已經(jīng)存在事務(wù)的一個真正的子事務(wù). 嵌套事務(wù)開始執(zhí)行時, 它將取得一個 savepoint. 如果這個嵌套事務(wù)失敗, 我們將回滾到此 savepoint. 嵌套事務(wù)是外部事務(wù)的一部分, 只有外部事務(wù)結(jié)束后它才會被提交。
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
if (useSavepointForNestedTransaction()) {
DefaultTransactionStatus status = newTransactionStatus(
definition, transaction, false, false, true, debugEnabled, null);
this.transactionExecutionListeners.forEach(listener .beforeBegin(status));
try {
status.createAndHoldSavepoint();
}
catch (RuntimeException .afterBegin(status, ex));
throw ex;
}
this.transactionExecutionListeners.forEach(listener .afterBegin(status, null));
return status;
}
else {
return startTransaction(definition, transaction, true, debugEnabled, null);
}
}
2.4 事務(wù)掛起
事務(wù)掛起在AbstractTransactionManager.suspend()中處理,該方法內(nèi)部將調(diào)用具體事務(wù)管理器的doSuspend()方法。以DataSourceTransactionManager為例,將ConnectionHolder設(shè)為null,因為一個ConnectionHolder對象就代表了一個數(shù)據(jù)庫連接,將ConnectionHolder設(shè)為null就意味著我們下次要使用連接時,將重新從連接池獲取。
protected Object doSuspend(Object transaction) {
DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction;
txObject.setConnectionHolder(null);
return TransactionSynchronizationManager.unbindResource(obtainDataSource());
}
unbindResource()方法最終會調(diào)用TransactionSynchronizationManager.doUnbindResource()方法,該方法將移除當(dāng)前線程與事務(wù)對象的綁定。
private static Object doUnbindResource(Object actualKey) {
Map<Object, Object> map = resources.get();
if (map == null) {
return null;
}
Object value = map.remove(actualKey);
if (map.isEmpty()) {
resources.remove();
}
if (value instanceof ResourceHolder resourceHolder .isVoid()) {
value = null;
}
return value;
}
而被掛起的事務(wù)的各種狀態(tài)最終會保存在TransactionStatus對象中。
2.5 事務(wù)提交&回滾
主要是對jdbc的封裝、源碼邏輯較清晰,不展開細(xì)說。
3、聲明式事務(wù)
其底層建立在 AOP 的基礎(chǔ)之上,對方法前后進(jìn)行攔截,然后在目標(biāo)方法開始之前創(chuàng)建或者加入一個事務(wù),在執(zhí)行完目標(biāo)方法之后根據(jù)執(zhí)行情況提交或者回滾事務(wù)。通過聲明式事物,無需在業(yè)務(wù)邏輯代碼中摻雜事務(wù)管理的代碼,只需在配置文件中做相關(guān)的事務(wù)規(guī)則聲明(或通過等價的基于標(biāo)注的方式),便可以將事務(wù)規(guī)則應(yīng)用到業(yè)務(wù)邏輯中。
3.1 使用示例
配置:
<bean id="dataSource" class="com.zaxxer.hikari.HikariDataSource">
<property name="driverClassName" value="${jdbc.driverClassName}" />
<property name="url" value="${jdbc.url}" />
<property name="username" value="${jdbc.username}" />
<property name="password" value="${jdbc.password}" />
</bean>
<bean id="transactionManager" class="org.springframework.jdbc.datasource.DataSourceTransactionManager">
<property name="dataSource" ref="dataSource"/>
</bean>
<tx:annotation-driven transaction-manager="transactionManager"/>
代碼:
@Transactional
public void addOrder() {
// 執(zhí)行數(shù)據(jù)庫操作
}
3.2 自定義標(biāo)簽解析
先從配置文件開始入手,找到處理annotation-driven標(biāo)簽的類TxNamespaceHandler。TxNamespaceHandler實現(xiàn)了NamespaceHandler接口,定義了如何解析和處理自定義XML標(biāo)簽。
@Override
public void init() {
registerBeanDefinitionParser("advice", new TxAdviceBeanDefinitionParser());
registerBeanDefinitionParser("annotation-driven", new AnnotationDrivenBeanDefinitionParser());
registerBeanDefinitionParser("jta-transaction-manager", new JtaTransactionManagerBeanDefinitionParser());
}
AnnotationDrivenBeanDefinitionParser里的parse()方法,對XML標(biāo)簽annotation-driven進(jìn)行解析。
@Override
public BeanDefinition parse(Element element, ParserContext parserContext) {
registerTransactionalEventListenerFactory(parserContext);
String mode = element.getAttribute("mode");
if ("aspectj".equals(mode)) {
// mode="aspectj"
registerTransactionAspect(element, parserContext);
if (ClassUtils.isPresent("jakarta.transaction.Transactional", getClass().getClassLoader())) {
registerJtaTransactionAspect(element, parserContext);
}
}
else {
// mode="proxy"
AopAutoProxyConfigurer.configureAutoProxyCreator(element, parserContext);
}
return null;
}
以默認(rèn)mode配置為例,執(zhí)行configureAutoProxyCreator()方法,將在Spring容器中注冊了3個bean:
BeanFactoryTransactionAttributeSourceAdvisor、TransactionInterceptor、AnnotationTransactionAttributeSource。同時會將TransactionInterceptor的BeanName傳入到Advisor中,然后將AnnotationTransactionAttributeSource這個Bean注入到Advisor中。之后動態(tài)代理的時候會使用這個Advisor去尋找每個Bean是否需要動態(tài)代理。
// Create the TransactionAttributeSourceAdvisor definition.
RootBeanDefinition advisorDef = new RootBeanDefinition(BeanFactoryTransactionAttributeSourceAdvisor.class);
advisorDef.setSource(eleSource);
advisorDef.setRole(BeanDefinition.ROLE_INFRASTRUCTURE);
advisorDef.getPropertyValues().add("transactionAttributeSource", new RuntimeBeanReference(sourceName));
advisorDef.getPropertyValues().add("adviceBeanName", interceptorName);
if (element.hasAttribute("order")) {
advisorDef.getPropertyValues().add("order", element.getAttribute("order"));
}
parserContext.getRegistry().registerBeanDefinition(txAdvisorBeanName, advisorDef);
CompositeComponentDefinition compositeDef = new CompositeComponentDefinition(element.getTagName(), eleSource);
compositeDef.addNestedComponent(new BeanComponentDefinition(sourceDef, sourceName));
compositeDef.addNestedComponent(new BeanComponentDefinition(interceptorDef, interceptorName));
compositeDef.addNestedComponent(new BeanComponentDefinition(advisorDef, txAdvisorBeanName));
parserContext.registerComponent(compositeDef);
3.3 Advisor
回顧AOP用法,Advisor可用于定義一個切面,它包含切點(Pointcut)和通知(Advice),用于在特定的連接點上執(zhí)行特定的操作。spring事務(wù)實現(xiàn)了一個Advisor: BeanFactoryTransactionAttributeSourceAdvisor。
public class BeanFactoryTransactionAttributeSourceAdvisor extends AbstractBeanFactoryPointcutAdvisor {
private final TransactionAttributeSourcePointcut pointcut = new TransactionAttributeSourcePointcut();
public void setTransactionAttributeSource(TransactionAttributeSource transactionAttributeSource) {
this.pointcut.setTransactionAttributeSource(transactionAttributeSource);
}
public void setClassFilter(ClassFilter classFilter) {
this.pointcut.setClassFilter(classFilter);
}
@Override
public Pointcut getPointcut() {
return this.pointcut;
}
}
BeanFactoryTransactionAttributeSourceAdvisor其實是一個PointcutAdvisor,是否匹配到切入點取決于Pointcut。Pointcut的核心在于其ClassFilter和MethodMatcher。
ClassFilter:
TransactionAttributeSourcePointcut內(nèi)部私有類 TransactionAttributeSourceClassFilter,實現(xiàn)了Spring框架中的ClassFilter接口。在matches方法中,它首先檢查傳入的類clazz 否為TransactionalProxy、TransactionManager或PersistenceExceptionTranslator的子類,如果不是,則獲取當(dāng)前的 TransactionAttributeSource 并檢查其是否允許該類作為候選類。
private class TransactionAttributeSourceClassFilter implements ClassFilter {
@Override
public boolean matches(Class<?> clazz) {
if (TransactionalProxy.class.isAssignableFrom(clazz) ||
TransactionManager.class.isAssignableFrom(clazz) ||
PersistenceExceptionTranslator.class.isAssignableFrom(clazz)) {
return false;
}
return (transactionAttributeSource == null || transactionAttributeSource.isCandidateClass(clazz));
}
}
MethodMatcher:
TransactionAttributeSourcePointcut.matches:
@Override
public boolean matches(Method method, Class<?> targetClass) {
return (this.transactionAttributeSource == null ||
this.transactionAttributeSource.getTransactionAttribute(method, targetClass) != null);
}
getTransactionAttribute()方法最終會調(diào)用至AbstractFallbackTransactionAttributeSource.computeTransactionAttribute()方法,該方法將先去方法上查找是否有相應(yīng)的事務(wù)注解(比如@Transactional),如果沒有,那么再去類上查找。
protected TransactionAttribute computeTransactionAttribute(Method method, @Nullable Class<?> targetClass) {
// Don't allow non-public methods, as configured.
if (allowPublicMethodsOnly() !Modifier.isPublic(method.getModifiers())) {
return null;
}
// The method may be on an interface, but we need attributes from the target class.
// If the target class is null, the method will be unchanged.
Method specificMethod = AopUtils.getMostSpecificMethod(method, targetClass);
// First try is the method in the target class.
TransactionAttribute txAttr = findTransactionAttribute(specificMethod);
if (txAttr != null) {
return txAttr;
}
// Second try is the transaction attribute on the target class.
txAttr = findTransactionAttribute(specificMethod.getDeclaringClass());
if (txAttr ClassUtils.isUserLevelMethod(method)) {
return txAttr;
}
if (specificMethod != method) {
// Fallback is to look at the original method.
txAttr = findTransactionAttribute(method);
if (txAttr != null) {
return txAttr;
}
// Last fallback is the class of the original method.
txAttr = findTransactionAttribute(method.getDeclaringClass());
if (txAttr ClassUtils.isUserLevelMethod(method)) {
return txAttr;
}
}
return null;
}
3.4 TransactionInterceptor
TransactionInterceptor是spring事務(wù)提供的AOP攔截器,實現(xiàn)了AOP Alliance的MethodInterceptor接口,是一種通知(advice)。其可以用于在方法調(diào)用前后進(jìn)行事務(wù)管理。
@Override
@Nullable
public Object invoke(MethodInvocation invocation) throws Throwable {
// Work out the target class: may be {@code null}.
// The TransactionAttributeSource should be passed the target class
// as well as the method, which may be from an interface.
Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null);
// Adapt to TransactionAspectSupport's invokeWithinTransaction...
return invokeWithinTransaction(invocation.getMethod(), targetClass, new CoroutinesInvocationCallback() {
@Override
@Nullable
public Object proceedWithInvocation() throws Throwable {
return invocation.proceed();
}
@Override
public Object getTarget() {
return invocation.getThis();
}
@Override
public Object[] getArguments() {
return invocation.getArguments();
}
});
}
invokeWithinTransaction()方法會根據(jù)目標(biāo)方法上的事務(wù)配置,來決定是開啟新事務(wù)、加入已有事務(wù),還是直接執(zhí)行邏輯(如果沒有事務(wù))。其代碼簡化如下(僅保留PlatformTransactionManager部分):
protected Object invokeWithinTransaction(Method method, @Nullable Class<?> targetClass, final InvocationCallback invocation) {
// If the transaction attribute is null, the method is non-transactional.
final TransactionAttribute txAttr = getTransactionAttributeSource()
.getTransactionAttribute(method, targetClass);
final PlatformTransactionManager tm = determineTransactionManager(txAttr);
final String joinpointIdentification = methodIdentification(method, targetClass);
if (txAttr == null || !(tm instanceof CallbackPreferringPlatformTransactionManager)) {
// Standard transaction demarcation with getTransaction and commit/rollback calls.
TransactionInfo txInfo = createTransactionIfNecessary(tm, txAttr, joinpointIdentification);
ObjectretVal = null;
try {
// This is an around advice: Invoke the next interceptor in the chain.
// This will normally result in a target object being invoked.
retVal = invocation.proceedWithInvocation();
} catch (Throwableex) {
// target invocation exception
completeTransactionAfterThrowing(txInfo, ex);
throwex;
} finally {
cleanupTransactionInfo(txInfo);
}
commitTransactionAfterReturning(txInfo);
returnretVal;
}
}
作者:京東零售 范錫軍
來源:京東云開發(fā)者社區(qū) 轉(zhuǎn)載請注明來源
浙公網(wǎng)安備 33010602011771號