zoukankan      html  css  js  c++  java
  • RocketMQ-broker状态管理及数据统计

    RocketMQ-broker状态管理及数据统计

    在RocketMQ中,状态管理有BrokerStatsManager,ConsumerStatsManager,FilterServerStatsManager,其实现的方式都是一样的。

    这边就拿BrokerStatsManager做介绍

    一个StatsItemSet,代表着一项数据统计指标,这个指标定义了各个key的统计单项StatsItem,具体的值和次数使用原子类表示

    BrokerStatsManager支持这些统一指标,同时每一个指标对应一个StateItemSet,里面包含了各个topic和group的信息

        public static final String TOPIC_PUT_NUMS = "TOPIC_PUT_NUMS";
        public static final String TOPIC_PUT_SIZE = "TOPIC_PUT_SIZE";
        public static final String GROUP_GET_NUMS = "GROUP_GET_NUMS";
        public static final String GROUP_GET_SIZE = "GROUP_GET_SIZE";
        public static final String SNDBCK_PUT_NUMS = "SNDBCK_PUT_NUMS";
        public static final String BROKER_PUT_NUMS = "BROKER_PUT_NUMS";
        public static final String BROKER_GET_NUMS = "BROKER_GET_NUMS";
        public static final String GROUP_GET_FROM_DISK_NUMS = "GROUP_GET_FROM_DISK_NUMS";
        public static final String GROUP_GET_FROM_DISK_SIZE = "GROUP_GET_FROM_DISK_SIZE";
        public static final String BROKER_GET_FROM_DISK_NUMS = "BROKER_GET_FROM_DISK_NUMS";
        public static final String BROKER_GET_FROM_DISK_SIZE = "BROKER_GET_FROM_DISK_SIZE";
        // For commercial
        public static final String COMMERCIAL_SEND_TIMES = "COMMERCIAL_SEND_TIMES";
        public static final String COMMERCIAL_SNDBCK_TIMES = "COMMERCIAL_SNDBCK_TIMES";
        public static final String COMMERCIAL_RCV_TIMES = "COMMERCIAL_RCV_TIMES";
        public static final String COMMERCIAL_RCV_EPOLLS = "COMMERCIAL_RCV_EPOLLS";
        public static final String COMMERCIAL_SEND_SIZE = "COMMERCIAL_SEND_SIZE";
        public static final String COMMERCIAL_RCV_SIZE = "COMMERCIAL_RCV_SIZE";
        public static final String COMMERCIAL_PERM_FAILURES = "COMMERCIAL_PERM_FAILURES";
    
    
    
    this.statsTable.put(TOPIC_PUT_NUMS, new StatsItemSet(TOPIC_PUT_NUMS, this.scheduledExecutorService, log));
    this.statsTable.put(TOPIC_PUT_SIZE, new StatsItemSet(TOPIC_PUT_SIZE, this.scheduledExecutorService, log));
    this.statsTable.put(GROUP_GET_NUMS, new StatsItemSet(GROUP_GET_NUMS, this.scheduledExecutorService, log));
    this.statsTable.put(GROUP_GET_SIZE, new StatsItemSet(GROUP_GET_SIZE, this.scheduledExecutorService, log));
    this.statsTable.put(GROUP_GET_LATENCY, new StatsItemSet(GROUP_GET_LATENCY, this.scheduledExecutorService, log));
    this.statsTable.put(SNDBCK_PUT_NUMS, new StatsItemSet(SNDBCK_PUT_NUMS, this.scheduledExecutorService, log));
    this.statsTable.put(BROKER_PUT_NUMS, new StatsItemSet(BROKER_PUT_NUMS, this.scheduledExecutorService, log));
    this.statsTable.put(BROKER_GET_NUMS, new StatsItemSet(BROKER_GET_NUMS, this.scheduledExecutorService, log));
    this.statsTable.put(GROUP_GET_FROM_DISK_NUMS, new StatsItemSet(GROUP_GET_FROM_DISK_NUMS, this.scheduledExecutorService, log));
    this.statsTable.put(GROUP_GET_FROM_DISK_SIZE, new StatsItemSet(GROUP_GET_FROM_DISK_SIZE, this.scheduledExecutorService, log));
    this.statsTable.put(BROKER_GET_FROM_DISK_NUMS, new StatsItemSet(BROKER_GET_FROM_DISK_NUMS, this.scheduledExecutorService, log));
    this.statsTable.put(BROKER_GET_FROM_DISK_SIZE, new StatsItemSet(BROKER_GET_FROM_DISK_SIZE, this.scheduledExecutorService, log));
    
    this.statsTable.put(COMMERCIAL_SEND_TIMES, new StatsItemSet(COMMERCIAL_SEND_TIMES, this.commercialExecutor, COMMERCIAL_LOG));
    this.statsTable.put(COMMERCIAL_RCV_TIMES, new StatsItemSet(COMMERCIAL_RCV_TIMES, this.commercialExecutor, COMMERCIAL_LOG));
    this.statsTable.put(COMMERCIAL_SEND_SIZE, new StatsItemSet(COMMERCIAL_SEND_SIZE, this.commercialExecutor, COMMERCIAL_LOG));
    this.statsTable.put(COMMERCIAL_RCV_SIZE, new StatsItemSet(COMMERCIAL_RCV_SIZE, this.commercialExecutor, COMMERCIAL_LOG));
    this.statsTable.put(COMMERCIAL_RCV_EPOLLS, new StatsItemSet(COMMERCIAL_RCV_EPOLLS, this.commercialExecutor, COMMERCIAL_LOG));
    this.statsTable.put(COMMERCIAL_SNDBCK_TIMES, new StatsItemSet(COMMERCIAL_SNDBCK_TIMES, this.commercialExecutor, COMMERCIAL_LOG));
    this.statsTable.put(COMMERCIAL_PERM_FAILURES, new StatsItemSet(COMMERCIAL_PERM_FAILURES, this.commercialExecutor, COMMERCIAL_LOG));

    最后这些指标都会保存在    private final HashMap<String, StatsItemSet> statsTable = new HashMap<String, StatsItemSet>();

    一个指标StatsItemSet里面有如下属性

    private final ConcurrentMap<String/* key */, StatsItem> statsItemTable =
            new ConcurrentHashMap<String, StatsItem>(128);
    
        private final String statsName;
        private final ScheduledExecutorService scheduledExecutorService;
        private final Logger log;

    比如一个TOPIC_PUT_NUMS的指标,在其statsItemTable 包含了各个topic的message数量和存放次数。其中key就是topic。

    然后会初始化一些定时任务,比如会在每隔10s把当前的状态封装之后放在csListMinute,最多放6个统计记录,然后在一分钟内打印结果,格式如下

    "TOPIC_PUT_NUMS [topicname] Stats In One Minute, SUM: %d TPS: %.2f AVGPT: %.2

    再次之前会进行简单计算代码如下

      private static StatsSnapshot computeStatsData(final LinkedList<CallSnapshot> csList) {
            StatsSnapshot statsSnapshot = new StatsSnapshot();
            synchronized (csList) {
                double tps = 0;
                double avgpt = 0;
                long sum = 0;
                if (!csList.isEmpty()) {
                    CallSnapshot first = csList.getFirst();
                    CallSnapshot last = csList.getLast();
                    // 比如统计前后的值差距
                    sum = last.getValue() - first.getValue();
                    // last.getTimestamp() - first.getTimestamp()代表的是统计的时候时间差
                    tps = (sum * 1000.0d) / (last.getTimestamp() - first.getTimestamp());
    
                    // 次数差距
                    long timesDiff = last.getTimes() - first.getTimes();
                    if (timesDiff > 0) {
                        // 平均每次的达到的数量级   ,在TOPIC_PUT_NUMS中表示 每次在在该topic中平均写入avgpt个message
                        avgpt = (sum * 1.0d) / timesDiff;
                    }
                }
    
                statsSnapshot.setSum(sum);
                statsSnapshot.setTps(tps);
                statsSnapshot.setAvgpt(avgpt);
            }
    
            return statsSnapshot;
        }

    每隔一小时,一天的统计结果处理逻辑雷同。

  • 相关阅读:
    JSP 072: 处理注册结果样式的显示
    JSP 07: 开发注册页面
    JSP 06: 两个内置对象request和response
    Java Web 01: 什么是http协议
    JSP 05: JSP定义表达式和内容输出表达式
    JSP 04: 如何在JSP页面中书写Java代码
    JSP 03: 创建一个JSP页面并启动运行项目
    Fail 02: Port 8080 required by Tomcat Server at localhost is already in use.
    Fail 03: netstat 不是内部或外部命令
    Build 01: 安装新的JDK
  • 原文地址:https://www.cnblogs.com/gaojy/p/15096814.html
Copyright © 2011-2022 走看看