zoukankan      html  css  js  c++  java
  • rocketmq消息重复推送的问题

    最近,在公司的测试环境,遇到个问题,每次重启应用重启后,原来消费过的消息又被重复推送了一遍,消费者和生产者代码如下:

    package com.tf56.queue.client;
    
    import java.util.concurrent.TimeUnit;
    
    import com.alibaba.rocketmq.client.exception.MQBrokerException;
    import com.alibaba.rocketmq.client.exception.MQClientException;
    import com.alibaba.rocketmq.client.producer.DefaultMQProducer;
    import com.alibaba.rocketmq.client.producer.SendResult;
    import com.alibaba.rocketmq.common.message.Message;
    import com.alibaba.rocketmq.remoting.exception.RemotingException;
    /**
     * RocketMQ 生产者工具类
     * @author zjhua
     *
     */
    public class RocketMQProducer {
        
        private DefaultMQProducer producer;
        private String namesrvAddr;
        private String groupName;
        private String instanceName;
    
        public String getNamesrvAddr() {
            return namesrvAddr;
        }
    
        public void setNamesrvAddr(String namesrvAddr) {
            this.namesrvAddr = namesrvAddr;
        }
    
        public String getGroupName() {
            return groupName;
        }
    
        public void setGroupName(String groupName) {
            this.groupName = groupName;
        }
    
        public String getInstanceName() {
            return instanceName;
        }
    
        public void setInstanceName(String instanceName) {
            this.instanceName = instanceName;
        }
    
        public RocketMQProducer(String namesrvAddr, String groupName,
                String instanceName) {
            super();
            this.namesrvAddr = namesrvAddr;
            this.groupName = groupName;
            this.instanceName = instanceName;
            this.producer = new DefaultMQProducer(groupName);
    
            producer.setNamesrvAddr(namesrvAddr);
            producer.setInstanceName(instanceName);
            producer.setVipChannelEnabled(false);
    
            try {
                producer.start();
            } catch (MQClientException e) {
                e.printStackTrace();
            }
        }
    
        /**
         * 发送消息工具类
         * @param topic
         * @param tags
         * @param keys
         * @param body
         * @return
         * @throws MQClientException
         * @throws RemotingException
         * @throws MQBrokerException
         * @throws InterruptedException
         */
        public SendResult send(String topic, String tags, String keys, byte[] body)
                throws MQClientException, RemotingException, MQBrokerException,
                InterruptedException {
            Message msg = new Message(topic, tags, keys, body);
            try {
                SendResult sendResult = this.producer.send(msg);
                return sendResult;
            } catch (MQClientException | RemotingException | MQBrokerException
                    | InterruptedException e) {
                e.printStackTrace();
                throw e;
            }
        }
        
        /**
         * 发送工具类
         * @param topic
         * @param tags
         * @param keys
         * @param body
         * @param retryTimes
         * @param elapseMS
         * @return
         */
        public SendResult send(String topic, String tags, String keys, byte[] body,int retryTimes,int elapseMS) {
            Message msg = new Message(topic, tags, keys, body);
            boolean success = false;
            int i = 0;
            SendResult sendResult = null;
            while (!success && i++ < retryTimes) {
                try {
                    sendResult = this.producer.send(msg);
                    return sendResult;
                } catch (MQClientException | RemotingException | MQBrokerException
                        | InterruptedException e) {
                    e.printStackTrace();
                }
                try {
                    Thread.sleep(elapseMS);
                } catch (InterruptedException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
            }
            return sendResult;
        }
    
        public static void main(String[] args) throws MQClientException,
                InterruptedException {
            RocketMQProducer producer = new RocketMQProducer("10.7.29.121:9876",
                    "ProducerGroupName", "Producer");
    
            for (int i = 0; i < 10; i++) {
                try {
                    {
                        SendResult sendResult = producer.send("TopicTestZJH",// topic
                                "TagA",// tag
                                "OrderID001",// key
                                ("Hello MetaQ" + i).getBytes());
                        System.out.println(sendResult);
                    }
    
                    {
                        SendResult sendResult = producer.send("TopicTestYIDU",// topic
                                "TagB",// tag
                                "OrderID0034",// key
                                ("Hello MetaQ" + i).getBytes());
                        System.out.println(sendResult);
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                }
                TimeUnit.MILLISECONDS.sleep(1000);
            }
            /**
             * spring bean配置
             */
    //        <bean id="rocketMQProducer" class="com.tf56.queue.client.RocketMQProducer">  
    //            <constructor-arg name="namesvrAddr" value="10.7.29.121:9876"/>  
    //            <constructor-arg name="groupName" value="ProducerGroupName"/>
    //            <constructor-arg name="instanceName" value="Producer"/>
    //        </bean>
        }
    }

    消费端代码:

    package tf56.sofa.util;
    
    import java.net.InetAddress;
    import java.net.UnknownHostException;
    import java.util.List;
    
    import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
    import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
    import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
    import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
    import com.alibaba.rocketmq.client.exception.MQClientException;
    import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;
    import com.alibaba.rocketmq.common.message.MessageExt;
    import com.alibaba.rocketmq.common.protocol.heartbeat.MessageModel;
    
    public class RocketMQPushConsumer {
        
        private DefaultMQPushConsumer consumer;
        private String namesrvAddr;
        private String groupName;
        private String instanceName;
        private String topics;
        
        public RocketMQPushConsumer(String namesrvAddr, String groupName, String instanceName,String topics,MessageListenerConcurrently messageListener) {
            super();
            this.namesrvAddr = namesrvAddr;
            this.groupName = groupName;
            this.instanceName = instanceName;
            this.topics = topics;
            
            /**
             * 一个应用创建一个Consumer,由应用来维护此对象,可以设置为全局对象或者单例<br>
             * 注意:ConsumerGroupName需要由应用来保证唯一
             */
            consumer = new DefaultMQPushConsumer(groupName);
            consumer.setNamesrvAddr(namesrvAddr);
            consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
            consumer.setInstanceName(RocketMQPushConsumer.getInstanceName(namesrvAddr));
            consumer.setMessageModel(MessageModel.CLUSTERING);
            consumer.setVipChannelEnabled(false);
            try {
                /**
                 * 订阅指定topic下所有消息<br>
                 * 注意:一个consumer对象可以订阅多个topic
                 */
                
                String[] topicsArr = topics.split(";");
                for(int i=0;i<topicsArr.length;i++) {
                    consumer.subscribe(topicsArr[i], "*");
                }
        
                consumer.registerMessageListener(messageListener);
            } catch (Exception e) {
                e.printStackTrace();
                return;
            }
            System.out.println("Consumer Started.");
        }
        
        /**
         * Consumer对象在使用之前必须要调用start初始化,初始化一次即可<br>
         */
        public void init() {
            try {
                consumer.start();
            } catch (MQClientException e) {
                e.printStackTrace();
            }
        }
    
        private static String getInstanceName(String namesrvAddr) {
            return getHostAddress() + namesrvAddr;
        }
        
        private static String getHostAddress(){
            try {
                return InetAddress.getLocalHost().getHostAddress();
            } catch (UnknownHostException e) {
                e.printStackTrace();
            }
            return "";
        }
    
        /**
         * 当前例子是PushConsumer用法,使用方式给用户感觉是消息从RocketMQ服务器推到了应用客户端。<br>
         * 但是实际PushConsumer内部是使用长轮询Pull方式从MetaQ服务器拉消息,然后再回调用户Listener方法<br>
         */
        public static void main(String[] args) throws InterruptedException,
                MQClientException {
            MessageListenerConcurrently messageListener = new MessageListenerConcurrently() {
                @Override
                public ConsumeConcurrentlyStatus consumeMessage(
                        List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                    MessageExt msg = msgs.get(0);
                    if (msg.getTopic().equals("TopicTestZJH")) {
                        System.out.println("TopicTestZJH->" + new String(msg.getBody()));
                    } else if (msg.getTopic().equals("TopicTestYIDU")) {
                        System.out.println("TopicTestYIDU->" + new String(msg.getBody()));
                    }
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                }
            };
            RocketMQPushConsumer consumer = new RocketMQPushConsumer("10.7.29.121:9876",
                    "ConsumerGroupName", "Consumer","TopicTestZJH;TopicTestYIDU",messageListener);
            consumer.init();
            /**
             * spring构造器注入
             */
    //        <bean id="rocketMQPushConsumer" class="com.tf56.queue.client.RocketMQPushConsumer">  
    //            <constructor-arg name="namesvrAddr" value="10.7.29.121:9876"/>  
    //            <constructor-arg name="groupName" value="ConsumerGroupName"/>
    //            <constructor-arg name="instanceName" value="Consumer"/>
    //            <constructor-arg name="topics" value="TopicTestZJH;TopicTestYIDU"/>
    //            <constructor-arg name="messageListener" ref="messageListener"/>
    //        </bean>
        }
    }

     补充:问题已经找到了,应该是rocketmq客户端和服务器版本不一致的问题所致,我们公司的环境(由运维统管)使用的是3.2.6,上述出问题的客户端版本使用的是3.6.2.Final。客户端切换成3.2.6版本之后,就没有这个问题了。

    参考:http://blog.csdn.net/xyzjl/article/details/54970927

  • 相关阅读:
    VC实现开机自启动
    用Shell扩展实现源代码统计程序
    在(CListView)列表视图中添加右键菜单的方法
    关于打开外部程序并且发送一个按键消息 (转
    vc中运行外部程序的方法
    如何在 XCode 4.2 設定部分程式碼不使用 ARC 方式分享(转)
    Xcode调试断点不停止解决方案!(转)
    NSString+NSMutableString+NSValue+NSAraay用法汇总 (转)
    对于Retain和Assign属性的理解(转)
    基于Xcode4开发第一个iPhone程序:“Hello World”(转)
  • 原文地址:https://www.cnblogs.com/zhjh256/p/6986031.html
Copyright © 2011-2022 走看看