zoukankan      html  css  js  c++  java
  • go学习笔记 Go的sync.Pool源码

    Pool介绍#

    总所周知Go 是一个自动垃圾回收的编程语言,采用三色并发标记算法标记对象并回收。如果你想使用 Go 开发一个高性能的应用程序的话,就必须考虑垃圾回收给性能带来的影响。因为Go 在垃圾回收的时候会有一个STW(stop-the-world,程序暂停)的时间,并且如果对象太多,做标记也需要时间。所以如果采用对象池来创建对象,增加对象的重复利用率,使用的时候就不必在堆上重新创建对象可以节省开销。在Go中,golang提供了对象重用的机制,也就是sync.Pool对象池。 sync.Pool是可伸缩的,并发安全的。其大小仅受限于内存的大小,可以被看作是一个存放可重用对象的值的容器。 设计的目的是存放已经分配的但是暂时不用的对象,在需要用到的时候直接从pool中取。

    任何存放区其中的值可以在任何时候被删除而不通知,在高负载下可以动态的扩容,在不活跃时对象池会收缩。它对外提供了三个方法:New、Get 和 Put。下面用一个简短的例子来说明一下Pool使用:

    package main
     
    import (
        "fmt"
        "sync"
    )
     
    var pool *sync.Pool
     
    type Person struct {
        Name string
    }
     
    func init() {
        pool = &sync.Pool{
            New: func() interface{} {
                fmt.Println("creating a new person")
                return new(Person)
            },
        }
    }
     
    func main() {
     
        person := pool.Get().(*Person)
        fmt.Println("Get Pool Object1:", person)
     
        person.Name = "first"
        pool.Put(person)
     
        fmt.Println("Get Pool Object2:", pool.Get().(*Person))
        fmt.Println("Get Pool Object3:", pool.Get().(*Person))
     
    }

    结果:

    creating a new person
    Get Pool Object1: &{}
    Get Pool Object2: &{first}
    creating a new person
    Get Pool Object3: &{}

    这里我用了init方法初始化了一个pool,然后get了三次,put了一次到pool中,如果pool中没有对象,那么会调用New函数创建一个新的对象,否则会从put进去的对象中获取。

    存储在池中的任何项目都可以随时自动删除,并且不会被通知。Pool可以安全地同时使用多个goroutine。池的目的是缓存已分配但未使用的对象以供以后重用,从而减轻对gc的压力。也就是说,它可以轻松构建高效,线程安全的free列表。但是,它不适用于所有free列表。池的适当使用是管理一组默认共享的临时项,并且可能由包的并发独立客户端重用。池提供了一种在许多客户端上分摊分配开销的方法。很好地使用池的一个例子是fmt包,它维护一个动态大小的临时输出缓冲区存储。底层存储队列在负载下(当许多goroutine正在积极打印时)进行缩放,并在静止时收缩。另一方面,作为短期对象的一部分维护的空闲列表不适合用于池, 因为在该场景中开销不能很好地摊销。 使这些对象实现自己的空闲列表更有效。首次使用后不得复制池。

    pool 的两个特点
    1、在本地私有池和本地共享池均获取 obj 失败时,则会从其他p偷一个 obj 返回给调用方。
    2、obj在池中的生命周期取决于垃圾回收任务的下一次执行时间,并且从池中获取到的值可能是 put 进去的其中一个值,也可能是 newfun处 新生成的一个值,在应用时很容易入坑。

    在多个goroutine之间使用同一个pool做到高效,是因为sync.pool为每个P都分配了一个子池,
    当执行一个pool的get或者put操作的时候都会先把当前的goroutine固定到某个P的子池上面,
    然后再对该子池进行操作。每个子池里面有一个私有对象和共享列表对象,
    私有对象是只有对应的P能够访问,因为一个P同一时间只能执行一个goroutine,
    【因此对私有对象存取操作是不需要加锁的】。

    源码分析

    type Pool struct {
        // 不允许复制,一个结构体,有一个Lock()方法,嵌入别的结构体中,表示不允许复制
        // noCopy对象,拥有一个Lock方法,使得Cond对象在进行go vet扫描的时候,能够被检测到是否被复制
        noCopy noCopy
     
        //local 和 localSize 维护一个动态 poolLocal 数组
        // 每个固定大小的池, 真实类型是 [P]poolLocal
        // 其实就是一个[P]poolLocal 的指针地址
        local     unsafe.Pointer // local fixed-size per-P pool, actual type is [P]poolLocal
        localSize uintptr        // size of the local array
     
        victim     unsafe.Pointer // local from previous cycle
        victimSize uintptr        // size of victims array
     
        // New optionally specifies a function to generate
        // a value when Get would otherwise return nil.
        // It may not be changed concurrently with calls to Get.
        // New 是一个回调函数指针,当Get 获取到目标对象为 nil 时,需要调用此处的回调函数用于生成 新的对象
        New func() interface{}
    }
    1. Pool结构体里面noCopy代表这个结构体是禁止拷贝的,它可以在我们使用 go vet 工具的时候生效;

    local是一个poolLocal数组的指针,localSize代表这个数组的大小;同样victim也是一个poolLocal数组的指针,每次垃圾回收的时候,Pool 会把 victim 中的对象移除,然后把 local 的数据给 victim;local和victim的逻辑我们下面会详细介绍到。

    New函数是在创建pool的时候设置的,当pool没有缓存对象的时候,会调用New方法生成一个新的对象。

    下面我们对照着pool的结构图往下讲,避免找不到北:

    // Local per-P Pool appendix.
    /*
    因为poolLocal中的对象可能会被其他P偷走,
    private域保证这个P不会被偷光,至少保留一个对象供自己用。
    否则,如果这个P只剩一个对象,被偷走了,
    那么当它本身需要对象时又要从别的P偷回来,造成了不必要的开销
    */
    type poolLocalInternal struct {
        private interface{} // Can be used only by the respective P.
        shared  poolChain   // Local P can pushHead/popHead; any P can popTail.
    }
     
    type poolLocal struct {
        poolLocalInternal
     
        // Prevents false sharing on widespread platforms with
        // 128 mod (cache line size) = 0 .
        /**
        cache使用中常见的一个问题是false sharing。
        当不同的线程同时读写同一cache line上不同数据时就可能发生false sharing。
        false sharing会导致多核处理器上严重的系统性能下降。
        字节对齐,避免 false sharing (伪共享)
        */
        pad [128 - unsafe.Sizeof(poolLocalInternal{})%128]byte
    }

    local字段存储的是一个poolLocal数组的指针,poolLocal数组大小是goroutine中P的数量,访问时,P的id对应poolLocal数组下标索引,所以Pool的最大个数runtime.GOMAXPROCS(0)。

    通过这样的设计,每个P都有了自己的本地空间,多个 goroutine 使用同一个 Pool 时,减少了竞争,提升了性能。如果对goroutine的P、G、M有疑惑的同学不妨看看这篇文章:The Go scheduler

    poolLocal里面有一个pad数组用来占位用,防止在 cache line 上分配多个 poolLocalInternal从而造成false sharing,cache使用中常见的一个问题是false sharing。当不同的线程同时读写同一cache line上不同数据时就可能发生false sharing。false sharing会导致多核处理器上严重的系统性能下降。具体的可以参考伪共享(False Sharing)

    poolLocalInternal包含两个字段private和shared。

    private代表缓存的一个元素,只能由相应的一个 P 存取。因为一个 P 同时只能执行一个 goroutine,所以不会有并发的问题;所以无需加锁

    shared则可以由任意的 P 访问,但是只有本地的 P 才能 pushHead/popHead,其它 P 可以 popTail。因为可能有多个goroutine同时操作,所以需要加锁。

    type poolChain struct {
        // head is the poolDequeue to push to. This is only accessed
        // by the producer, so doesn't need to be synchronized.
        head *poolChainElt
     
        // tail is the poolDequeue to popTail from. This is accessed
        // by consumers, so reads and writes must be atomic.
        tail *poolChainElt
    }
     
    type poolChainElt struct {
        poolDequeue
     
        // next and prev link to the adjacent poolChainElts in this
        // poolChain.
        //
        // next is written atomically by the producer and read
        // atomically by the consumer. It only transitions from nil to
        // non-nil.
        //
        // prev is written atomically by the consumer and read
        // atomically by the producer. It only transitions from
        // non-nil to nil.
        next, prev *poolChainElt
    }
    type poolDequeue struct {
        // headTail packs together a 32-bit head index and a 32-bit
        // tail index. Both are indexes into vals modulo len(vals)-1.
        //
        // tail = index of oldest data in queue
        // head = index of next slot to fill
        //
        // Slots in the range [tail, head) are owned by consumers.
        // A consumer continues to own a slot outside this range until
        // it nils the slot, at which point ownership passes to the
        // producer.
        //
        // The head index is stored in the most-significant bits so
        // that we can atomically add to it and the overflow is
        // harmless.
        headTail uint64
     
        // vals is a ring buffer of interface{} values stored in this
        // dequeue. The size of this must be a power of 2.
        //
        // vals[i].typ is nil if the slot is empty and non-nil
        // otherwise. A slot is still in use until *both* the tail
        // index has moved beyond it and typ has been set to nil. This
        // is set to nil atomically by the consumer and read
        // atomically by the producer.
        vals []eface
    }
     
    type eface struct {
        typ, val unsafe.Pointer
    }

    poolChain是一个双端队列,里面的head和tail分别指向队列头尾;poolDequeue里面存放真正的数据,是一个单生产者、多消费者的固定大小的无锁的环状队列,headTail是环状队列的首位位置的指针,可以通过位运算解析出首尾的位置,生产者可以从 head 插入、head 删除,而消费者仅可从 tail 删除。

    这个双端队列的模型大概是这个样子:

    poolDequeue里面的环状队列大小是固定的,后面分析源码我们会看到,当环状队列满了的时候会创建一个size是原来两倍大小的环状队列。大家这张图好好体会一下,会反复用到。

    Get方法#

    // Get selects an arbitrary item from the Pool, removes it from the
    // Pool, and returns it to the caller.
    // Get may choose to ignore the pool and treat it as empty.
    // Callers should not assume any relation between values passed to Put and
    // the values returned by Get.
    //
    // If Get would otherwise return nil and p.New is non-nil, Get returns
    // the result of calling p.New.
    func (p *Pool) Get() interface{} {
        if race.Enabled {
            race.Disable()
        }
        l, pid := p.pin() //1.把当前goroutine绑定在当前的P上
        x := l.private    //2.优先从local的private中获取
        l.private = nil
        if x == nil {
            // Try to pop the head of the local shard. We prefer
            // the head over the tail for temporal locality of
            // reuse.
            x, _ = l.shared.popHead()  //3,private没有,那么从shared的头部获取
            if x == nil {
                x = p.getSlow(pid)  //4. 如果都没有,那么去别的local上去偷一个
            }
        }
        runtime_procUnpin()  //解除抢占
        if race.Enabled {
            race.Enable()
            if x != nil {
                race.Acquire(poolRaceAddr(x))
            }
        }
        //5. 如果没有获取到,尝试使用New函数生成一个新的
        if x == nil && p.New != nil {
            x = p.New()
        }
        return x
    }
    • 这一段代码首先会将当前goroutine绑定在当前的P上返回对应的local,然后尝试从local的private中获取,然后需要把private字段置空,因为已经拿到了想要的对象;

    • private中获取不到,那么就去shared的头部获取;

    • shared也没有,那么尝试遍历所有的 local,尝试从它们的 shared 弹出一个元素;

    • 最后如果还是没有,那么就直接调用预先设置好的 New 函数,创建一个出来。

    pin#

    // pin 会将当前 goroutine 订到 P 上, 禁止抢占(preemption) 并从 poolLocal 池中返回 P 对应的 poolLocal
    // 调用方必须在完成取值后调用 runtime_procUnpin() 来取消禁止抢占。
    // pin pins the current goroutine to P, disables preemption and
    // returns poolLocal pool for the P and the P's id.
    // Caller must call runtime_procUnpin() when done with the pool.
    func (p *Pool) pin() (*poolLocal, int) {
        pid := runtime_procPin()
        // In pinSlow we store to local and then to localSize, here we load in opposite order.
        // Since we've disabled preemption, GC cannot happen in between.
        // Thus here we must observe local at least as large localSize.
        // We can observe a newer/larger local, it is fine (we must observe its zero-initialized-ness).
        // 因为可能存在动态的 P(运行时调整 P 的个数)procresize/GOMAXPROCS
        // 如果 P.id 没有越界,则直接返回   PID
        /**
        具体的逻辑就是首先拿到当前的pid,
        然后以pid作为index找到local中的poolLocal,
        但是如果pid大于了localsize,
        说明当前线程的poollocal不存在,就会新创建一个poolLocal
        */
        s := atomic.LoadUintptr(&p.localSize) // load-acquire
        l := p.local                          // load-consume
        if uintptr(pid) < s {
            return indexLocal(l, pid), pid
        }
        // 没有结果时,涉及全局加锁
        // 例如重新分配数组内存,添加到全局列表
        return p.pinSlow()
    }
     
    func indexLocal(l unsafe.Pointer, i int) *poolLocal {
        lp := unsafe.Pointer(uintptr(l) + uintptr(i)*unsafe.Sizeof(poolLocal{}))
        return (*poolLocal)(lp)
    }

    pin方法里面首先会调用runtime_procPin方法会先获取当前goroutine,然后绑定到对应的M上,然后返回M目前绑定的P的id,因为这个pid后面会用到,防止在使用途中P被抢占,具体的细节可以看这篇:https://zhuanlan.zhihu.com/p/99710992。接下来会使用原子操作取出localSize,如果当前pid大于localSize,那么就表示Pool还没创建对应的poolLocal,那么调用pinSlow进行创建工作,否则调用indexLocal取出pid对应的poolLocal返回。

    indexLocal里面是使用了地址操作,传入的i是数组的index值,所以需要获取poolLocal{}的size做一下地址的位移操作,然后再转成转成poolLocal地址返回。

    pinSlow#

    func (p *Pool) pinSlow() (*poolLocal, int) {
        // Retry under the mutex.
        // Can not lock the mutex while pinned.
        //因为需要对全局进行加锁,pinSlow() 会首先取消 P 的不可抢占,然后使用 allPoolsMu 进行加锁
        runtime_procUnpin() // 解除pin
        allPoolsMu.Lock() // 加上全局锁
        defer allPoolsMu.Unlock()
        pid := runtime_procPin() // pin住
        // poolCleanup won't be called while we are pinned.
        s := p.localSize
        l := p.local
        // 重新对pid进行检查 再次检查是否符合条件,因为可能中途已被其他线程调用
        // 当再次固定 P 时 poolCleanup 不会被调用
     
        if uintptr(pid) < s {
            return indexLocal(l, pid), pid
        }
        // 初始化local前会将pool放入到allPools数组中
        if p.local == nil {
            allPools = append(allPools, p)
        }
        // If GOMAXPROCS changes between GCs, we re-allocate the array and lose the old one.
        size := runtime.GOMAXPROCS(0) // 当前P的数量
        local := make([]poolLocal, size)
        atomic.StorePointer(&p.local, unsafe.Pointer(&local[0])) // store-release
        atomic.StoreUintptr(&p.localSize, uintptr(size))         // store-release
        return &local[pid], pid
    }

    因为allPoolsMu是一个全局Mutex锁,因此上锁会比较慢可能被阻塞,所以上锁前调用runtime_procUnpin方法解除pin的操作;

    在解除绑定后,pinSlow 可能被其他的线程调用过了,p.local 可能会发生变化。因此这时候需要再次对 pid 进行检查。

    最后初始化local,并使用原子操作对local和localSize设值,返回当前P对应的local。

    到这里pin方法终于讲完了。画一个简单的图描述一下这整个流程:

    下面我们再回到Get方法中,如果private中没有值,那么会调用shared的popHead方法获取值。

    popHead#

    func (c *poolChain) popHead() (interface{}, bool) {
        d := c.head // 这里头部是一个poolChainElt
        // 遍历poolChain链表
        for d != nil {  
            // 从poolChainElt的环状列表中获取值
            if val, ok := d.popHead(); ok {
                return val, ok
            }
            // There may still be unconsumed elements in the
            // previous dequeue, so try backing up.
            // load poolChain下一个对象
            d = loadPoolChainElt(&d.prev)
        }
        return nil, false
    }
     
    // popHead removes and returns the element at the head of the queue.
    // It returns false if the queue is empty. It must only be called by a
    // single producer.
    func (d *poolDequeue) popHead() (interface{}, bool) {
        var slot *eface
        for {
            ptrs := atomic.LoadUint64(&d.headTail)
            head, tail := d.unpack(ptrs) // headTail的高32位为head,低32位为tail
            if tail == head {
                // Queue is empty. // 首尾相等,那么这个队列就是空的
                return nil, false
            }
     
            // Confirm tail and decrement head. We do this before
            // reading the value to take back ownership of this
            // slot.
            head--   // 这里需要head--之后再获取slot
            ptrs2 := d.pack(head, tail)
            if atomic.CompareAndSwapUint64(&d.headTail, ptrs, ptrs2) {
                // We successfully took back slot.
                slot = &d.vals[head&uint32(len(d.vals)-1)]
                break
            }
        }
     
        val := *(*interface{})(unsafe.Pointer(slot))
        if val == dequeueNil(nil) {   // 说明没取到缓存的对象,返回 nil
            val = nil
        }
        // Zero the slot. Unlike popTail, this isn't racing with
        // pushHead, so we don't need to be careful here.
        *slot = eface{}  // 重置slot 
        return val, true
    }

    poolChain的popHead方法里面会获取到poolChain的头结点,不记得poolChain数据结构的同学建议往上面翻一下再回来。接着有个for循环会挨个从poolChain的头结点往下遍历,直到获取对象返回。

    • poolDequeue的popHead方法首先会获取到headTail的值,然后调用unpack解包,headTail是一个64位的值,高32位表示head,低32位表示tail。

    • 判断head和tail是否相等,相等那么这个队列就是空的;

    • 如果队列不是空的,那么将head减一之后再使用,因为head当前指的位置是空值,表示下一个新对象存放的位置;

    • CAS重新设值新的headTail,成功之后获取slot,这里因为vals大小是2的n 次幂,因此len(d.vals)-1)之后低n位全是1,和head取与之后可以获取到head的低n位的值;

    • 如果slot所对应的对象是dequeueNil,那么表示是空值,直接返回,否则将slot指针对应位置的值置空,返回val。

    如果shared的popHead方法也没获取到值,那么就需要调用getSlow方法获取了。

    getSlow#

     // 从其他P的共享缓冲区偷取 obj
    func (p *Pool) getSlow(pid int) interface{} {
        // See the comment in pin regarding ordering of the loads.
        size := atomic.LoadUintptr(&p.localSize) // load-acquire 获取当前 poolLocal 的大小
        locals := p.local                        // load-consume 获取当前 poolLocal
        // Try to steal one element from other procs.
        // 遍历locals列表,从其他的local的shared列表尾部获取对象
        for i := 0; i < int(size); i++ {
            l := indexLocal(locals, (pid+i+1)%int(size))
            if x, _ := l.shared.popTail(); x != nil {
                return x
            }
        }
     
        // Try the victim cache. We do this after attempting to steal
        // from all primary caches because we want objects in the
        // victim cache to age out if at all possible.
        size = atomic.LoadUintptr(&p.victimSize)
        if uintptr(pid) >= size {
            return nil
        }
        locals = p.victim
        l := indexLocal(locals, pid)
        // victim的private不为空则返回
        if x := l.private; x != nil {
            l.private = nil
            return x
        }
        //  遍历victim对应的locals列表,从其他的local的shared列表尾部获取对象
        for i := 0; i < int(size); i++ {
            l := indexLocal(locals, (pid+i)%int(size))
            if x, _ := l.shared.popTail(); x != nil {
                return x
            }
        }
     
        // Mark the victim cache as empty for future gets don't bother
        // with it.
        // 获取不到,将victimSize置为0
        atomic.StoreUintptr(&p.victimSize, 0)
     
        return nil
    }

    getSlow方法会遍历locals列表,这里需要注意的是,遍历是从索引为 pid+1 的 poolLocal 处开始,尝试调用shared的popTail方法获取对象;如果没有拿到,则从 victim 里找。如果都没找到,那么就将victimSize置为0,下次就不找victim了。

    poolChain&popTail#

    func (c *poolChain) popTail() (interface{}, bool) {
        d := loadPoolChainElt(&c.tail)
        if d == nil {
            return nil, false  // 如果最后一个节点是空的,那么直接返回
        }
     
        for {
            // It's important that we load the next pointer
            // *before* popping the tail. In general, d may be
            // transiently empty, but if next is non-nil before
            // the pop and the pop fails, then d is permanently
            // empty, which is the only condition under which it's
            // safe to drop d from the chain.
            // 这里获取的是next节点,与一般的双向链表是相反的
            d2 := loadPoolChainElt(&d.next)
     
            if val, ok := d.popTail(); ok {
                return val, ok
            }
     
            if d2 == nil {
                // This is the only dequeue. It's empty right
                // now, but could be pushed to in the future.
                return nil, false
            }
     
            // The tail of the chain has been drained, so move on
            // to the next dequeue. Try to drop it from the chain
            // so the next pop doesn't have to look at the empty
            // dequeue again.
            // 因为d已经没有数据了,所以重置tail为d2,并删除d2的上一个节点
            if atomic.CompareAndSwapPointer((*unsafe.Pointer)(unsafe.Pointer(&c.tail)), unsafe.Pointer(d), unsafe.Pointer(d2)) {
                // We won the race. Clear the prev pointer so
                // the garbage collector can collect the empty
                // dequeue and so popHead doesn't back up
                // further than necessary.
                storePoolChainElt(&d2.prev, nil)
            }
            d = d2
        }
    }
    • 判断poolChain,如果最后一个节点是空的,那么直接返回;
    • 进入for循环,获取tail的next节点,这里需要注意的是这个双向链表与一般的链表是反向的,不清楚的可以再去看看第一张图;
    • 调用popTail获取poolDequeue列表的对象,有对象直接返回;
    • d2为空则表示已经遍历完整个poolChain双向列表了,都为空,那么直接返回;
    • 通过CAS将tail重置为d2,因为d已经没有数据了,并将d2的prev节点置为nil,然后将d置为d2,进入下一个循环;

    poolDequeue&popTail#

    // popTail removes and returns the element at the tail of the queue.
    // It returns false if the queue is empty. It may be called by any
    // number of consumers.
    func (d *poolDequeue) popTail() (interface{}, bool) {
        var slot *eface
        for {
            ptrs := atomic.LoadUint64(&d.headTail)
            head, tail := d.unpack(ptrs) // 和pophead一样,将headTail解包
            if tail == head {
                // Queue is empty. // 首位相等,表示列表中没有数据,返回
                return nil, false
            }
     
            // Confirm head and tail (for our speculative check
            // above) and increment tail. If this succeeds, then
            // we own the slot at tail.
            ptrs2 := d.pack(head, tail+1)
            // CAS重置tail位置
            if atomic.CompareAndSwapUint64(&d.headTail, ptrs, ptrs2) {
                // Success.
                // 获取tail位置对象
                slot = &d.vals[tail&uint32(len(d.vals)-1)]
                break
            }
        }
     
        // We now own slot.
        val := *(*interface{})(unsafe.Pointer(slot))
        if val == dequeueNil(nil) {
            val = nil 
        }
     
        // Tell pushHead that we're done with this slot. Zeroing the
        // slot is also important so we don't leave behind references
        // that could keep this object live longer than necessary.
        //
        // We write to val first and then publish that we're done with
        // this slot by atomically writing to typ.
        slot.val = nil
        atomic.StorePointer(&slot.typ, nil)
        // At this point pushHead owns the slot.
     
        return val, true
    }

     如果看懂了popHead,那么这个popTail方法是和它非常的相近的。

    popTail简单来说也是从队列尾部移除一个元素,如果队列为空,返回 false。但是需要注意的是,这个popTail可能会被多个消费者调用,所以需要循环CAS获取对象;在poolDequeue环状列表中tail是有数据的,不必像popHead中head--

    最后,需要将slot置空。

    大家可以再对照一下图回顾一下代码:

    Put方法#

    // Put adds x to the pool.
    func (p *Pool) Put(x interface{}) {
        if x == nil {
            return
        }
        if race.Enabled {
            if fastrand()%4 == 0 {
                // Randomly drop x on floor.
                return
            }
            race.ReleaseMerge(poolRaceAddr(x))
            race.Disable()
        }
        l, _ := p.pin() // 先获得当前P绑定的 localPool
        if l.private == nil {
            l.private = x
            x = nil
        }
        if x != nil {
            l.shared.pushHead(x)
        }
        // 调用方必须在完成取值后调用 runtime_procUnpin() 来取消禁用抢占
        runtime_procUnpin() 
        if race.Enabled {
            race.Enable()
        }
    }

    看完了Get方法,看Put方法就容易多了。同样Put方法首先会去Pin住当前goroutine和P,然后尝试将 x 赋值给 private 字段。如果private不为空,那么就调用pushHead将其放入到shared队列中。

    poolChain&pushHead#

    func (c *poolChain) pushHead(val interface{}) {
        d := c.head
       // 头节点没有初始化,那么设值一下
        if d == nil {
            // Initialize the chain.
            const initSize = 8 // Must be a power of 2
            d = new(poolChainElt)
            d.vals = make([]eface, initSize)
            c.head = d
            storePoolChainElt(&c.tail, d)
        }
        // 将对象加入到环状队列中
        if d.pushHead(val) {
            return
        }
     
        // The current dequeue is full. Allocate a new one of twice
        // the size.
        newSize := len(d.vals) * 2
        // 这里做了限制,单个环状队列不能超过2的30次方大小
        if newSize >= dequeueLimit {
            // Can't make it any bigger.
            newSize = dequeueLimit
        }
        // 初始化新的环状列表,大小是d的两倍
        d2 := &poolChainElt{prev: d}
        d2.vals = make([]eface, newSize)
        c.head = d2
        storePoolChainElt(&d.next, d2)
        // push到新的队列中
        d2.pushHead(val)
    }

    如果头节点为空,那么需要创建一个新的poolChainElt对象作为头节点,大小为8;然后调用pushHead放入到环状队列中;

    如果放置失败,那么创建一个 poolChainElt 节点,并且双端队列的长度翻倍,当然长度也不能超过dequeueLimit,即2的30次方;

    然后将新的节点d2和d互相绑定一下,并将d2设值为头节点,将传入的对象push到d2中;

    poolDequeue&pushHead#

    // pushHead adds val at the head of the queue. It returns false if the
    // queue is full. It must only be called by a single producer.
    func (d *poolDequeue) pushHead(val interface{}) bool {
        ptrs := atomic.LoadUint64(&d.headTail)
        head, tail := d.unpack(ptrs) // 解包headTail
        // 判断队列是否已满
        if (tail+uint32(len(d.vals)))&(1<<dequeueBits-1) == head {
            // Queue is full.
            return false
        }
        slot := &d.vals[head&uint32(len(d.vals)-1)] // 找到head的槽位
     
        // Check if the head slot has been released by popTail.
        typ := atomic.LoadPointer(&slot.typ) // 检查slot是否和popTail有冲突
        if typ != nil {
            // Another goroutine is still cleaning up the tail, so
            // the queue is actually still full.
            return false
        }
     
        // The head slot is free, so we own it.
        if val == nil {
            val = dequeueNil(nil)
        }
       // 将 val 赋值到 slot,并将 head 指针值加 1
        *(*interface{})(unsafe.Pointer(slot)) = val
     
        // Increment head. This passes ownership of slot to popTail
        // and acts as a store barrier for writing the slot.
        atomic.AddUint64(&d.headTail, 1<<dequeueBits)
        return true
    }

    首先通过位运算判断队列是否已满,也就是将尾部指针加上 len(d.vals) ,因为head指向的是将要被填充的位置,所以head和tail位置是相隔len(d.vals),然后再取低 31 位,看它是否和 head 相等。如果队列满了,直接返回 false;

    然后找到找到head的槽位slot,并判断typ是否为空,因为popTail 是先设置 val,再将 typ 设置为 nil,所以如果有冲突,那么直接返回;

    最后设值slot,并将head加1返回;

    GC#

    在pool.go文件的 init 函数里,注册了 GC 发生时,如何清理 Pool 的函数:

     
    func init() {
        runtime_registerPoolCleanup(poolCleanup)
    }
    // 当 stop the world  (STW) 来临,在 GC 之前会调用该函数
    func poolCleanup() {
        // This function is called with the world stopped, at the beginning of a garbage collection.
        // It must not allocate and probably should not call any runtime functions.
     
        // Because the world is stopped, no pool user can be in a
        // pinned section (in effect, this has all Ps pinned).
     
        // Drop victim caches from all pools.
        for _, p := range oldPools {
            p.victim = nil
            p.victimSize = 0
        }
     
        // Move primary cache to victim cache.
        for _, p := range allPools {
            p.victim = p.local
            p.victimSize = p.localSize
            p.local = nil
            p.localSize = 0
        }
     
        // The pools with non-empty primary caches now have non-empty
        // victim caches and no pools have primary caches.
        oldPools, allPools = allPools, nil
    }

     poolCleanup 会在 STW 阶段被调用。主要是将 local 和 victim 作交换,那么不至于GC 把所有的 Pool 都清空了,而是需要两个 GC 周期才会被释放。如果 sync.Pool 的获取、释放速度稳定,那么就不会有新的池对象进行分配。存在Pool中的对象会在没有任何通知的情况下被自动移除掉。实际上,这个清理过程是在每次垃圾回收之前做的。垃圾回收是固定两分钟触发一次。而且每次清理会将Pool中的所有对象都清理掉!

    总结#

    整个设计充分利用了go.runtime的调度器优势:一个P下goroutine竞争的无锁化;

    一个goroutine固定在一个局部调度器P上,从当前 P 对应的 poolLocal 取值, 若取不到,则从对应的 shared 数组上取,若还是取不到;则尝试从其他 P 的 shared 中偷。 若偷不到,则调用 New 创建一个新的对象。池中所有临时对象在一次 GC 后会被全部清空。

    通过以上的解读,我们可以看到,Get方法并不会对获取到的对象值做任何的保证,因为放入本地池中的值有可能会在任何时候被删除,但是不通知调用者。放入共享池中的值有可能被其他的goroutine偷走。 所以对象池比较适合用来存储一些临时且状态无关的数据,但是不适合用来存储数据库连接的实例,因为存入对象池重的值有可能会在垃圾回收时被删除掉,这违反了数据库连接池建立的初衷。

    根据上面的说法,Golang的对象池严格意义上来说是一个临时的对象池,适用于储存一些会在goroutine间分享的临时对象。主要作用是减少GC,提高性能。在Golang中最常见的使用场景是fmt包中的输出缓冲区。

    在Golang中如果要实现连接池的效果,可以用container/list来实现,开源界也有一些现成的实现,比如go-commons-pool,具体的读者可以去自行了解。

    Go语言的goroutine虽然可以创建很多,但是真正能物理上并发运行的goroutine数量是有限的,是由runtime.GOMAXPROCS(0)设置的。所以这个Pool高效的设计的地方就在于将数据分散在了各个真正并发的线程中,每个线程优先从自己的poolLocal中获取数据,很大程度上降低了锁竞争。 

    Reference#

    https://www.cnblogs.com/luozhiyun/p/14194872.html

    https://blog.csdn.net/u010853261/article/details/90647884

    https://www.cnblogs.com/wsw-seu/p/12402267.html

    https://blog.csdn.net/qq_25870633/article/details/83448234

  • 相关阅读:
    【转】PowerDesigner数据库视图同时显示Code和Name
    [转]BT原理分析
    异常机制及throw与throws的区别(转)
    BS与CS的联系与区别。
    ASP.NET和C#的区别/
    上百例Silverlight网站及演示汇总,供友参考
    Bing Maps进阶系列九:使用MapCruncher进行地图切片并集成进Bing Maps
    【Silverlight】Bing Maps学习系列(八):使用Bing Maps Silverlight Control加载自己部署的Google Maps
    学习MAP 地图好地址
    Bing必应地图中国API
  • 原文地址:https://www.cnblogs.com/majiang/p/14200165.html
Copyright © 2011-2022 走看看