[TOC]
概览
spring事务需要aop的支持,在spring容器初始化的时候,会将一个TransactionInterceptor的实例加入到所有方法上加了@Transactional注解的bean的代理对象的advisor数组中,当我们执行事务方法时,就会去执行TransactionInterceptor.invoke方法进行事务处理
注入TransactionInterceptor的过程参考
1. 首先初始化所有Advisor,这个过程会把容器中实现了advisor接口的beanDefinition对应的类实例化并加入到一个AdvisedSupport对象中,最后会注入到代理对象
AbstractAutoProxyCreator.postProcessBeforeInstantiation()
AspectJAwareAdvisorAutoProxyCreator.shouldSkip()
AspectJAwareAdvisorAutoProxyCreator.findCandidateAdvisors()
2. 代理bean
AbstractAutoProxyCreator.postProcessAfterInitialization()
AbstractAutoProxyCreator.wrapIfNecessary()
AbstractAutoProxyCreator.createProxy() //这里面会去选择是使用jdk动态代理or cglib,并最终生成代理对象
类图

这里可以到到TransactionInterceptor最终是实现了Advice接口,所以在前面的1中就会把TransactionInterceptor实例化好作为拦截器加入到代理对象中
事务处理流程
当我们调用加了@Transactional的bean的目标方法时,首先会进入TransactionInterceptor.invoke,这里是因为TransactionInterceptor实现了MethodInterceptor(具体可参考spring aop原理)。然后会调用TransactionAspectSupport.invokeWithinTransaction()
@Nullable
protected Object invokeWithinTransaction(Method method, @Nullable Class<?> targetClass,
final InvocationCallback invocation) throws Throwable {
// If the transaction attribute is null, the method is non-transactional.
TransactionAttributeSource tas = getTransactionAttributeSource();
final TransactionAttribute txAttr = (tas != null ? tas.getTransactionAttribute(method, targetClass) : null);
final PlatformTransactionManager tm = determineTransactionManager(txAttr);
final String joinpointIdentification = methodIdentification(method, targetClass, txAttr);
if (txAttr == null || !(tm instanceof CallbackPreferringPlatformTransactionManager)) {
// Standard transaction demarcation with getTransaction and commit/rollback calls.
//保存事务所需要的所有信息到txInfo,包括隔离级别,数据源等
TransactionInfo txInfo = createTransactionIfNecessary(tm, txAttr, joinpointIdentification);
Object retVal = null;
try {
// This is an around advice: Invoke the next interceptor in the chain.
// This will normally result in a target object being invoked.
retVal = invocation.proceedWithInvocation();
}
catch (Throwable ex) {
// target invocation exception
//异常回滚处理
completeTransactionAfterThrowing(txInfo, ex);
throw ex;
}
finally {
cleanupTransactionInfo(txInfo);
}
commitTransactionAfterReturning(txInfo);
return retVal;
}
这里有个createTransactionIfNecessary(tm, txAttr, joinpointIdentification)方法,它保存了事务执行过程中所需要的全部信息,其内部会处理事务的传播,例如当propagation设置为REQUIRES_NEW时,它会去挂起当前事务,挂起主要是把当前事务的jdbc连接从线程的threadlocalMap中清除,并保存到新的事务的TransactionStatus中。然后把当前事务的transactionInfo保存到新建立的transactioninfo的oldTransactionInfo属性中
protected TransactionInfo createTransactionIfNecessary(@Nullable PlatformTransactionManager tm,
@Nullable TransactionAttribute txAttr, final String joinpointIdentification) {
// If no name specified, apply method identification as transaction name.
if (txAttr != null && txAttr.getName() == null) {
txAttr = new DelegatingTransactionAttribute(txAttr) {
@Override
public String getName() {
return joinpointIdentification;
}
};
}
TransactionStatus status = null;
if (txAttr != null) {
if (tm != null) {
//初始化事务的状态,后续提交跟回滚时会通过这个对象拿到这个事务的传播属性
//根据不同的传播属性做不同的处理
//如果是REQUIRES_NEW,并且当前已经存在事务,就会挂起当前事务,
//挂起主要是保存当前事务的jdbc连接信息到TranscationStatus中
//把当前线程的threadlocalMap里的连接信息切换成新的连接信息
status = tm.getTransaction(txAttr);
}
}
//初始化当前事务,并把当前事务加入到当前线程的threadLocalMap中
//将先前的事务加入到当前事务的oldTransactionInfo属性中
return prepareTransactionInfo(tm, txAttr, joinpointIdentification, status);
}
进入tm.getTransaction()
@Override
public final TransactionStatus getTransaction(@Nullable TransactionDefinition definition) throws TransactionException {
//获取当前线程绑定的transactionResource对象
Object transaction = doGetTransaction();
// Cache debug flag to avoid repeated checks.
boolean debugEnabled = logger.isDebugEnabled();
if (definition == null) {
// Use defaults if no transaction definition given.
definition = new DefaultTransactionDefinition();
}
//如果当前已经存在一个事务
if (isExistingTransaction(transaction)) {
// Existing transaction found -> check propagation behavior to find out how to behave.
//检查传播行为并处理。包括REQUIRES_NEW当前事务的挂起,NEVER跟NOT_SUPPORTED抛异常,NESTED的savepoint等
return handleExistingTransaction(definition, transaction, debugEnabled);
}
// Check definition settings for new transaction.
if (definition.getTimeout() < TransactionDefinition.TIMEOUT_DEFAULT) {
throw new InvalidTimeoutException("Invalid transaction timeout", definition.getTimeout());
}
// No existing transaction found -> check propagation behavior to find out how to proceed.
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_MANDATORY) {
throw new IllegalTransactionStateException(
"No existing transaction found for transaction marked with propagation 'mandatory'");
}
else if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED ||
definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW ||
definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
SuspendedResourcesHolder suspendedResources = suspend(null);
if (debugEnabled) {
logger.debug("Creating new transaction with name [" + definition.getName() + "]: " + definition);
}
try {
boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
DefaultTransactionStatus status = newTransactionStatus(
definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);
//为当前session设置隔离级别,关闭自动提交
doBegin(transaction, definition);
prepareSynchronization(status, definition);
return status;
}
catch (RuntimeException | Error ex) {
resume(null, suspendedResources);
throw ex;
}
}
else {
// Create "empty" transaction: no actual transaction, but potentially synchronization.
if (definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT && logger.isWarnEnabled()) {
logger.warn("Custom isolation level specified but no actual transaction initiated; " +
"isolation level will effectively be ignored: " + definition);
}
boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
//构建transcationStatus对象
return prepareTransactionStatus(definition, null, true, newSynchronization, debugEnabled, null);
}
}
最后我们执行完目标方法回到invokeWithinTransaction,我们先来看看finally里的cleanupTransactionInfo()做了什么事情
protected void cleanupTransactionInfo(@Nullable TransactionInfo txInfo) {
if (txInfo != null) {
txInfo.restoreThreadLocalStatus();
}
}
private void restoreThreadLocalStatus() {
// Use stack to restore old transaction TransactionInfo.
// Will be null if none was set.
transactionInfoHolder.set(this.oldTransactionInfo);
}
private static final ThreadLocal<TransactionInfo> transactionInfoHolder =
new NamedThreadLocal<>("Current aspect-driven transaction");
这里我们发现,它把先前内层事务(如果存在)保存下来的事务信息重新设置到了当前线程的threadlocalMap中,这里主要在TransactionAspectSupport.currentTransactionStatus()可以获取到这个transactionInfo的status属性
接下来我们看看commitTransactionAfterReturning()
private void processCommit(DefaultTransactionStatus status) throws TransactionException {
try {
boolean beforeCompletionInvoked = false;
try {
boolean unexpectedRollback = false;
prepareForCommit(status);
triggerBeforeCommit(status);
triggerBeforeCompletion(status);
beforeCompletionInvoked = true;
//处理嵌套事务(NESTED)
if (status.hasSavepoint()) {
if (status.isDebug()) {
logger.debug("Releasing transaction savepoint");
}
unexpectedRollback = status.isGlobalRollbackOnly();
status.releaseHeldSavepoint();
}
// 这个值是根据事务的传播属性进行初始化的,可参考AbstractPlatformTransactionManager.handleExistingTransaction
//跟AbstractPlatformTransactionManager.getTransaction对TransactionStatus的初始化方法。
//这里也是对REQUIRED传播特性的一个重要保证,子方法在这里不会触发事务提交
else if (status.isNewTransaction()) {
if (status.isDebug()) {
logger.debug("Initiating transaction commit");
}
unexpectedRollback = status.isGlobalRollbackOnly();
doCommit(status);
}
else if (isFailEarlyOnGlobalRollbackOnly()) {
unexpectedRollback = status.isGlobalRollbackOnly();
}
// Throw UnexpectedRollbackException if we have a global rollback-only
// marker but still didn't get a corresponding exception from commit.
if (unexpectedRollback) {
throw new UnexpectedRollbackException(
"Transaction silently rolled back because it has been marked as rollback-only");
}
}
catch (UnexpectedRollbackException ex) {
// can only be caused by doCommit
triggerAfterCompletion(status, TransactionSynchronization.STATUS_ROLLED_BACK);
throw ex;
}
catch (TransactionException ex) {
// can only be caused by doCommit
if (isRollbackOnCommitFailure()) {
doRollbackOnCommitException(status, ex);
}
else {
triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN);
}
throw ex;
}
catch (RuntimeException | Error ex) {
if (!beforeCompletionInvoked) {
triggerBeforeCompletion(status);
}
doRollbackOnCommitException(status, ex);
throw ex;
}
// Trigger afterCommit callbacks, with an exception thrown there
// propagated to callers but the transaction still considered as committed.
try {
triggerAfterCommit(status);
}
finally {
triggerAfterCompletion(status, TransactionSynchronization.STATUS_COMMITTED);
}
}
finally {
cleanupAfterCompletion(status);
}
}
提交完事了,我们来看看回滚TransactionAspectSupport.completeTransactionAfterThrowing
private void processRollback(DefaultTransactionStatus status, boolean unexpected) {
try {
boolean unexpectedRollback = unexpected;
try {
triggerBeforeCompletion(status);
if (status.hasSavepoint()) {
if (status.isDebug()) {
logger.debug("Rolling back transaction to savepoint");
}
status.rollbackToHeldSavepoint();
}
else if (status.isNewTransaction()) {
if (status.isDebug()) {
logger.debug("Initiating transaction rollback");
}
doRollback(status);
}
else {
// Participating in larger transaction
if (status.hasTransaction()) {
if (status.isLocalRollbackOnly() || isGlobalRollbackOnParticipationFailure()) {
if (status.isDebug()) {
logger.debug("Participating transaction failed - marking existing transaction as rollback-only");
}
doSetRollbackOnly(status);
}
else {
if (status.isDebug()) {
logger.debug("Participating transaction failed - letting transaction originator decide on rollback");
}
}
}
else {
logger.debug("Should roll back transaction but cannot - no transaction available");
}
// Unexpected rollback only matters here if we're asked to fail early
if (!isFailEarlyOnGlobalRollbackOnly()) {
unexpectedRollback = false;
}
}
}
catch (RuntimeException | Error ex) {
triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN);
throw ex;
}
triggerAfterCompletion(status, TransactionSynchronization.STATUS_ROLLED_BACK);
// Raise UnexpectedRollbackException if we had a global rollback-only marker
if (unexpectedRollback) {
throw new UnexpectedRollbackException(
"Transaction rolled back because it has been marked as rollback-only");
}
}
finally {
cleanupAfterCompletion(status);
}
}
这里其实对传播性的处理逻辑跟commit里差不多
总结
Spring声明式事务通过TranscationInterceptor对目标方法进行拦截,进行开启,提交,回滚事务以及传播机制隔离性的统一处理
我感觉写的不是很到位,后面会继续改动,尽量把整个设计条理清晰的给大家整理出来
网友评论