今天使用aop时,发现一件很值得研究的事情。
我使用aop对一个service类的一个方法进行@After加强,该方法是开启了事务的,姑且叫它func()。
我也是看过aop源码的,我本以为func()的执行如果func()方法本身不报错,那么对它进行事务增强的Interceptor就会执行完毕,事务就应该提交才对。
但是我错了,我一共写了两个@After,第一个没有异常,但是第二个@After有异常。奇怪的事情发生了,事务并没有完成。
为什么呢?该问题姑且记下,看来源码看的还是不够啊。
由于项目中用的是CGLIB方式增强,这部分代码确实没看,只看过JDK方式的事务处理。问题也可能出在CGLIB上。
即使不是CGLIB使用JDK自带的动态代理也是不行的,也是会导致回滚的
================================================2020-11-12 16:00更新===========================
对于一个事务方法进行aop增强还是得慎重啊,因为不走完增强的逻辑,事务也不会提交的
而且如果增强部分抛了异常,还会影响事务的提交
@Override // @Transactional(propagation = Propagation.REQUIRED) public void batchAddIncreSubAndZkPersist(SubscribeObjSdk topData, List<DataMedia> originalInput, List<SubscribeObjSdk> retureSubList, KafkaGroup kafkaGroup) { processBatchAddIncreSub(topData, originalInput, retureSubList, kafkaGroup); logger.info("batchAddIncreSubAndZkPersist subscription no {} write db over", topData.getSubScriptionNo()); Executor myExecutor = KafkaTopicAsyncThPoolSupplier.obtainThreadPool(); CompletableFuture.supplyAsync(() -> { SubInfoService subInfoService = SpringContextUtils.getBean("subInfoService", SubInfoService.class);
源代码是我有对这个方法 batchAddIncreSubAndZkPersist的增强,比如
@After(value="execution(* com.suning.rdrs.admin.service.impl.TaskServiceImpl.batchAddIncreSubAndZkPersist(..)) && args(a,b,c,d)") public void processCopyTaskTopicRelation(SubscribeObjSdk a, List<DataMedia> b, List<SubscribeObjSdk> c, KafkaGroup d) throws InterruptedException, ExecutionException { if (Objects.equals(a.getSubscribeType(), StringConstant.DI_WHOLESCHEMA_TYPE)) { SubscribeObjSdk subscribeSdk = c.get(0); DataMedia dataMedia = b.get(0); String dataCenter = dataMedia.getDataCenter(); String ldc = propGetter.getRegularLdc(dataCenter);
现在发现的现象是 由于 写数据库的部分在 processBatchAddIncreSub 中,由于增强方法 processCopyTaskTopicRelation 会执行很长时间 ,这样
CompletableFuture.supplyAsync(() -> { SubInfoService subInfoService = SpringContextUtils.getBean("subInfoService", SubInfoService.class);
如果上面的代码里有对数据的读取操作是读不到的,因为增强部分没有执行完,事务还没有提交。这就会导致逻辑出现问题
开始源码分析
spring的事务也是通过aop做的,事务增强的advisor是TransactionInterceptor,它也没什么特别的,跟用户自己的写的advisor都是一回事,如果我们对一个有事务的方法进行aop增强,请看下面的代码段
JdkDynamicAopProxy.invoke
List<Object> chain = this.advised.getInterceptorsAndDynamicInterceptionAdvice(method, targetClass);//所有advisor都会在这里被拿到 // Check whether we have any advice. If we don't, we can fallback on direct // reflective invocation of the target, and avoid creating a MethodInvocation. if (chain.isEmpty()) { // We can skip creating a MethodInvocation: just invoke the target directly // Note that the final invoker must be an InvokerInterceptor so we know it does // nothing but a reflective operation on the target, and no hot swapping or fancy proxying. Object[] argsToUse = AopProxyUtils.adaptArgumentsIfNecessary(method, args); retVal = AopUtils.invokeJoinpointUsingReflection(target, method, argsToUse); } else { // We need to create a method invocation... invocation = new ReflectiveMethodInvocation(proxy, target, method, args, targetClass, chain);//把所有的advisor构造成一个ReflectiveMethodInvocation // Proceed to the joinpoint through the interceptor chain. retVal = invocation.proceed(); }
ReflectiveMethodInvocation
@Override public Object proceed() throws Throwable { // We start with an index of -1 and increment early. if (this.currentInterceptorIndex == this.interceptorsAndDynamicMethodMatchers.size() - 1) { return invokeJoinpoint(); } Object interceptorOrInterceptionAdvice = this.interceptorsAndDynamicMethodMatchers.get(++this.currentInterceptorIndex); if (interceptorOrInterceptionAdvice instanceof InterceptorAndDynamicMethodMatcher) { // Evaluate dynamic method matcher here: static part will already have // been evaluated and found to match. InterceptorAndDynamicMethodMatcher dm = (InterceptorAndDynamicMethodMatcher) interceptorOrInterceptionAdvice; if (dm.methodMatcher.matches(this.method, this.targetClass, this.arguments)) { return dm.interceptor.invoke(this); } else { // Dynamic matching failed. // Skip this interceptor and invoke the next in the chain. return proceed(); } } else { // It's an interceptor, so we just invoke it: The pointcut will have // been evaluated statically before this object was constructed. return ((MethodInterceptor) interceptorOrInterceptionAdvice).invoke(this);//事务TransactionInterceptor就在这里执行 } }
TransactionInterceptor.invoke,上面的this就是入参 MethodInvocation invocation
@Override public Object invoke(final MethodInvocation invocation) throws Throwable { // Work out the target class: may be {@code null}. // The TransactionAttributeSource should be passed the target class // as well as the method, which may be from an interface. Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null); // Adapt to TransactionAspectSupport's invokeWithinTransaction... return invokeWithinTransaction(invocation.getMethod(), targetClass, new InvocationCallback() { @Override public Object proceedWithInvocation() throws Throwable { return invocation.proceed(); } }); }
protected Object invokeWithinTransaction(Method method, Class<?> targetClass, final InvocationCallback invocation) throws Throwable { // If the transaction attribute is null, the method is non-transactional. final TransactionAttribute txAttr = getTransactionAttributeSource().getTransactionAttribute(method, targetClass); final PlatformTransactionManager tm = determineTransactionManager(txAttr); final String joinpointIdentification = methodIdentification(method, targetClass, txAttr); if (txAttr == null || !(tm instanceof CallbackPreferringPlatformTransactionManager)) { // Standard transaction demarcation with getTransaction and commit/rollback calls. TransactionInfo txInfo = createTransactionIfNecessary(tm, txAttr, joinpointIdentification); Object retVal = null; try { // This is an around advice: Invoke the next interceptor in the chain. // This will normally result in a target object being invoked. retVal = invocation.proceedWithInvocation();//其实这里会把所有的advisor都执行一遍,所以嘞,如果我们写的aop有异常,当然会让事务回滚了 } catch (Throwable ex) {