zoukankan      html  css  js  c++  java
  • RocketMQ(1)--helloworld

     

    双Master方式:

    服务器环境

    序号 IP 角色 模式
    1 192.168.32.135 nameServer1,brokerServer1  Master1
    2 192.168.32.136 nameServer2,brokerServer2  Master2

    Helloworld代码示例:

    /**
     * Producer,发送消息
     * 
     */
    public class Producer {
        public static void main(String[] args) throws MQClientException, InterruptedException {
            DefaultMQProducer producer = new DefaultMQProducer("quickstart_producer");
            producer.setNamesrvAddr("192.168.32.135:9876;92.168.32.136:9876");
            producer.start();
    
            for (int i = 0; i < 100; i++) {
                try {
                    Message msg = new Message("TopicTest",// topic
                        "TagA",// tag
                        ("Hello RocketMQ " + i).getBytes()// body
                            );
                    SendResult sendResult = producer.send(msg);
                    System.out.println(sendResult);
                }
                catch (Exception e) {
                    e.printStackTrace();
                    Thread.sleep(1000);
                }
            }
    
            producer.shutdown();
        }
    }
    
    /**
     * Consumer,订阅消息
     */
    public class Consumer {
    
        public static void main(String[] args) throws InterruptedException, MQClientException {
            DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("quickstart_consumer");
            consumer.setNamesrvAddr("192.168.32.135:9876;192.168.32.136:9876");
            /**
             * 设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费<br>
             * 如果非第一次启动,那么按照上次消费的位置继续消费
             */
            consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
    
            consumer.subscribe("TopicTest", "*");
            
            consumer.registerMessageListener(new MessageListenerConcurrently() {
                @Override
                public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                        ConsumeConcurrentlyContext context) {
    //                System.out.println(Thread.currentThread().getName() + " Receive New Messages: " + msgs);
                    try {
                        for(MessageExt msg : msgs){
                            String topic = msg.getTopic();
                            String msgBody = new String(msg.getBody(),"utf-8");
                            String tags = msg.getTags();
                            System.out.println("收到消息:"+" topic:"+topic+" msgBody:"+msgBody+" tags:"+tags);
                        }
                    } catch (UnsupportedEncodingException e) {
                        e.printStackTrace();
                        return ConsumeConcurrentlyStatus.RECONSUME_LATER;
                    }
                    
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                }
            });
    
            consumer.start();
    
            System.out.println("Consumer Started.");
        }
    }
    View Code

    启动消费者,生产者,结果如下:

    问题:consumeMessage方法的参数是List<MessageExt> msgs,说明消费端是可以一次消费多条消息的,那么其中的一条消息处理失败后,怎么处理呢?

    回答:1. 默认情况下消费端一次只消费一条消息,可以通过consumer.setConsumeMessageBatchMaxSize(10),来修改。

       2. 如果是先启动消费端,再启动生产端,即使通过上面设置了,消费端也是一次消费一条消息。

       3. 如果显示启动生产端,再启动消费端,消费端会一次消费多条消息,例如一次拉取了5条消息,如果在第四条失败了,则这五条消息会全部重试,所以这种情况需要在业务逻辑中去去除重复消息。

    问题:消息是怎么重试的?

    回答:1. 生成端重试:producer.setRetryTimesWhenSendFailed(3);producer.send(msg,1000); 通过前面两行代码能实现生产端重试,意思是如果消息发送超过了超时时间(1000),重发3次

         2.消费端重试:分两种,一是timeout,而是exception

          timeout是指MQ没有收到消费端的返回效应(RECONSUME_LATER/CONSUME_SUCCESS),这是MQ会一直给消费端发送消息重试。

          exception是指消费端在消费的过程中发生异常,给MQ返回了RECONSUME_LATER,这是MQ会按照一定的规则给消费端发送消息重试。

              默认重试规则:messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h

    测试代码示例:

    /**
     * Producer,发送消息
     * 
     */
    public class Producer {
        public static void main(String[] args) throws MQClientException, InterruptedException {
            DefaultMQProducer producer = new DefaultMQProducer("quickstart_producer");
            producer.setNamesrvAddr("192.168.32.135:9876;92.168.32.136:9876");
            producer.setRetryTimesWhenSendFailed(3);
            producer.start();
    
            for (int i = 0; i < 10; i++) {
                try {
                    Message msg = new Message("TopicTest",// topic
                        "TagA",// tag
                        ("Hello RocketMQ " + i).getBytes()// body
                            );
                    SendResult sendResult = producer.send(msg,1000);
                    System.out.println(sendResult);
                }
                catch (Exception e) {
                    e.printStackTrace();
                    Thread.sleep(1000);
                }
            }
    
            producer.shutdown();
        }
    }
    
    /**
     * Consumer,订阅消息
     */
    public class Consumer {
    
        public static void main(String[] args) throws InterruptedException, MQClientException {
            DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("quickstart_consumer");
            consumer.setNamesrvAddr("192.168.32.135:9876;192.168.32.136:9876");
            /**
             * 设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费<br>
             * 如果非第一次启动,那么按照上次消费的位置继续消费
             */
            consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
    
            consumer.subscribe("TopicTest", "*");
            //默认为1
            consumer.setConsumeMessageBatchMaxSize(10);
            consumer.registerMessageListener(new MessageListenerConcurrently() {
    
                @Override
                public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                        ConsumeConcurrentlyContext context) {
    //                System.out.println(Thread.currentThread().getName() + " Receive New Messages: " + msgs);
                    try {
                        System.out.println("消息条数:" + msgs.size());
                        for(MessageExt msg : msgs){
                            String topic = msg.getTopic();
                            String msgBody = new String(msg.getBody(),"utf-8");
                            String tags = msg.getTags();
                            System.out.println("收到消息:"+" topic:"+topic+" msgBody:"+msgBody+" tags:"+tags);
                        
                            if("Hello RocketMQ 4".equals(msgBody)){
                                System.out.println("=====失败消息开始=====");
                                System.out.println(msg);
                                System.out.println(msgBody);
                                System.out.println("=====失败消息结束=====");
                                int i = 6/0;
                            }
                        }
                    } catch (UnsupportedEncodingException e) {
                        e.printStackTrace();
                        return ConsumeConcurrentlyStatus.RECONSUME_LATER;
                    }
                    
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                }
            });
    
            consumer.start();
    
            System.out.println("Consumer Started.");
        }
    }
    View Code
  • 相关阅读:
    [leetcode] Maximum Depth of Binary Tree
    [leetcode] Binary Tree Zigzag Level Order Traversal
    [leetcode] Binary Tree Level Order Traversal
    软工第二次极限测试
    动手动脑的问题以及课后实验性的问题3
    计算几何--半平面交与平面区域
    拉格朗日插值法
    计算几何--最小圆覆盖与最小球覆盖
    计算几何--圆与球
    程序员修炼之道读后感1
  • 原文地址:https://www.cnblogs.com/lostyears/p/8575455.html
Copyright © 2011-2022 走看看