zoukankan      html  css  js  c++  java
  • 解决事物提交与消息发送顺序问题

    最近在线上发现了一个问题,mq的监听时常会报消息不存在的异常,关键代码如下:

    public void sendMessage(MessageData message) throws Exception {
            if (message == null) return;
    
            // 持久化消息 ①
            String messageId = rpTransactionControlService.createTransactionControl(message.getMessageBody(), message.getMessageType(), message.getQueue(), delayTime, message.getField1(), message.getField2(), message.getField3());
            log.info("create transaction control messageid = " + messageId);
            message.setMessageId(messageId);
    
            // 发送消息 ②
            ActiveMQClientPool.getInstance().sendMsg(sendMessage, message.getQueue(), message.getDelayTime() * 1000);
    
    }

    导致的原因就是 ②已经消息发送了,但是①还没有事物提交,就导致了问题。

    解决办法 1、 增加延迟发送 。

                    2、 增加事物监听。

    针对1方法,如果是activemq,有一个需要注意的地方, 需要修改activemq.xml  

    <broker xmlns="http://activemq.apache.org/schema/core" brokerName="localhost" dataDirectory="${activemq.data}" schedulerSupport="true">

     即,增加 schedulerSupport="true" 参数

    针对2方法, 需要先创建一个 TransactionalMessageListener 类

    @Component
    @Slf4j
    public class TransactionalMessageListener {
    
        @TransactionalEventListener(fallbackExecution = true, phase = TransactionPhase.AFTER_COMMIT)
        public void afterCommit(EventMessage eventMessage) throws Exception {
    
            if (eventMessage == null) return;
    
            if (eventMessage.getEventType() == null) throw new NullPointerException("eventType");
    
            if (eventMessage.getEventType() == EventType.ACTIVE_MQ) { // 防止事物已经还没有提交,mq监听器就已经收到了消息
    
                if (eventMessage.getData() instanceof MessageData) {
                    MessageData message = (MessageData) eventMessage.getData();
                    String sendMessage = JSONObject.toJSONString(message);
                    log.info("push msg to activeMq, context :{ " + sendMessage + "}");
                    try {
                        ActiveMQClientPool.getInstance().sendMsg(sendMessage, message.getQueue(), message.getDelayTime() * 1000);
                    } catch (Exception e) {
                        log.error("push msg error to activeMq:{" + sendMessage + "}", e);
                    }
                }
    
            }
        }
    
    }
    public class EventMessage<T> {
    
        public EventMessage(EventType eventType, T data) {
            this.eventType = eventType;
            this.data = data;
        }
    
        private EventType eventType;
    
        private T data;
    
        public EventType getEventType() {
            return eventType;
        }
    
        public void setEventType(EventType eventType) {
            this.eventType = eventType;
        }
    
        public T getData() {
            return data;
        }
    
        public void setData(T data) {
            this.data = data;
        }
    }

    然后修改原来逻辑如下

    
    
    @Autowired
    ApplicationEventPublisher publisher;
    public void sendMessage(MessageData message) throws Exception {
            if (message == null) return;
    
            String messageId = rpTransactionControlService.createTransactionControl(message.getMessageBody(), message.getMessageType(), message.getQueue(), delayTime, message.getField1(), message.getField2(), message.getField3());
            log.info("create transaction control messageid = " + messageId);
            message.setMessageId(messageId);
    
    publisher.publishEvent(new EventMessage<MessageData>(EventType.MQ, message));
     }

    这里我需要对这个进行解释一下:   @TransactionalEventListener(fallbackExecution = true, phase = TransactionPhase.AFTER_COMMIT)     

    fallbackExecution = true 是为了保证没有事物的时候也能正常收到消息

     phase = TransactionPhase.AFTER_COMMIT  代表提交后监听

  • 相关阅读:
    Java 标识符
    Java 关键字详解
    Java 语言的主要特性
    redis学习
    垃圾回收
    JVM内存结构
    sql总结(DML)
    sql总结(DDL)
    加密算法
    《数据结构》 定长顺序串常用操作代码集合
  • 原文地址:https://www.cnblogs.com/huxipeng/p/10857665.html
Copyright © 2011-2022 走看看