zoukankan      html  css  js  c++  java
  • Java&Go高性能队列之LinkedBlockingQueue性能测试

    在写完高性能队列Disruptor在测试中应用千万级日志回放引擎设计稿之后,我就一直在准备Java & Go 语言几种高性能消息队列的性能测试,其中选取了几种基准测试场景以及在性能测试中的应用场景。

    测试场景设计的思路参考的两个方面:

    • 消息体大小,我用的不同大小GET请求区分
    • 生产者和消费者线程数,Go语言中称协程goroutine

    PS:后续的文章中,Go语言文章中如果出现线程,均指goroutine。

    结论

    总体来说,java.util.concurrent.LinkedBlockingQueue性能还是在50万QPS级别上,满足现在压测需求,唯一需要避免的就是队列较长时性能不稳定。总结起来三点比较通用的参考:

    • 消息体尽可能小
    • 线程数增益有限
    • 尽量避免消息积压

    简介

    首先介绍一下第一个被测试的对象java.util.concurrent.LinkedBlockingQueue,分解名字可以得到这是个由链表实现的阻塞单向的对象。官方给的定义是:

    基于链接节点的可选有界阻塞队列。此队列对元素进行 FIFO(先进先出)排序。队列的头部是在队列中时间最长的元素。队列的尾部是在队列中时间最短的元素。新元素被插入到队列的尾部,队列检索操作获取队列头部的元素。链接队列通常比基于数组的队列具有更高的吞吐量,但在大多数并发应用程序中性能更不可预测。

    在我查到的几种JDK自带的队列实现类中,java.util.concurrent.LinkedBlockingQueue性能是最高的,还有一个候选的类java.util.concurrent.ArrayBlockingQueue,资料说java.util.concurrent.LinkedBlockingQueue性能大概是java.util.concurrent.ArrayBlockingQueue性能的2 ~ 3倍,差距过于明显,这个有机会再来测试。

    测试结果

    这里性能只记录每毫秒处理消息(对象)个数作为评价性能的唯一标准。

    数据说明

    这里我用了三种org.apache.http.client.methods.HttpGet,创建方法均使用原生API,为了区分大小的区别,我会响应增加一些header和URL长度。

    小对象:

    def get = new HttpGet()
    

    中对象:

    def get = new HttpGet(url)
    get.addHeader("token", token)
    get.addHeader(HttpClientConstant.USER_AGENT)
    get.addHeader(HttpClientConstant.CONNECTION)
    

    大对象:

    def get = new HttpGet(url + token)
    get.addHeader("token", token)
    get.addHeader("token1", token)
    get.addHeader("token5", token)
    get.addHeader("token4", token)
    get.addHeader("token3", token)
    get.addHeader("token2", token)
    get.addHeader(HttpClientConstant.USER_AGENT)
    get.addHeader(HttpClientConstant.CONNECTION)
    

    生产者

    对象大小 队列长度 (百万) 线程数 速率(/ms)
    1 1 838
    1 5 837
    1 10 823
    5 1 483
    10 1 450
    1 1 301
    1 5 322
    1 10 320
    1 20 271
    5 1 失败
    10 1 失败
    0.5 1 351
    0.5 5 375
    1 1 214
    1 5 240
    1 10 241
    0.5 1 209
    0.5 5 250
    0.5 10 246
    0.2 1 217
    0.2 5 309
    0.2 10 321
    0.2 20 243

    中间两次测试失败,是因为等待时间太长了,进行到300万左右开始停滞,所以放弃了。

    针对org.apache.http.client.methods.HttpRequestBase消息体结论如下:

    1. 长度保持在十万量级
    2. 生产者线程数5-10线程
    3. 消息体尽可能小

    消费者

    对象大小 队列长度 (百万) 线程数 速率(/ms)
    1 1 1893
    1 5 1706
    1 10 1594
    1 20 1672
    2 1 2544
    2 5 2024
    5 1 3419
    1 1 1897
    1 5 1485
    1 10 1345
    1 20 1430
    2 1 2971
    2 5 1576
    1 1 1980
    1 5 1623
    1 10 1689
    0.5 1 1136
    0.5 5 1096
    0.5 10 1072

    针对org.apache.http.client.methods.HttpRequestBase消息体结论如下:

    1. 数据上看长度越长越好
    2. 消费者线程越少越好
    3. 消息体尽可能小

    这里跟生产者标准有点不一样,基本上就是锁的竞争越少越好,测试消息数越多越好(这个工作中暂时用不到)。

    生产者 & 消费者

    这里的线程数指的是生产者或者消费者的数量,总体线程数是此数值的2倍。

    对象大小 次数 (百万) 线程数 队列长度 (百万) 速率(/ms)
    1 1 0.1 1326
    1 1 0.2 1050
    1 1 0.5 1054
    1 5 0.1 1091
    1 10 0.1 1128
    2 1 0.1 1798
    2 1 0.2 1122
    2 5 0.2 946
    5 5 0.1 1079
    5 10 0.1 1179
    1 1 0.1 632
    1 1 0.2 664
    1 5 0.2 718
    1 10 0.2 683
    2 1 0.2 675
    2 5 0.2 735
    2 10 0.2 788
    2 15 0.2 828
    1 1 0.1 505
    1 1 0.2 558
    1 5 0.2 609
    1 10 0.2 496
    2 1 0.2 523
    2 5 0.2 759
    2 10 0.2 668

    针对org.apache.http.client.methods.HttpRequestBase消息体结论如下:

    1. 消息队列积累消息越少,速率越快
    2. 消费速率随时间推移越来越快,不明显
    3. 消息体尽可能小

    测试用例

    测试用例使用Groovy语言编写,自从我自定义了异步关键字fun和复习了闭包的语法之后,感觉就像开了光一样,有点迷上了各类多线程的语法实现。所以这个用例对于Java同学来讲可能有点看着熟悉,仔细阅读起来有点费劲,我会尽量写一些注释。大家可以把终点放在测试结果上,这可以对以后大家使用java.util.concurrent.LinkedBlockingQueue类有个基本的参考。

    测试用例会根据上述的测试场景进行微调,例如线程数、消息体对象的大小等等,这个我会着重进行三种用例场景的测试。当然在工作中使用场景肯定比我提到的三种复杂多,各位有兴趣可以自己亲自上手测试,这里我就不班门弄斧了。

    生产者场景

    package com.funtest.groovytest
    
    import com.funtester.config.HttpClientConstant
    import com.funtester.frame.SourceCode
    import com.funtester.utils.CountUtil
    import com.funtester.utils.Time
    import org.apache.http.client.methods.HttpGet
    import org.apache.http.client.methods.HttpRequestBase
    
    import java.util.concurrent.CountDownLatch
    import java.util.concurrent.LinkedBlockingQueue
    import java.util.concurrent.atomic.AtomicInteger
    
    class QueueT extends SourceCode {
    
        static AtomicInteger index = new AtomicInteger(0)
    
        static int total = 100_0000
    
        static int size = 10
    
        static int threadNum = 1
    
        static int piece = total / size
    
        static def url = "http://localhost:12345/funtester"
    
        static def token = "FunTesterFunTesterFunTesterFunTesterFunTesterFunTesterFunTester"
    
        public static void main(String[] args) {
    
            LinkedBlockingQueue<HttpRequestBase> linkedQ = new LinkedBlockingQueue<>()
    
            def start = Time.getTimeStamp()
            def latch = new CountDownLatch(threadNum)
            def ts = []
            def barrier = new CyclicBarrier(threadNum + 1)
            def funtester = {//创建异步闭包的方法
                fun {
                    barrier.await()
                    while (true) {
                        if (index.getAndIncrement() % piece == 0) {
                            def l = Time.getTimeStamp() - start
                            ts << l
                            output("${formatLong(index.get())}添加总消耗${formatLong(l)}")
                            start = Time.getTimeStamp()
                        }
                        if (index.get() > total) break
    
                        def get = new HttpGet(url)
                        get.addHeader("token",token)
                        get.addHeader(HttpClientConstant.USER_AGENT)
                        get.addHeader(HttpClientConstant.CONNECTION)
                        linkedQ.put(get)
                    }
                    latch.countDown()
                }
            }
            threadNum.times {funtester()}
            def st = Time.getTimeStamp()
            barrier.await()
            latch.await()
            def et = Time.getTimeStamp()
            outRGB("每毫秒速率${total / (et - st)}")
            outRGB(CountUtil.index(ts).toString())
        }
    
    
    }
    
    

    消费者场景

    package com.funtest.groovytest
    
    import com.funtester.config.HttpClientConstant
    import com.funtester.frame.SourceCode
    import com.funtester.utils.CountUtil
    import com.funtester.utils.Time
    import org.apache.http.client.methods.HttpGet
    import org.apache.http.client.methods.HttpRequestBase
    
    import java.util.concurrent.CountDownLatch
    import java.util.concurrent.CyclicBarrier
    import java.util.concurrent.LinkedBlockingQueue
    import java.util.concurrent.TimeUnit
    import java.util.concurrent.atomic.AtomicInteger
    
    class QueueTconsume extends SourceCode {
    
        static AtomicInteger index = new AtomicInteger(1)
    
        static int total = 100_0000
    
        static int size = 10
    
        static int threadNum = 5
    
        static int piece = total / size
    
        static def url = "http://localhost:12345/funtester"
    
        static def token = "FunTesterFunTesterFunTesterFunTesterFunTesterFunTesterFunTester"
    
        public static void main(String[] args) {
    
            LinkedBlockingQueue<HttpRequestBase> linkedQ = new LinkedBlockingQueue<>()
            def pwait = new CountDownLatch(10)
            def produces = {
                fun {
                    while (true) {
                        if (linkedQ.size() > total) break
                        def get = new HttpGet(url)
                        get.addHeader("token", token)
                        get.addHeader(HttpClientConstant.USER_AGENT)
                        get.addHeader(HttpClientConstant.CONNECTION)
                        linkedQ.add(get)
                    }
                    pwait.countDown()
                }
            }
            10.times {produces()}
            pwait.await()
            outRGB("数据构造完成!${linkedQ.size()}")
    
    
            def start = Time.getTimeStamp()
            def barrier = new CyclicBarrier(threadNum + 1 )
            def latch = new CountDownLatch(threadNum)
            def ts = []
            def funtester = {
                fun {
                    barrier.await()
                    while (true) {
                        if (index.getAndIncrement() % piece == 0) {
                            def l = Time.getTimeStamp() - start
                            ts << l
                            output("${formatLong(index.get())}消费总消耗${formatLong(l)}")
                            start = Time.getTimeStamp()
                        }
                        def poll = linkedQ.poll(100, TimeUnit.MILLISECONDS)
                        if (poll == null) break
                    }
                    latch.countDown()
                }
            }
            threadNum.times {funtester()}
            def st = Time.getTimeStamp()
            barrier.await()
            latch.await()
            def et = Time.getTimeStamp()
            outRGB("每毫秒速率${total / (et - st)}")
            outRGB(CountUtil.index(ts).toString())
        }
    
    
    }
    
    
    

    生产者 & 消费者 场景

    这里我引入了另外一个变量:初始队列长度length,用例运行之前将队列按照这个长度进行单线程填充。

    package com.funtest.groovytest
    
    import com.funtester.frame.SourceCode
    import com.funtester.utils.Time
    import org.apache.http.client.methods.HttpGet
    import org.apache.http.client.methods.HttpRequestBase
    
    import java.util.concurrent.CountDownLatch
    import java.util.concurrent.CyclicBarrier
    import java.util.concurrent.LinkedBlockingQueue
    import java.util.concurrent.TimeUnit
    import java.util.concurrent.atomic.AtomicInteger
    
    class QueueBoth extends SourceCode {
    
        static AtomicInteger index = new AtomicInteger(1)
    
        static int total = 500_0000
    
        static int length = 50_0000
    
        static int threadNum = 5
    
        static def url = "http://localhost:12345/funtester"
    
        static def token = "FunTesterFunTesterFunTesterFunTesterFunTesterFunTesterFunTester"
    
        public static void main(String[] args) {
            LinkedBlockingQueue<HttpRequestBase> linkedQ = new LinkedBlockingQueue<>()
    
            def latch = new CountDownLatch(threadNum * 2)
            def barrier = new CyclicBarrier(threadNum * 2 + 1)
            def ts = []
            def funtester = {f ->
                {
                    fun {
                        barrier.await()
                        while (true) {
                            if (index.getAndIncrement() > total) break
                            f()
                        }
                        latch.countDown()
                    }
                }
            }
            def produces =  {
                def get = new HttpGet(url)
                get.addHeader("token", token)
                get.addHeader(HttpClientConstant.USER_AGENT)
                get.addHeader(HttpClientConstant.CONNECTION)
                linkedQ.put(get)
            }
            length.times {produces()}
    
            threadNum.times {
                funtester produces
                funtester {linkedQ.poll(100, TimeUnit.MILLISECONDS)}
            }
            def st = Time.getTimeStamp()
            barrier.await()
            latch.await()
            def et = Time.getTimeStamp()
            outRGB("每毫秒速率${total / (et - st) / 2}")
        }
    
    
    }
    
    

    补充

    性能非常不稳定

    其中有两个问题需要补充说明,java.util.concurrent.LinkedBlockingQueue性能在测试过程中非常不稳定,我每次打印日志以1/10为节点打印时间戳,下面分享一些在队列长度100万时,生产者模式中的日志:

    INFO-> 23.731 F-2  107,942添加总消耗523
    INFO-> 23.897 F-10 200,061添加总消耗165
    INFO-> 24.137 F-9  300,024添加总消耗239
    INFO-> 24.320 F-2  400,037添加总消耗182
    INFO-> 25.200 F-5  500,065添加总消耗879
    INFO-> 25.411 F-2  600,094添加总消耗211
    INFO-> 25.604 F-8  700,090添加总消耗193
    INFO-> 26.868 F-1  800,047添加总消耗1,264
    INFO-> 26.927 F-4  900,053添加总消耗57
    INFO-> 28.454 F-3  1,000,009添加总消耗1,527
    INFO-> 28.457 main 每毫秒速率190.0779319521
    INFO-> 28.476 main 平均值:524.0 ,最大值1527.0 ,最小值:57.0 ,中位数:239.0 p99:1527.0 p95:1527.0
    
    
    INFO-> 43.930 F-10 112,384添加总消耗385
    INFO-> 44.072 F-9  200,159添加总消耗140
    INFO-> 44.296 F-1  300,058添加总消耗223
    INFO-> 44.445 F-7  400,075添加总消耗149
    INFO-> 45.311 F-10 500,086添加总消耗866
    INFO-> 45.498 F-8  600,080添加总消耗187
    INFO-> 45.700 F-1  700,088添加总消耗202
    INFO-> 45.760 F-9  800,057添加总消耗59
    INFO-> 47.245 F-6  900,095添加总消耗1,485
    INFO-> 47.303 F-6  1,000,009添加总消耗58
    INFO-> 47.305 main 每毫秒速率262.7430373095
    INFO-> 47.320 main 平均值:375.4 ,最大值1485.0 ,最小值:58.0 ,中位数:202.0 p99:1485.0 p95:1485.0
    
    
    INFO-> 00.916 F-1  100,000添加总消耗568
    INFO-> 01.269 F-1  200,000添加总消耗353
    INFO-> 01.461 F-1  300,000添加总消耗192
    INFO-> 01.635 F-1  400,000添加总消耗174
    INFO-> 02.536 F-1  500,000添加总消耗899
    INFO-> 02.777 F-1  600,000添加总消耗240
    INFO-> 03.015 F-1  700,000添加总消耗237
    INFO-> 03.107 F-1  800,000添加总消耗91
    INFO-> 04.519 F-1  900,000添加总消耗1,412
    INFO-> 05.940 F-1  1,000,000添加总消耗96
    INFO-> 05.943 main 每毫秒速率184.5358922310
    INFO-> 05.959 main 平均值:426.2 ,最大值1412.0 ,最小值:91.0 ,中位数:240.0 p99:1412.0 p95:1412.0
    

    可以看出最大值最小值能相差十几倍,甚至二十几倍,这种情况随着消息队列总长度增长而增长,大多数发生在80万 ~ 100万阶段,如果将长度降低到50万,这种情况就会得到明显改善。所以还有一个附加观点:消息队列长度应当尽可能少一些。

    基准测试

    下面是我使用FunTester性能测试框架对三种消息对象的生产代码进行的测试结果。

    测试对象 线程数 个数(百万) 速率(/ms)
    1 1 5681
    5 1 8010
    5 5 15105
    1 1 1287
    5 1 2329
    5 5 4176
    1 1 807
    5 1 2084
    5 5 3185

    测试用例如下:

    package com.funtest.groovytest
    
    import com.funtester.base.constaint.FixedThread
    import com.funtester.config.HttpClientConstant
    import com.funtester.frame.execute.Concurrent
    import com.funtester.httpclient.FunLibrary
    import org.apache.http.client.methods.HttpGet
    
    class TTT extends FunLibrary {
    
        static int total = 100_0000
    
        static int thread = 1
    
        static int times = total / thread
    
        static def url = "http://localhost:12345/funtester"
    
        static def token = "FunTesterFunTesterFunTesterFunTesterFunTesterFunTesterFunTester"
    
        public static void main(String[] args) {
            RUNUP_TIME = 0
            def tasks = []
            thread.times {tasks << new FunTester(times)}
            new Concurrent(tasks,"测试生产者代码性能").start()
    
        }
    
        private static class FunTester extends FixedThread {
    
            FunTester(int limit) {
                super(null, limit, true)
            }
    
            @Override
            protected void doing() throws Exception {
    //            def get = new HttpGet()
    
    //            def get = new HttpGet(url)
    //            get.addHeader("token", token)
    //            get.addHeader(HttpClientConstant.USER_AGENT)
    //            get.addHeader(HttpClientConstant.CONNECTION)
    
                def get = new HttpGet(url + token)
                get.addHeader("token", token)
                get.addHeader("token1", token)
                get.addHeader("token5", token)
                get.addHeader("token4", token)
                get.addHeader("token3", token)
                get.addHeader("token2", token)
                get.addHeader(HttpClientConstant.USER_AGENT)
                get.addHeader(HttpClientConstant.CONNECTION)
    
            }
    
            @Override
            FixedThread clone() {
                return new FunTester(limit)
            }
        }
    
    }
    
    

    Have Fun ~ Tester !

  • 相关阅读:
    Asp.net2.0 中自定义过滤器对Response内容进行处理 dodo
    自动化测试工具 dodo
    TestDriven.NET 2.0——单元测试的好助手(转) dodo
    JS弹出窗口的运用与技巧 dodo
    ElasticSearch 简介 规格严格
    修改PostgreSQL字段长度导致cached plan must not change result type错误 规格严格
    Linux系统更改时区(转) 规格严格
    mvn编译“Cannot find matching toolchain definitions for the following toolchain types“报错解决方法 规格严格
    ElasticSearch 集群 & 数据备份 & 优化 规格严格
    Elasticsearch黑鸟教程22:索引模板的详细介绍 规格严格
  • 原文地址:https://www.cnblogs.com/FunTester/p/15783212.html
Copyright © 2011-2022 走看看