zoukankan      html  css  js  c++  java
  • Golang 之 sync.Map揭秘

    Question: 

    1. sync.Map实现原理及适用的场景?

    Preface:

    在Go 1.6之前, 内置的map类型是部分goroutine安全的,并发的读没有问题,并发的写可能有问题。自go 1.6之后, 并发地读写map会报错

    func main() {
        a := map[int]int{1: 1}
        go func() {
            for {
                a[1] = 2
            }
        }()
        go func() {
            // 不停地对map进行读取
            for {
                _ = a[1]
            }
        }()
        for {
        }
    }
    #fatal error: concurrent map read and map write

    go 1.9之前的解决方案是额外绑定一个锁,封装成一个新的struct或者单独使用锁都可以

    func main() {
        var counter = struct {
            sync.RWMutex
            m map[string]int
        }{m: map[string]int{"a": 1}}
        ch_write := make(chan string, 1)
        ch_read := make(chan string, 1)
        go func() {
            for i := 0; i < 3; i++ {
                counter.Lock()
                counter.m["a"]++
                fmt.Println("write:", counter.m["a"])
                counter.Unlock()
            }
            ch_write <- "write_finish"
        }()
        go func() {
            // 不停地对map进行读取
            for i := 0; i < 3; i++ {
                counter.RLock()
                fmt.Println("get:", counter.m["a"])
                counter.RUnlock()
            }
            ch_read <- "read_finish"
        }()
        fmt.Println(<-ch_write)
        fmt.Println(<-ch_read)
    }
    get: 1
    write: 2
    get: 2
    write: 3
    get: 3
    write: 4
    write_finish
    read_finish

     用syns.Map来同时读写:打印的顺序可能各种各样

    func main() {
        var counter sync.Map
        counter.Store("a", -1)
        ch_write := make(chan string, 1)
        ch_read := make(chan string, 1)
        go func() {
            for i := 0; i < 3; i++ {
                counter.Store("a", i)
                fmt.Println("write:", i)
            }
            ch_write <- "write_finish"
        }()
        go func() {
            // 不停地对map进行读取
            for i := 0; i < 3; i++ {
                if v, ok := counter.Load("a"); ok {
                    fmt.Println("read:", v)
                }
            }
            ch_read <- "read_finish"
        }()
        fmt.Println(<-ch_write)
        fmt.Println(<-ch_read)
    }
    read: -1
    read: -1
    read: -1
    write: 0
    write: 1
    write: 2
    write_finish
    read_finish

    针对sync.Map和1.9之前解决的方法,有同学做过性能比较,随着cpu核心数的增加、并发加剧,读写锁+map的方式性能在不停的衰减,并且在核数为4的时候出现了性能的拐点;而sync.Map虽然性能不是特别好,但是相对比较平稳,但sync.Map也相对使用起来更简单

     sync.Map 原理:

    空间换取时间,通过两个数据结构(read,dirty)实现加锁,实现对性能的提升,通常的读只会使用read结构

     sync.Map 底层结构:

    type Map struct {
        // 当涉及到dirty数据的操作的时候,需要使用这个锁
        mu Mutex
        // 底层也是一个map,和dirty一样,底层指向的是同一个数据,但增加了一个amended字段,表示和dirty数据不一致时为true
        read atomic.Value // readOnly
        // dirty数据包含当前的map包含的entries,它包含最新的entries(包括read中未删除的数据,虽有冗余,但是提升dirty字段为read的时候非常快,不用一个一个的复制,而是直接将这个数据结构作为read字段的一部分),有些数据还可能没有移动到read字段中。
        // 对于dirty的操作需要加锁,因为对它的操作可能会有读写竞争。
        // 当dirty为空的时候, 比如初始化或者刚提升完,下一次的写操作会复制read字段中未删除的数据到这个数据中。
        dirty map[interface{}]*entry
        // 当从Map中读取entry的时候,如果read中不包含这个entry,会尝试从dirty中读取,这个时候会将misses加一,
        // 当misses累积到 dirty的长度的时候, 就会将dirty提升为read,避免从dirty中miss太多次。因为操作dirty需要加锁。
        misses int
    }

    对sync.Map 的主要操作主要四个步骤,增 改(store),删(delete),查(load),store是最复杂的

    Load:通过key查找对应的值value,是否存在,通过ok反映

    func (m *Map) Load(key interface{}) (value interface{}, ok bool) {
        // 1.首先从m.read中得到只读readOnly,从它的map中查找,不需要加锁
        read, _ := m.read.Load().(readOnly)
        e, ok := read.m[key]
        // 2. 如果没找到,并且m.dirty中有新数据,需要从m.dirty查找,这个时候需要加锁
        if !ok && read.amended {
            m.mu.Lock()
            // 3. 双检查,避免加锁的时候m.dirty提升为m.read,dirty可能为nil
            read, _ = m.read.Load().(readOnly)
            e, ok = read.m[key]
            // 如果m.read中还是不存在,并且m.dirty中有新数据
            if !ok && read.amended {
                // 4. 从m.dirty查找
                e, ok = m.dirty[key]
                // 5. 不管m.dirty中存不存在,都将misses计数加一,并判断是否需要提升dirty
                m.missLocked()
            }
            m.mu.Unlock()
        }
        if !ok {
            return nil, false
        }
        return e.load()
    }
    // dirty的提升为read,amended默认为false
    func (m *Map) missLocked() {
        m.misses++
        if m.misses < len(m.dirty) {
            return
        }
        m.read.Store(readOnly{m: m.dirty})
        m.dirty = nil
        m.misses = 0
    }

    Delete

    func (m *Map) Delete(key interface{}) {
        read, _ := m.read.Load().(readOnly)
        e, ok := read.m[key]
        if !ok && read.amended {
            m.mu.Lock()
            // 2. double检查
            read, _ = m.read.Load().(readOnly)
            e, ok = read.m[key]
            if !ok && read.amended {
           //  3. 如果还是没有找到就去dirty中删除这个key,这里才是真正把key给删调,
                delete(m.dirty, key)
            }
            m.mu.Unlock()
        }
        // 1. 如果read中有数据直接删除,在read中置entry的p为nil,dirty指向的应该是同一个entry
        if ok {
            e.delete()
        }
    }
    func (e *entry) delete() (hadValue bool) {
        for {
            p := atomic.LoadPointer(&e.p)
            // 已标记为删除
            if p == nil || p == expunged {
                return false
            }
            // 原子操作,e.p标记为nil
            if atomic.CompareAndSwapPointer(&e.p, p, nil) {
                return true
            }
        }
    }

    Store:更新或者增加一个entry

    func (m *Map) Store(key, value interface{}) {
        // 1. 如果read存在该key 而且没有被标记为删除,可以直接存储
        read, _ := m.read.Load().(readOnly)
        if e, ok := read.m[key]; ok && e.tryStore(&value) {
            return
        }
        m.mu.Lock()
        read, _ = m.read.Load().(readOnly)
        if e, ok := read.m[key]; ok {
        // 2. dirty中可能不存在这个key
            if e.unexpungeLocked() {
                m.dirty[key] = e //m.dirty中不存在这个键,所以加入m.dirty
            }
            e.storeLocked(&value) //更新
        } else if e, ok := m.dirty[key]; ok {
        // 3. dirty中还存在这个key的话就直接更新
            e.storeLocked(&value)
        } else { 
        // 4. 新增一个entry
            if !read.amended { 
                m.dirtyLocked() //如果dirty为空,复制read中不为空的数据到dirty中
                m.read.Store(readOnly{m: read.m, amended: true})
            }
            m.dirty[key] = newEntry(value) //将这个entry加入到m.dirty中
        }
        m.mu.Unlock()
    }
    
    func (e *entry) tryStore(i *interface{}) bool {
        p := atomic.LoadPointer(&e.p)
        if p == expunged {
            return false
        }
        for {
            if atomic.CompareAndSwapPointer(&e.p, p, unsafe.Pointer(i)) {
                return true
            }
            p = atomic.LoadPointer(&e.p)
            if p == expunged {
                return false
            }
        }
    }
    
    func (e *entry) unexpungeLocked() (wasExpunged bool) {
        return atomic.CompareAndSwapPointer(&e.p, expunged, nil)
    }
    
    func (m *Map) dirtyLocked() {
        if m.dirty != nil {
            return
        }
        read, _ := m.read.Load().(readOnly)
        m.dirty = make(map[interface{}]*entry, len(read.m))
        for k, e := range read.m {
            if !e.tryExpungeLocked() {
                m.dirty[k] = e
            }
        }
    }
    func (e *entry) tryExpungeLocked() (isExpunged bool) {
        p := atomic.LoadPointer(&e.p)
        for p == nil {
            // 将已经删除标记为nil的数据标记为expunged
            if atomic.CompareAndSwapPointer(&e.p, nil, expunged) {
                return true
            }
            p = atomic.LoadPointer(&e.p)
        }
        return p == expunged
    }

    额外:

    在遍历sync.Map不能用常见的range方法,需要通过sync.map自己的方法,实际上在调用Range方法之前,会做一个dirty提升

    func := func(key, value interface{}) bool {
            fmt.Printf("Range: k, v = %v, %v
    ", key, value)
            return true
    }   
    map.Range(func)

    总结:

    观察源码,发现在改,删,查,都可以优先对read进行操作不用加锁,而在有冲突的情况下需要加锁操作dirty。sync.Map对特定场景做了性能优化,一种是读多写少的场景,另外一种多个goroutine读/写/修改的key集合没有交集。也就避免了读多写少情况下的加锁性能损耗。

    Tips:

    判断自己是否真正理解了,可以问自己entry一共有哪几个状态?

    1.正常情况下,entry的point指向具体的值

    2.当删除操作时,entry的point为nil

    3.当新插入一个值时,如果dirty为空(dirty刚刚提升后),会先把read中存在的值复制到dirty中(只会复制不为expunged和nil的entry),并把read中为entry的point为nil的改为expunged,代表entry于不存在dirty中。在查找的时候expunged是有作用的。

    参考:

    1.https://colobu.com/2017/07/11/dive-into-sync-Map/

  • 相关阅读:
    python 基础2.5 循环中continue与breake用法
    python 基础 2.4 while 循环
    python 基础 2.3 for 循环
    python 基础 2.2 if流程控制(二)
    python 基础 2.1 if 流程控制(一)
    python 基础 1.6 python 帮助信息及数据类型间相互转换
    python 基础 1.5 python数据类型(四)--字典常用方法示例
    Tornado Web 框架
    LinkCode 第k个排列
    LeetCode 46. Permutations
  • 原文地址:https://www.cnblogs.com/peterleee/p/14172132.html
Copyright © 2011-2022 走看看