zoukankan      html  css  js  c++  java
  • RocketMQ-quickstart(批量消费)

    一、专业术语

    • Producer

        消费生产者,负责产生消息,一般由业务系统负责产生消息

    • Consumer

        消息消费者,负责消费消息,一般是后台系统负责异步消费

    • Push Consumer

        Consumer的一种,应用通常向Consumer对象注册一个Listener接口,一旦收到消息,Consumer对象立刻回调Listener接口方法

    • Pull Consumer

        Consumer的一种,应用通常主动调用Consumer的拉消息方法,从Broker拉消息,主动权由应用控制

    • Producer Group

        一类Producer的集合名称,这类Consumer通常发送一类消息,且发送逻辑一致。

    • Consumer Group

        一类Consumer的集合名称,这类Consumer通常消费一类消息,且消费逻辑一致。

    • Broker

        消息中转角色,负责存储消息,转发消息,一般也称为Server。在JMS规范中称为Provider。

    • 广播消费

        一个消息被多个Consumer消费,即使这些Consumer属于同一个Consumer Group,消息也会被Consumer Group中的每个Consumer都消费一 次,广播消费中的Consumer Group概念可以认为在消息划分方面无意义

        在CORBA Notification 规范中,消费方式都属于广播消费。

        在JMS规范中,相当于JMS publish/subscribe model

    • 集群消费

        一个Consumer Group 中的Consumer实例平均分摊消费消息。例如某个Topic有9条消息,其中一个Consumer Group有3个实例(可能是3个进程,或者3台机器),那么每个实例只消费其中的3条消息。

    • 顺序消息

        消费消息的顺序要同发送消息的顺序一致,在RocketMq中,主要指的是局部顺序,即一类消息为满足顺序性,必须Producer单线程顺序发送,且发送到同一队列,这样Consumer就可以按照Producer发送的顺序去消费消息。

    • 普通顺序消息

        顺序消息的一种,正常情况下可以保证完全的顺序消息,但是一旦发生通信异常,Broker重启,由于队列总数发生变化,哈希取模后定位的队列会变化,产生短暂的消息顺序不一致。

        如果业务能容忍在集群一次情况(如某个Broker宕机或者重启)下,消息短暂的乱序,使用普通顺序方式比较合适。

    • 严格顺序消息

        顺序消费的一种,无论正常异常情况都能保证顺序,但是牺牲了分布式Failover特性,即Broker集群中只要有一台机器不可用,则整个集群都不可用,服务可用性大大降低。

        如果服务器部署为同步双写模式,此缺陷可通过备机自动切换为主避免,不过仍然会存在几分钟的服务不可用(依赖同步双写,主备自动切换,自动切换功能尚未实现)

    • Message Queue

        在RocketMq中,所有消息队列都是持久化,长度无限的数据结构,所谓长度无限是指队列中的每个存储单元都是定长,访问其中的存储单元使用Offset来访问,offset为java long类型,64位,理论上在100年内不会溢出,所以任务是长度无限,另外队列中只保存最近几天的数据,之前的数据会按照过期时间来删除。

    二、代码示例

    生产者:

    package com.alibaba.rocketmq.example.quickstart;
    
    import com.alibaba.rocketmq.client.exception.MQBrokerException;
    import com.alibaba.rocketmq.client.exception.MQClientException;
    import com.alibaba.rocketmq.client.producer.DefaultMQProducer;
    import com.alibaba.rocketmq.client.producer.SendResult;
    import com.alibaba.rocketmq.common.message.Message;
    import com.alibaba.rocketmq.remoting.exception.RemotingException;
    
    /**
     * @author : Jixiaohu
     * @Date : 2018-04-19.
     * @Time : 9:20.
     * @Description :
     */
    public class produce {
        public static void main(String[] args) throws MQClientException, InterruptedException, MQBrokerException {
            DefaultMQProducer producer = new DefaultMQProducer("jxh_quickstart_produce");
            producer.setNamesrvAddr("192.168.1.114:9876;192.168.2.2:9876");
            producer.start();
    
            try {
                for (int i = 0; i < 100; i++) {
                    Message msg = new Message("TopicQuickStart", "TagA",
                            ("Hello RoctetMq : " +i ).getBytes());
                    SendResult sendResult = producer.send(msg);
                    System.out.println(sendResult);
                }
            } catch (RemotingException e) {
                e.printStackTrace();
                Thread.sleep(1000);
            }
            producer.shutdown();
        }
    }

    生产者生产100条消息:

    消费者:

    package com.alibaba.rocketmq.example.quickstart;
    
    import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
    import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
    import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
    import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
    import com.alibaba.rocketmq.client.exception.MQClientException;
    import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;
    import com.alibaba.rocketmq.common.message.MessageExt;
    
    import java.util.List;
    
    /**
     * @author : Jixiaohu
     * @Date : 2018-04-19.
     * @Time : 9:20.
     * @Description :
     */
    public class Consumer {
    
        public static void main(String[] args) throws MQClientException {
            DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("jxh_quickstart_produce");
            consumer.setNamesrvAddr("192.168.1.114:9876;192.168.2.2:9876");
    
            /**
             * 设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费<br>
             * 如果非第一次启动,那么按照上次消费的位置继续消费
             */
            consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
    
            consumer.subscribe("TopicQuickStart", "*");
    
            //不配置默认一条
            consumer.setConsumeMessageBatchMaxSize(10);
    
            consumer.registerMessageListener(new MessageListenerConcurrently() {
                @Override
                public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                    System.out.println("拉取消息条数 " + msgs.size());
                    try {
                        for (MessageExt msg : msgs) {
                            String topic = msg.getTopic();
                            String msgBody = new String(msg.getBody(), "utf-8");
                            String tags = msg.getTags();
                            System.out.println("收到信息:" + " topic:" + topic + " msgBody:" + msgBody + " tags:" + tags);
                        }
                    } catch (Exception e) {
                        e.printStackTrace();
                        return ConsumeConcurrentlyStatus.RECONSUME_LATER;
                    }
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                }
            });
    
            consumer.start();
            System.out.println("Consumer Started");
    
        }
    }

    这边设置了每次消费条数,我这边先启动Consumer订阅ropic,然后启动produce,看一下打印结果:

    produce发送了100条数据,下面看一下,Consumer消费数据的情况

     

    你会发现,为什么每次获取的消息都是1条,那上面设置的每次获取最大的消息数目“10”,是不是不起作用了?

     其实是这样的,

    我们正常的流程一般都是先启动Consumer端,然后再启动Producer端。Consumer端都是一条一条的消费的。但是有时候会出现先启动Producer端,然后再启动Consumer端这种情况,那这个时候就是会批量消费了,这个参数就会有作用了。

     三、消息的重试,

      消息的重试大致分为三种情况,

    ①:Produce发送消息到MQ上,发送失败。

    看一下produce的代码是怎么实现的,这边看一个大概的情况

    public class produce {
        public static void main(String[] args) throws MQClientException, InterruptedException, MQBrokerException {
            DefaultMQProducer producer = new DefaultMQProducer("jxh_quickstart_produce");
            //消息发送至mq失败后的重试次数
            producer.setRetryTimesWhenSendFailed(10);
            producer.setNamesrvAddr("192.168.1.114:9876;192.168.2.2:9876");
            producer.start();
    
            try {
                for (int i = 0; i < 100; i++) {
                    Message msg = new Message("TopicQuickStart", "TagA",
                            ("Hello RoctetMq : " + i).getBytes());
    //                SendResult sendResult = producer.send(msg);
                    //增加一个超时参数,单位为毫秒
                    SendResult sendResult = producer.send(msg, 1000);
                    System.out.println(sendResult);
                }
            } catch (RemotingException e) {
                e.printStackTrace();
                Thread.sleep(1000);
            }
            producer.shutdown();
        }
    }

    ②MQ推送消息至Consumer超时失败(网络波动)timeout。

     这种情况,timeout,MQ会无限循环,直到把消息推送至Consumer,MQ没有接收到RECONSUME_LATER或CONSUME_SUCCESS

    ③Consumer处理消息后,返回RECONSUME_LATER,MQ也会按照策略发送消息 exception。

    消息重试的测试是  1s,5s,10s,30s,1m,2m,3m,4m,5m,6m,7m,8m,9m,10m,20m,30m,1h,2h

    RocketMQ为我们提供了这么多次数的失败重试,但是在实际中也许我们并不需要这么多重试,比如重试3次,还没有成功,我们希望把这条消息存储起来并采用另一种方式处理,而且希望RocketMQ不要再重试呢,因为重试解决不了问题了!这该如何做呢?

    public class Consumer {
    
        public static void main(String[] args) throws MQClientException {
            DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("jxh_quickstart_produce");
            consumer.setNamesrvAddr("192.168.1.114:9876;192.168.2.2:9876");
    
            /**
             * 设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费<br>
             * 如果非第一次启动,那么按照上次消费的位置继续消费
             */
            consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
    
            consumer.subscribe("TopicQuickStart", "*");
    
            //不配置默认一条
            consumer.setConsumeMessageBatchMaxSize(10);
    
            consumer.registerMessageListener(new MessageListenerConcurrently() {
                @Override
                public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                    System.out.println("拉取消息条数 " + msgs.size());
                    try {
    //                    for (MessageExt msg : msgs) {
                        MessageExt msg = msgs.get(0);
                        String topic = msg.getTopic();
                        String msgBody = new String(msg.getBody(), "utf-8");
                        String tags = msg.getTags();
                        System.out.println("收到信息:" + " topic:" + topic + " msgBody:" + msgBody + " tags:" + tags + "msgs:" + msgs);
                        //注意,要先启动Consumer,在进行发送消息(也就是先订阅服务,再发送)
                        if ("Hello RoctetMq : 4".equals(msgBody)) {
                            System.out.println("===========失败消息开始========");
                            System.out.println(msgBody);
                            System.out.println("===========失败消息结束========");
                            //异常
                            int a = 1 / 0;
                        }
    //                    }
                    } catch (Exception e) {
                        e.printStackTrace();
                        if (msgs.get(0).getReconsumeTimes() == 3) {
                            //   该条消息可以存储到DB或者LOG日志中,或其他处理方式
                            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                        }
    
                        return ConsumeConcurrentlyStatus.RECONSUME_LATER;
                    }
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                }
            });
    
            consumer.start();
            System.out.println("Consumer Started");
    
        }
    }

    查看下打印结果:

    这边能看到重试次数。

  • 相关阅读:
    利用dockerfile定制镜像
    发布Docker 镜像到dockerhub
    Docker 停止容器
    133. Clone Graph
    132. Palindrome Partitioning II
    131. Palindrome Partitioning
    130. Surrounded Regions
    129. Sum Root to Leaf Numbers
    128. Longest Consecutive Sequence
    127. Word Ladder
  • 原文地址:https://www.cnblogs.com/shmilyToHu/p/8885345.html
Copyright © 2011-2022 走看看