美文网首页
又见AOP: Spring事务 - @Transactional

又见AOP: Spring事务 - @Transactional

作者: overflowedstack | 来源:发表于2020-05-02 11:12 被阅读0次

写在前面

在操作数据库时,经常需要将多个操作作为一个原子操作,也就是说,这些操作要么全部成功,要么,其中一个有问题就全部回滚。
在数据库层面,整个流程就是:

start transaction
do something
do some other thing
commit (or rollback)

那么,JDBC实现这个过程的例子如下:

Connection conn = DriverManager.getConnection(...);
try {
  con.setAutoCommit(false);
  Statement stmt = con.createStatement();
  //1 or more queries or updates
  con.commit();
} catch(Exception e) {
  con.rollback();
} finally {
con.close();
}

而在一个项目中,可能有多处需要执行事务操作。如果在每一处都写一样的语句,先关闭自动提交,手动提交事务,有异常时回滚事务,关闭连接。那么重复代码就会越来越多。而这不正是AOP可以解决的场景吗!
Spring提供了注解@Transactional,只要在方法上标注这个注解,那么就认为这个方法是在一个事务中进行的,就像这样:

    @Transactional(propagation = Propagation.REQUIRED)
    public Boolean executeUpdate(ProcessContext context,String sql) {
        return executeUpdate(context, sql, null);
    }

那么问题来了:
为什么加了这个注解,方法就有了事务的能力呢?底层原理是什么?
在这个方法内部又调用了其他方法操作数据库,那这其他方法有事务的能力吗?

来一探究竟。

1. @Transactional,相应的后置处理器,及around方法

在运行时,被标为@Transactional方法所在的类是一个代理类,这是因为用到了动态代理的技术。既然是动态代理,那么可以肯定,是在Spring容器初始化这个bean的时候,调用这个bean的后置处理器,将bean初始化为一个代理对象。其中,事务的能力就是一个around通知方法,当调用目标方法时,会被拦截先执行这个增强方法。
口说无凭,Debug模式开启,来验证一下。在beanPostProcessor处打个断点。

    @Override
    public Object applyBeanPostProcessorsAfterInitialization(Object existingBean, String beanName)
            throws BeansException {

        Object result = existingBean;
        for (BeanPostProcessor beanProcessor : getBeanPostProcessors()) {
            result = beanProcessor.postProcessAfterInitialization(result, beanName);
            if (result == null) {
                return result;
            }
        }
        return result;
    }

初始化目标bean时,遍历相关的后置处理器,发现有一个后置处理器叫作InfrastructureAdvisorAutoProxyCreator,这个后置处理器的postProcessAfterInitialization方法会创建jdk或者cglib动态代理类。此时,这个bean就变成了一个被增强了的代理对象。

当调用标有@Transactional注解的目标方法时,会发现进入如下增强方法。其实这就是一个常见的around类型的通知。
它先调用createTransactionIfNecessary来开启事务。
接下来,invocation.proceedWithInvocation()会去执行目标方法的方法拦截器链,并最终调用目标方法。如果没有其它方法拦截器的话,就直接执行目标方法。在这里就是直接调用目标方法去做数据库的增删改查。
如果数据库操作过程中有异常,completeTransactionAfterThrowing方法就会处理异常,并回滚事务。
若一切顺利,则会调用commitTransactionAfterReturning来提交事务。

    protected Object invokeWithinTransaction(Method method, Class<?> targetClass, final InvocationCallback invocation)
            throws Throwable {

        // If the transaction attribute is null, the method is non-transactional.
        final TransactionAttribute txAttr = getTransactionAttributeSource().getTransactionAttribute(method, targetClass);
        final PlatformTransactionManager tm = determineTransactionManager(txAttr);
        final String joinpointIdentification = methodIdentification(method, targetClass, txAttr);

        if (txAttr == null || !(tm instanceof CallbackPreferringPlatformTransactionManager)) {
            // Standard transaction demarcation with getTransaction and commit/rollback calls.
            TransactionInfo txInfo = createTransactionIfNecessary(tm, txAttr, joinpointIdentification);
            Object retVal = null;
            try {
                // This is an around advice: Invoke the next interceptor in the chain.
                // This will normally result in a target object being invoked.
                retVal = invocation.proceedWithInvocation();
            }
            catch (Throwable ex) {
                // target invocation exception
                completeTransactionAfterThrowing(txInfo, ex);
                throw ex;
            }
            finally {
                cleanupTransactionInfo(txInfo);
            }
            commitTransactionAfterReturning(txInfo);
            return retVal;
        }

        else {
             //此处代码省略
          }
}

2. determineTransactionManager

来看一下上面调用到的determineTransactionManager(txAttr)方法。部分代码如下:

        String qualifier = txAttr.getQualifier();
        if (StringUtils.hasText(qualifier)) {
            return determineQualifiedTransactionManager(qualifier);
        }
        else if (StringUtils.hasText(this.transactionManagerBeanName)) {
            return determineQualifiedTransactionManager(this.transactionManagerBeanName);
        }

它会从注解的属性中拿它的qualifier。这个qualifier是什么呢?查看Transactional注解的定义,发现它就是value属性,如果没有特别指定的话,默认值是空字符串。

    /**
     * A <em>qualifier</em> value for the specified transaction.
     * <p>May be used to determine the target transaction manager,
     * matching the qualifier value (or the bean name) of a specific
     * {@link org.springframework.transaction.PlatformTransactionManager}
     * bean definition.
     * @since 4.2
     * @see #value
     */
    @AliasFor("value")
    String transactionManager() default "";

拿到qualifier之后,如果它是空字符串,就会调用determineQualifiedTransactionManager(this.transactionManagerBeanName)来获取相应的TransactionManager。

    private PlatformTransactionManager determineQualifiedTransactionManager(String qualifier) {
        PlatformTransactionManager txManager = this.transactionManagerCache.get(qualifier);
        if (txManager == null) {
            txManager = BeanFactoryAnnotationUtils.qualifiedBeanOfType(
                    this.beanFactory, PlatformTransactionManager.class, qualifier);
            this.transactionManagerCache.putIfAbsent(qualifier, txManager);
        }
        return txManager;
    }

这个transactionManagerBeanName是什么呢?它就是在config里指定要用的transaction manager。Spring容器会初始化txManager bean来负责事务。

    <bean id="txManager" class="org.springframework.jdbc.datasource.DataSourceTransactionManager">
        <property name="dataSource" ref="myDataSource" />
    </bean>
    <tx:annotation-driven transaction-manager="txManager" />

这里所用的是DataSourceTransactionManager,它是JDBC提供的事务管理器,实现了Spring事务的一些接口。
Spring并不直接管理事务,而是提供了多种事务管理器,他们将事务管理的职责委托给Hibernate等持久化机制所提供的相关平台框架的事务来实现。Spring事务管理器的接口是org.springframework.transaction.PlatformTransactionManager,通过这个接口,Spring为各个平台如JDBC、Hibernate等都提供了对应的事务管理器,但是具体的实现就是各个平台自己的事情了。

3. open connection - createTransactionIfNecessary

再来看看TransactionInfo txInfo = createTransactionIfNecessary(tm, txAttr, joinpointIdentification);

  • 它最终会调用DataSourceTransactionManager的doGetTransaction方法。
    @Override
    protected Object doGetTransaction() {
        DataSourceTransactionObject txObject = new DataSourceTransactionObject();
        txObject.setSavepointAllowed(isNestedTransactionAllowed());
        ConnectionHolder conHolder =
                (ConnectionHolder) TransactionSynchronizationManager.getResource(this.dataSource);
        txObject.setConnectionHolder(conHolder, false);
        return txObject;
    }

其中TransactionSynchronizationManager.getResource会根据dataSource去缓存resources中找是否已有创建好的connection。resources被定义为一个ThreadLocal,能够支持线程级别相同connection的复用。

    private static final ThreadLocal<Map<Object, Object>> resources =
            new NamedThreadLocal<Map<Object, Object>>("Transactional resources");
  • 如果没有已有的transaction,那么就可能需要调用doBegin(transaction, definition)开启一个新的事务。
    其中会用con.setAutoCommit(false)将autoCommit设为false。由于JDBC事务默认是开启的,并且默认是自动提交。这里将自动提交设为false,那么就开启了手动事务,需要在最后手动commit,将整个事务提交。

4. invoke目标方法

再回到invokeWithinTransaction方法,这里通过拦截器链最终调用到目标方法进行数据库操作。

                // This is an around advice: Invoke the next interceptor in the chain.
                // This will normally result in a target object being invoked.
                retVal = invocation.proceedWithInvocation();

5. commit or rollback

  • commitTransactionAfterReturning
    对于JDBC事务,会最终调用DataSourceTransactionManager的doCommit方法。
    @Override
    protected void doCommit(DefaultTransactionStatus status) {
        DataSourceTransactionObject txObject = (DataSourceTransactionObject) status.getTransaction();
        Connection con = txObject.getConnectionHolder().getConnection();
        if (status.isDebug()) {
            logger.debug("Committing JDBC transaction on Connection [" + con + "]");
        }
        try {
            con.commit();
        }
        catch (SQLException ex) {
            throw new TransactionSystemException("Could not commit JDBC transaction", ex);
        }
    }
  • completeTransactionAfterThrowing
    最终调用DataSourceTransactionManager的doRollback方法
    @Override
    protected void doRollback(DefaultTransactionStatus status) {
        DataSourceTransactionObject txObject = (DataSourceTransactionObject) status.getTransaction();
        Connection con = txObject.getConnectionHolder().getConnection();
        if (status.isDebug()) {
            logger.debug("Rolling back JDBC transaction on Connection [" + con + "]");
        }
        try {
            con.rollback();
        }
        catch (SQLException ex) {
            throw new TransactionSystemException("Could not roll back JDBC transaction", ex);
        }
    }

6. 传播行为

Spring事务中有个概念叫作传播行为。事务传播行为(propagation behavior)指的就是当一个事务方法被另一个事务方法调用时,这个事务方法应该如何进行。
引用一张表格:


Spring事务传播行为

可以通过如下方式来指定要采用的传播行为:

@Transactional(propagation = Propagation.REQUIRED)

来看一下具体实现。当指定了不同的传播行为时,Spring的getTransaction方法进行了不同的处理。

  • 当前已有事务时
    getTransaction的如下语句:
        if (isExistingTransaction(transaction)) {
            // Existing transaction found -> check propagation behavior to find out how to behave.
            return handleExistingTransaction(definition, transaction, debugEnabled);
        }

handleExistingTransaction会根据不同的传播行为,决定具体操作(是否要挂起当前事务开启新的事务,或者在当前事务中继续进行操作,等等)

  • 开启新的事务时
    getTransaction也会根据不同的传播行为,决定下一步操作。
        // No existing transaction found -> check propagation behavior to find out how to proceed.
        if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_MANDATORY) {
            throw new IllegalTransactionStateException(
                    "No existing transaction found for transaction marked with propagation 'mandatory'");
        }
        else if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED ||
                definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW ||
                definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
            SuspendedResourcesHolder suspendedResources = suspend(null);
            if (debugEnabled) {
                logger.debug("Creating new transaction with name [" + definition.getName() + "]: " + definition);
            }
            try {
                boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
                DefaultTransactionStatus status = newTransactionStatus(
                        definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);
                doBegin(transaction, definition);
                prepareSynchronization(status, definition);
                return status;
            }
            catch (RuntimeException ex) {
                resume(null, suspendedResources);
                throw ex;
            }
            catch (Error err) {
                resume(null, suspendedResources);
                throw err;
            }
        }

7. 隔离级别

事务的操作有四种隔离级别,隔离级别时数据库层面的概念。
当多个线程都开启事务操作数据库中的数据时,数据库系统要能进行隔离操作,以保证各个线程获取数据的准确性, 所以, 对于不同的事务,采用不同的隔离级别 会 有不同的结果。
而Spring事务也支持指定相应的隔离级别,在执行事务时,数据库层面就会根据指定的隔离级别进行操作。

@Transactional(isolation = Isolation.READ_COMMITTED)

如下是Spring事务隔离级别的定义,它与传播行为一起都是在TransactionDefinition接口中定义的。

    /**
     * Use the default isolation level of the underlying datastore.
     * All other levels correspond to the JDBC isolation levels.
     * @see java.sql.Connection
     */
    int ISOLATION_DEFAULT = -1;

    /**
     * Indicates that dirty reads, non-repeatable reads and phantom reads
     * can occur.
     * <p>This level allows a row changed by one transaction to be read by another
     * transaction before any changes in that row have been committed (a "dirty read").
     * If any of the changes are rolled back, the second transaction will have
     * retrieved an invalid row.
     * @see java.sql.Connection#TRANSACTION_READ_UNCOMMITTED
     */
    int ISOLATION_READ_UNCOMMITTED = Connection.TRANSACTION_READ_UNCOMMITTED;

    /**
     * Indicates that dirty reads are prevented; non-repeatable reads and
     * phantom reads can occur.
     * <p>This level only prohibits a transaction from reading a row
     * with uncommitted changes in it.
     * @see java.sql.Connection#TRANSACTION_READ_COMMITTED
     */
    int ISOLATION_READ_COMMITTED = Connection.TRANSACTION_READ_COMMITTED;

    /**
     * Indicates that dirty reads and non-repeatable reads are prevented;
     * phantom reads can occur.
     * <p>This level prohibits a transaction from reading a row with uncommitted changes
     * in it, and it also prohibits the situation where one transaction reads a row,
     * a second transaction alters the row, and the first transaction re-reads the row,
     * getting different values the second time (a "non-repeatable read").
     * @see java.sql.Connection#TRANSACTION_REPEATABLE_READ
     */
    int ISOLATION_REPEATABLE_READ = Connection.TRANSACTION_REPEATABLE_READ;

    /**
     * Indicates that dirty reads, non-repeatable reads and phantom reads
     * are prevented.
     * <p>This level includes the prohibitions in {@link #ISOLATION_REPEATABLE_READ}
     * and further prohibits the situation where one transaction reads all rows that
     * satisfy a {@code WHERE} condition, a second transaction inserts a row
     * that satisfies that {@code WHERE} condition, and the first transaction
     * re-reads for the same condition, retrieving the additional "phantom" row
     * in the second read.
     * @see java.sql.Connection#TRANSACTION_SERIALIZABLE
     */
    int ISOLATION_SERIALIZABLE = Connection.TRANSACTION_SERIALIZABLE;

相关文章

网友评论

      本文标题:又见AOP: Spring事务 - @Transactional

      本文链接:https://www.haomeiwen.com/subject/dbqumhtx.html