zoukankan      html  css  js  c++  java
  • 解决事物提交与消息发送顺序问题

    最近在线上发现了一个问题,mq的监听时常会报消息不存在的异常,关键代码如下:

    public void sendMessage(MessageData message) throws Exception {
            if (message == null) return;
    
            // 持久化消息 ①
            String messageId = rpTransactionControlService.createTransactionControl(message.getMessageBody(), message.getMessageType(), message.getQueue(), delayTime, message.getField1(), message.getField2(), message.getField3());
            log.info("create transaction control messageid = " + messageId);
            message.setMessageId(messageId);
    
            // 发送消息 ②
            ActiveMQClientPool.getInstance().sendMsg(sendMessage, message.getQueue(), message.getDelayTime() * 1000);
    
    }

    导致的原因就是 ②已经消息发送了,但是①还没有事物提交,就导致了问题。

    解决办法 1、 增加延迟发送 。

                    2、 增加事物监听。

    针对1方法,如果是activemq,有一个需要注意的地方, 需要修改activemq.xml  

    <broker xmlns="http://activemq.apache.org/schema/core" brokerName="localhost" dataDirectory="${activemq.data}" schedulerSupport="true">

     即,增加 schedulerSupport="true" 参数

    针对2方法, 需要先创建一个 TransactionalMessageListener 类

    @Component
    @Slf4j
    public class TransactionalMessageListener {
    
        @TransactionalEventListener(fallbackExecution = true, phase = TransactionPhase.AFTER_COMMIT)
        public void afterCommit(EventMessage eventMessage) throws Exception {
    
            if (eventMessage == null) return;
    
            if (eventMessage.getEventType() == null) throw new NullPointerException("eventType");
    
            if (eventMessage.getEventType() == EventType.ACTIVE_MQ) { // 防止事物已经还没有提交,mq监听器就已经收到了消息
    
                if (eventMessage.getData() instanceof MessageData) {
                    MessageData message = (MessageData) eventMessage.getData();
                    String sendMessage = JSONObject.toJSONString(message);
                    log.info("push msg to activeMq, context :{ " + sendMessage + "}");
                    try {
                        ActiveMQClientPool.getInstance().sendMsg(sendMessage, message.getQueue(), message.getDelayTime() * 1000);
                    } catch (Exception e) {
                        log.error("push msg error to activeMq:{" + sendMessage + "}", e);
                    }
                }
    
            }
        }
    
    }
    public class EventMessage<T> {
    
        public EventMessage(EventType eventType, T data) {
            this.eventType = eventType;
            this.data = data;
        }
    
        private EventType eventType;
    
        private T data;
    
        public EventType getEventType() {
            return eventType;
        }
    
        public void setEventType(EventType eventType) {
            this.eventType = eventType;
        }
    
        public T getData() {
            return data;
        }
    
        public void setData(T data) {
            this.data = data;
        }
    }

    然后修改原来逻辑如下

    
    
    @Autowired
    ApplicationEventPublisher publisher;
    public void sendMessage(MessageData message) throws Exception {
            if (message == null) return;
    
            String messageId = rpTransactionControlService.createTransactionControl(message.getMessageBody(), message.getMessageType(), message.getQueue(), delayTime, message.getField1(), message.getField2(), message.getField3());
            log.info("create transaction control messageid = " + messageId);
            message.setMessageId(messageId);
    
    publisher.publishEvent(new EventMessage<MessageData>(EventType.MQ, message));
     }

    这里我需要对这个进行解释一下:   @TransactionalEventListener(fallbackExecution = true, phase = TransactionPhase.AFTER_COMMIT)     

    fallbackExecution = true 是为了保证没有事物的时候也能正常收到消息

     phase = TransactionPhase.AFTER_COMMIT  代表提交后监听

  • 相关阅读:
    Mac php使用gd库出错 Call to undefined function imagettftext()
    centos 使用 locate
    Mac HomeBrew 安装 mysql
    zsh 命令提示符 PROMPT
    新的开始
    Java 面试题分析
    Java NIO Show All Files
    正确使用 Volatile 变量
    面试题整理 2017
    有10阶梯, 每次走1,2 or 3 阶,有多少种方式???
  • 原文地址:https://www.cnblogs.com/huxipeng/p/10857665.html
Copyright © 2011-2022 走看看