zoukankan      html  css  js  c++  java
  • golang一个例子引出的几个问题

    这个例子是从go源码src/pkg/net/rpc/server_test.go截取出来的

    func benchmarkEndToEndAsync(dial func() (*Client, error), b *testing.B) {
         const MaxConcurrentCalls = 100
         b.StopTimer()
         once.Do(startServer)
         client, err := dial()
         if err != nil {
              b.Fatal("error dialing:", err)
         }
    
         // Asynchronous calls
         args := &Args{7, 8}
         procs := 4 * runtime.GOMAXPROCS(-1)
         send := int32(b.N)
         recv := int32(b.N)
         var wg sync.WaitGroup
         wg.Add(procs)
         gate := make(chan bool, MaxConcurrentCalls)
         res := make(chan *Call, MaxConcurrentCalls)
         b.StartTimer()
    
         for p := 0; p < procs; p++ {
              go func() {
                   for atomic.AddInt32(&send, -1) >= 0 {
                        gate <- true
                        reply := new(Reply)
                        client.Go("Arith.Add", args, reply, res)
                   }
              }()
              go func() {
                   for call := range res {
                        A := call.Args.(*Args).A
                        B := call.Args.(*Args).B
                        C := call.Reply.(*Reply).C
                        if A+B != C {
                             b.Fatalf("incorrect reply: Add: expected %d got %d", A+B, C)
                        }
                        <-gate
                        if atomic.AddInt32(&recv, -1) == 0 {
                             close(res)
                        }
                   }
                   wg.Done()
              }()
         }
         wg.Wait()
    }

    这个代码用来对rpc的客户端Go函数进行压力测试。

    这里有几个地方值得揣摩下:

    1 如何测试客户端服务端?

    先使用startServer(这个函数里面具体是开启了一个routine)进行服务器服务。然后在每个测试用例中启动server,如果是benchTest的话记得这里的Timer要在启动服务器行为之后再开启。

    2 这里的wg变量有什么用?

    wg变量是sync.WaitGroup类型,Add增加计数,Done减少计数,Wait进行阻塞等待,等计数减为0的时候再停止阻塞。

    这里如果不使用WaitGroup进行wait阻塞的话,主routine会先于次routine先结束。会导致程序提早退出。

    因此这里也给出了一个测试用例中测试异步函数的方法。就是使用WaitGroup

    3 为什么要有gate这个channel buffer?

    看起来gate好像是没什么用啊,如果去掉gate呢?有可能会出现“rpc: discarding Call reply due to insufficient Done chan capacity”

    这个gate完全是因为client.Go这个函数,rpc包的client.Go是异步的调用,虽然是异步调用,这个异步调用的最后一个done参数是一个channel buffer。

    当client.Go进行完rpc调用后,将信号传入这个channel buffer。但是这个channel buffer却是不会阻塞的。

    具体看源码:

    Image(7)

    这里select加了个default分支,说明了done是非阻塞的。看注释,作者认为这个buffer的大小容量应该由调用者来保证。rpc包并不保证容量大小。

    在并发情况下,我们使用Client.Go的时候就要自己保证channel buffer大小。

    方法有个两个:

    1 使用一个同样大小的channel buffer来进行阻塞保证。

    这个方法就是gate的使用原因了。只有gate容量有剩余的时候才会容许调用client.Go

    2 调大channel buffer大小

    在这个例子中,bench的channel最大只会是b.N,所以,如果我们分配的res的channel buffer大小为b.N也能解决这个问题。

    这个方法导致的效果就是bench的时间变快了,但是mem分配增加了。

    4 这里的atomic什么作用?

    因为这里会有多个routine会对send和recive进行操作,这里就需要保证原子性。

    多个并发routine对一个共享变量进行操作有两种方法,channel和锁。

    这里当然使用channel也能起到原子操作的效果。sync包的atomic和sync的mutex都是锁的方式。

    所以说这里其实可以使用channel,mutex,atomic三种方法。

    5 procs的作用?

    bench test在运行前自身会调用runtime.GOMAXPROCS进行多核的设置,然后再每个处理器中并行运行测试。

    这里的runtime.GOMAXPROCS(-1)是获取你要跑的cpu核数,这个核数是根据bench test的 -test.cpu设置的。具体可以看下src/testing/testing.go parseCpuList。在没有设置过GOMAXPROCS和test.cpu的情况下,这里的runtime.GoMAXPROCS就默认是1。

    你可以使用-test.cpu 1,2,4来设置你的压力测试用例是有几个cpu,每个cpu是几核的。

    这里的procs设置为处理器核数的4倍就是为了测试routine能分配远大于核数的个数,这样每个核承担的goroutine能大于1。

    上面的for循环就是保证起的routine数是足够的。

    实时了解作者更多技术文章,技术心得,请关注微信公众号“轩脉刃的刀光剑影”

    本文基于署名-非商业性使用 3.0许可协议发布,欢迎转载,演绎,但是必须保留本文的署名叶剑峰(包含链接http://www.cnblogs.com/yjf512/),且不得用于商业目的。如您有任何疑问或者授权方面的协商,请与我联系

  • 相关阅读:
    <海量数据库解决方案>2011041201
    <海量数据库解决方案>2011040801
    <汇编语言(第2版)>2011041701
    makefile实践三为多目录源文件建立makefile
    <海量数据库解决方案>2011042501
    <海量数据库解决方案>2011041101
    <海量数据库解决方案>2011042901
    <海量数据库解决方案>2011042601
    <海量数据库解决方案>2011050301
    <iPhone开发秘籍>温度转换器实践
  • 原文地址:https://www.cnblogs.com/yjf512/p/2882570.html
Copyright © 2011-2022 走看看