写在前面
在操作数据库时,经常需要将多个操作作为一个原子操作,也就是说,这些操作要么全部成功,要么,其中一个有问题就全部回滚。
在数据库层面,整个流程就是:
start transaction
do something
do some other thing
commit (or rollback)
那么,JDBC实现这个过程的例子如下:
Connection conn = DriverManager.getConnection(...);
try {
con.setAutoCommit(false);
Statement stmt = con.createStatement();
//1 or more queries or updates
con.commit();
} catch(Exception e) {
con.rollback();
} finally {
con.close();
}
而在一个项目中,可能有多处需要执行事务操作。如果在每一处都写一样的语句,先关闭自动提交,手动提交事务,有异常时回滚事务,关闭连接。那么重复代码就会越来越多。而这不正是AOP可以解决的场景吗!
Spring提供了注解@Transactional,只要在方法上标注这个注解,那么就认为这个方法是在一个事务中进行的,就像这样:
@Transactional(propagation = Propagation.REQUIRED)
public Boolean executeUpdate(ProcessContext context,String sql) {
return executeUpdate(context, sql, null);
}
那么问题来了:
为什么加了这个注解,方法就有了事务的能力呢?底层原理是什么?
在这个方法内部又调用了其他方法操作数据库,那这其他方法有事务的能力吗?
来一探究竟。
1. @Transactional,相应的后置处理器,及around方法
在运行时,被标为@Transactional方法所在的类是一个代理类,这是因为用到了动态代理的技术。既然是动态代理,那么可以肯定,是在Spring容器初始化这个bean的时候,调用这个bean的后置处理器,将bean初始化为一个代理对象。其中,事务的能力就是一个around通知方法,当调用目标方法时,会被拦截先执行这个增强方法。
口说无凭,Debug模式开启,来验证一下。在beanPostProcessor处打个断点。
@Override
public Object applyBeanPostProcessorsAfterInitialization(Object existingBean, String beanName)
throws BeansException {
Object result = existingBean;
for (BeanPostProcessor beanProcessor : getBeanPostProcessors()) {
result = beanProcessor.postProcessAfterInitialization(result, beanName);
if (result == null) {
return result;
}
}
return result;
}
初始化目标bean时,遍历相关的后置处理器,发现有一个后置处理器叫作InfrastructureAdvisorAutoProxyCreator,这个后置处理器的postProcessAfterInitialization方法会创建jdk或者cglib动态代理类。此时,这个bean就变成了一个被增强了的代理对象。
当调用标有@Transactional注解的目标方法时,会发现进入如下增强方法。其实这就是一个常见的around类型的通知。
它先调用createTransactionIfNecessary来开启事务。
接下来,invocation.proceedWithInvocation()会去执行目标方法的方法拦截器链,并最终调用目标方法。如果没有其它方法拦截器的话,就直接执行目标方法。在这里就是直接调用目标方法去做数据库的增删改查。
如果数据库操作过程中有异常,completeTransactionAfterThrowing方法就会处理异常,并回滚事务。
若一切顺利,则会调用commitTransactionAfterReturning来提交事务。
protected Object invokeWithinTransaction(Method method, Class<?> targetClass, final InvocationCallback invocation)
throws Throwable {
// If the transaction attribute is null, the method is non-transactional.
final TransactionAttribute txAttr = getTransactionAttributeSource().getTransactionAttribute(method, targetClass);
final PlatformTransactionManager tm = determineTransactionManager(txAttr);
final String joinpointIdentification = methodIdentification(method, targetClass, txAttr);
if (txAttr == null || !(tm instanceof CallbackPreferringPlatformTransactionManager)) {
// Standard transaction demarcation with getTransaction and commit/rollback calls.
TransactionInfo txInfo = createTransactionIfNecessary(tm, txAttr, joinpointIdentification);
Object retVal = null;
try {
// This is an around advice: Invoke the next interceptor in the chain.
// This will normally result in a target object being invoked.
retVal = invocation.proceedWithInvocation();
}
catch (Throwable ex) {
// target invocation exception
completeTransactionAfterThrowing(txInfo, ex);
throw ex;
}
finally {
cleanupTransactionInfo(txInfo);
}
commitTransactionAfterReturning(txInfo);
return retVal;
}
else {
//此处代码省略
}
}
2. determineTransactionManager
来看一下上面调用到的determineTransactionManager(txAttr)方法。部分代码如下:
String qualifier = txAttr.getQualifier();
if (StringUtils.hasText(qualifier)) {
return determineQualifiedTransactionManager(qualifier);
}
else if (StringUtils.hasText(this.transactionManagerBeanName)) {
return determineQualifiedTransactionManager(this.transactionManagerBeanName);
}
它会从注解的属性中拿它的qualifier。这个qualifier是什么呢?查看Transactional注解的定义,发现它就是value属性,如果没有特别指定的话,默认值是空字符串。
/**
* A <em>qualifier</em> value for the specified transaction.
* <p>May be used to determine the target transaction manager,
* matching the qualifier value (or the bean name) of a specific
* {@link org.springframework.transaction.PlatformTransactionManager}
* bean definition.
* @since 4.2
* @see #value
*/
@AliasFor("value")
String transactionManager() default "";
拿到qualifier之后,如果它是空字符串,就会调用determineQualifiedTransactionManager(this.transactionManagerBeanName)来获取相应的TransactionManager。
private PlatformTransactionManager determineQualifiedTransactionManager(String qualifier) {
PlatformTransactionManager txManager = this.transactionManagerCache.get(qualifier);
if (txManager == null) {
txManager = BeanFactoryAnnotationUtils.qualifiedBeanOfType(
this.beanFactory, PlatformTransactionManager.class, qualifier);
this.transactionManagerCache.putIfAbsent(qualifier, txManager);
}
return txManager;
}
这个transactionManagerBeanName是什么呢?它就是在config里指定要用的transaction manager。Spring容器会初始化txManager bean来负责事务。
<bean id="txManager" class="org.springframework.jdbc.datasource.DataSourceTransactionManager">
<property name="dataSource" ref="myDataSource" />
</bean>
<tx:annotation-driven transaction-manager="txManager" />
这里所用的是DataSourceTransactionManager,它是JDBC提供的事务管理器,实现了Spring事务的一些接口。
Spring并不直接管理事务,而是提供了多种事务管理器,他们将事务管理的职责委托给Hibernate等持久化机制所提供的相关平台框架的事务来实现。Spring事务管理器的接口是org.springframework.transaction.PlatformTransactionManager,通过这个接口,Spring为各个平台如JDBC、Hibernate等都提供了对应的事务管理器,但是具体的实现就是各个平台自己的事情了。
3. open connection - createTransactionIfNecessary
再来看看TransactionInfo txInfo = createTransactionIfNecessary(tm, txAttr, joinpointIdentification);
- 它最终会调用DataSourceTransactionManager的doGetTransaction方法。
@Override
protected Object doGetTransaction() {
DataSourceTransactionObject txObject = new DataSourceTransactionObject();
txObject.setSavepointAllowed(isNestedTransactionAllowed());
ConnectionHolder conHolder =
(ConnectionHolder) TransactionSynchronizationManager.getResource(this.dataSource);
txObject.setConnectionHolder(conHolder, false);
return txObject;
}
其中TransactionSynchronizationManager.getResource会根据dataSource去缓存resources中找是否已有创建好的connection。resources被定义为一个ThreadLocal,能够支持线程级别相同connection的复用。
private static final ThreadLocal<Map<Object, Object>> resources =
new NamedThreadLocal<Map<Object, Object>>("Transactional resources");
- 如果没有已有的transaction,那么就可能需要调用doBegin(transaction, definition)开启一个新的事务。
其中会用con.setAutoCommit(false)将autoCommit设为false。由于JDBC事务默认是开启的,并且默认是自动提交。这里将自动提交设为false,那么就开启了手动事务,需要在最后手动commit,将整个事务提交。
4. invoke目标方法
再回到invokeWithinTransaction方法,这里通过拦截器链最终调用到目标方法进行数据库操作。
// This is an around advice: Invoke the next interceptor in the chain.
// This will normally result in a target object being invoked.
retVal = invocation.proceedWithInvocation();
5. commit or rollback
- commitTransactionAfterReturning
对于JDBC事务,会最终调用DataSourceTransactionManager的doCommit方法。
@Override
protected void doCommit(DefaultTransactionStatus status) {
DataSourceTransactionObject txObject = (DataSourceTransactionObject) status.getTransaction();
Connection con = txObject.getConnectionHolder().getConnection();
if (status.isDebug()) {
logger.debug("Committing JDBC transaction on Connection [" + con + "]");
}
try {
con.commit();
}
catch (SQLException ex) {
throw new TransactionSystemException("Could not commit JDBC transaction", ex);
}
}
- completeTransactionAfterThrowing
最终调用DataSourceTransactionManager的doRollback方法
@Override
protected void doRollback(DefaultTransactionStatus status) {
DataSourceTransactionObject txObject = (DataSourceTransactionObject) status.getTransaction();
Connection con = txObject.getConnectionHolder().getConnection();
if (status.isDebug()) {
logger.debug("Rolling back JDBC transaction on Connection [" + con + "]");
}
try {
con.rollback();
}
catch (SQLException ex) {
throw new TransactionSystemException("Could not roll back JDBC transaction", ex);
}
}
6. 传播行为
Spring事务中有个概念叫作传播行为。事务传播行为(propagation behavior)指的就是当一个事务方法被另一个事务方法调用时,这个事务方法应该如何进行。
引用一张表格:
Spring事务传播行为
可以通过如下方式来指定要采用的传播行为:
@Transactional(propagation = Propagation.REQUIRED)
来看一下具体实现。当指定了不同的传播行为时,Spring的getTransaction方法进行了不同的处理。
- 当前已有事务时
getTransaction的如下语句:
if (isExistingTransaction(transaction)) {
// Existing transaction found -> check propagation behavior to find out how to behave.
return handleExistingTransaction(definition, transaction, debugEnabled);
}
handleExistingTransaction会根据不同的传播行为,决定具体操作(是否要挂起当前事务开启新的事务,或者在当前事务中继续进行操作,等等)
- 开启新的事务时
getTransaction也会根据不同的传播行为,决定下一步操作。
// No existing transaction found -> check propagation behavior to find out how to proceed.
if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_MANDATORY) {
throw new IllegalTransactionStateException(
"No existing transaction found for transaction marked with propagation 'mandatory'");
}
else if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED ||
definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW ||
definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) {
SuspendedResourcesHolder suspendedResources = suspend(null);
if (debugEnabled) {
logger.debug("Creating new transaction with name [" + definition.getName() + "]: " + definition);
}
try {
boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER);
DefaultTransactionStatus status = newTransactionStatus(
definition, transaction, true, newSynchronization, debugEnabled, suspendedResources);
doBegin(transaction, definition);
prepareSynchronization(status, definition);
return status;
}
catch (RuntimeException ex) {
resume(null, suspendedResources);
throw ex;
}
catch (Error err) {
resume(null, suspendedResources);
throw err;
}
}
7. 隔离级别
事务的操作有四种隔离级别,隔离级别时数据库层面的概念。
当多个线程都开启事务操作数据库中的数据时,数据库系统要能进行隔离操作,以保证各个线程获取数据的准确性, 所以, 对于不同的事务,采用不同的隔离级别 会 有不同的结果。
而Spring事务也支持指定相应的隔离级别,在执行事务时,数据库层面就会根据指定的隔离级别进行操作。
@Transactional(isolation = Isolation.READ_COMMITTED)
如下是Spring事务隔离级别的定义,它与传播行为一起都是在TransactionDefinition接口中定义的。
/**
* Use the default isolation level of the underlying datastore.
* All other levels correspond to the JDBC isolation levels.
* @see java.sql.Connection
*/
int ISOLATION_DEFAULT = -1;
/**
* Indicates that dirty reads, non-repeatable reads and phantom reads
* can occur.
* <p>This level allows a row changed by one transaction to be read by another
* transaction before any changes in that row have been committed (a "dirty read").
* If any of the changes are rolled back, the second transaction will have
* retrieved an invalid row.
* @see java.sql.Connection#TRANSACTION_READ_UNCOMMITTED
*/
int ISOLATION_READ_UNCOMMITTED = Connection.TRANSACTION_READ_UNCOMMITTED;
/**
* Indicates that dirty reads are prevented; non-repeatable reads and
* phantom reads can occur.
* <p>This level only prohibits a transaction from reading a row
* with uncommitted changes in it.
* @see java.sql.Connection#TRANSACTION_READ_COMMITTED
*/
int ISOLATION_READ_COMMITTED = Connection.TRANSACTION_READ_COMMITTED;
/**
* Indicates that dirty reads and non-repeatable reads are prevented;
* phantom reads can occur.
* <p>This level prohibits a transaction from reading a row with uncommitted changes
* in it, and it also prohibits the situation where one transaction reads a row,
* a second transaction alters the row, and the first transaction re-reads the row,
* getting different values the second time (a "non-repeatable read").
* @see java.sql.Connection#TRANSACTION_REPEATABLE_READ
*/
int ISOLATION_REPEATABLE_READ = Connection.TRANSACTION_REPEATABLE_READ;
/**
* Indicates that dirty reads, non-repeatable reads and phantom reads
* are prevented.
* <p>This level includes the prohibitions in {@link #ISOLATION_REPEATABLE_READ}
* and further prohibits the situation where one transaction reads all rows that
* satisfy a {@code WHERE} condition, a second transaction inserts a row
* that satisfies that {@code WHERE} condition, and the first transaction
* re-reads for the same condition, retrieving the additional "phantom" row
* in the second read.
* @see java.sql.Connection#TRANSACTION_SERIALIZABLE
*/
int ISOLATION_SERIALIZABLE = Connection.TRANSACTION_SERIALIZABLE;











网友评论