zoukankan      html  css  js  c++  java
  • RocketMQ集群消息测试(二)

    最近两天发生了很多事情,李咏(勇)去世,金庸讣告,重庆公交车坠江,印尼狮航JT610航班坠毁。让我感觉生命实在是脆弱,有人寿终正寝,有人患病离世,这些都是可预见的。但是像公交坠江,飞机坠海这类的就是不可预见的。他们也许正在去上班的路上,也许正在去参加朋友聚会的路上,也许家中有正等待他们的亲人,可是突然一次事故。将他们与亲人,朋友永远的分开了。所以活着的人除了为他们惋惜,还应该珍惜当下,珍惜每一天,每一刻。我们有理由,有时间去学习。

    废话不多说了,上篇文章搭建了一个两主两从异步的RocketMQ集群环境,这篇文章主要对这个集群环境进行可用性,可靠性的测试。我们仍然暂时不去讨论源码实现。

    场景一:验证Producer发送消息是负载均衡的。

    测试代码:我使用send(msg)方法发送消息,没有自定义MessageQueueSelector,一共发送了12条消息。

     1 public class Producer {
     2     public static void main(String[] args) throws UnsupportedEncodingException {
     3         try {
     4             MQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
     5             ((DefaultMQProducer) producer).setNamesrvAddr("192.168.1.85:9876;192.168.99:9876");
     6             producer.start();
     7 
     8             for (int i = 0; i < 12; i++) {
     9                 Message msg = new Message("TopicTest", "TagA", "KEY" + i,
    10                         ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
    11                 SendResult sendResult = producer.send(msg);
    12 
    13                 System.out.println(sendResult);
    14             }
    15 
    16             producer.shutdown();
    17         } catch (MQClientException | RemotingException | MQBrokerException | InterruptedException e) {
    18             e.printStackTrace();
    19         }
    20     }
    21 }

    我们看一下Broker情况,此时broker-a和broker-b各有6条消息,并且成功把消息同步到slave。

    场景二:消费测试,代码如下

     1 public class Consumer {
     2 
     3     public static void main(String[] args) throws MQClientException {
     4         DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_3");
     5         consumer.setNamesrvAddr("192.168.1.85:9876;192.168.99:9876");
     6         consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
     7         consumer.subscribe("TopicTest", "TagA");
     8 
     9         consumer.registerMessageListener(new MessageListenerOrderly() {
    10             @Override
    11             public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
    12                 context.setAutoCommit(false);
    13                 System.out.println(Thread.currentThread().getName()+":"+msgs);
    14                 return ConsumeOrderlyStatus.SUCCESS;
    15             }
    16         });
    17         consumer.start();
    18         System.out.printf("Consumer Started.%n");
    19     }
    20 }
    Consumer采用Push方式,其实RocketMQ的所谓推模式本质上也是对Pull模式的封装,这里先不去讨论。

    Broker初始状态:

    如下图:

    1、启动Consumer

    2、启动Producer,发送12条消息,此时看到broker中Today Produce Count加了6,Today Consume Count也加了6。说明消息都被消费了。

     场景三:如果两个broker master都挂掉,consumer还会拉取消息吗?

    从上图可以看到,即使两个broker master都宕机了,并不影响consumer继续从slave拉取消息。只是此时无法写写入消息了。

    接下来我们将两台master服务启动,看看会发生什么

    和预想的效果一样,消费端继续从master拉取消息,这里有一个关键的问题,就是明显消息被重复消费了,RocketMQ之所以允许重复消费是因为master挂掉的概率很小,如果在broker里做消息去重操作,将会影响整个Broker的吞吐量,所以重复消费问题肯定需要业务方自己解决了。

    场景四:如果两个namesrv都挂了,对Producer和Consumer有什么影响呢?

    这个就不贴图,图片也反映不出来真实情况,其实真实情况是这样的。我将两个namesrv服务停止之后,原来已经建立连接Producer和Consumer依然可以正常工作,只是新加入的Producer和Consumer无法工作了。

    场景五:两个slave都挂掉了,会有什么现象?

    如上图:首先将Producer停止,不再发送新的消息,然后启动broker-a slave服务。刚启动时produce count数量是0。

    如下图:继续观察,发现produce count数量开始上升,说明slave启动之后,master开始将消息向slave同步。

    重点:通过上面测试,大家可能注意到开源版本的RocketMQ,Broker的master和slave是不能自动切换的。一旦master挂了,这条线路也就不能写消息了。根据Producer轮询规则,所有的消息都会发送到另一条Broker线路上。但是其实阿里自己内部使用的RocketMQ版本是支持master和slave自动切换的。如果你们用阿里云服务,也能享受到这个待遇。

  • 相关阅读:
    pandas学习
    Scala类详述
    1G-4G的介绍及eclipse中Android工程目录介绍
    Day01
    Linux系统BackSpace 、方向键的问题
    SEL数据类型,@selector的用法,以及调用SEL
    git本地仓库与github远程仓库链接协议问题
    Linux下包含头文件的路径问题与动态库链接路径问题
    c++中basic_istream::getline()的返回值何时为真
    c++标准库函数equal_range()
  • 原文地址:https://www.cnblogs.com/shileibrave/p/9882511.html
Copyright © 2011-2022 走看看