Spring使用TransactionalEventListener解决事务未提交读取不到数据问题

1 背景

业务处理过程,发现了以下问题,代码一是原代码能正常执行,代码二是经过迭代一次非正常执行代码。

代码一:以下代码开启线程后,代码正常执行:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 10, 200, TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<Runnable>(5));

@Transactional
public Long test() {
// ......
// 插入记录
Long studentId = studentService.insert(student);
// 异步线程
writeStatisticsData(studentId);
return studentId;
}

private void writeStatisticsData(Long studentId) {
executor.execute(() -> {
Student student = studentService.findById(studentId);
//........
});
}

代码二:以下代码开启线程后,代码不正常执行(获取不到student对象了):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
@Transactional
public Long test() {
// ......
// 插入记录
Long studentId = studentService.insert(student);
// 异步线程
writeStatisticsData(studentId);
// 插入学生地址记录
Long addressId = addressService.insert(address);
return studentId;
}

private void writeStatisticsData(Long studentId) {
executor.execute(() -> {
Student student = studentService.findById(studentId);
//........
});
}

2 问题分析

这里使用了spring事务,显然需要考虑事务的隔离级别。
查看mysql隔离级别:

1
2
SELECT @@tx_isolation;
READ-COMMITTED

读提交,即在事务A插入数据过程中,事务B在A提交之前读取不到A插入的数据。

问题原因分析,代码一正常运行的原因:

由于mysql事务的隔离级别是 读提交,test方法在开启异步线程后,异步线程也开启了事务,同时以读者身份去读 test 方法中插入的 student 记录,但此时 test 方法已经提交了事务,所以可以读取到 student 记录(即在异步方法中可以读取到 student 记录),但此代码有风险,若事务提交的时间晚一点,异步线程也有可能读取不到 student 记录。

代码二不能正常运行的原因:

经过上面分析,很明显异步方法中不能读取到 student 记录,由于代码二在异步线程下面又执行了其他操作,延时了test方法中事务的提交,所以代码二不能正常运行。

3 解决问题方案

解决思路是在事务提交后再做其他的处理(如异步发消息处理等),这里还是从Spring执行事务的过程中入手,Spring事务的处理过程不再分析,这里直接看Spring事务增强器TransactionInterceptor的核心处理流程,源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
protected Object invokeWithinTransaction(Method method, Class<?> targetClass, final InvocationCallback invocation) throws Throwable {
// 获取事务属性
final TransactionAttribute txAttr = getTransactionAttributeSource().getTransactionAttribute(method, targetClass);
//加载配置中配置的TransactionManager
final PlatformTransactionManager tm = determineTransactionManager(txAttr);
final String joinpointIdentification = methodIdentification(method, targetClass, txAttr);

// 声明式事务的处理
if (txAttr == null || !(tm instanceof CallbackPreferringPlatformTransactionManager)) {
TransactionInfo txInfo = createTransactionIfNecessary(tm, txAttr, joinpointIdentification);
Object retVal = null;
//......
retVal = invocation.proceedWithInvocation();
//......
commitTransactionAfterReturning(txInfo);
return retVal;
} else {
// 编程式事务的处理......
}
//......
}

这里主要看声明式事务的处理,因为编程式事务的处理及提交都是用户在编码中进行控制。在声明式事务处理中,当方法执行完后,会执行 commitTransactionAfterReturning 方法来进行提交事务,该方法在 TransactionAspectSupport 类中,源码如下:

1
2
3
4
5
protected void commitTransactionAfterReturning(TransactionInfo txInfo) {
if (txInfo != null && txInfo.hasTransaction()) {
txInfo.getTransactionManager().commit(txInfo.getTransactionStatus());
}
}

再看 commit 方法,该方法在 AbstractPlatformTransactionManager 类中,源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
public final void commit(TransactionStatus status) throws TransactionException {
// 这里省略很多代码,如事务回滚......
processCommit(defStatus);
}

private void processCommit(DefaultTransactionStatus status) throws TransactionException {
try {
boolean beforeCompletionInvoked = false;
try {
prepareForCommit(status);
triggerBeforeCommit(status);
triggerBeforeCompletion(status);
beforeCompletionInvoked = true;
boolean globalRollbackOnly = false;
if (status.isNewTransaction() || isFailEarlyOnGlobalRollbackOnly()) {
globalRollbackOnly = status.isGlobalRollbackOnly();
}
if (status.hasSavepoint()) {
status.releaseHeldSavepoint();
} else if (status.isNewTransaction()) {
// 提交事务
doCommit(status);
}
//......
} catch (......) {
// 事务异常处理......
}

try {
// 事务提交成功后的处理-----这里是重点
triggerAfterCommit(status);
} finally {
triggerAfterCompletion(status, TransactionSynchronization.STATUS_COMMITTED);
}
}
finally {
cleanupAfterCompletion(status);
}
}

private void triggerAfterCommit(DefaultTransactionStatus status) {
if (status.isNewSynchronization()) {
TransactionSynchronizationUtils.triggerAfterCommit();
}
}

最终会走到 TransactionSynchronizationUtils.triggerAfterCommit() 方法中,

1
2
3
4
5
6
7
8
9
10
11
public static void triggerAfterCommit() {
invokeAfterCommit(TransactionSynchronizationManager.getSynchronizations());
}

public static void invokeAfterCommit(List<TransactionSynchronization> synchronizations) {
if (synchronizations != null) {
for (TransactionSynchronization synchronization : synchronizations) {
synchronization.afterCommit();
}
}
}

上面会把缓存在 TransactionSynchronizationManager 中的 TransactionSynchronization 按顺序来执行 afterCommit 方法,其中 TransactionSynchronization 以集合形式缓存在 TransactionSynchronizationManager 的 ThreadLocal 中。

  • 方式一
    经过上面分析,只需要代码中重新生成个 TransactionSynchronization 并加入到 TransactionSynchronizationManager 的 TransactionSynchronization 集合中即可,所以有了解决方案,如下:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    private void writeStatisticsData(Long studentId) {
    if(TransactionSynchronizationManager.isActualTransactionActive()) {
    // 当前存在事务
    TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronizationAdapter() {
    @Override
    public void afterCommit() {
    executor.execute(() -> {
    Student student = studentService.findById(studentId);
    //........
    });
    }});
    } else {
    // 当前不存在事务
    executor.execute(() -> {
    Student student = studentService.findById(studentId);
    //........
    });
    }
    }

  • 方式二
    使用 @TransactionalEventListener 结合 Spring事件监听机制,该注解自从Spring4.2版本开始有的,如下:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    // 事件
    public class StudentEvent extends ApplicationEvent {
    public StudentEvent(Long studentId) {
    super(studentId);
    }
    }

    // 监听器
    public class StudentEventListener{
    @TransactionalEventListener(phase = TransactionPhase.AFTER_COMMIT)
    public void writeStatisticsData(StudentEvent studentEvent) {
    executor.execute(() -> {
    Student student = studentService.findById(studentEvent.getSource());
    //........
    });
    }
    }

    @Service
    public class StudentService {
    // Spring4.2之后,ApplicationEventPublisher自动被注入到容器中,采用Autowired即可获取
    @Autowired
    private ApplicationEventPublisher applicationEventPublisher;

    @Transactional
    public Long test() {
    // ......
    // 插入记录
    Long studentId = studentService.insert(student);
    // 发布事件
    applicationEventPublisher.publishEvent(new StudentEvent(studentId));
    // 插入学生地址记录
    Long addressId = addressService.insert(address);
    return studentId;
    }
    }

原理分析:
Spring Bean在加载配置文件时,会使用 AnnotationDrivenBeanDefinitionParser注册TransactionalEventListenerFactory :

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
class AnnotationDrivenBeanDefinitionParser implements BeanDefinitionParser {

@Override
public BeanDefinition parse(Element element, ParserContext parserContext) {
// 重点——将TransactionalEventListenerFactory加入到容器中
registerTransactionalEventListenerFactory(parserContext);
String mode = element.getAttribute("mode");
if ("aspectj".equals(mode)) {
// mode="aspectj"
registerTransactionAspect(element, parserContext);
}
else {
// mode="proxy"
AopAutoProxyConfigurer.configureAutoProxyCreator(element, parserContext);
}
return null;
}

private void registerTransactionalEventListenerFactory(ParserContext parserContext) {
RootBeanDefinition def = new RootBeanDefinition();
def.setBeanClass(TransactionalEventListenerFactory.class);
parserContext.registerBeanComponent(new BeanComponentDefinition(def,
TransactionManagementConfigUtils.TRANSACTIONAL_EVENT_LISTENER_FACTORY_BEAN_NAME));
}
}
1
2
3
4
5
6
7
8
9
public class TransactionalEventListenerFactory implements EventListenerFactory, Ordered {

//省略部分代码......

@Override
public ApplicationListener<?> createApplicationListener(String beanName, Class<?> type, Method method) {
return new ApplicationListenerMethodTransactionalAdapter(beanName, type, method);
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
class ApplicationListenerMethodTransactionalAdapter extends ApplicationListenerMethodAdapter {

//省略部分代码......

@Override
public void onApplicationEvent(ApplicationEvent event) {
if (TransactionSynchronizationManager.isSynchronizationActive()) {
// 事务存在时,生成TransactionSynchronization并加入到 TransactionSynchronizationManager的缓存集合中
TransactionSynchronization transactionSynchronization = createTransactionSynchronization(event);
TransactionSynchronizationManager.registerSynchronization(transactionSynchronization);
} else if (this.annotation.fallbackExecution()) {
//.......
}
processEvent(event);
} else {
// 当前不存在事务什么也不做
}
}

上述 @TransactionalEventListener 本质上是一个 @EventListener,TransactionalEventListenerFactory类会将每一个扫描到的方法有TransactionalEventListener注解包装成ApplicationListenerMethodTransactionalAdapter对象,通过ApplicationListenerMethodTransactionalAdapter的onApplicationEvent方法可以看到若当前存在事务,就会生成TransactionSynchronization并加入到 TransactionSynchronizationManager的缓存ThreadLocal集合中,剩余流程同上述分析。

转载自:https://blog.csdn.net/zhuqiuhui/article/details/89299360