zoukankan      html  css  js  c++  java
  • RocketMQ系列:rocketmq的benchmark工具

    1.环境参考

    benchmark环境搭建:参考单机快速搭建单broker环境

    被压测环境:rocketmq的dledger集群

    2.源码位置

    https://github.com/apache/rocketmq/tree/master/example/src/main/java/org/apache/rocketmq/example/benchmark

    3.工具清单

    consumer.sh:消息消费的benchmark工具
    producer.sh: 消息生产benchmark工具(同步非批处理模式)

    3.1 producer.sh

    3.1.1 帮助

    sh producer.sh -h

    usage: benchmarkProducer [-h] [-k <arg>] [-n <arg>] [-s <arg>] [-t <arg>] [-w <arg>]
    -h,--help Print help
    -k,--keyEnable <arg> Message Key Enable, Default: false   //是否使用message key,true则使用timestamp作为message的key
    -n,--namesrvAddr <arg> Name server address list, eg: 192.168.0.1:9876;192.168.0.2:9876  //指定nameserver地址
    -s,--messageSize <arg> Message Size, Default: 128        //指定消息大小,默认128字节
    -t,--topic <arg> Topic name, Default: BenchmarkTest      //指定topic,默认使用BenchmarkTest,如果指定其他记得先创建对应的topic
    -w,--threadCount <arg> Thread count, Default: 64         //开启的发送生产消息的线程数

    3.1.2 源码重要片段

    //默认生产组为:benchmark_producer
    
    final DefaultMQProducer producer = new DefaultMQProducer("benchmark_producer");
    //如果keyEnable为True,则会以时间戳作为message的key
    if (keyEnable) {
        msg.setKeys(String.valueOf(beginTimestamp / 1000));
    }
    //设置producer用于发送消息的线程池大小,-w的值
    final ExecutorService sendThreadPool = Executors.newFixedThreadPool(threadCount);

    3.1.3 例子

    指定nameserver进行生产消息压测

    sh producer.sh -n xxx.xxx.xxx.xxx:9876

    SendTPS:生产消息的TPS

    Max RT:最大响应时间(毫秒)

    Average RT:平均响应时间(毫秒)

    Send Failed:发送失败的总请求数

    Response Failed:返回失败的总响应数

    这里刚开始有发生失败的原因是由于producer刚启动,短期内对broker造成了压力。在实际使用producer的时候,应该对发送失败的情况进行重新消息重发。

     可以看到控制台里Produce Message TPS为3000多,其中slave是从master同步消息的TPS(备份master的消息数据),master才是实际接收的生产消息TPS。

    3.2 consumer.sh

    3.2.1 帮助

    sh consumer.sh -h

    usage: benchmarkConsumer [-e <arg>] [-f <arg>] [-g <arg>] [-h] [-n <arg>] [-p <arg>] [-r <arg>] [-t <arg>]
    -e,--expression <arg> filter expression content file path.ie: ./test/expr               //配合filter参数使用,过滤的条件表达式
    -f,--filterType <arg> TAG, SQL92                                                        //过滤方式
    -g,--group <arg> Consumer group name, Default: benchmark_consumer                       //指定消费组,默认为benchmark_consumer
    -h,--help Print help
    -n,--namesrvAddr <arg> Name server address list, eg: 192.168.0.1:9876;192.168.0.2:9876  //指定nameserver地址
    -p,--group prefix enable <arg> Consumer group name, Default: false                      //是否给消费组添加后缀,默认会给指定的消费组后添加后缀,默认应该是true(提示有问题)
    -r,--fail rate <arg> consumer fail rate, default 0                                      //指定消费失败率,只要没有超过消费失败率,消费失败都会重试
    -t,--topic <arg> Topic name, Default: BenchmarkTest                                     //指定topic,默认使用BenchmarkTest,如果指定其他记得先创建对应的topic 

    3.2.2 源码重要片段

    #根据指定的消费group生成消费者
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(group); 
    #指定nameserver地址
    if (commandLine.hasOption('n')) {
                String ns = commandLine.getOptionValue('n');
                consumer.setNamesrvAddr(ns);
    }
    #如果没有指定isSuffixEnable,即-p指定的数值,则会给消费组加上后缀
    if (Boolean.parseBoolean(isSuffixEnable)) {
                group = groupPrefix + "_" + (System.currentTimeMillis() % 100);
    }
    #指定topic的消息过滤器,只消费符合条件的消息
    #SQL92语法
    #TAG语言
    if
    (filterType == null || expression == null) { consumer.subscribe(topic, "*"); } else { if (ExpressionType.TAG.equals(filterType)) { String expr = MixAll.file2String(expression); System.out.printf("Expression: %s%n", expr); consumer.subscribe(topic, MessageSelector.byTag(expr)); } else if (ExpressionType.SQL92.equals(filterType)) { String expr = MixAll.file2String(expression); System.out.printf("Expression: %s%n", expr); consumer.subscribe(topic, MessageSelector.bySql(expr)); } else { throw new IllegalArgumentException("Not support filter type! " + filterType); }
    }
    #如果当前消费比例小于failRate,会稍后进行重试消费,否则直接跳过
    if (ThreadLocalRandom.current().nextDouble() < failRate) { statsBenchmarkConsumer.getFailCount().incrementAndGet(); return ConsumeConcurrentlyStatus.RECONSUME_LATER;
    }
    else { return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }

    3.2.3 例子

    #从BenchmarkTest消费消息,这里会自动给消费组:test2加上一个后缀

    sh consumer.sh -t BenchmarkTest -n nameserver:9876 -g test2

     TPS: 消费的TPS

    FAIL:消费失败的总数

    AVG(B2C):broker到Consumer的平均响应时间(毫秒)

    AVG(S2C):nameserver到Consumer的平均响应时间(毫秒)

    MAX(B2C): broker到Consumer的最大响应时间(毫秒)

    MAX(S2C): nameserver到Consumer的最大响应时间(毫秒)

     可以看到控制台里Consumer Message TPS为6w多,远大于producer的tps,且消费只从Master请求消息。

    博主:测试生财(一个不为996而996的测开码农)

    座右铭:专注测试开发与自动化运维,努力读书思考写作,为内卷的人生奠定财务自由。

    内容范畴:技术提升,职场杂谈,事业发展,阅读写作,投资理财,健康人生。

    csdn:https://blog.csdn.net/ccgshigao

    博客园:https://www.cnblogs.com/qa-freeroad/

    51cto:https://blog.51cto.com/14900374

    微信公众号:测试生财(定期分享独家内容和资源)

  • 相关阅读:
    Windows Server 2008 R2 实现多用户连接远程桌面
    增加远程登录用户登陆个数
    Win2008R2PHP5.4环境加载Zend模块
    Windows 和  Linux 下 禁止ping的方法
    Windows 2003 FastCgi安装环境
    Windows2008下搭建NFS实现windows空间提供linux使用
    Spring + JdbcTemplate + JdbcDaoSupport examples
    Spring Object/XML mapping example
    Spring AOP + AspectJ in XML configuration example
    Spring AOP + AspectJ annotation example
  • 原文地址:https://www.cnblogs.com/qa-freeroad/p/13695384.html
Copyright © 2011-2022 走看看