Spring事务管理代码剖析
2016-10-29 18:36
447 查看
InfrastructureAdvisorAutoProxyCreator是一个bean工厂后处理器,在实例化对象时,该对象优先实例化,然后在创建其他的bean对象,在创建其他bean对象时都会经过InfrastructureAdvisorAutoProxyCreator处理,InfrastructureAdvisorAutoProxyCreator对象可以检测被创建的对象是否有@Transactional注解,如果有就会检测哪些方法需要被代理,然后创建出CGLIB的代理对象。
下面是对@Transactional注解形式的事务管理的代码进行分析的。
当我们调用@Transactional注解的方法时,首先会进入CglibAopProxy.DynamicAdvisedInterceptor.intercept()方法,这个方法的源码是:
从上面的代码中可以看出,首先会获得被代理的目标对象target,然后根据被代理的方法和目标对象的Class对象获得对应的拦截器列表,最后创建从出CglibMethodInvocation对象,然后调用proceed方法,该方法会调用目标方法。下面首先分析getInterceptorsAndDynamicInterceptionAdvice方法:
在上面的代码中首先会创建出MethodCacheKey对象,该对象只是存储入参对象,下面是MethodCacheKey的源码:
创建出MethodCacheKey对象后,下面就需要执行:
这段代码会返回和入参对应的拦截器列表。下面分析advisorChainFactory.getInterceptorsAndDynamicInterceptionAdvice的代码,其中advisorChainFactory是DefaultAdvisorChainFactory的实例:
执行完上面的代码后,接下来执行:
对于这段代码首先分析CglibMethodInvocation:
从上面可以看出CglibMethodInvocation对象的创建过程还是比较简单的,下面分析proceed方法,该方法属于类ReflectiveMethodInvocation:
下面进入invoke方法中,也就是TransactionInterceptor中的invoke方法:
下面进入invokeWithinTransaction方法:
上面的方法中首先会调用determineTransactionManager方法,这个方法就是返回事务管理器:
在执行完上面的方法后,就执行createTransactionIfNecessary方法,这个是非常重要的方法,该方法会创建事务:
下面开始调用getTransaction方法:
下面分析doGetTransaction方法
下面分析TransactionSynchronizationManager.getResource(this.dataSource):
下面再来分析doGetResource方法:
下面是newTransactionStatus方法:
下面是doBegin方法,该方法是事务开始,入参是DataSourceTransactionObject和事务定义对象,下面方法中设置了超时时间,并从数据源中获取一个连接,将数据源和连接存储起来与本线程关联,设置连接手动提交:
下面分析prepareSynchronization,该方法就是在TransactionSynchronizationManager中设置一部分属性,这些属性是线程安全的:
下面是prepareTransactionInfo方法:
当存在多个@Transactional方法依次调用时,下面这个方法handleExistingTransaction会被调用:
当方法都调用完毕,执行后续清理时,首先调用方法commitTransactionAfterReturning:
调用事务管理器的commit方法:
真正执行提交操作的方法:
事务管理器是唯一的,如果有多个@Transactional方法被调用,每一个被调用的方法都会有一个对应的事务状态对象和事务属性对象。
下面这个方法执行事务挂起的操作,当使用某些传播行为时,会将一些事务挂起,下面这个方法就会被调用:
事务挂起的操作其实很简单,事务挂起就是将事务的各种信息从ThreadLocal对象中删除,将下一个事务的信息填入ThreadLocal对象中,运行完下一个事务后,再恢复上一个事务信息。
下面的方法是执行事务信息恢复的方法,在后一个事务挂起并执行完后,下面的方法会执行,以恢复当前事务状态:
每一个被@Transactional注解的方法都会创建一个TransactionInfo对象,该对象中持有上一个事务的TransactionInfo对象,这样可以用于恢复上一个事务状态。
传播行为中的NOT_SUPPORTED也会创建TransactionInfo对象、事务状态对象、事务定义对象。spring会单独创建一个新的连接用于NOT_SUPPORTED的事务中。但是这个连接spring不执行关于连接的任何属性的设置操作,属性的设置在自己的代码中进行设置,包括提交也是一样,spring不进行事务的提交。
TransactionSynchronizationManager这个管理器里面的方法基本都是静态的,里面建有多个静态的ThreadLocal属性,这些属性是一个线程对应一个属性值。一个线程使用了哪一个数据源的哪一个连接会在resources中存储。
TransactionSynchronizationManager在这个对象中还会设置当前线程是否有事务,事务隔离级别,事务是否只读,事务名字。事务的名字就是被注解的类名加方法名。
在方法中使用连接时都要从TransactionSynchronizationManager对象中获取。
下面是对@Transactional注解形式的事务管理的代码进行分析的。
当我们调用@Transactional注解的方法时,首先会进入CglibAopProxy.DynamicAdvisedInterceptor.intercept()方法,这个方法的源码是:
入参:proxy是被CGLIB代理后的对象,method被@Transactional注解的方法,args是调用被@Transactional注解的方法的入参,methodProxy是被代理的方法对象,需要通过该对象调用目标对象中的方法 public Object intercept(Object proxy, Method method, Object[] args, MethodProxy methodProxy) throws Throwable { Object oldProxy = null; boolean setProxyContext = false; Class<?> targetClass = null; Object target = null; try { if (this.advised.exposeProxy) { // Make invocation available if necessary. oldProxy = AopContext.setCurrentProxy(proxy); setProxyContext = true; } // May be null. Get as late as possible to minimize the time we // "own" the target, in case it comes from a pool... target = getTarget();// 获得被代理的对象 if (target != null) { targetClass = target.getClass(); } // 获得拦截器列表 //advised是ProxyFactory的实例对象,创建出代理对象是通过ProxyFactory完成的 List<Object> chain = this.advised.getInterceptorsAndDynamicInterceptionAdvice(method, targetClass); Object retVal; // Check whether we only have one InvokerInterceptor: that is, // no real advice, but just reflective invocation of the target. if (chain.isEmpty() && Modifier.isPublic(method.getModifiers())) { // We can skip creating a MethodInvocation: just invoke the target directly. // Note that the final invoker must be an InvokerInterceptor, so we know // it does nothing but a reflective operation on the target, and no hot // swapping or fancy proxying. Object[] argsToUse = AopProxyUtils.adaptArgumentsIfNecessary(method, args); retVal = methodProxy.invoke(target, argsToUse); } else { // We need to create a method invocation... retVal = new CglibMethodInvocation(proxy, target, method, args, targetClass, chain, methodProxy) .proceed(); } retVal = processReturnType(proxy, target, method, retVal); return retVal; } finally { if (target != null) { releaseTarget(target); } if (setProxyContext) { // Restore old proxy. AopContext.setCurrentProxy(oldProxy); } } }
从上面的代码中可以看出,首先会获得被代理的目标对象target,然后根据被代理的方法和目标对象的Class对象获得对应的拦截器列表,最后创建从出CglibMethodInvocation对象,然后调用proceed方法,该方法会调用目标方法。下面首先分析getInterceptorsAndDynamicInterceptionAdvice方法:
/** * Determine a list of {@link org.aopalliance.intercept.MethodInterceptor} objects for the given * method, based on this configuration. * @param method the proxied method * @param targetClass the target class * @return List of MethodInterceptors (may also include InterceptorAndDynamicMethodMatchers) */ 入参:method是需要代理的方法对象,targetClass是目标对象target的Class对象 public List<Object> getInterceptorsAndDynamicInterceptionAdvice(Method method, Class<?> targetClass) { MethodCacheKey cacheKey = new MethodCacheKey(method); List<Object> cached = this.methodCache.get(cacheKey);//methodCache中保存了需要目标方法的拦截器列表,第一次进入该方法时,methodCache的返回值是null,methodCache是一个map实例 if (cached == null) { cached = this.advisorChainFactory.getInterceptorsAndDynamicInterceptionAdvice(this, method, targetClass);// 获得拦截器列表,其实是增强对象列表 this.methodCache.put(cacheKey, cached);//将增强对象列表缓存起来,以便以后使用 } return cached; }
在上面的代码中首先会创建出MethodCacheKey对象,该对象只是存储入参对象,下面是MethodCacheKey的源码:
/** * Simple wrapper class around a Method. Used as the key when caching methods, for efficient * equals and hashCode comparisons. */ private static final class MethodCacheKey implements Comparab 4000 le<MethodCacheKey> { private final Method method; private final int hashCode; public MethodCacheKey(Method method) { this.method = method; this.hashCode = method.hashCode(); } @Override public boolean equals(Object other) { return (this == other || (other instanceof MethodCacheKey && this.method == ((MethodCacheKey) other).method)); } @Override public int hashCode() { return this.hashCode; } @Override public String toString() { return this.method.toString(); } @Override public int compareTo(MethodCacheKey other) { int result = this.method.getName().compareTo(other.method.getName()); if (result == 0) { result = this.method.toString().compareTo(other.method.toString()); } return result; } }
创建出MethodCacheKey对象后,下面就需要执行:
cached = this.advisorChainFactory.getInterceptorsAndDynamicInterceptionAdvice(this, method, targetClass);
这段代码会返回和入参对应的拦截器列表。下面分析advisorChainFactory.getInterceptorsAndDynamicInterceptionAdvice的代码,其中advisorChainFactory是DefaultAdvisorChainFactory的实例:
入参:config是ProxyFactory的实例,ProxyFactory中默认包括BeanFactoryTransactionAttributeSourceAdvisor切面对象 public List<Object> getInterceptorsAndDynamicInterceptionAdvice(Advised config, Method method, Class<?> targetClass) { // This is somewhat tricky... We have to process introductions first, // but we need to preserve order in the ultimate list. List<Object> interceptorList = new ArrayList<Object>(config.getAdvisors().length); Class<?> actualClass = (targetClass != null ? targetClass : method.getDeclaringClass());//获得目标对象的CLass对象 boolean hasIntroductions = hasMatchingIntroductions(config, actualClass);//判断config中是否有匹配目标对象的引介切面,如果有就返回true AdvisorAdapterRegistry registry = GlobalAdvisorAdapterRegistry.getInstance(); for (Advisor advisor : config.getAdvisors()) {//使用BeanFactoryTransactionAttributeSourceAdvisor对象进行下面的处理 if (advisor instanceof PointcutAdvisor) { // Add it conditionally. PointcutAdvisor pointcutAdvisor = (PointcutAdvisor) advisor; //判断是否对目标对象进行代理,如果matches判断入参对象存在事务属性,matches就返回true if (config.isPreFiltered() || pointcutAdvisor.getPointcut().getClassFilter().matches(actualClass)) { MethodInterceptor[] interceptors = registry.getInterceptors(advisor);// 通过切面获得拦截器,其实是获得增强对象列表,代码比较简单,就不再对getInterceptors方法进行解析 MethodMatcher mm = pointcutAdvisor.getPointcut().getMethodMatcher();//获得切点对应的方法匹配器 //下面的MethodMatchers.matches调用,是根据入参判断目标对象是否有事务属性, /**调用的方法源码是: public boolean matches(Method method, Class<?> targetClass) { if (TransactionalProxy.class.isAssignableFrom(targetClass)) { return false; } // AnnotationTransactionAttributeSource TransactionAttributeSource tas = getTransactionAttributeSource(); return (tas == null || tas.getTransactionAttribute(method, targetClass) != null); }**/ if (MethodMatchers.matches(mm, method, actualClass, hasIntroductions)) { //如果有事务属性 if (mm.isRuntime()) { // Creating a new object instance in the getInterceptors() method // isn't a problem as we normally cache created chains. for (MethodInterceptor interceptor : interceptors) { interceptorList.add(new InterceptorAndDynamicMethodMatcher(interceptor, mm)); } } else { interceptorList.addAll(Arrays.asList(interceptors));//将interceptors增强对象添加到interceptorList对象中 } } } } else if (advisor instanceof IntroductionAdvisor) { IntroductionAdvisor ia = (IntroductionAdvisor) advisor; if (config.isPreFiltered() || ia.getClassFilter().matches(actualClass)) { Interceptor[] interceptors = registry.getInterceptors(advisor); interceptorList.addAll(Arrays.asList(interceptors)); } } else { Interceptor[] interceptors = registry.getInterceptors(advisor); interceptorList.addAll(Arrays.asList(interceptors)); } } //返回增强对象列表 return interceptorList; }
执行完上面的代码后,接下来执行:
retVal = new CglibMethodInvocation(proxy, target, method, args, targetClass, chain, methodProxy).proceed();
对于这段代码首先分析CglibMethodInvocation:
public CglibMethodInvocation(Object proxy, Object target, Method method, Object[] arguments, Class<?> targetClass, List<Object> interceptorsAndDynamicMethodMatchers, MethodProxy methodProxy{ super(proxy, target, method, arguments, targetClass, interceptorsAndDynamicMethodMatchers); this.methodProxy = methodProxy; this.publicMethod = Modifier.isPublic(method.getModifiers()); } //上面super会调用下面的代码 protected ReflectiveMethodInvocation( Object proxy, Object target, Method method, Object[] arguments, Class<?> targetClass, List<Object> interceptorsAndDynamicMethodMatchers) { this.proxy = proxy; this.target = target; this.targetClass = targetClass; this.method = BridgeMethodResolver.findBridgedMethod(method);//目标方法 this.arguments = AopProxyUtils.adaptArgumentsIfNecessary(method, arguments);//目标方法的入参 this.interceptorsAndDynamicMethodMatchers = interceptorsAndDynamicMethodMatchers; }
从上面可以看出CglibMethodInvocation对象的创建过程还是比较简单的,下面分析proceed方法,该方法属于类ReflectiveMethodInvocation:
public Object proceed() throws Throwable { // We start with an index of -1 and increment early. if (this.currentInterceptorIndex == this.interceptorsAndDynamicMethodMatchers.size() - 1) { return invokeJoinpoint(); } //获得事务拦截器对象TransactionInterceptor,该对象是在解析XML文件时就创建的 //interceptorsAndDynamicMethodMatchers就是之前创建的拦截器列表对象 Object interceptorOrInterceptionAdvice = this.interceptorsAndDynamicMethodMatchers.get(++this.currentInterceptorIndex); //如果interceptorOrInterceptionAdvice 是TransactionInterceptor对象,那么if中的代码就不执行,直接执行else中代码 if (interceptorOrInterceptionAdvice instanceof InterceptorAndDynamicMethodMatcher) { // Evaluate dynamic method matcher here: static part will already have // been evaluated and found to match. InterceptorAndDynamicMethodMatcher dm = (InterceptorAndDynamicMethodMatcher) interceptorOrInterceptionAdvice; if (dm.methodMatcher.matches(this.method, this.targetClass, this.arguments)) { return dm.interceptor.invoke(this); } else { // Dynamic matching failed. // Skip this interceptor and invoke the next in the chain. return proceed(); } } else { // It's an interceptor, so we just invoke it: The pointcut will have // been evaluated statically before this object was constructed. return ((MethodInterceptor) interceptorOrInterceptionAdvice).invoke(this); } }
下面进入invoke方法中,也就是TransactionInterceptor中的invoke方法:
public Object invoke(final MethodInvocation invocation) throws Throwable { // Work out the target class: may be {@code null}. // The TransactionAttributeSource should be passed the target class // as well as the method, which may be from an interface. Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null);// 获得被代理的对象 // Adapt to TransactionAspectSupport's invokeWithinTransaction... return invokeWithinTransaction(invocation.getMethod(), targetClass, new InvocationCallback() { @Override public Object proceedWithInvocation() throws Throwable { return invocation.proceed(); } }); }
下面进入invokeWithinTransaction方法:
入参:method目标方法,targetClass目标类 protected Object invokeWithinTransaction(Method method, Class<?> targetClass, final InvocationCallback invocation)throws Throwable { // If the transaction attribute is null, the method is non-transactional. // 获得注解的属性值 final TransactionAttribute txAttr = getTransactionAttributeSource().getTransactionAttribute(method, targetClass); final PlatformTransactionManager tm = determineTransactionManager(txAttr);// 找到事务管理器对象 final String joinpointIdentification = methodIdentification(method, targetClass);//返回值是"类名.方法名",比如Test.test if (txAttr == null || !(tm instanceof CallbackPreferringPlatformTransactionManager)) { //如果使用的是DataSourceTransactionManager对象,就执行下面的方法 // Standard transaction demarcation with getTransaction and commit/rollback calls. TransactionInfo txInfo = createTransactionIfNecessary(tm, txAttr, joinpointIdentification); Object retVal = null; try { // This is an around advice: Invoke the next interceptor in the chain. // This will normally result in a target object being invoked. retVal = invocation.proceedWithInvocation();//最终调用目标方法,至于中间调用的其他代码可以不用管 } catch (Throwable ex) { // target invocation exception completeTransactionAfterThrowing(txInfo, ex); throw ex; } finally { cleanupTransactionInfo(txInfo); } commitTransactionAfterReturning(txInfo); return retVal; } else { // It's a CallbackPreferringPlatformTransactionManager: pass a TransactionCallback in. try { Object result = ((CallbackPreferringPlatformTransactionManager) tm).execute(txAttr, new TransactionCallback<Object>() { @Override public Object doInTransaction(TransactionStatus status) { TransactionInfo txInfo = prepareTransactionInfo(tm, txAttr, joinpointIdentification, status); try { return invocation.proceedWithInvocation(); } catch (Throwable ex) { if (txAttr.rollbackOn(ex)) { // A RuntimeException: will lead to a rollback. if (ex instanceof RuntimeException) { throw (RuntimeException) ex; } else { throw new ThrowableHolderException(ex); } } else { // A normal return value: will lead to a commit. return new ThrowableHolder(ex); } } finally { cleanupTransactionInfo(txInfo); } } }); // Check result: It might indicate a Throwable to rethrow. if (result instanceof ThrowableHolder) { throw ((ThrowableHolder) result).getThrowable(); } else { return result; } } catch (ThrowableHolderException ex) { throw ex.getCause(); } } }
上面的方法中首先会调用determineTransactionManager方法,这个方法就是返回事务管理器:
该方法用于找到事务管理对象,找事务管理对象就是根据名字直接在BeanFactory中查找 protected PlatformTransactionManager determineTransactionManager(TransactionAttribute txAttr) { // Do not attempt to lookup tx manager if no tx attributes are set if (txAttr == null || this.beanFactory == null) { return getTransactionManager(); } String qualifier = txAttr.getQualifier();// 使用的事务管理器名字,这个是在注解的属性中配置的,如果没有配置就是""空字符串 if (StringUtils.hasText(qualifier)) { return determineQualifiedTransactionManager(qualifier); } else if (StringUtils.hasText(this.transactionManagerBeanName)) { //如果使用的是默认的事务管理器,就走下面这个方法,其中transactionManagerBeanName="transactionManagerBeanName" //determineQualifiedTransactionManager是从BeanFactory中根据transactionManagerBeanName找到事务管理器对象 return determineQualifiedTransactionManager(this.transactionManagerBeanName);// 找到事务管理器 } else { PlatformTransactionManager defaultTransactionManager = getTransactionManager(); if (defaultTransactionManager == null) { defaultTransactionManager = this.transactionManagerCache.get(DEFAULT_TRANSACTION_MANAGER_KEY); if (defaultTransactionManager == null) { defaultTransactionManager = this.beanFactory.getBean(PlatformTransactionManager.class); this.transactionManagerCache .putIfAbsent(DEFAULT_TRANSACTION_MANAGER_KEY, defaultTransactionManager); } } return defaultTransactionManager; } }
在执行完上面的方法后,就执行createTransactionIfNecessary方法,这个是非常重要的方法,该方法会创建事务:
//入参分别是事务管理器,事务属性和"类名.方法名",TransactionAttribute类是TransactionDefinition的子类 // 创建事务 @SuppressWarnings("serial") protected TransactionInfo createTransactionIfNecessary(PlatformTransactionManager tm, TransactionAttribute txAttr,final String joinpointIdentification) { // If no name specified, apply method identification as transaction name. if (txAttr != null && txAttr.getName() == null) {// 获得事务定义对象 //下面这个是一个代理对象,比较简单,不再分析 txAttr = new DelegatingTransactionAttribute(txAttr) { @Override public String getName() { return joinpointIdentification; } }; } TransactionStatus status = null; if (txAttr != null) { if (tm != null) { status = tm.getTransaction(txAttr);// 创建出一个事务状态对象 } else { if (logger.isDebugEnabled()) { logger.debug("Skipping transactional joinpoint [" + joinpointIdentification + "] because no transaction manager has been configured"); } } } return prepareTransactionInfo(tm, txAttr, joinpointIdentification, status); }
下面开始调用getTransaction方法:
/** * This implementation handles propagation behavior. Delegates to * {@code doGetTransaction}, {@code isExistingTransaction} * and {@code doBegin}. * @see #doGetTransaction * @see #isExistingTransaction * @see #doBegin */ @Override public final TransactionStatus getTransaction(TransactionDefinition definition) throws TransactionException { //如果存在多个@Transactional注解的方法调用时,当调用后面几个@Transactional注解的方法时,transaction对象中持有的ConnectionHolder对象中持有一个数据库连接,这个数据库连接属于本线程 Object transaction = doGetTransaction();//获取事务对象,下面分析这个方法,方法返回值是DataSourceTransactionObject对象 // Cache debug flag to avoid repeated checks. boolean debugEnabled = logger.isDebugEnabled(); if (definition == null) { // Use defaults if no transaction definition given. definition = new DefaultTransactionDefinition(); } //检测当前线程是否存在事务,如果存在多个@Transactional注解的方法调用时,下面这个方法就会返回true if (isExistingTransaction(transaction)) { // Existing transaction found -> check propagation behavior to find out how to behave. return handleExistingTransaction(definition, transaction, debugEnabled); } // Check definition settings for new transaction. if (definition.getTimeout() < TransactionDefinition.TIMEOUT_DEFAULT) { throw new InvalidTimeoutException("Invalid transaction timeout", definition.getTimeout()); } // No existing transaction found -> check propagation behavior to find out how to proceed. if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_MANDATORY) { throw new IllegalTransactionStateException( "No existing transaction found for transaction marked with propagation 'mandatory'"); } else if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRED || definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW || definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) { //执行完doGetTransaction方法后直接执行下面的方法 SuspendedResourcesHolder suspendedResources = suspend(null);//挂起之前的事务 if (debugEnabled) { logger.debug("Creating new transaction with name [" + definition.getName() + "]: " + definition); } try { boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER); // DefaultTransactionStatus status = newTransactionStatus( definition, transaction, true, newSynchronization, debugEnabled, suspendedResources); doBegin(transaction, definition); prepareSynchronization(status, definition); return status; } catch (RuntimeException ex) { resume(null, suspendedResources); throw ex; } catch (Error err) { resume(null, suspendedResources); throw err; } } else { // Create "empty" transaction: no actual transaction, but potentially synchronization. if (definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT && logger.isWarnEnabled()) { logger.warn("Custom isolation level specified but no actual transaction initiated; " + "isolation level will effectively be ignored: " + definition); } boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS); //事务传播行为为NOT_SUPPORTED时执行下面的方法 return prepareTransactionStatus(definition, null, true, newSynchronization, debugEnabled, null); } }
下面分析doGetTransaction方法
@Override protected Object doGetTransaction() { DataSourceTransactionObject txObject = new DataSourceTransactionObject();//这个方法使用默认构造方法,不需要分析 txObject.setSavepointAllowed(isNestedTransactionAllowed()); ConnectionHolder conHolder = (ConnectionHolder) TransactionSynchronizationManager.getResource(this.dataSource);//这个步骤需要获取ConnectionHolder对象 txObject.setConnectionHolder(conHolder, false); return txObject; }
下面分析TransactionSynchronizationManager.getResource(this.dataSource):
public static Object getResource(Object key) { Object actualKey = TransactionSynchronizationUtils.unwrapResourceIfNecessary(key);//该方法就是获取我们在配置文件中配置的数据源对象 Object value = doGetResource(actualKey);//下面分析这个方法 if (value != null && logger.isTraceEnabled()) { logger.trace("Retrieved value [" + value + "] for key [" + actualKey + "] bound to thread [" + Thread.currentThread().getName() + "]"); } return value; }
下面再来分析doGetResource方法:
//方法入参是数据源对象 private static Object doGetResource(Object actualKey) { Map<Object, Object> map = resources.get();//resources是一个ThreadLocal对象的静态属性 if (map == null) { return null; } Object value = map.get(actualKey);//如果第一次进来,value值是null // Transparently remove ResourceHolder that was marked as void... if (value instanceof ResourceHolder && ((ResourceHolder) value).isVoid()) { map.remove(actualKey); // Remove entire ThreadLocal if empty... if (map.isEmpty()) { resources.remove(); } value = null; } return value; }
下面是newTransactionStatus方法:
/** * Create a TransactionStatus instance for the given arguments. */ protected DefaultTransactionStatus newTransactionStatus( TransactionDefinition definition, Object transaction, boolean newTransaction, boolean newSynchronization, boolean debug, Object suspendedResources) { boolean actualNewSynchronization = newSynchronization && !TransactionSynchronizationManager.isSynchronizationActive(); /* public DefaultTransactionStatus( Object transaction, boolean newTransaction, boolean newSynchronization, boolean readOnly, boolean debug, Object suspendedResources) { this.transaction = transaction; this.newTransaction = newTransaction; this.newSynchronization = newSynchronization; this.readOnly = readOnly; this.debug = debug; this.suspendedResources = suspendedResources; } */ return new DefaultTransactionStatus( transaction, newTransaction, actualNewSynchronization, definition.isReadOnly(), debug, suspendedResources); }
下面是doBegin方法,该方法是事务开始,入参是DataSourceTransactionObject和事务定义对象,下面方法中设置了超时时间,并从数据源中获取一个连接,将数据源和连接存储起来与本线程关联,设置连接手动提交:
@Override protected void doBegin(Object transaction, TransactionDefinition definition) { DataSourceTransactionObject txObject = (DataSourceTransactionObject) transaction; Connection con = null; try { if (txObject.getConnectionHolder() == null || txObject.getConnectionHolder().isSynchronizedWithTransaction()) { Connection newCon = this.dataSource.getConnection();// 通过数据源拿到连接 if (logger.isDebugEnabled()) { logger.debug("Acquired Connection [" + newCon + "] for JDBC transaction"); } //ConnectionHolder可以简单的认为里面保存了一个数据库连接 txObject.setConnectionHolder(new ConnectionHolder(newCon), true); } txObject.getConnectionHolder().setSynchronizedWithTransaction(true); con = txObject.getConnectionHolder().getConnection(); Integer previousIsolationLevel = DataSourceUtils.prepareConnectionForTransaction(con, definition); txObject.setPreviousIsolationLevel(previousIsolationLevel); // Switch to manual commit if necessary. This is very expensive in some JDBC drivers, // so we don't want to do it unnecessarily (for example if we've explicitly // configured the connection pool to set it already). if (con.getAutoCommit()) { txObject.setMustRestoreAutoCommit(true); if (logger.isDebugEnabled()) { logger.debug("Switching JDBC Connection [" + con + "] to manual commit"); } con.setAutoCommit(false);// 事务不默认提交 } txObject.getConnectionHolder().setTransactionActive(true); int timeout = determineTimeout(definition); if (timeout != TransactionDefinition.TIMEOUT_DEFAULT) { txObject.getConnectionHolder().setTimeoutInSeconds(timeout); } // Bind the session holder to the // thread. //将数据源和数据源对应的连接存储在TransactionSynchronizationManager中 if (txObject.isNewConnectionHolder()) { TransactionSynchronizationManager.bindResource(getDataSource(), txObject.getConnectionHolder()); } } catch (Throwable ex) { if (txObject.isNewConnectionHolder()) { DataSourceUtils.releaseConnection(con, this.dataSource); txObject.setConnectionHolder(null, false); } throw new CannotCreateTransactionException("Could not open JDBC Connection for transaction", ex); } }
下面分析prepareSynchronization,该方法就是在TransactionSynchronizationManager中设置一部分属性,这些属性是线程安全的:
/** * Initialize transaction synchronization as appropriate. */ protected void prepareSynchronization(DefaultTransactionStatus status, TransactionDefinition definition) { //if判断为true if (status.isNewSynchronization()) { TransactionSynchronizationManager.setActualTransactionActive(status.hasTransaction()); TransactionSynchronizationManager.setCurrentTransactionIsolationLevel( definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT ? definition.getIsolationLevel() : null); TransactionSynchronizationManager.setCurrentTransactionReadOnly(definition.isReadOnly()); TransactionSynchronizationManager.setCurrentTransactionName(definition.getName()); TransactionSynchronizationManager.initSynchronization(); } }
下面是prepareTransactionInfo方法:
protected TransactionInfo prepareTransactionInfo(PlatformTransactionManager tm, TransactionAttribute txAttr, String joinpointIdentification, TransactionStatus status) { //TransactionInfo就是保存了入参的三个对象,该对象用于恢复上一个事务状态 TransactionInfo txInfo = new TransactionInfo(tm, txAttr, joinpointIdentification); if (txAttr != null) { // We need a transaction for this method... if (logger.isTraceEnabled()) { logger.trace("Getting transaction for [" + txInfo.getJoinpointIdentification() + "]"); } // The transaction manager will flag an error if an incompatible tx already exists. txInfo.newTransactionStatus(status);//保存事务状态对象 } else { // The TransactionInfo.hasTransaction() method will return false. We created it only // to preserve the integrity of the ThreadLocal stack maintained in this class. if (logger.isTraceEnabled()) logger.trace("Don't need to create transaction for [" + joinpointIdentification + "]: This method isn't transactional."); } // We always bind the TransactionInfo to the thread, even if we didn't create // a new transaction here. This guarantees that the TransactionInfo stack // will be managed correctly even if no transaction was created by this aspect. txInfo.bindToThread(); // TransactionInfo中存储了事务相关的三个对象,分别是事务管理器,事务定义,事务状态 return txInfo; }
当存在多个@Transactional方法依次调用时,下面这个方法handleExistingTransaction会被调用:
/** * Create a TransactionStatus for an existing transaction. */ private TransactionStatus handleExistingTransaction( TransactionDefinition definition, Object transaction, boolean debugEnabled) throws TransactionException { //definition.getPropagationBehavior()用于获得事务的传播行为 if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NEVER) { throw new IllegalTransactionStateException( "Existing transaction found for transaction marked with propagation 'never'"); } if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NOT_SUPPORTED) { if (debugEnabled) { logger.debug("Suspending current transaction"); } Object suspendedResources = suspend(transaction); boolean newSynchronization = (getTransactionSynchronization() == SYNCHRONIZATION_ALWAYS); //suspendedResources表示被挂起的资源集合 return prepareTransactionStatus( definition, null, false, newSynchronization, debugEnabled, suspendedResources); } if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_REQUIRES_NEW) { if (debugEnabled) { logger.debug("Suspending current transaction, creating new transaction with name [" + definition.getName() + "]"); } SuspendedResourcesHolder suspendedResources = suspend(transaction); try { boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER); DefaultTransactionStatus status = newTransactionStatus( definition, transaction, true, newSynchronization, debugEnabled, suspendedResources); doBegin(transaction, definition); prepareSynchronization(status, definition); return status; } catch (RuntimeException beginEx) { resumeAfterBeginException(transaction, suspendedResources, beginEx); throw beginEx; } catch (Error beginErr) { resumeAfterBeginException(transaction, suspendedResources, beginErr); throw beginErr; } } if (definition.getPropagationBehavior() == TransactionDefinition.PROPAGATION_NESTED) { if (!isNestedTransactionAllowed()) { throw new NestedTransactionNotSupportedException( "Transaction manager does not allow nested transactions by default - " + "specify 'nestedTransactionAllowed' property with value 'true'"); } if (debugEnabled) { logger.debug("Creating nested transaction with name [" + definition.getName() + "]"); } if (useSavepointForNestedTransaction()) { // Create savepoint within existing Spring-managed transaction, // through the SavepointManager API implemented by TransactionStatus. // Usually uses JDBC 3.0 savepoints. Never activates Spring synchronization. DefaultTransactionStatus status = prepareTransactionStatus(definition, transaction, false, false, debugEnabled, null); status.createAndHoldSavepoint(); return status; } else { // Nested transaction through nested begin and commit/rollback calls. // Usually only for JTA: Spring synchronization might get activated here // in case of a pre-existing JTA transaction. boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER); DefaultTransactionStatus status = newTransactionStatus( definition, transaction, true, newSynchronization, debugEnabled, null); doBegin(transaction, definition); prepareSynchronization(status, definition); return status; } } // Assumably PROPAGATION_SUPPORTS or PROPAGATION_REQUIRED. if (debugEnabled) { logger.debug("Participating in existing transaction"); } if (isValidateExistingTransaction()) { if (definition.getIsolationLevel() != TransactionDefinition.ISOLATION_DEFAULT) { Integer currentIsolationLevel = TransactionSynchronizationManager.getCurrentTransactionIsolationLevel(); if (currentIsolationLevel == null || currentIsolationLevel != definition.getIsolationLevel()) { Constants isoConstants = DefaultTransactionDefinition.constants; throw new IllegalTransactionStateException("Participating transaction with definition [" + definition + "] specifies isolation level which is incompatible with existing transaction: " + (currentIsolationLevel != null ? isoConstants.toCode(currentIsolationLevel, DefaultTransactionDefinition.PREFIX_ISOLATION) : "(unknown)")); } } if (!definition.isReadOnly()) { if (TransactionSynchronizationManager.isCurrentTransactionReadOnly()) { throw new IllegalTransactionStateException("Participating transaction with definition [" + definition + "] is not marked as read-only but existing transaction is"); } } } boolean newSynchronization = (getTransactionSynchronization() != SYNCHRONIZATION_NEVER); //下面传入的false属性表示当前创建的事务不是第一个被执行的事务,而是前面已经有事务了,目前的事务是在前面的事务里面的 //这个false在事务执行完毕进行清理工作时,会有用 return prepareTransactionStatus(definition, transaction, false, newSynchronization, debugEnabled, null);//创建一个新的事务,如果采用默认的事务传播行为那么本方法直接执行本行 }
当方法都调用完毕,执行后续清理时,首先调用方法commitTransactionAfterReturning:
protected void commitTransactionAfterReturning(TransactionInfo txInfo) { if (txInfo != null && txInfo.hasTransaction()) { if (logger.isTraceEnabled()) { logger.trace("Completing transaction for [" + txInfo.getJoinpointIdentification() + "]"); } txInfo.getTransactionManager().commit(txInfo.getTransactionStatus());//用于执行最后的提交操作 } }
调用事务管理器的commit方法:
/** * This implementation of commit handles participating in existing * transactions and programmatic rollback requests. * Delegates to {@code isRollbackOnly}, {@code doCommit} * and {@code rollback}. * @see org.springframework.transaction.TransactionStatus#isRollbackOnly() * @see #doCommit * @see #rollback */ @Override public final void commit(TransactionStatus status) throws TransactionException { if (status.isCompleted()) { throw new IllegalTransactionStateException( "Transaction is already completed - do not call commit or rollback more than once per transaction"); } DefaultTransactionStatus defStatus = (DefaultTransactionStatus) status; if (defStatus.isLocalRollbackOnly()) { if (defStatus.isDebug()) { logger.debug("Transactional code has requested rollback"); } processRollback(defStatus); return; } if (!shouldCommitOnGlobalRollbackOnly() && defStatus.isGlobalRollbackOnly()) { if (defStatus.isDebug()) { logger.debug("Global transaction is marked as rollback-only but transactional code requested commit"); } processRollback(defStatus); // Throw UnexpectedRollbackException only at outermost transaction boundary // or if explicitly asked to. if (status.isNewTransaction() || isFailEarlyOnGlobalRollbackOnly()) { throw new UnexpectedRollbackException( "Transaction rolled back because it has been marked as rollback-only"); } return; } processCommit(defStatus);//执行提交操作,提交操作由最初被调用的@Transactional方法执行commit操作,其他的被调用@Transactional方法相当于执行的空方法 }
真正执行提交操作的方法:
@Override protected void doCommit(DefaultTransactionStatus status) { DataSourceTransactionObject txObject = (DataSourceTransactionObject) status.getTransaction(); Connection con = txObject.getConnectionHolder().getConnection(); if (status.isDebug()) { logger.debug("Committing JDBC transaction on Connection [" + con + "]"); } try { con.commit();// 直接调用连接提交 } catch (SQLException ex) { throw new TransactionSystemException("Could not commit JDBC transaction", ex); } }
事务管理器是唯一的,如果有多个@Transactional方法被调用,每一个被调用的方法都会有一个对应的事务状态对象和事务属性对象。
下面这个方法执行事务挂起的操作,当使用某些传播行为时,会将一些事务挂起,下面这个方法就会被调用:
/** * Suspend the given transaction. Suspends transaction synchronization first, then delegates to * the {@code doSuspend} template method. * @param transaction the current transaction object (or {@code null} to just suspend active * synchronizations, if any) * @return an object that holds suspended resources (or {@code null} if neither transaction nor * synchronization active) * @see #doSuspend * @see #resume */ protected final SuspendedResourcesHolder suspend(Object transaction) throws TransactionException { if (TransactionSynchronizationManager.isSynchronizationActive()) { List<TransactionSynchronization> suspendedSynchronizations = doSuspendSynchronization();//将当前线程中的其他事务全部挂起 try { Object suspendedResources = null; if (transaction != null) { suspendedResources = doSuspend(transaction);//挂起 } String name = TransactionSynchronizationManager.getCurrentTransactionName(); TransactionSynchronizationManager.setCurrentTransactionName(null); boolean readOnly = TransactionSynchronizationManager.isCurrentTransactionReadOnly(); TransactionSynchronizationManager.setCurrentTransactionReadOnly(false); Integer isolationLevel = TransactionSynchronizationManager.getCurrentTransactionIsolationLevel(); TransactionSynchronizationManager.setCurrentTransactionIsolationLevel(null); boolean wasActive = TransactionSynchronizationManager.isActualTransactionActive(); TransactionSynchronizationManager.setActualTransactionActive(false); return new SuspendedResourcesHolder(suspendedResources, suspendedSynchronizations, name, readOnly, isolationLevel, wasActive); } catch (RuntimeException ex) { // doSuspend failed - original transaction is still active... doResumeSynchronization(suspendedSynchronizations); throw ex; } catch (Error err) { // doSuspend failed - original transaction is still active... doResumeSynchronization(suspendedSynchronizations); throw err; } } else if (transaction != null) { // Transaction active but no synchronization active. Object suspendedResources = doSuspend(transaction); return new SuspendedResourcesHolder(suspendedResources); } else { // Neither transaction nor synchronization active. return null; } }
事务挂起的操作其实很简单,事务挂起就是将事务的各种信息从ThreadLocal对象中删除,将下一个事务的信息填入ThreadLocal对象中,运行完下一个事务后,再恢复上一个事务信息。
下面的方法是执行事务信息恢复的方法,在后一个事务挂起并执行完后,下面的方法会执行,以恢复当前事务状态:
/** * Reset the TransactionInfo ThreadLocal. * <p> * Call this in all cases: exception or normal return! * @param txInfo information about the current transaction (may be {@code null}) */ protected void cleanupTransactionInfo(TransactionInfo txInfo) { if (txInfo != null) { txInfo.restoreThreadLocalStatus(); } }
每一个被@Transactional注解的方法都会创建一个TransactionInfo对象,该对象中持有上一个事务的TransactionInfo对象,这样可以用于恢复上一个事务状态。
传播行为中的NOT_SUPPORTED也会创建TransactionInfo对象、事务状态对象、事务定义对象。spring会单独创建一个新的连接用于NOT_SUPPORTED的事务中。但是这个连接spring不执行关于连接的任何属性的设置操作,属性的设置在自己的代码中进行设置,包括提交也是一样,spring不进行事务的提交。
TransactionSynchronizationManager这个管理器里面的方法基本都是静态的,里面建有多个静态的ThreadLocal属性,这些属性是一个线程对应一个属性值。一个线程使用了哪一个数据源的哪一个连接会在resources中存储。
TransactionSynchronizationManager在这个对象中还会设置当前线程是否有事务,事务隔离级别,事务是否只读,事务名字。事务的名字就是被注解的类名加方法名。
在方法中使用连接时都要从TransactionSynchronizationManager对象中获取。
相关文章推荐
- Spring 事务管理高级应用难点剖析: 第 1 部分
- Spring 事务管理高级应用难点剖析
- Spring的事务管理难点剖析(5):联合军种作战的混乱
- Spring 事务管理高级应用难点剖析
- Spring 事务管理高级应用难点剖析: 第 2 部分
- Spring 事务管理高级应用难点剖析2
- Spring的事务管理难点剖析(4):多线程的困惑
- Spring 事务管理高级应用难点剖析: 第 3 部分
- Spring 事务管理高级应用难点剖析1
- Spring事务管理高级应用难点剖析
- Spring事务管理高级应用难点剖析
- Spring事务管理高级应用难点剖析
- Spring的事务管理难点剖析(3):事务方法嵌套调用的迷茫
- Spring 事务管理高级应用难点剖析: 第 1 部分---转自:陈 雄华
- Spring 事务管理高级应用难点剖析 IBM
- Spring 事务管理高级应用难点剖析: 第 2 部分
- Spring 事务管理高级应用难点剖析: 第 3 部分
- Spring 事务管理高级应用难点剖析: 第 3 部分
- Spring的事务管理难点剖析(1):DAO和事务管理的牵绊
- Spring的事务管理难点剖析(6):特殊方法成漏网之鱼