1 背景
业务处理过程,发现了以下问题,代码一是原代码能正常执行,代码二是经过迭代一次非正常执行代码。
代码一:以下代码开启线程后,代码正常执行:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 10, 200, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(5));
@Transactional public Long test() { Long studentId = studentService.insert(student); writeStatisticsData(studentId); return studentId; }
private void writeStatisticsData(Long studentId) { executor.execute(() -> { Student student = studentService.findById(studentId); }); }
|
代码二:以下代码开启线程后,代码不正常执行(获取不到student对象了):
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| @Transactional public Long test() { Long studentId = studentService.insert(student); writeStatisticsData(studentId); Long addressId = addressService.insert(address); return studentId; }
private void writeStatisticsData(Long studentId) { executor.execute(() -> { Student student = studentService.findById(studentId); }); }
|
2 问题分析
这里使用了spring事务,显然需要考虑事务的隔离级别。
查看mysql隔离级别:
1 2
| SELECT @@tx_isolation; READ-COMMITTED
|
读提交,即在事务A插入数据过程中,事务B在A提交之前读取不到A插入的数据。
问题原因分析,代码一正常运行的原因:
由于mysql事务的隔离级别是 读提交,test方法在开启异步线程后,异步线程也开启了事务,同时以读者身份去读 test 方法中插入的 student 记录,但此时 test 方法已经提交了事务,所以可以读取到 student 记录(即在异步方法中可以读取到 student 记录),但此代码有风险,若事务提交的时间晚一点,异步线程也有可能读取不到 student 记录。
代码二不能正常运行的原因:
经过上面分析,很明显异步方法中不能读取到 student 记录,由于代码二在异步线程下面又执行了其他操作,延时了test方法中事务的提交,所以代码二不能正常运行。
3 解决问题方案
解决思路是在事务提交后再做其他的处理(如异步发消息处理等),这里还是从Spring执行事务的过程中入手,Spring事务的处理过程不再分析,这里直接看Spring事务增强器TransactionInterceptor的核心处理流程,源码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| protected Object invokeWithinTransaction(Method method, Class<?> targetClass, final InvocationCallback invocation) throws Throwable { final TransactionAttribute txAttr = getTransactionAttributeSource().getTransactionAttribute(method, targetClass); final PlatformTransactionManager tm = determineTransactionManager(txAttr); final String joinpointIdentification = methodIdentification(method, targetClass, txAttr);
if (txAttr == null || !(tm instanceof CallbackPreferringPlatformTransactionManager)) { TransactionInfo txInfo = createTransactionIfNecessary(tm, txAttr, joinpointIdentification); Object retVal = null; retVal = invocation.proceedWithInvocation(); commitTransactionAfterReturning(txInfo); return retVal; } else { } }
|
这里主要看声明式事务的处理,因为编程式事务的处理及提交都是用户在编码中进行控制。在声明式事务处理中,当方法执行完后,会执行 commitTransactionAfterReturning 方法来进行提交事务,该方法在 TransactionAspectSupport 类中,源码如下:
1 2 3 4 5
| protected void commitTransactionAfterReturning(TransactionInfo txInfo) { if (txInfo != null && txInfo.hasTransaction()) { txInfo.getTransactionManager().commit(txInfo.getTransactionStatus()); } }
|
再看 commit 方法,该方法在 AbstractPlatformTransactionManager 类中,源码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45
| public final void commit(TransactionStatus status) throws TransactionException { processCommit(defStatus); }
private void processCommit(DefaultTransactionStatus status) throws TransactionException { try { boolean beforeCompletionInvoked = false; try { prepareForCommit(status); triggerBeforeCommit(status); triggerBeforeCompletion(status); beforeCompletionInvoked = true; boolean globalRollbackOnly = false; if (status.isNewTransaction() || isFailEarlyOnGlobalRollbackOnly()) { globalRollbackOnly = status.isGlobalRollbackOnly(); } if (status.hasSavepoint()) { status.releaseHeldSavepoint(); } else if (status.isNewTransaction()) { doCommit(status); } } catch (......) { }
try { triggerAfterCommit(status); } finally { triggerAfterCompletion(status, TransactionSynchronization.STATUS_COMMITTED); } } finally { cleanupAfterCompletion(status); } }
private void triggerAfterCommit(DefaultTransactionStatus status) { if (status.isNewSynchronization()) { TransactionSynchronizationUtils.triggerAfterCommit(); } }
|
最终会走到 TransactionSynchronizationUtils.triggerAfterCommit() 方法中,
1 2 3 4 5 6 7 8 9 10 11
| public static void triggerAfterCommit() { invokeAfterCommit(TransactionSynchronizationManager.getSynchronizations()); }
public static void invokeAfterCommit(List<TransactionSynchronization> synchronizations) { if (synchronizations != null) { for (TransactionSynchronization synchronization : synchronizations) { synchronization.afterCommit(); } } }
|
上面会把缓存在 TransactionSynchronizationManager 中的 TransactionSynchronization 按顺序来执行 afterCommit 方法,其中 TransactionSynchronization 以集合形式缓存在 TransactionSynchronizationManager 的 ThreadLocal 中。
方式一
经过上面分析,只需要代码中重新生成个 TransactionSynchronization 并加入到 TransactionSynchronizationManager 的 TransactionSynchronization 集合中即可,所以有了解决方案,如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| private void writeStatisticsData(Long studentId) { if(TransactionSynchronizationManager.isActualTransactionActive()) { TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronizationAdapter() { @Override public void afterCommit() { executor.execute(() -> { Student student = studentService.findById(studentId); }); }}); } else { executor.execute(() -> { Student student = studentService.findById(studentId); }); } }
|
方式二
使用 @TransactionalEventListener 结合 Spring事件监听机制,该注解自从Spring4.2版本开始有的,如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37
| public class StudentEvent extends ApplicationEvent { public StudentEvent(Long studentId) { super(studentId); } }
public class StudentEventListener{ @TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT) public void writeStatisticsData(StudentEvent studentEvent) { executor.execute(() -> { Student student = studentService.findById(studentEvent.getSource()); }); } }
@Service public class StudentService { @Autowired private ApplicationEventPublisher applicationEventPublisher;
@Transactional public Long test() { Long studentId = studentService.insert(student); applicationEventPublisher.publishEvent(new StudentEvent(studentId)); Long addressId = addressService.insert(address); return studentId; } }
|
原理分析:
Spring Bean在加载配置文件时,会使用 AnnotationDrivenBeanDefinitionParser注册TransactionalEventListenerFactory :
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
| class AnnotationDrivenBeanDefinitionParser implements BeanDefinitionParser {
@Override public BeanDefinition parse(Element element, ParserContext parserContext) { registerTransactionalEventListenerFactory(parserContext); String mode = element.getAttribute("mode"); if ("aspectj".equals(mode)) { registerTransactionAspect(element, parserContext); } else { AopAutoProxyConfigurer.configureAutoProxyCreator(element, parserContext); } return null; }
private void registerTransactionalEventListenerFactory(ParserContext parserContext) { RootBeanDefinition def = new RootBeanDefinition(); def.setBeanClass(TransactionalEventListenerFactory.class); parserContext.registerBeanComponent(new BeanComponentDefinition(def, TransactionManagementConfigUtils.TRANSACTIONAL_EVENT_LISTENER_FACTORY_BEAN_NAME)); } }
|
1 2 3 4 5 6 7 8 9
| public class TransactionalEventListenerFactory implements EventListenerFactory, Ordered {
@Override public ApplicationListener<?> createApplicationListener(String beanName, Class<?> type, Method method) { return new ApplicationListenerMethodTransactionalAdapter(beanName, type, method); } }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| class ApplicationListenerMethodTransactionalAdapter extends ApplicationListenerMethodAdapter {
@Override public void onApplicationEvent(ApplicationEvent event) { if (TransactionSynchronizationManager.isSynchronizationActive()) { TransactionSynchronization transactionSynchronization = createTransactionSynchronization(event); TransactionSynchronizationManager.registerSynchronization(transactionSynchronization); } else if (this.annotation.fallbackExecution()) { } processEvent(event); } else { } }
|
上述 @TransactionalEventListener 本质上是一个 @EventListener,TransactionalEventListenerFactory类会将每一个扫描到的方法有TransactionalEventListener注解包装成ApplicationListenerMethodTransactionalAdapter对象,通过ApplicationListenerMethodTransactionalAdapter的onApplicationEvent方法可以看到若当前存在事务,就会生成TransactionSynchronization并加入到 TransactionSynchronizationManager的缓存ThreadLocal集合中,剩余流程同上述分析。
转载自:https://blog.csdn.net/zhuqiuhui/article/details/89299360