zoukankan      html  css  js  c++  java
  • 固定QPS压测模式探索

    在早前跟测试同行在QQ群聊天的时候,聊过一个固定QPS压测的问题,最近突然有需求,想实现一下,丰富一下自己的性能测试框架,最新的代码请移步我的GitHub,地址:https://github.com/JunManYuanLong/FunTester,gitee地址:https://gitee.com/fanapi/tester

    思路

    • 有一个多线程的基类,其他压测任务类继承于基类。
    • 并发执行类由线程池任务发生器补偿器组成。
    • 单线程执行任务发生器将生成的任务对象丢到线程池里面执行。
    • 另起补偿器线程完成缺失的补偿。(由于多种原因,真实发生量小于设定值)

    总体的思路与如何mock固定QPS的接口moco固定QPS接口升级补偿机制这两票文章一致,但是没有采取Semaphore的模式,原因是moco是多线程对单线程,压测是单线程对多线程。

    • 语言继续采用Java语言。

    基类

    写得有点仓促,还未进行大量实践,所以注释少一些。这里依然设计两种子模式:定量压测定时压测,这里由于两种压测模式,通过一个属性isTimesMode记录,在执行类FixedQpsConcurrent中用到,单次压测任务对象统一isTimesModelimit两个属性。

    package com.fun.base.constaint;
    
    import com.fun.base.interfaces.MarkThread;
    import com.fun.config.HttpClientConstant;
    import com.fun.frame.execute.FixedQpsConcurrent;
    import com.fun.frame.httpclient.GCThread;
    import com.fun.utils.Time;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    public abstract class FixedQpsThread<T> extends ThreadBase {
    
        private static Logger logger = LoggerFactory.getLogger(FixedQpsThread.class);
    
        public int qps;
    
        public int limit;
    
        public boolean isTimesMode;
    
        public FixedQpsThread(T t, int limit, int qps, MarkThread markThread) {
            this.limit = limit;
            this.qps = qps;
            this.mark = markThread;
            this.t = t;
            isTimesMode = limit > 1000 ? true : false;
        }
    
    
        protected FixedQpsThread() {
            super();
        }
    
        @Override
        public void run() {
            try {
                before();
                threadmark = mark == null ? EMPTY : this.mark.mark(this);
                long s = Time.getTimeStamp();
                doing();
                long e = Time.getTimeStamp();
                long diff = e - s;
                FixedQpsConcurrent.allTimes.add(diff);
                FixedQpsConcurrent.executeTimes.getAndIncrement();
                if (diff > HttpClientConstant.MAX_ACCEPT_TIME)
                    FixedQpsConcurrent.marks.add(diff + CONNECTOR + threadmark);
            } catch (Exception e) {
                logger.warn("执行任务失败!", e);
                logger.warn("执行失败对象的标记:{}", threadmark);
                FixedQpsConcurrent.errorTimes.getAndIncrement();
            } finally {
                after();
            }
        }
    
        @Override
        public void before() {
            GCThread.starts();
        }
    
        /**
         * 子类必需实现改方法,不然调用deepclone方法会报错
         *
         * @return
         */
        public abstract FixedQpsThread clone();
    
    
    }
    
    

    执行类

    此处补偿线程设计还待优化,中间有两处休眠:一处是循环检测是否需要补偿,一处是单词补偿间隔。尚未提取配置变量,有待后面实践之后进行优化调整。测试结果对象依然采用了原来的,数值和计算方式保持一致,后期也会根据实践结果进行调整,可以关注我的GitHub及时获取更新。

    package com.fun.frame.execute;
    
    import com.fun.base.bean.PerformanceResultBean;
    import com.fun.base.constaint.FixedQpsThread;
    import com.fun.config.Constant;
    import com.fun.frame.Save;
    import com.fun.frame.SourceCode;
    import com.fun.frame.httpclient.GCThread;
    import com.fun.utils.Time;
    import com.fun.utils.WriteRead;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    import java.util.ArrayList;
    import java.util.Collections;
    import java.util.List;
    import java.util.Vector;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.TimeUnit;
    import java.util.concurrent.atomic.AtomicInteger;
    
    import static java.util.stream.Collectors.toList;
    
    /**
     * 并发类,用于启动压力脚本
     */
    public class FixedQpsConcurrent extends SourceCode {
    
        private static Logger logger = LoggerFactory.getLogger(FixedQpsConcurrent.class);
    
        public static boolean key = false;
    
        public static AtomicInteger executeTimes = new AtomicInteger();
    
        public static AtomicInteger errorTimes = new AtomicInteger();
    
        public static Vector<String> marks = new Vector<>();
    
        /**
         * 用于记录所有请求时间
         */
        public static Vector<Long> allTimes = new Vector<>();
    
        /**
         * 开始时间
         */
        public long startTime;
    
        /**
         * 结束时间
         */
        public long endTime;
    
        public int queueLength;
    
        /**
         * 任务描述
         */
        public String desc = DEFAULT_STRING;
    
        /**
         * 任务集
         */
        public List<FixedQpsThread> threads = new ArrayList<>();
    
        /**
         * 线程池
         */
        ExecutorService executorService;
    
        /**
         * @param thread 线程任务
         */
        public FixedQpsConcurrent(FixedQpsThread thread) {
            this(thread, DEFAULT_STRING);
        }
    
        /**
         * @param threads 线程组
         */
        public FixedQpsConcurrent(List<FixedQpsThread> threads) {
            this(threads, DEFAULT_STRING);
        }
    
        /**
         * @param thread 线程任务
         * @param desc   任务描述
         */
        public FixedQpsConcurrent(FixedQpsThread thread, String desc) {
            this();
            this.queueLength = 1;
            threads.add(thread);
            this.desc = desc + Time.getNow();
        }
    
        /**
         * @param threads 线程组
         * @param desc    任务描述
         */
        public FixedQpsConcurrent(List<FixedQpsThread> threads, String desc) {
            this();
            this.threads = threads;
            this.queueLength = threads.size();
            this.desc = desc + Time.getNow();
        }
    
        private FixedQpsConcurrent() {
            executorService = ThreadPoolUtil.createPool(20, 200, 3);
        }
    
        /**
         * 执行多线程任务
         * 默认取list中thread对象,丢入线程池,完成多线程执行,如果没有threadname,name默认采用desc+线程数作为threadname,去除末尾的日期
         */
        public PerformanceResultBean start() {
            key = false;
            FixedQpsThread fixedQpsThread = threads.get(0);
            boolean isTimesMode = fixedQpsThread.isTimesMode;
            int limit = fixedQpsThread.limit;
            int qps = fixedQpsThread.qps;
            long interval = 1_000_000_000 / qps;
            AidThread aidThread = new AidThread();
            new Thread(aidThread).start();
            startTime = Time.getTimeStamp();
            while (true) {
                executorService.execute(threads.get(limit-- % queueLength).clone());
                if (key ? true : isTimesMode ? limit < 1 : Time.getTimeStamp() - startTime > fixedQpsThread.limit) break;
                sleep(interval);
            }
            endTime = Time.getTimeStamp();
            aidThread.stop();
            GCThread.stop();
            try {
                executorService.shutdown();
                executorService.awaitTermination(10, TimeUnit.SECONDS);
            } catch (InterruptedException e) {
                logger.error("线程池等待任务结束失败!", e);
            }
            logger.info("总计执行 {} ,共用时:{} s,执行总数:{},错误数:{}!", fixedQpsThread.isTimesMode ? fixedQpsThread.limit + "次任务" : "秒", Time.getTimeDiffer(startTime, endTime), executeTimes, errorTimes);
            return over();
        }
    
        private PerformanceResultBean over() {
            key = true;
            Save.saveLongList(allTimes, "data/" + queueLength + desc);
            Save.saveStringListSync(marks, MARK_Path.replace(LONG_Path, EMPTY) + desc);
            allTimes = new Vector<>();
            marks = new Vector<>();
            executeTimes.set(0);
            errorTimes.set(0);
            return countQPS(queueLength, desc, Time.getTimeByTimestamp(startTime), Time.getTimeByTimestamp(endTime));
        }
    
        /**
         * 计算结果
         * <p>此结果仅供参考</p>
         *
         * @param name 线程数
         */
        public PerformanceResultBean countQPS(int name, String desc, String start, String end) {
            List<String> strings = WriteRead.readTxtFileByLine(Constant.DATA_Path + name + desc);
            int size = strings.size();
            List<Integer> data = strings.stream().map(x -> changeStringToInt(x)).collect(toList());
            int sum = data.stream().mapToInt(x -> x).sum();
            Collections.sort(data);
            String statistics = StatisticsUtil.statistics(data, desc, this.queueLength);
            double qps = 1000.0 * size * name / sum;
            return new PerformanceResultBean(desc, start, end, name, size, sum / size, qps, getPercent(executeTimes.get(), errorTimes.get()), 0, executeTimes.get(), statistics);
        }
    
    
        /**
         * 用于做后期的计算
         *
         * @param name
         * @param desc
         * @return
         */
        public PerformanceResultBean countQPS(int name, String desc) {
            return countQPS(name, desc, Time.getDate(), Time.getDate());
        }
    
        /**
         * 后期计算用
         *
         * @param name
         * @return
         */
        public PerformanceResultBean countQPS(int name) {
            return countQPS(name, EMPTY, Time.getDate(), Time.getDate());
        }
    
    
        /**
         * 补偿线程
         */
        class AidThread implements Runnable {
    
            private boolean key = true;
    
            int i;
    
            public AidThread() {
    
            }
    
            @Override
            public void run() {
                logger.info("补偿线程开始!");
                while (key) {
                    long expect = (Time.getTimeStamp() - startTime) / 1000 * threads.get(0).qps;
                    if (expect > executeTimes.get() + 10) {
                        range((int) expect - executeTimes.get()).forEach(x -> {
                            sleep(100);
                            executorService.execute(threads.get(i++ % queueLength).clone());
                        });
                    }
                    sleep(3);
                }
                logger.info("补偿线程结束!");
            }
    
            public void stop() {
                key = false;
            }
    
    
        }
    
    
    }
    

    其他配套的标记类统计类还等待修改,比较简单,这里不放代码了。


    公众号FunTester首发,原创分享爱好者,腾讯云、开源中国和掘金社区首页推荐,知乎准八级原创作者,欢迎关注、交流,禁止第三方擅自转载。

    FunTester热文精选

  • 相关阅读:
    apache安全—用户访问控制
    hdu 3232 Crossing Rivers 过河(数学期望)
    HDU 5418 Victor and World (可重复走的TSP问题,状压dp)
    UVA 11020 Efficient Solutions (BST,Splay树)
    UVA 11922 Permutation Transformer (Splay树)
    HYSBZ 1208 宠物收养所 (Splay树)
    HYSBZ 1503 郁闷的出纳员 (Splay树)
    HDU 5416 CRB and Tree (技巧)
    HDU 5414 CRB and String (字符串,模拟)
    HDU 5410 CRB and His Birthday (01背包,完全背包,混合)
  • 原文地址:https://www.cnblogs.com/FunTester/p/13813309.html
Copyright © 2011-2022 走看看