zoukankan      html  css  js  c++  java
  • 动态模型之增压暂停【FunTester测试框架】

    距离上次对FunTester测试框架功能规划之后,已经很久没有更新过功能规划了,主要因素是FunTester测试框架目前支持的功能已经完全满足工作需求。无论是分布式性能测试框架,还是全链路性能测试支持,以及量化模拟线上流量,基本技术验证都完成了,余下的都是在技术方案的上进行调整以更适应现在工作需求,不存在技术障碍。

    虽然FunTester测试框架还在不断更新,但很久没进行过功能更新了。最近在设想为了可能用到的测试场景中,动态压力是目前最有可能在工作中应用的。

    终极目标

    最终想要实现的就是在不断压测过程中,灵活增加、减少或者保持测试压力,这样既可以一次执行用例的过程中,实现多个测试场景的覆盖,也可以避免从0开始加压导致测试时间的增加。

    以往的性能测试用例,点击开始之后,除了主动结束,中间很少进行压力调节,动态压力就是为了在压测过程中干预压力,实现测试过程中对发压端进行调节。这个需求可以通过容器化和分布式等技术加持下实现。我的想法就是直接对单个JVM线程池运行的任务进行调节,适用于非集群压测和对压力需求比较灵活的场景,当然如果应用到日常性能巡检,也是可以提升效能的。

    主要实现功能:

    • 性能测试执行中,动态增减性能测试压力值。
    • 性能测试执行中,动态注入新的流量模型任务。

    这样的好处:

    • 动态增减执行任务,达到一次试执行测试多种压力目的。
    • 动态增减不同模型的任务,达到动态修改压测流量的目的。

    万里长征第一步:增加暂停功能实现。

    需求来源

    在工作中,经常会遇到一类场景:预计系统提供的QPS在1万,线程数100,我们设计了0线程为起始,间隔30s增加10的压力,最大值120。(当然也可以使用QPS递增模式,下面的演示Demo会使用线程递增)。

    可实际情况是80线程已经到达性能瓶颈,然后需求是在80线程的压力下保持一阵子,方便收集数据和获取现场。

    思路

    基本实现

    因为使用场景是在压力递增的过程中,终止递增并保持压力,所以使用了性能测试软启动初探中的软启动方案。使用for循环来递增压力。然后在某个时刻触发终止递增。

    触发条件

    预期是在性能测试服务化中实践和本地脚本测试中使用,目前只实现了本地脚本测试中功能,思路是通过一个线程安全变量来标记执行状态,是否是持续递增压力还是需要终止压力,这里选择了另起一个线程来完成这件任务,通过从控制台输入内容来控制这个开关。我目前只是做的Demo是一个没有回头路的方案,只有终止压力没有暂停后继续增加或者结束。

    多线程类任务如下:

        private static class FunTester implements Runnable {
    
            @Override
            public void run() {
                waitForKey(INTPUT_KEY);
                HOLD.set(1);
                output("压力暂停");
            }
    
        }
    
    

    这里从控制台等待输入,如果等于com.funtester.config.Constant#INTPUT_KEY那么就设置com.funtester.frame.execute.HoldConcurrent#HOLD设置为1,从而标记测试任务已经进入了终止增压阶段。

    多线程同步

    这里参考Java线程同步三剑客内容,我使用了java.util.concurrent.Phaser,原因是java.util.concurrent.CountDownLatch不灵活,无法满足需求,而java.util.concurrent.CyclicBarrier虽然灵活也能满足多线程同步需求,但无法灵活增减需要同步的线程数,故而也放弃了。

    导致问题

    这种动态压力模型是在加压阶段进行的,也就是线程并不是固定,进而带来了一些问题,目前尚未解决。

    • 本地数据统计失真。
    • 暂不支持多任务,暂无法服务化。

    实现Demo

    多线程任务类

    首先是com.funtester.base.constaint.HoldThread类,实现了com.funtester.base.constaint.ThreadBase的部分方法,具体与其他实现区别主要是在com.funtester.base.constaint.HoldThread#beforecom.funtester.base.constaint.HoldThread#after方法中对于标记多线程同步对象com.funtester.frame.execute.HoldConcurrent#phaser的操作。

    代码如下:

    package com.funtester.base.constaint;
    
    import com.funtester.frame.execute.HoldConcurrent;
    import com.funtester.httpclient.GCThread;
    import com.funtester.utils.Time;
    import org.apache.logging.log4j.LogManager;
    import org.apache.logging.log4j.Logger;
    
    /**
     * 动态压力模型,增压暂停
     */
    public abstract class HoldThread<F> extends ThreadBase<F> {
    
        private static final long serialVersionUID = -4617192188292407063L;
    
        private static final Logger logger = LogManager.getLogger(HoldThread.class);
    
        public HoldThread(F f, int limit, boolean isTimesMode) {
            this.isTimesMode = isTimesMode;
            this.limit = limit;
            this.f = f;
        }
    
        protected HoldThread() {
            super();
        }
    
        /**
         *
         */
        @Override
        public void run() {
            try {
                before();
                long ss = Time.getTimeStamp();
                while (true) {
                    try {
                        executeNum++;
                        long s = Time.getTimeStamp();
                        doing();
                        long et = Time.getTimeStamp();
                        short diff = (short) (et - s);
                        costs.add(diff);
                    } catch (Exception e) {
                        logger.warn("执行任务失败!", e);
                        errorNum++;
                    } finally {
                        if ((isTimesMode ? executeNum >= limit : (Time.getTimeStamp() - ss) >= limit) || ThreadBase.needAbort() || status())
                            break;
                    }
                }
                long ee = Time.getTimeStamp();
                if ((ee - ss) / 1000 > RUNUP_TIME + 3)
                    logger.info("线程:{},执行次数:{},错误次数: {},总耗时:{} s", threadName, executeNum, errorNum, (ee - ss) / 1000.0);
                HoldConcurrent.allTimes.addAll(costs);
                HoldConcurrent.requestMark.addAll(marks);
            } catch (Exception e) {
                logger.warn("执行任务失败!", e);
            } finally {
                after();
            }
        }
    
        @Override
        public void before() {
            super.before();
            HoldConcurrent.phaser.register();
        }
    
        @Override
        protected void after() {
            super.after();
            GCThread.stop();
            HoldConcurrent.phaser.arriveAndDeregister();
    
        }
    
    
    }
    
    

    执行类

    执行类com.funtester.frame.execute.HoldConcurrent与其他模型区别在于:1.没有固定稳定压力,增压结束即稳定压力;2.多线程任务接受控制台输出内容控制任务阶段;3.当用户一直不输入com.funtester.config.Constant#INTPUT_KEY,存在无法自然停止隐患。

    代码如下:

    package com.funtester.frame.execute;
    
    import com.funtester.base.bean.PerformanceResultBean;
    import com.funtester.base.constaint.ThreadBase;
    import com.funtester.config.Constant;
    import com.funtester.frame.Save;
    import com.funtester.frame.SourceCode;
    import com.funtester.utils.RWUtil;
    import com.funtester.utils.Time;
    import org.apache.commons.lang3.StringUtils;
    import org.apache.logging.log4j.LogManager;
    import org.apache.logging.log4j.Logger;
    
    import java.util.ArrayList;
    import java.util.List;
    import java.util.Vector;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Phaser;
    import java.util.concurrent.atomic.AtomicInteger;
    
    import static java.util.stream.Collectors.toList;
    
    /**
     * 并发类,用于启动压力脚本
     */
    public class HoldConcurrent extends SourceCode {
    
        private static Logger logger = LogManager.getLogger(HoldConcurrent.class);
    
        /**
         * 用来标记状态
         */
        public static AtomicInteger HOLD = new AtomicInteger(0);
    
        /**
         * 开始时间
         */
        private long startTime;
    
        /**
         * 结束时间
         */
        private long endTime;
    
        /**
         * 任务描述
         */
        public String desc;
    
        /**
         * 任务集
         */
        public List<ThreadBase> threads = new ArrayList<>();
    
        /**
         * 线程数
         */
        public int threadNum;
    
        /**
         * 执行失败总数
         */
        private int errorTotal;
    
        /**
         * 任务执行失败总数
         */
        private int failTotal;
    
        /**
         * 执行总数
         */
        private int executeTotal;
    
        /**
         * 用于记录所有请求时间
         */
        public static Vector<Short> allTimes = new Vector<>();
    
        /**
         * 记录所有markrequest的信息
         */
        public static Vector<String> requestMark = new Vector<>();
    
        /**
         * 线程池
         */
        ExecutorService executorService;
    
    
        /**
         * 多线程多阶段同步类,用于多线程任务阶段管理
         */
        public static Phaser phaser;
    
        /**
         * @param thread    线程任务
         * @param threadNum 线程数
         * @param desc      任务描述
         */
        public HoldConcurrent(ThreadBase thread, int threadNum, String desc) {
            this(threadNum, desc);
            range(threadNum).forEach(x -> threads.add(thread.clone()));
        }
    
        /**
         * @param threads 线程组
         * @param desc    任务描述
         */
        public HoldConcurrent(List<ThreadBase> threads, String desc) {
            this(threads.size(), desc);
            this.threads = threads;
        }
    
        private HoldConcurrent(int threadNum, String desc) {
            this.threadNum = threadNum;
            this.desc = StatisticsUtil.getFileName(desc);
            phaser = new Phaser(1);
            executorService = ThreadPoolUtil.createFixedPool(threadNum);
        }
    
        private HoldConcurrent() {
    
        }
    
        /**
         * 执行多线程任务
         * 默认取list中thread对象,丢入线程池,完成多线程执行,如果没有threadname,name默认采用desc+线程数作为threadname,去除末尾的日期
         */
        public PerformanceResultBean start() {
            Thread funtester = new Thread(new FunTester());
            funtester.start();
            ThreadBase.progress = new Progress(threads, StatisticsUtil.getTrueName(desc));
            ThreadBase.progress.threadNum = 0;
            new Thread(ThreadBase.progress).start();
            startTime = Time.getTimeStamp();
            for (int i = 0; i < threadNum; i++) {
                if (HOLD.get() == 1) {
                    threadNum = i;
                    break;
                }
                ThreadBase thread = threads.get(i);
                if (StringUtils.isBlank(thread.threadName)) thread.threadName = StatisticsUtil.getTrueName(desc) + i;
                sleep(RUNUP_TIME / threadNum);
                executorService.execute(thread);
                ThreadBase.progress.threadNum = i + 1;
                logger.info("已经启动了 {} 个线程!", i + 1);
            }
            phaser.arriveAndAwaitAdvance();
            executorService.shutdown();
            ThreadBase.progress.stop();
            threads.forEach(x -> {
                if (x.status()) failTotal++;
                errorTotal += x.errorNum;
                executeTotal += x.executeNum;
            });
            endTime = Time.getTimeStamp();
            HOLD.set(0);
            logger.info("总计{}个线程,共用时:{} s,执行总数:{},错误数:{},失败数:{}", threadNum, Time.getTimeDiffer(startTime, endTime), formatLong(executeTotal), errorTotal, failTotal);
            return over();
        }
    
        private static class FunTester implements Runnable {
    
            @Override
            public void run() {
                waitForKey(INTPUT_KEY);
                HOLD.set(1);
                output("压力暂停");
            }
    
        }
    
        private PerformanceResultBean over() {
            Save.saveIntegerList(allTimes, DATA_Path.replace(LONG_Path, EMPTY) + StatisticsUtil.getFileName(threadNum, desc));
            Save.saveStringListSync(HoldConcurrent.requestMark, MARK_Path.replace(LONG_Path, EMPTY) + desc);
            allTimes = new Vector<>();
            requestMark = new Vector<>();
            return countQPS(threadNum, desc, Time.getTimeByTimestamp(startTime), Time.getTimeByTimestamp(endTime));
        }
    
    
        /**
         * 计算结果
         * <p>此结果仅供参考</p>
         * 此处因为start和end的不准确问题,所以采用改计算方法,与fixQPS有区别
         *
         * @param name 线程数
         */
        public PerformanceResultBean countQPS(int name, String desc, String start, String end) {
            List<String> strings = RWUtil.readTxtFileByLine(Constant.DATA_Path + StatisticsUtil.getFileName(name, desc));
            int size = strings.size() == 0 ? 1 : strings.size();
            List<Integer> data = strings.stream().map(x -> changeStringToInt(x)).collect(toList());
            int sum = data.stream().mapToInt(x -> x).sum();
            String statistics = StatisticsUtil.statistics(data, desc, threadNum);
            int rt = sum / size;
            double qps = 1000.0 * name / (rt == 0 ? 1 : rt);
            double qps2 = (executeTotal + errorTotal) * 1000.0 / (endTime - startTime);
            return new PerformanceResultBean(desc, start, end, name, size, rt, qps, qps2, getPercent(executeTotal, errorTotal), getPercent(threadNum, failTotal), executeTotal, statistics);
        }
    
    
        /**
         * 用于做后期的计算
         *
         * @param name
         * @param desc
         * @return
         */
        public PerformanceResultBean countQPS(int name, String desc) {
            return countQPS(name, desc, Time.getDate(), Time.getDate());
        }
    
    
    }
    

    测试

    测试脚本

    这里用了简单的多线程sleep进行测试,不再使用本地HTTP接口了。脚本内容如下:

    package com.funtest.groovytest
    
    import com.funtester.base.constaint.HoldThread
    import com.funtester.base.constaint.ThreadBase
    import com.funtester.frame.SourceCode
    import com.funtester.frame.execute.HoldConcurrent
    /**
     * 加压暂停
     */
    class HoldTest extends SourceCode {
        public static void main(String[] args) {
            def tester = new FunTester()
    
            new HoldConcurrent(tester,10,"终止压力递增保持压力测试").start()
    
        }
    
        private static class FunTester extends HoldThread {
    
            FunTester() {
                super(null, 100, true)
            }
    
            @Override
            protected void doing() throws Exception {
                sleep(0.1)
            }
    
            @Override
            ThreadBase clone() {
                return new FunTester()
            }
    
        }
    
    }
    
    

    控制台输出

    下面是完整的控制台输出:

    INFO-> 当前用户:oker,工作目录:/Users/oker/IdeaProjects/funtester/src/test/groovy/com/funtest/groovytest/,系统编码格式:UTF-8,系统Mac OS X版本:10.16
    WARN-> 请输入“FunTester”继续运行!
    INFO-> 已经启动了 1 个线程!
    INFO-> 终止压力递增保持压力测试进度:▍  2% ,当前QPS: 0
    INFO-> 已经启动了 2 个线程!
    INFO-> 终止压力递增保持压力测试进度:▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍  31% ,当前QPS: 20
    INFO-> 已经启动了 3 个线程!
    INFO-> 终止压力递增保持压力测试进度:▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍  60% ,当前QPS: 30
    FunTester
    INFO-> 本次共等待:9.535秒!
    INFO-> 压力暂停
    INFO-> 已经启动了 4 个线程!
    INFO-> 终止压力递增保持压力测试进度:▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍  89% ,当前QPS: 39
    INFO-> 终止压力递增保持压力测试进度:▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍▍  100%
    INFO-> 总计4个线程,共用时:13.439 s,执行总数:227,错误数:0,失败数:0
    INFO-> 数据保存成功!文件名:/Users/oker/IdeaProjects/funtester/src/test/groovy/com/funtest/groovytest/long/data/终止压力递增保持压力测试231809_4
    INFO-> 
    ~☢~~☢~~☢~~☢~~☢~~☢~~☢~~☢~~☢~~☢~ JSON ~☢~~☢~~☢~~☢~~☢~~☢~~☢~~☢~~☢~~☢~
    >  {
    >  ① . "rt":103,
    >  ① . "failRate":0.0,
    >  ① . "threads":4,
    >  ① . "deviation":"56.51%",
    >  ① . "errorRate":0.0,
    >  ① . "executeTotal":227,
    >  ① . "qps2":16.891137733462312,
    >  ① . "total":227,
    >  ① . "qps":38.83495145631068,
    >  ① . "startTime":"2021-09-23 18:09:02",
    >  ① . "endTime":"2021-09-23 18:09:16",
    >  ① . "mark":"终止压力递增保持压力测试231809",
    >  ① . "table":"eJwBLwDQ/+aVsOaNrumHj+WkquWwkSzml6Dms5Xnu5jlm74hIOW6lOW9k+Wkp+S6jiAxMDI0/eodgA=="
    >  }
    ~☢~~☢~~☢~~☢~~☢~~☢~~☢~~☢~~☢~~☢~ JSON ~☢~~☢~~☢~~☢~~☢~~☢~~☢~~☢~~☢~~☢~
    INFO-> 数据量太少,无法绘图! 应当大于 1024
    
    Process finished with exit code 0
    
    

    可以看出QPSQPS2相差一倍以上,这个比较容易理解,等差数列既视感。下一步会努力实现本地灵活增减压力值和全局任务管理功能,敬请期待!!!

    Have Fun ~ Tester !

  • 相关阅读:
    (转)Lucene.net搜索结果排序(单条件和多条件)
    .Net去html标签
    (转)Lucene.Net多字段查询,多索引查询
    HttpUtility
    内存卡问题汇总
    我的NHibernate
    (转)lucene.net和(pangu)盘古分词 搜索引擎的简单实现
    Power Designer使用技巧
    Oracle添加修改删除表字段
    在数据库开发过程中,数据库、表、字段、视图、存储过程等的命名规则
  • 原文地址:https://www.cnblogs.com/FunTester/p/15403222.html
Copyright © 2011-2022 走看看