美文网首页
Spring源码解析之事物管理

Spring源码解析之事物管理

作者: 穹柏 | 来源:发表于2020-07-14 00:38 被阅读0次

[TOC]

概览

spring事务需要aop的支持,在spring容器初始化的时候,会将一个TransactionInterceptor的实例加入到所有方法上加了@Transactional注解的bean的代理对象的advisor数组中,当我们执行事务方法时,就会去执行TransactionInterceptor.invoke方法进行事务处理

注入TransactionInterceptor的过程参考

1. 首先初始化所有Advisor,这个过程会把容器中实现了advisor接口的beanDefinition对应的类实例化并加入到一个AdvisedSupport对象中,最后会注入到代理对象

    AbstractAutoProxyCreator.postProcessBeforeInstantiation()

        AspectJAwareAdvisorAutoProxyCreator.shouldSkip()

            AspectJAwareAdvisorAutoProxyCreator.findCandidateAdvisors() 

2. 代理bean

    AbstractAutoProxyCreator.postProcessAfterInitialization()

        AbstractAutoProxyCreator.wrapIfNecessary()

            AbstractAutoProxyCreator.createProxy()   //这里面会去选择是使用jdk动态代理or cglib,并最终生成代理对象

类图

TransactionInterceptor类图.png

这里可以到到TransactionInterceptor最终是实现了Advice接口,所以在前面的1中就会把TransactionInterceptor实例化好作为拦截器加入到代理对象中

事务处理流程

当我们调用加了@Transactional的bean的目标方法时,首先会进入TransactionInterceptor.invoke,这里是因为TransactionInterceptor实现了MethodInterceptor(具体可参考spring aop原理)。然后会调用TransactionAspectSupport.invokeWithinTransaction()

    @Nullable
    protected Object invokeWithinTransaction(Method method, @Nullable Class<?> targetClass,
            final InvocationCallback invocation) throws Throwable {

        // If the transaction attribute is null, the method is non-transactional.
        TransactionAttributeSource tas = getTransactionAttributeSource();
        final TransactionAttribute txAttr = (tas != null ? tas.getTransactionAttribute(method, targetClass) : null);
        final PlatformTransactionManager tm = determineTransactionManager(txAttr);
        final String joinpointIdentification = methodIdentification(method, targetClass, txAttr);

        if (txAttr == null || !(tm instanceof CallbackPreferringPlatformTransactionManager)) {
            // Standard transaction demarcation with getTransaction and commit/rollback calls.
            //保存事务所需要的所有信息到txInfo,包括隔离级别,数据源等
            TransactionInfo txInfo = createTransactionIfNecessary(tm, txAttr, joinpointIdentification);
            Object retVal = null;
            try {
                // This is an around advice: Invoke the next interceptor in the chain.
                // This will normally result in a target object being invoked.
                retVal = invocation.proceedWithInvocation();
            }
            catch (Throwable ex) {
                // target invocation exception
                                //异常回滚处理
                completeTransactionAfterThrowing(txInfo, ex);
                throw ex;
            }
            finally {
                cleanupTransactionInfo(txInfo);
            }
            commitTransactionAfterReturning(txInfo);
            return retVal;
        }

这里有个createTransactionIfNecessary(tm, txAttr, joinpointIdentification)方法,它保存了事务执行过程中所需要的全部信息,其内部会处理事务的传播,例如当propagation设置为REQUIRES_NEW时,它会去挂起当前事务,挂起主要是把当前事务的jdbc连接从线程的threadlocalMap中清除,并保存到新的事务的TransactionStatus中。然后把当前事务的transactionInfo保存到新建立的transactioninfo的oldTransactionInfo属性中

protected TransactionInfo createTransactionIfNecessary(@Nullable PlatformTransactionManager tm,
            @Nullable TransactionAttribute txAttr, final String joinpointIdentification) {

        // If no name specified, apply method identification as transaction name.
        if (txAttr != null && txAttr.getName() == null) {
            txAttr = new DelegatingTransactionAttribute(txAttr) {
                @Override
                public String getName() {
                    return joinpointIdentification;
                }
            };
        }

        TransactionStatus status = null;
        if (txAttr != null) {
            if (tm != null) {
                //初始化事务的状态,后续提交跟回滚时会通过这个对象拿到这个事务的传播属性
               //根据不同的传播属性做不同的处理
               //如果是REQUIRES_NEW,并且当前已经存在事务,就会挂起当前事务,
               //挂起主要是保存当前事务的jdbc连接信息到TranscationStatus中
               //把当前线程的threadlocalMap里的连接信息切换成新的连接信息
                status = tm.getTransaction(txAttr);
            }
        }
        //初始化当前事务,并把当前事务加入到当前线程的threadLocalMap中
        //将先前的事务加入到当前事务的oldTransactionInfo属性中
        return prepareTransactionInfo(tm, txAttr, joinpointIdentification, status);
    }

进入tm.getTransaction()

@Override
    public final TransactionStatus getTransaction(@Nullable TransactionDefinition definition) throws TransactionException {
        //获取当前线程绑定的transactionResource对象
        Object transaction = doGetTransaction();

        // Cache debug flag to avoid repeated checks.
        boolean debugEnabled = logger.isDebugEnabled();

        if (definition == null) {
            // Use defaults if no transaction definition given.
            definition = new DefaultTransactionDefinition();
        }

         //如果当前已经存在一个事务
        if (isExistingTransaction(transaction)) {
            // Existing transaction found -> check propagation behavior to find out how to behave.
            //检查传播行为并处理。包括REQUIRES_NEW当前事务的挂起,NEVER跟NOT_SUPPORTED抛异常,NESTED的savepoint等
            return handleExistingTransaction(definition, transaction, debugEnabled);
        }

        // Check definition settings for new transaction.
        if (definition.getTimeout() < TransactionDefinition.TIMEOUT_DEFAULT) {
            throw new InvalidTimeoutException("Invalid transaction timeout", definition.getTimeout());
        }

        // No existing transaction found -> check propagation behavior to find out how to proceed.
        if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_MANDATORY) {
            throw new IllegalTransactionStateException(
                    "No existing transaction found for transaction marked with propagation 'mandatory'");
        }
        else if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED ||
                definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW ||
                definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
            SuspendedResourcesHolder suspendedResources = suspend(null);
            if (debugEnabled) {
                logger.debug("Creating new transaction with name [" + definition.getName() + "]: " + definition);
            }
            try {
                boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
                DefaultTransactionStatus status = newTransactionStatus(
                        definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);
                //为当前session设置隔离级别,关闭自动提交
                doBegin(transaction, definition);
                prepareSynchronization(status, definition);
                return status;
            }
            catch (RuntimeException | Error ex) {
                resume(null, suspendedResources);
                throw ex;
            }
        }
        else {
            // Create "empty" transaction: no actual transaction, but potentially synchronization.
            if (definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT && logger.isWarnEnabled()) {
                logger.warn("Custom isolation level specified but no actual transaction initiated; " +
                        "isolation level will effectively be ignored: " + definition);
            }
            boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS);
            //构建transcationStatus对象
            return prepareTransactionStatus(definition, null, true, newSynchronization, debugEnabled, null);
        }
    }

最后我们执行完目标方法回到invokeWithinTransaction,我们先来看看finally里的cleanupTransactionInfo()做了什么事情

    protected void cleanupTransactionInfo(@Nullable TransactionInfo txInfo) {
        if (txInfo != null) {
            txInfo.restoreThreadLocalStatus();
        }
    }

    private void restoreThreadLocalStatus() {
            // Use stack to restore old transaction TransactionInfo.
            // Will be null if none was set.
            transactionInfoHolder.set(this.oldTransactionInfo);
    }
private static final ThreadLocal<TransactionInfo> transactionInfoHolder =
            new NamedThreadLocal<>("Current aspect-driven transaction");

这里我们发现,它把先前内层事务(如果存在)保存下来的事务信息重新设置到了当前线程的threadlocalMap中,这里主要在TransactionAspectSupport.currentTransactionStatus()可以获取到这个transactionInfo的status属性

接下来我们看看commitTransactionAfterReturning()

private void processCommit(DefaultTransactionStatus status) throws TransactionException {
        try {
            boolean beforeCompletionInvoked = false;

            try {
                boolean unexpectedRollback = false;
                prepareForCommit(status);
                triggerBeforeCommit(status);
                triggerBeforeCompletion(status);
                beforeCompletionInvoked = true;

                //处理嵌套事务(NESTED)
                if (status.hasSavepoint()) {
                    if (status.isDebug()) {
                        logger.debug("Releasing transaction savepoint");
                    }
                    unexpectedRollback = status.isGlobalRollbackOnly();
                    status.releaseHeldSavepoint();
                }
                //  这个值是根据事务的传播属性进行初始化的,可参考AbstractPlatformTransactionManager.handleExistingTransaction
               //跟AbstractPlatformTransactionManager.getTransaction对TransactionStatus的初始化方法。
               //这里也是对REQUIRED传播特性的一个重要保证,子方法在这里不会触发事务提交
                else if (status.isNewTransaction()) {
                    if (status.isDebug()) {
                        logger.debug("Initiating transaction commit");
                    }
                    unexpectedRollback = status.isGlobalRollbackOnly();
                    doCommit(status);
                }
                else if (isFailEarlyOnGlobalRollbackOnly()) {
                    unexpectedRollback = status.isGlobalRollbackOnly();
                }

                // Throw UnexpectedRollbackException if we have a global rollback-only
                // marker but still didn't get a corresponding exception from commit.
                if (unexpectedRollback) {
                    throw new UnexpectedRollbackException(
                            "Transaction silently rolled back because it has been marked as rollback-only");
                }
            }
            catch (UnexpectedRollbackException ex) {
                // can only be caused by doCommit
                triggerAfterCompletion(status, TransactionSynchronization.STATUS_ROLLED_BACK);
                throw ex;
            }
            catch (TransactionException ex) {
                // can only be caused by doCommit
                if (isRollbackOnCommitFailure()) {
                    doRollbackOnCommitException(status, ex);
                }
                else {
                    triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN);
                }
                throw ex;
            }
            catch (RuntimeException | Error ex) {
                if (!beforeCompletionInvoked) {
                    triggerBeforeCompletion(status);
                }
                doRollbackOnCommitException(status, ex);
                throw ex;
            }

            // Trigger afterCommit callbacks, with an exception thrown there
            // propagated to callers but the transaction still considered as committed.
            try {
                triggerAfterCommit(status);
            }
            finally {
                triggerAfterCompletion(status, TransactionSynchronization.STATUS_COMMITTED);
            }

        }
        finally {
            cleanupAfterCompletion(status);
        }
    }

提交完事了,我们来看看回滚TransactionAspectSupport.completeTransactionAfterThrowing

private void processRollback(DefaultTransactionStatus status, boolean unexpected) {
        try {
            boolean unexpectedRollback = unexpected;

            try {
                triggerBeforeCompletion(status);

                if (status.hasSavepoint()) {
                    if (status.isDebug()) {
                        logger.debug("Rolling back transaction to savepoint");
                    }
                    status.rollbackToHeldSavepoint();
                }
                else if (status.isNewTransaction()) {
                    if (status.isDebug()) {
                        logger.debug("Initiating transaction rollback");
                    }
                    doRollback(status);
                }
                else {
                    // Participating in larger transaction
                    if (status.hasTransaction()) {
                        if (status.isLocalRollbackOnly() || isGlobalRollbackOnParticipationFailure()) {
                            if (status.isDebug()) {
                                logger.debug("Participating transaction failed - marking existing transaction as rollback-only");
                            }
                            doSetRollbackOnly(status);
                        }
                        else {
                            if (status.isDebug()) {
                                logger.debug("Participating transaction failed - letting transaction originator decide on rollback");
                            }
                        }
                    }
                    else {
                        logger.debug("Should roll back transaction but cannot - no transaction available");
                    }
                    // Unexpected rollback only matters here if we're asked to fail early
                    if (!isFailEarlyOnGlobalRollbackOnly()) {
                        unexpectedRollback = false;
                    }
                }
            }
            catch (RuntimeException | Error ex) {
                triggerAfterCompletion(status, TransactionSynchronization.STATUS_UNKNOWN);
                throw ex;
            }

            triggerAfterCompletion(status, TransactionSynchronization.STATUS_ROLLED_BACK);

            // Raise UnexpectedRollbackException if we had a global rollback-only marker
            if (unexpectedRollback) {
                throw new UnexpectedRollbackException(
                        "Transaction rolled back because it has been marked as rollback-only");
            }
        }
        finally {
            cleanupAfterCompletion(status);
        }
    }

这里其实对传播性的处理逻辑跟commit里差不多

总结

Spring声明式事务通过TranscationInterceptor对目标方法进行拦截,进行开启,提交,回滚事务以及传播机制隔离性的统一处理

我感觉写的不是很到位,后面会继续改动,尽量把整个设计条理清晰的给大家整理出来

相关文章

网友评论

      本文标题:Spring源码解析之事物管理

      本文链接:https://www.haomeiwen.com/subject/sslzcktx.html