前言碎语
楼主之前推荐过2pc的分布式事务框架LCN。今天来详细聊聊TCC事务协议。
2pc实现:https://github.com/codingapi/tx-lcn
tcc实现:https://github.com/yu199195/hmily
首先我们了解下什么是tcc,如下图
tcc分布式事务协议控制整体业务事务分为三个阶段。
try:执行业务逻辑
confirm:确定业务逻辑执行无误后,确定业务逻辑执行完成
cancel:假如try阶段有问题,执行cancel阶段逻辑,取消try阶段的数据
这就需要我们在设计业务时,在try阶段多想想业务处理的折中状态,比如,处理中,支付中,进行中等,在confirm阶段变更为处理完成,或者在cancel阶段变更为处理失败。
下面以电商下单为例子:.
假设我们有一个电商下单的业务,有三个服务组成,订单服务处理下单逻辑,库存服务处理减库存逻辑,支付服务处理减账户余额逻辑。在下单服务里先后调用减库存和减余额的方法。如果使用tcc分布式事务来协调事务,我们服务就要做如下设计:
订单服务:
- try:支付状态设置为支付中
- confirm:设置为支付完成
- cancel:设置为支付失败
库存服务:
多加一个锁定库存的字段记录,用于记录业务处理中状态
- try:总库存-1,锁定库存+1
- confirm:锁定库存-1
- cancel:总库存+1,锁定库存-1
支付服务:
多加一个冻结金额的字段记录,用于记录业务处理中状态
- try:余额-1,冻结金额+1
- confirm:冻结金额-1
- cancel:余额+1,冻结金额-1
tcc分布式事务在这里起到了一个事务协调者的角色。真实业务只需要调用try阶段的方法。confirm和cancel阶段的额方法由tcc框架来帮我们调用完成最终业务逻辑。下面我们假设如下三个场景的业务情况,看tcc如何协调业务最终一致的。
- 服务一切正常:所有服务的try方法执行后都没有问题,库存足够,余额足够。tcc事务协调器会触发订单服务的confirm方法,将订单更新为支付完成,触发库存服务的confirm方法锁定库存-1,触发支付服务的confirm方法冻结金额-1
- 库存服务故障,无法调通:这个时候订单已经生成,状态为待支付。当调用库存超时抛异常后,tcc事务协调器会触发订单服务的cancel方法将订单状态更新为支付失败。
- 支付服务故障,无法调通:这个时候订单已经生成,状态为待支付,总库存-1,锁定库存+1了。当调用支付服务超时抛异常时,tcc事务协调器会触发订单服务的cancel方法将订单状态更新为支付失败,触发库存服务的cancel方法将库存+1,锁定库存-1。
hmily事务框架怎么做的?
通过上面对tcc事务协议说明大家应该都了解了tcc的处理协调机制,下面我们来看看hmily是怎么做到的,我们以接入支持dubbo服务为例。
概要:首先最基础两个应用点是aop和dubbo的filter机制,其次针对一组事务,定义了启动事务处理器,参与事务处理器去协调处理不同的事务单元。外加一个disruptor+ScheduledService处理事务日志,补偿处理失败的事务。
hmily框架以@Hmily注解为切入点,定义了一个环绕织入的切面,注解必填两个参数confirmMethod和cancelMethod,也就是tcc协调的两个阶段方法。在需要tcc事务的方法上面加上这个注解,也就托管了tcc三个阶段的处理流程。下面是aspect切面的抽象类,不同的RPC框架支持会有不同的实现 。其中真正处理业务逻辑需要实现HmilyTransactionInterceptor接口
@Aspect
public abstract class AbstractHmilyTransactionAspect {
private HmilyTransactionInterceptor hmilyTransactionInterceptor;
protected void setHmilyTransactionInterceptor(final HmilyTransactionInterceptor hmilyTransactionInterceptor) {
this.hmilyTransactionInterceptor = hmilyTransactionInterceptor;
}
/**
* this is point cut with {@linkplain Hmily }.
*/
@Pointcut("@annotation(org.dromara.hmily.annotation.Hmily)")
public void hmilyInterceptor() {
}
/**
* this is around in {@linkplain Hmily }.
* @param proceedingJoinPoint proceedingJoinPoint
* @return Object
* @throws Throwable Throwable
*/
@Around("hmilyInterceptor()")
public Object interceptTccMethod(final ProceedingJoinPoint proceedingJoinPoint) throws Throwable {
return hmilyTransactionInterceptor.interceptor(proceedingJoinPoint);
}
/**
* spring Order.
*
* @return int
*/
public abstract int getOrder();
}
dubbo的aspect抽象实现
@Aspect
@Component
public class DubboHmilyTransactionAspect extends AbstractHmilyTransactionAspect implements Ordered {
@Autowired
public DubboHmilyTransactionAspect(final DubboHmilyTransactionInterceptor dubboHmilyTransactionInterceptor) {
super.setHmilyTransactionInterceptor(dubboHmilyTransactionInterceptor);
}
@Override
public int getOrder() {
return Ordered.HIGHEST_PRECEDENCE;
}
}
dubbo的HmilyTransactionInterceptor实现
@Component
public class DubboHmilyTransactionInterceptor implements HmilyTransactionInterceptor {
private final HmilyTransactionAspectService hmilyTransactionAspectService;
@Autowired
public DubboHmilyTransactionInterceptor(final HmilyTransactionAspectService hmilyTransactionAspectService) {
this.hmilyTransactionAspectService = hmilyTransactionAspectService;
}
@Override
public Object interceptor(final ProceedingJoinPoint pjp) throws Throwable {
final String context = RpcContext.getContext().getAttachment(CommonConstant.HMILY_TRANSACTION_CONTEXT);
HmilyTransactionContext hmilyTransactionContext;
//判断dubbo上下文中是否携带了tcc事务,如果有就取出反序列化为事务上下文对象
if (StringUtils.isNoneBlank(context)) {
hmilyTransactionContext = GsonUtils.getInstance().fromJson(context, HmilyTransactionContext.class);
RpcContext.getContext().getAttachments().remove(CommonConstant.HMILY_TRANSACTION_CONTEXT);
} else {
//如果dubbo上下文中没有,就从当前上下文中获取。如果是事务发起者,这里其实也获取不到事务
hmilyTransactionContext = HmilyTransactionContextLocal.getInstance().get();
}
return hmilyTransactionAspectService.invoke(hmilyTransactionContext, pjp);
}
}
这里主要判断了dubbo上下文中是否携带了tcc事务。如果没有就从当前线程上下文中获取,如果是事务的发起者,这里其实获取不到事务上下文对象的。在invoke里有个获取事务处理器的逻辑,如果事务上下文入参 为null,那么获取到的就是启动事务处理器。启动事务处理器处理逻辑如下
public Object handler(final ProceedingJoinPoint point, final HmilyTransactionContext context)
throws Throwable {
System.err.println("StarterHmilyTransactionHandler");
Object returnValue;
try {
HmilyTransaction hmilyTransaction = hmilyTransactionExecutor.begin(point);
try {
//execute try
returnValue = point.proceed();
hmilyTransaction.setStatus(HmilyActionEnum.TRYING.getCode());
hmilyTransactionExecutor.updateStatus(hmilyTransaction);
} catch (Throwable throwable) {
//if exception ,execute cancel
final HmilyTransaction currentTransaction = hmilyTransactionExecutor.getCurrentTransaction();
executor.execute(() -> hmilyTransactionExecutor
.cancel(currentTransaction));
throw throwable;
}
//execute confirm
final HmilyTransaction currentTransaction = hmilyTransactionExecutor.getCurrentTransaction();
executor.execute(() -> hmilyTransactionExecutor.confirm(currentTransaction));
} finally {
HmilyTransactionContextLocal.getInstance().remove();
hmilyTransactionExecutor.remove();
}
return returnValue;
}
真正业务处理方法,point.proceed();被try,catch包起来了,如果try里面的方法出现异常,就会走hmilyTransactionExecutor.cancel(currentTransaction)的逻辑,如果成功,就走hmilyTransactionExecutor.confirm(currentTransaction)逻辑。其中cancel和confirm里都有协调参与者事务的处理逻辑,以confirm逻辑为例。
public void confirm(final HmilyTransaction currentTransaction) throws HmilyRuntimeException {
LogUtil.debug(LOGGER, () -> "tcc confirm .......!start");
if (Objects.isNull(currentTransaction) || CollectionUtils.isEmpty(currentTransaction.getHmilyParticipants())) {
return;
}
currentTransaction.setStatus(HmilyActionEnum.CONFIRMING.getCode());
updateStatus(currentTransaction);
final ListhmilyParticipants = currentTransaction.getHmilyParticipants();
ListfailList = Lists.newArrayListWithCapacity(hmilyParticipants.size());
boolean success = true;
if (CollectionUtils.isNotEmpty(hmilyParticipants)) {
for (HmilyParticipant hmilyParticipant : hmilyParticipants) {
try {
HmilyTransactionContext context = new HmilyTransactionContext();
context.setAction(HmilyActionEnum.CONFIRMING.getCode());
context.setRole(HmilyRoleEnum.START.getCode());
context.setTransId(hmilyParticipant.getTransId());
HmilyTransactionContextLocal.getInstance().set(context);
executeParticipantMethod(hmilyParticipant.getConfirmHmilyInvocation());
} catch (Exception e) {
LogUtil.error(LOGGER, "execute confirm :{}", () -> e);
success = false;
failList.add(hmilyParticipant);
} finally {
HmilyTransactionContextLocal.getInstance().remove();
}
}
executeHandler(success, currentTransaction, failList);