zoukankan      html  css  js  c++  java
  • 分布式事务九_基于可靠消息的最终一致性代码-copy

    一、消息服务子系统
    1、表结构
    DROP TABLE IF EXISTS `rp_transaction_message`;
    CREATE TABLE `rp_transaction_message` (
    `id` varchar(50) NOT NULL DEFAULT '' COMMENT '主键ID',
    `version` int(11) NOT NULL DEFAULT '0' COMMENT '版本号',
    `editor` varchar(100) DEFAULT NULL COMMENT '修改者',
    `creater` varchar(100) DEFAULT NULL COMMENT '创建者',
    `edit_time` datetime DEFAULT NULL COMMENT '最后修改时间',
    `create_time` datetime NOT NULL DEFAULT '0000-00-00 00:00:00' COMMENT '创建时间',
    `message_id` varchar(50) NOT NULL DEFAULT '' COMMENT '消息ID',
    `message_body` longtext NOT NULL COMMENT '消息内容',
    `message_data_type` varchar(50) DEFAULT NULL COMMENT '消息数据类型',
    `consumer_queue` varchar(100) NOT NULL DEFAULT '' COMMENT '消费队列',
    `message_send_times` smallint(6) NOT NULL DEFAULT '0' COMMENT '消息重发次数',
    `areadly_dead` varchar(20) NOT NULL DEFAULT '' COMMENT '是否死亡',
    `status` varchar(20) NOT NULL DEFAULT '' COMMENT '状态',
    `remark` varchar(200) DEFAULT NULL COMMENT '备注',
    `field1` varchar(200) DEFAULT NULL COMMENT '扩展字段1',
    `field2` varchar(200) DEFAULT NULL COMMENT '扩展字段2',
    `field3` varchar(200) DEFAULT NULL COMMENT '扩展字段3',
    PRIMARY KEY (`id`),
    KEY `AK_Key_2` (`message_id`)
    ) ENGINE=InnoDB DEFAULT CHARSET=utf8;
    2、RpTransactionMessageService.java
    public interface RpTransactionMessageService {

    /**
    * 预存储消息.
    */
    public int saveMessageWaitingConfirm(RpTransactionMessage rpTransactionMessage) throws MessageBizException;


    /**
    * 确认并发送消息.
    */
    public void confirmAndSendMessage(String messageId) throws MessageBizException;


    /**
    * 存储并发送消息.
    */
    public int saveAndSendMessage(RpTransactionMessage rpTransactionMessage) throws MessageBizException;


    /**
    * 直接发送消息.
    */
    public void directSendMessage(RpTransactionMessage rpTransactionMessage) throws MessageBizException;


    /**
    * 重发消息.
    */
    public void reSendMessage(RpTransactionMessage rpTransactionMessage) throws MessageBizException;


    /**
    * 根据messageId重发某条消息.
    */
    public void reSendMessageByMessageId(String messageId) throws MessageBizException;


    /**
    * 将消息标记为死亡消息.
    */
    public void setMessageToAreadlyDead(String messageId) throws MessageBizException;


    /**
    * 根据消息ID获取消息
    */
    public RpTransactionMessage getMessageByMessageId(String messageId) throws MessageBizException;

    /**
    * 根据消息ID删除消息
    */
    public void deleteMessageByMessageId(String messageId) throws MessageBizException;


    /**
    * 重发某个消息队列中的全部已死亡的消息.
    */
    public void reSendAllDeadMessageByQueueName(String queueName, int batchSize) throws MessageBizException;

    /**
    * 获取分页数据
    */
    PageBean listPage(PageParam pageParam, Map<String, Object> paramMap) throws MessageBizException;


    }
    3、RpTransactionMessageServiceImpl
    @Service("rpTransactionMessageService")
    public class RpTransactionMessageServiceImpl implements RpTransactionMessageService {

    private static final Log log = LogFactory.getLog(RpTransactionMessageServiceImpl.class);

    @Autowired
    private RpTransactionMessageDao rpTransactionMessageDao;

    @Autowired
    private JmsTemplate notifyJmsTemplate;

    public int saveMessageWaitingConfirm(RpTransactionMessage message) {

    if (message == null) {
    throw new MessageBizException(MessageBizException.SAVA_MESSAGE_IS_NULL, "保存的消息为空");
    }

    if (StringUtil.isEmpty(message.getConsumerQueue())) {
    throw new MessageBizException(MessageBizException.MESSAGE_CONSUMER_QUEUE_IS_NULL, "消息的消费队列不能为空 ");
    }

    message.setEditTime(new Date());
    message.setStatus(MessageStatusEnum.WAITING_CONFIRM.name());
    message.setAreadlyDead(PublicEnum.NO.name());
    message.setMessageSendTimes(0);
    return rpTransactionMessageDao.insert(message);
    }


    public void confirmAndSendMessage(String messageId) {
    final RpTransactionMessage message = getMessageByMessageId(messageId);
    if (message == null) {
    throw new MessageBizException(MessageBizException.SAVA_MESSAGE_IS_NULL, "根据消息id查找的消息为空");
    }

    message.setStatus(MessageStatusEnum.SENDING.name());
    message.setEditTime(new Date());
    rpTransactionMessageDao.update(message);

    notifyJmsTemplate.setDefaultDestinationName(message.getConsumerQueue());
    notifyJmsTemplate.send(new MessageCreator() {
    public Message createMessage(Session session) throws JMSException {
    return session.createTextMessage(message.getMessageBody());
    }
    });
    }

    public int saveAndSendMessage(final RpTransactionMessage message) {

    if (message == null) {
    throw new MessageBizException(MessageBizException.SAVA_MESSAGE_IS_NULL, "保存的消息为空");
    }

    if (StringUtil.isEmpty(message.getConsumerQueue())) {
    throw new MessageBizException(MessageBizException.MESSAGE_CONSUMER_QUEUE_IS_NULL, "消息的消费队列不能为空 ");
    }

    message.setStatus(MessageStatusEnum.SENDING.name());
    message.setAreadlyDead(PublicEnum.NO.name());
    message.setMessageSendTimes(0);
    message.setEditTime(new Date());
    int result = rpTransactionMessageDao.insert(message);

    notifyJmsTemplate.setDefaultDestinationName(message.getConsumerQueue());
    notifyJmsTemplate.send(new MessageCreator() {
    public Message createMessage(Session session) throws JMSException {
    return session.createTextMessage(message.getMessageBody());
    }
    });

    return result;
    }


    public void directSendMessage(final RpTransactionMessage message) {

    if (message == null) {
    throw new MessageBizException(MessageBizException.SAVA_MESSAGE_IS_NULL, "保存的消息为空");
    }

    if (StringUtil.isEmpty(message.getConsumerQueue())) {
    throw new MessageBizException(MessageBizException.MESSAGE_CONSUMER_QUEUE_IS_NULL, "消息的消费队列不能为空 ");
    }

    notifyJmsTemplate.setDefaultDestinationName(message.getConsumerQueue());
    notifyJmsTemplate.send(new MessageCreator() {
    public Message createMessage(Session session) throws JMSException {
    return session.createTextMessage(message.getMessageBody());
    }
    });
    }


    public void reSendMessage(final RpTransactionMessage message) {

    if (message == null) {
    throw new MessageBizException(MessageBizException.SAVA_MESSAGE_IS_NULL, "保存的消息为空");
    }

    if (StringUtil.isEmpty(message.getConsumerQueue())) {
    throw new MessageBizException(MessageBizException.MESSAGE_CONSUMER_QUEUE_IS_NULL, "消息的消费队列不能为空 ");
    }

    message.addSendTimes();
    message.setEditTime(new Date());
    rpTransactionMessageDao.update(message);

    notifyJmsTemplate.setDefaultDestinationName(message.getConsumerQueue());
    notifyJmsTemplate.send(new MessageCreator() {
    public Message createMessage(Session session) throws JMSException {
    return session.createTextMessage(message.getMessageBody());
    }
    });
    }

    public void reSendMessageByMessageId(String messageId) {
    final RpTransactionMessage message = getMessageByMessageId(messageId);
    if (message == null) {
    throw new MessageBizException(MessageBizException.SAVA_MESSAGE_IS_NULL, "根据消息id查找的消息为空");
    }

    int maxTimes = Integer.valueOf(PublicConfigUtil.readConfig("message.max.send.times"));
    if (message.getMessageSendTimes() >= maxTimes) {
    message.setAreadlyDead(PublicEnum.YES.name());
    }

    message.setEditTime(new Date());
    message.setMessageSendTimes(message.getMessageSendTimes() + 1);
    rpTransactionMessageDao.update(message);

    notifyJmsTemplate.setDefaultDestinationName(message.getConsumerQueue());
    notifyJmsTemplate.send(new MessageCreator() {
    public Message createMessage(Session session) throws JMSException {
    return session.createTextMessage(message.getMessageBody());
    }
    });
    }


    public void setMessageToAreadlyDead(String messageId) {
    RpTransactionMessage message = getMessageByMessageId(messageId);
    if (message == null) {
    throw new MessageBizException(MessageBizException.SAVA_MESSAGE_IS_NULL, "根据消息id查找的消息为空");
    }

    message.setAreadlyDead(PublicEnum.YES.name());
    message.setEditTime(new Date());
    rpTransactionMessageDao.update(message);
    }


    public RpTransactionMessage getMessageByMessageId(String messageId) {

    Map<String, Object> paramMap = new HashMap<String, Object>();
    paramMap.put("messageId", messageId);

    return rpTransactionMessageDao.getBy(paramMap);
    }


    public void deleteMessageByMessageId(String messageId) {
    Map<String, Object> paramMap = new HashMap<String, Object>();
    paramMap.put("messageId", messageId);
    rpTransactionMessageDao.delete(paramMap);
    }


    @SuppressWarnings("unchecked")
    public void reSendAllDeadMessageByQueueName(String queueName, int batchSize) {
    log.info("==>reSendAllDeadMessageByQueueName");

    int numPerPage = 1000;
    if (batchSize > 0 && batchSize < 100){
    numPerPage = 100;
    } else if (batchSize > 100 && batchSize < 5000){
    numPerPage = batchSize;
    } else if (batchSize > 5000){
    numPerPage = 5000;
    } else {
    numPerPage = 1000;
    }

    int pageNum = 1;
    Map<String, Object> paramMap = new HashMap<String, Object>();
    paramMap.put("consumerQueue", queueName);
    paramMap.put("areadlyDead", PublicEnum.YES.name());
    paramMap.put("listPageSortType", "ASC");


    Map<String, RpTransactionMessage> messageMap = new HashMap<String, RpTransactionMessage>();
    List<Object> recordList = new ArrayList<Object>();
    int pageCount = 1;

    PageBean pageBean = rpTransactionMessageDao.listPage(new PageParam(pageNum, numPerPage), paramMap);
    recordList = pageBean.getRecordList();
    if (recordList == null || recordList.isEmpty()) {
    log.info("==>recordList is empty");
    return;
    }
    pageCount = pageBean.getTotalPage();
    for (final Object obj : recordList) {
    final RpTransactionMessage message = (RpTransactionMessage) obj;
    messageMap.put(message.getMessageId(), message);
    }

    for (pageNum = 2; pageNum <= pageCount; pageNum++) {
    pageBean = rpTransactionMessageDao.listPage(new PageParam(pageNum, numPerPage), paramMap);
    recordList = pageBean.getRecordList();

    if (recordList == null || recordList.isEmpty()) {
    break;
    }

    for (final Object obj : recordList) {
    final RpTransactionMessage message = (RpTransactionMessage) obj;
    messageMap.put(message.getMessageId(), message);
    }
    }

    recordList = null;
    pageBean = null;

    for (Map.Entry<String, RpTransactionMessage> entry : messageMap.entrySet()) {
    final RpTransactionMessage message = entry.getValue();

    message.setEditTime(new Date());
    message.setMessageSendTimes(message.getMessageSendTimes() + 1);
    rpTransactionMessageDao.update(message);

    notifyJmsTemplate.setDefaultDestinationName(message.getConsumerQueue());
    notifyJmsTemplate.send(new MessageCreator() {
    public Message createMessage(Session session) throws JMSException {
    return session.createTextMessage(message.getMessageBody());
    }
    });
    }

    }


    @SuppressWarnings("unchecked")
    public PageBean<RpTransactionMessage> listPage(PageParam pageParam, Map<String, Object> paramMap){
    return rpTransactionMessageDao.listPage(pageParam, paramMap);
    }


    }
    二、消息状态确认子系统 与 消息恢复子系统
    @Component("messageBiz")
    public class MessageBiz {

    private static final Log log = LogFactory.getLog(MessageBiz.class);

    @Autowired
    private RpTradePaymentQueryService rpTradePaymentQueryService;
    @Autowired
    private RpTransactionMessageService rpTransactionMessageService;

    /**
    * 处理[waiting_confirm]状态的消息
    *
    * @param messages
    */
    public void handleWaitingConfirmTimeOutMessages(Map<String, RpTransactionMessage> messageMap) {
    log.debug("开始处理[waiting_confirm]状态的消息,总条数[" + messageMap.size() + "]");
    // 单条消息处理(目前该状态的消息,消费队列全部是accounting,如果后期有业务扩充,需做队列判断,做对应的业务处理。)
    for (Map.Entry<String, RpTransactionMessage> entry : messageMap.entrySet()) {
    RpTransactionMessage message = entry.getValue();
    try {

    log.debug("开始处理[waiting_confirm]消息ID为[" + message.getMessageId() + "]的消息");
    String bankOrderNo = message.getField1();
    RpTradePaymentRecord record = rpTradePaymentQueryService.getRecordByBankOrderNo(bankOrderNo);
    // 如果订单成功,把消息改为待处理,并发送消息
    if (TradeStatusEnum.SUCCESS.name().equals(record.getStatus())) {
    // 确认并发送消息
    rpTransactionMessageService.confirmAndSendMessage(message.getMessageId());

    } else if (TradeStatusEnum.WAITING_PAYMENT.name().equals(record.getStatus())) {
    // 订单状态是等到支付,可以直接删除数据
    log.debug("订单没有支付成功,删除[waiting_confirm]消息id[" + message.getMessageId() + "]的消息");
    rpTransactionMessageService.deleteMessageByMessageId(message.getMessageId());
    }

    log.debug("结束处理[waiting_confirm]消息ID为[" + message.getMessageId() + "]的消息");
    } catch (Exception e) {
    log.error("处理[waiting_confirm]消息ID为[" + message.getMessageId() + "]的消息异常:", e);
    }
    }
    }

    /**
    * 处理[SENDING]状态的消息
    *
    * @param messages
    */
    public void handleSendingTimeOutMessage(Map<String, RpTransactionMessage> messageMap) {
    SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
    log.debug("开始处理[SENDING]状态的消息,总条数[" + messageMap.size() + "]");

    // 根据配置获取通知间隔时间
    Map<Integer, Integer> notifyParam = getSendTime();

    // 单条消息处理
    for (Map.Entry<String, RpTransactionMessage> entry : messageMap.entrySet()) {
    RpTransactionMessage message = entry.getValue();
    try {
    log.debug("开始处理[SENDING]消息ID为[" + message.getMessageId() + "]的消息");
    // 判断发送次数
    int maxTimes = Integer.valueOf(PublicConfigUtil.readConfig("message.max.send.times"));
    log.debug("[SENDING]消息ID为[" + message.getMessageId() + "]的消息,已经重新发送的次数[" + message.getMessageSendTimes() + "]");

    // 如果超过最大发送次数直接退出
    if (maxTimes < message.getMessageSendTimes()) {
    // 标记为死亡
    rpTransactionMessageService.setMessageToAreadlyDead(message.getMessageId());
    continue;
    }
    // 判断是否达到发送消息的时间间隔条件
    int reSendTimes = message.getMessageSendTimes();
    int times = notifyParam.get(reSendTimes == 0 ? 1 : reSendTimes);
    long currentTimeInMillis = Calendar.getInstance().getTimeInMillis();
    long needTime = currentTimeInMillis - times * 60 * 1000;
    long hasTime = message.getEditTime().getTime();
    // 判断是否达到了可以再次发送的时间条件
    if (hasTime > needTime) {
    log.debug("currentTime[" + sdf.format(new Date()) + "],[SENDING]消息上次发送时间[" + sdf.format(message.getEditTime()) + "],必须过了[" + times + "]分钟才可以再发送。");
    continue;
    }

    // 重新发送消息
    rpTransactionMessageService.reSendMessage(message);

    log.debug("结束处理[SENDING]消息ID为[" + message.getMessageId() + "]的消息");
    } catch (Exception e) {
    log.error("处理[SENDING]消息ID为[" + message.getMessageId() + "]的消息异常:", e);
    }
    }

    }

    /**
    * 根据配置获取通知间隔时间
    *
    * @return
    */
    private Map<Integer, Integer> getSendTime() {
    Map<Integer, Integer> notifyParam = new HashMap<Integer, Integer>();
    notifyParam.put(1, Integer.valueOf(PublicConfigUtil.readConfig("message.send.1.time")));
    notifyParam.put(2, Integer.valueOf(PublicConfigUtil.readConfig("message.send.2.time")));
    notifyParam.put(3, Integer.valueOf(PublicConfigUtil.readConfig("message.send.3.time")));
    notifyParam.put(4, Integer.valueOf(PublicConfigUtil.readConfig("message.send.4.time")));
    notifyParam.put(5, Integer.valueOf(PublicConfigUtil.readConfig("message.send.5.time")));
    return notifyParam;
    }
    }
    三、实时消息子系统
    public class AccountingMessageListener implements SessionAwareMessageListener<Message> {

    private static final Log LOG = LogFactory.getLog(AccountingMessageListener.class);

    /**
    * 会计队列模板(由Spring创建并注入进来)
    */
    @Autowired
    private JmsTemplate notifyJmsTemplate;
    @Autowired
    private RpAccountingVoucherService rpAccountingVoucherService;
    @Autowired
    private RpTransactionMessageService rpTransactionMessageService;


    public synchronized void onMessage(Message message, Session session) {

    RpAccountingVoucher param = null;
    String strMessage = null;

    try {
    ActiveMQTextMessage objectMessage = (ActiveMQTextMessage) message;
    strMessage = objectMessage.getText();
    LOG.info("strMessage1 accounting:" + strMessage);
    param = JSONObject.parseObject(strMessage, RpAccountingVoucher.class); // 这里转换成相应的对象还有问题
    if (param == null) {
    LOG.info("param参数为空");
    return;
    }

    int entryType = param.getEntryType();
    double payerChangeAmount = param.getPayerChangeAmount();
    String voucherNo = param.getVoucherNo();
    String payerAccountNo = param.getPayerAccountNo();
    int fromSystem = param.getFromSystem();
    int payerAccountType = 0;
    if (param.getPayerAccountType() != null && !param.getPayerAccountType().equals("")) {
    payerAccountType = param.getPayerAccountType();
    }
    double payerFee = param.getPayerFee();
    String requestNo = param.getRequestNo();

    double bankChangeAmount = param.getBankChangeAmount();
    double receiverChangeAmount = param.getReceiverChangeAmount();
    String receiverAccountNo = param.getReceiverAccountNo();
    String bankAccount = param.getBankAccount();
    String bankChannelCode = param.getBankChannelCode();
    double profit = param.getProfit();
    double income = param.getIncome();
    double cost = param.getCost();

    String bankOrderNo = param.getBankOrderNo();
    int receiverAccountType = 0;
    double payAmount = param.getPayAmount();
    if (param.getReceiverAccountType() != null && !param.getReceiverAccountType().equals("")) {
    receiverAccountType = param.getReceiverAccountType();
    }

    double receiverFee = param.getReceiverFee();
    String remark = param.getRemark();

    rpAccountingVoucherService.createAccountingVoucher(entryType, voucherNo, payerAccountNo, receiverAccountNo, payerChangeAmount,
    receiverChangeAmount, income, cost, profit, bankChangeAmount, requestNo, bankChannelCode, bankAccount, fromSystem, remark, bankOrderNo,
    payerAccountType, payAmount, receiverAccountType, payerFee, receiverFee);

    //删除消息
    rpTransactionMessageService.deleteMessageByMessageId(param.getMessageId());

    } catch (BizException e) {
    // 业务异常,不再写会队列
    LOG.error("==>BizException", e);
    } catch (Exception e) {
    // 不明异常不再写会队列
    LOG.error("==>Exception", e);
    }
    }


    public JmsTemplate getNotifyJmsTemplate() {
    return notifyJmsTemplate;
    }


    public void setNotifyJmsTemplate(JmsTemplate notifyJmsTemplate) {
    this.notifyJmsTemplate = notifyJmsTemplate;
    }


    public RpAccountingVoucherService getRpAccountingVoucherService() {
    return rpAccountingVoucherService;
    }


    public void setRpAccountingVoucherService(RpAccountingVoucherService rpAccountingVoucherService) {
    this.rpAccountingVoucherService = rpAccountingVoucherService;
    }
    }
    ————————————————
    版权声明:本文为CSDN博主「chenshiying007」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。
    原文链接:https://blog.csdn.net/qq_27384769/article/details/79313659

  • 相关阅读:
    python操作excel文件一(xlrd读取文件)
    pytest 1.简单介绍一,安装和如何运行
    request鉴权的处理和判断
    Struts2 easy UI插件
    Struts2 JQuery UI常用插件
    Struts2 JSON
    Struts2 使用jQuery实现Ajax
    Struts2 Ajax校验
    oracle连接方式、创建数据库用户、忘记数据库密码、用户锁定
    第二次考试:错题总结
  • 原文地址:https://www.cnblogs.com/hanease/p/14471432.html
Copyright © 2011-2022 走看看