最大努力通知也是一种解决分布式事务的方案,下边是一个是充值的例子:
交互流程:
1、账户系统调用充值系统接口
2、充值系统完成支付处理向账户系统发起充值结果通知
若通知失败,则充值系统按策略进行重复通知
3、账户系统接收到充值结果通知修改充值状态。
4、账户系统未接收到通知会主动调用充值系统的接口查询充值结果。
通过上边的例子我们总结最大努力通知方案的目标:
目标:发起通知方通过一定的机制最大努力将业务处理结果通知到接收方。
具体包括:
1、有一定的消息重复通知机制。
因为接收通知方可能没有接收到通知,此时要有一定的机制对消息重复通知。
2、消息校对机制。
如果尽最大努力也没有通知到接收方,或者接收方消费消息后要再次消费,此时可由接收方主动向通知方查询消息
信息来满足需求。
最大努力通知与可靠消息一致性有什么不同?
1、解决方案思想不同
可靠消息一致性,发起通知方需要保证将消息发出去,并且将消息发到接收通知方,消息的可靠性关键由发起通知
方来保证。
最大努力通知,发起通知方尽最大的努力将业务处理结果通知为接收通知方,但是可能消息接收不到,此时需要接
收通知方主动调用发起通知方的接口查询业务处理结果,通知的可靠性关键在接收通知方。
2、两者的业务应用场景不同
可靠消息一致性关注的是交易过程的事务一致,以异步的方式完成交易。
最大努力通知关注的是交易后的通知事务,即将交易结果可靠的通知出去。
3、技术解决方向不同
可靠消息一致性要解决消息从发出到接收的一致性,即消息发出并且被接收到。
最大努力通知无法保证消息从发出到接收的一致性,只提供消息接收的可靠性机制。可靠机制是,最大努力的将消
息通知给接收方,当消息无法被接收方接收时,由接收方主动查询消息(业务处理结果)。
解决方案
通过对最大努力通知的理解,采用MQ的ack机制就可以实现最大努力通知。
方案1:
本方案是利用MQ的ack机制由MQ向接收通知方发送通知,流程如下:
1、发起通知方将通知发给MQ。
使用普通消息机制将通知发给MQ。
注意:如果消息没有发出去可由接收通知方主动请求发起通知方查询业务执行结果。(后边会讲)
2、接收通知方监听 MQ。
3、接收通知方接收消息,业务处理完成回应ack。
4、接收通知方若没有回应ack则MQ会重复通知。
MQ会按照间隔1min、5min、10min、30min、1h、2h、5h、10h的方式,逐步拉大通知间隔 (如果MQ采用
rocketMq,在broker中可进行配置),直到达到通知要求的时间窗口上限。
5、接收通知方可通过消息校对接口来校对消息的一致性。
方案2:
本方案也是利用MQ的ack机制,与方案1不同的是应用程序向接收通知方发送通知,如下图:
交互流程如下:
1、发起通知方将通知发给MQ。
使用可靠消息一致方案中的事务消息保证本地事务与消息的原子性,最终将通知先发给MQ。
2、通知程序监听 MQ,接收MQ的消息。
方案1中接收通知方直接监听MQ,方案2中由通知程序监听MQ。
通知程序若没有回应ack则MQ会重复通知。
3、通知程序通过互联网接口协议(如http、webservice)调用接收通知方案接口,完成通知。
通知程序调用接收通知方案接口成功就表示通知成功,即消费MQ消息成功,MQ将不再向通知程序投递通知消
息。
4、接收通知方可通过消息校对接口来校对消息的一致性。
方案1和方案2的不同点:
1、方案1中接收通知方与MQ接口,即接收通知方案监听 MQ,此方案主要应用与内部应用之间的通知。
2、方案2中由通知程序与MQ接口,通知程序监听MQ,收到MQ的消息后由通知程序通过互联网接口协议调用接收
通知方。此方案主要应用于外部应用之间的通知,例如支付宝、微信的支付结果通知。
RocketMQ实现最大努力通知型事务
业务说明
本实例通过RocketMq中间件实现最大努力通知型分布式事务,模拟充值过程。
本案例有账户系统和充值系统两个微服务,其中账户系统的数据库是bank1数据库,其中有张三账户。充值系统的
数据库使用bank1_pay数据库,记录了账户的充值记录。
业务流程如下图:
交互流程如下:
1、用户请求充值系统进行充值。
2、充值系统完成充值将充值结果发给MQ。
3、账户系统监听MQ,接收充值结果通知,如果接收不到消息,MQ会重复发送通知。接收到充值结果通知账户系
统增加充值金额。
4、账户系统也可以主动查询充值系统的充值结果查询接口,增加金额。
pay实现如下功能:
1、充值接口
2、充值完成要通知
3、充值结果查询接口
2)Dao
@Mapper @Component public interface AccountPayDao { @Insert("insert into account_pay(id,account_no,pay_amount,result) values(#{id},#{accountNo},#{payAmount},#{result})") int insertAccountPay(@Param("id") String id, @Param("accountNo") String accountNo, @Param("payAmount") Double pay_amount, @Param("result") String result); @Select("select id,account_no accountNo,pay_amount payAmount,result from account_pay where id=#{txNo}") AccountPay findByIdTxNo(@Param("txNo") String txNo); }
3)Service
@Service @Slf4j public class AccountPayServiceImpl implements AccountPayService { @Autowired AccountPayDao accountPayDao; @Autowired RocketMQTemplate rocketMQTemplate; //插入充值记录 @Override public AccountPay insertAccountPay(AccountPay accountPay) { int success = accountPayDao.insertAccountPay(accountPay.getId(), accountPay.getAccountNo(), accountPay.getPayAmount(), "success"); if(success>0){ //发送通知,使用普通消息发送通知 accountPay.setResult("success"); rocketMQTemplate.convertAndSend("topic_notifymsg",accountPay); return accountPay; } return null; } //查询充值记录,接收通知方调用此方法来查询充值结果 @Override public AccountPay getAccountPay(String txNo) { AccountPay accountPay = accountPayDao.findByIdTxNo(txNo); return accountPay; } }
4)Controller
@RestController public class AccountPayController { @Autowired AccountPayService accountPayService; //充值 @GetMapping(value = "/paydo") public AccountPay pay(AccountPay accountPay){ //生成事务编号 String txNo = UUID.randomUUID().toString(); accountPay.setId(txNo); return accountPayService.insertAccountPay(accountPay); } //查询充值结果 @GetMapping(value = "/payresult/{txNo}") public AccountPay payresult(@PathVariable("txNo") String txNo){ return accountPayService.getAccountPay(txNo); } }
bank1实现如下功能:
1、监听MQ,接收充值结果,根据充值结果完成账户金额修改。
2、主动查询充值系统,根据充值结果完成账户金额修改。
1)Dao
@Mapper @Component public interface AccountInfoDao { //修改账户金额 @Update("update account_info set account_balance=account_balance+#{amount} where account_no=#{accountNo}") int updateAccountBalance(@Param("accountNo") String accountNo, @Param("amount") Double amount); //查询幂等记录,用于幂等控制 @Select("select count(1) from de_duplication where tx_no = #{txNo}") int isExistTx(String txNo); //添加事务记录,用于幂等控制 @Insert("insert into de_duplication values(#{txNo},now());") int addTx(String txNo); }
2)AccountInfoService
@Service @Slf4j public class AccountInfoServiceImpl implements AccountInfoService { @Autowired AccountInfoDao accountInfoDao; @Autowired PayClient payClient; //更新账户金额 @Override @Transactional public void updateAccountBalance(AccountChangeEvent accountChange) { //幂等校验 if(accountInfoDao.isExistTx(accountChange.getTxNo())>0){ return ; } int i = accountInfoDao.updateAccountBalance(accountChange.getAccountNo(), accountChange.getAmount()); //插入事务记录,用于幂等控制 accountInfoDao.addTx(accountChange.getTxNo()); } //远程调用查询充值结果 @Override public AccountPay queryPayResult(String tx_no) { //远程调用 AccountPay payresult = payClient.payresult(tx_no); if("success".equals(payresult.getResult())){ //更新账户金额 AccountChangeEvent accountChangeEvent = new AccountChangeEvent(); accountChangeEvent.setAccountNo(payresult.getAccountNo());//账号 accountChangeEvent.setAmount(payresult.getPayAmount());//金额 accountChangeEvent.setTxNo(payresult.getId());//充值事务号 updateAccountBalance(accountChangeEvent); } return payresult; } }
@FeignClient(value = "dtx-notifymsg-demo-pay",fallback = PayFallback.class) public interface PayClient { //远程调用充值系统的接口查询充值结果 @GetMapping(value = "/pay/payresult/{txNo}") public AccountPay payresult(@PathVariable("txNo") String txNo); }
3)监听MQ
@Component @Slf4j @RocketMQMessageListener(topic = "topic_notifymsg",consumerGroup = "consumer_group_notifymsg_bank1") public class NotifyMsgListener implements RocketMQListener<AccountPay> { @Autowired AccountInfoService accountInfoService; //接收消息 @Override public void onMessage(AccountPay accountPay) { log.info("接收到消息:{}", JSON.toJSONString(accountPay)); if("success".equals(accountPay.getResult())){ //更新账户金额 AccountChangeEvent accountChangeEvent = new AccountChangeEvent(); accountChangeEvent.setAccountNo(accountPay.getAccountNo()); accountChangeEvent.setAmount(accountPay.getPayAmount()); accountChangeEvent.setTxNo(accountPay.getId()); accountInfoService.updateAccountBalance(accountChangeEvent); } log.info("处理消息完成:{}", JSON.toJSONString(accountPay)); } }
4)Controller
@RestController @Slf4j public class AccountInfoController { @Autowired private AccountInfoService accountInfoService; //主动查询充值结果 @GetMapping(value = "/payresult/{txNo}") public AccountPay result(@PathVariable("txNo") String txNo){ AccountPay accountPay = accountInfoService.queryPayResult(txNo); return accountPay; } }
测试场景
2020-03-12 18:54:21.442 DEBUG 8108 --- [io-9901-exec-10] c.t.m.d.AccountPayDao.insertAccountPay : ==> Preparing: insert into account_pay(id,account_no,pay_amount,result) values(?,?,?,?) 2020-03-12 18:54:21.442 DEBUG 8108 --- [io-9901-exec-10] c.t.m.d.AccountPayDao.insertAccountPay : ==> Parameters: 4db8776f-fc55-4190-b014-7caa21b77ec0(String), 1(String), 2000.0(Double), success(String) 2020-03-12 18:54:21.455 DEBUG 8108 --- [io-9901-exec-10] c.t.m.d.AccountPayDao.insertAccountPay : <== Updates: 1
2020-03-12 18:54:21.464 INFO 20964 --- [MessageThread_3] c.topcheer.eq.message.NotifyMsgListener : 接收到消息:{"accountNo":"1","id":"4db8776f-fc55-4190-b014-7caa21b77ec0","payAmount":2000.0,"result":"success"} 2020-03-12 18:54:21.466 DEBUG 20964 --- [MessageThread_3] c.t.eq.dao.AccountInfoDao.isExistTx : ==> Preparing: select count(1) from de_duplication where tx_no = ? 2020-03-12 18:54:21.466 DEBUG 20964 --- [MessageThread_3] c.t.eq.dao.AccountInfoDao.isExistTx : ==> Parameters: 4db8776f-fc55-4190-b014-7caa21b77ec0(String) 2020-03-12 18:54:21.467 DEBUG 20964 --- [MessageThread_3] c.t.eq.dao.AccountInfoDao.isExistTx : <== Total: 1 2020-03-12 18:54:21.468 DEBUG 20964 --- [MessageThread_3] c.t.e.d.A.updateAccountBalance : ==> Preparing: update account_info set account_balance=account_balance+? where account_no=? 2020-03-12 18:54:21.468 DEBUG 20964 --- [MessageThread_3] c.t.e.d.A.updateAccountBalance : ==> Parameters: 2000.0(Double), 1(String) 2020-03-12 18:54:21.470 DEBUG 20964 --- [MessageThread_3] c.t.e.d.A.updateAccountBalance : <== Updates: 1 2020-03-12 18:54:21.470 DEBUG 20964 --- [MessageThread_3] c.topcheer.eq.dao.AccountInfoDao.addTx : ==> Preparing: insert into de_duplication values(?,now()); 2020-03-12 18:54:21.470 DEBUG 20964 --- [MessageThread_3] c.topcheer.eq.dao.AccountInfoDao.addTx : ==> Parameters: 4db8776f-fc55-4190-b014-7caa21b77ec0(String) 2020-03-12 18:54:21.473 DEBUG 20964 --- [MessageThread_3] c.topcheer.eq.dao.AccountInfoDao.addTx : <== Updates: 1 2020-03-12 18:54:21.486 INFO 20964 --- [MessageThread_3] c.topcheer.eq.message.NotifyMsgListener : 处理消息完成:{"accountNo":"1","id":"4db8776f-fc55-4190-b014-7caa21b77ec0","payAmount":2000.0,"result":"success"}