zoukankan      html  css  js  c++  java
  • Golang 之 sync.Map揭秘

    Question: 

    1. sync.Map实现原理及适用的场景?

    Preface:

    在Go 1.6之前, 内置的map类型是部分goroutine安全的,并发的读没有问题,并发的写可能有问题。自go 1.6之后, 并发地读写map会报错

    func main() {
        a := map[int]int{1: 1}
        go func() {
            for {
                a[1] = 2
            }
        }()
        go func() {
            // 不停地对map进行读取
            for {
                _ = a[1]
            }
        }()
        for {
        }
    }
    #fatal error: concurrent map read and map write

    go 1.9之前的解决方案是额外绑定一个锁,封装成一个新的struct或者单独使用锁都可以

    func main() {
        var counter = struct {
            sync.RWMutex
            m map[string]int
        }{m: map[string]int{"a": 1}}
        ch_write := make(chan string, 1)
        ch_read := make(chan string, 1)
        go func() {
            for i := 0; i < 3; i++ {
                counter.Lock()
                counter.m["a"]++
                fmt.Println("write:", counter.m["a"])
                counter.Unlock()
            }
            ch_write <- "write_finish"
        }()
        go func() {
            // 不停地对map进行读取
            for i := 0; i < 3; i++ {
                counter.RLock()
                fmt.Println("get:", counter.m["a"])
                counter.RUnlock()
            }
            ch_read <- "read_finish"
        }()
        fmt.Println(<-ch_write)
        fmt.Println(<-ch_read)
    }
    get: 1
    write: 2
    get: 2
    write: 3
    get: 3
    write: 4
    write_finish
    read_finish

     用syns.Map来同时读写:打印的顺序可能各种各样

    func main() {
        var counter sync.Map
        counter.Store("a", -1)
        ch_write := make(chan string, 1)
        ch_read := make(chan string, 1)
        go func() {
            for i := 0; i < 3; i++ {
                counter.Store("a", i)
                fmt.Println("write:", i)
            }
            ch_write <- "write_finish"
        }()
        go func() {
            // 不停地对map进行读取
            for i := 0; i < 3; i++ {
                if v, ok := counter.Load("a"); ok {
                    fmt.Println("read:", v)
                }
            }
            ch_read <- "read_finish"
        }()
        fmt.Println(<-ch_write)
        fmt.Println(<-ch_read)
    }
    read: -1
    read: -1
    read: -1
    write: 0
    write: 1
    write: 2
    write_finish
    read_finish

    针对sync.Map和1.9之前解决的方法,有同学做过性能比较,随着cpu核心数的增加、并发加剧,读写锁+map的方式性能在不停的衰减,并且在核数为4的时候出现了性能的拐点;而sync.Map虽然性能不是特别好,但是相对比较平稳,但sync.Map也相对使用起来更简单

     sync.Map 原理:

    空间换取时间,通过两个数据结构(read,dirty)实现加锁,实现对性能的提升,通常的读只会使用read结构

     sync.Map 底层结构:

    type Map struct {
        // 当涉及到dirty数据的操作的时候,需要使用这个锁
        mu Mutex
        // 底层也是一个map,和dirty一样,底层指向的是同一个数据,但增加了一个amended字段,表示和dirty数据不一致时为true
        read atomic.Value // readOnly
        // dirty数据包含当前的map包含的entries,它包含最新的entries(包括read中未删除的数据,虽有冗余,但是提升dirty字段为read的时候非常快,不用一个一个的复制,而是直接将这个数据结构作为read字段的一部分),有些数据还可能没有移动到read字段中。
        // 对于dirty的操作需要加锁,因为对它的操作可能会有读写竞争。
        // 当dirty为空的时候, 比如初始化或者刚提升完,下一次的写操作会复制read字段中未删除的数据到这个数据中。
        dirty map[interface{}]*entry
        // 当从Map中读取entry的时候,如果read中不包含这个entry,会尝试从dirty中读取,这个时候会将misses加一,
        // 当misses累积到 dirty的长度的时候, 就会将dirty提升为read,避免从dirty中miss太多次。因为操作dirty需要加锁。
        misses int
    }

    对sync.Map 的主要操作主要四个步骤,增 改(store),删(delete),查(load),store是最复杂的

    Load:通过key查找对应的值value,是否存在,通过ok反映

    func (m *Map) Load(key interface{}) (value interface{}, ok bool) {
        // 1.首先从m.read中得到只读readOnly,从它的map中查找,不需要加锁
        read, _ := m.read.Load().(readOnly)
        e, ok := read.m[key]
        // 2. 如果没找到,并且m.dirty中有新数据,需要从m.dirty查找,这个时候需要加锁
        if !ok && read.amended {
            m.mu.Lock()
            // 3. 双检查,避免加锁的时候m.dirty提升为m.read,dirty可能为nil
            read, _ = m.read.Load().(readOnly)
            e, ok = read.m[key]
            // 如果m.read中还是不存在,并且m.dirty中有新数据
            if !ok && read.amended {
                // 4. 从m.dirty查找
                e, ok = m.dirty[key]
                // 5. 不管m.dirty中存不存在,都将misses计数加一,并判断是否需要提升dirty
                m.missLocked()
            }
            m.mu.Unlock()
        }
        if !ok {
            return nil, false
        }
        return e.load()
    }
    // dirty的提升为read,amended默认为false
    func (m *Map) missLocked() {
        m.misses++
        if m.misses < len(m.dirty) {
            return
        }
        m.read.Store(readOnly{m: m.dirty})
        m.dirty = nil
        m.misses = 0
    }

    Delete

    func (m *Map) Delete(key interface{}) {
        read, _ := m.read.Load().(readOnly)
        e, ok := read.m[key]
        if !ok && read.amended {
            m.mu.Lock()
            // 2. double检查
            read, _ = m.read.Load().(readOnly)
            e, ok = read.m[key]
            if !ok && read.amended {
           //  3. 如果还是没有找到就去dirty中删除这个key,这里才是真正把key给删调,
                delete(m.dirty, key)
            }
            m.mu.Unlock()
        }
        // 1. 如果read中有数据直接删除,在read中置entry的p为nil,dirty指向的应该是同一个entry
        if ok {
            e.delete()
        }
    }
    func (e *entry) delete() (hadValue bool) {
        for {
            p := atomic.LoadPointer(&e.p)
            // 已标记为删除
            if p == nil || p == expunged {
                return false
            }
            // 原子操作,e.p标记为nil
            if atomic.CompareAndSwapPointer(&e.p, p, nil) {
                return true
            }
        }
    }

    Store:更新或者增加一个entry

    func (m *Map) Store(key, value interface{}) {
        // 1. 如果read存在该key 而且没有被标记为删除,可以直接存储
        read, _ := m.read.Load().(readOnly)
        if e, ok := read.m[key]; ok && e.tryStore(&value) {
            return
        }
        m.mu.Lock()
        read, _ = m.read.Load().(readOnly)
        if e, ok := read.m[key]; ok {
        // 2. dirty中可能不存在这个key
            if e.unexpungeLocked() {
                m.dirty[key] = e //m.dirty中不存在这个键,所以加入m.dirty
            }
            e.storeLocked(&value) //更新
        } else if e, ok := m.dirty[key]; ok {
        // 3. dirty中还存在这个key的话就直接更新
            e.storeLocked(&value)
        } else { 
        // 4. 新增一个entry
            if !read.amended { 
                m.dirtyLocked() //如果dirty为空,复制read中不为空的数据到dirty中
                m.read.Store(readOnly{m: read.m, amended: true})
            }
            m.dirty[key] = newEntry(value) //将这个entry加入到m.dirty中
        }
        m.mu.Unlock()
    }
    
    func (e *entry) tryStore(i *interface{}) bool {
        p := atomic.LoadPointer(&e.p)
        if p == expunged {
            return false
        }
        for {
            if atomic.CompareAndSwapPointer(&e.p, p, unsafe.Pointer(i)) {
                return true
            }
            p = atomic.LoadPointer(&e.p)
            if p == expunged {
                return false
            }
        }
    }
    
    func (e *entry) unexpungeLocked() (wasExpunged bool) {
        return atomic.CompareAndSwapPointer(&e.p, expunged, nil)
    }
    
    func (m *Map) dirtyLocked() {
        if m.dirty != nil {
            return
        }
        read, _ := m.read.Load().(readOnly)
        m.dirty = make(map[interface{}]*entry, len(read.m))
        for k, e := range read.m {
            if !e.tryExpungeLocked() {
                m.dirty[k] = e
            }
        }
    }
    func (e *entry) tryExpungeLocked() (isExpunged bool) {
        p := atomic.LoadPointer(&e.p)
        for p == nil {
            // 将已经删除标记为nil的数据标记为expunged
            if atomic.CompareAndSwapPointer(&e.p, nil, expunged) {
                return true
            }
            p = atomic.LoadPointer(&e.p)
        }
        return p == expunged
    }

    额外:

    在遍历sync.Map不能用常见的range方法,需要通过sync.map自己的方法,实际上在调用Range方法之前,会做一个dirty提升

    func := func(key, value interface{}) bool {
            fmt.Printf("Range: k, v = %v, %v
    ", key, value)
            return true
    }   
    map.Range(func)

    总结:

    观察源码,发现在改,删,查,都可以优先对read进行操作不用加锁,而在有冲突的情况下需要加锁操作dirty。sync.Map对特定场景做了性能优化,一种是读多写少的场景,另外一种多个goroutine读/写/修改的key集合没有交集。也就避免了读多写少情况下的加锁性能损耗。

    Tips:

    判断自己是否真正理解了,可以问自己entry一共有哪几个状态?

    1.正常情况下,entry的point指向具体的值

    2.当删除操作时,entry的point为nil

    3.当新插入一个值时,如果dirty为空(dirty刚刚提升后),会先把read中存在的值复制到dirty中(只会复制不为expunged和nil的entry),并把read中为entry的point为nil的改为expunged,代表entry于不存在dirty中。在查找的时候expunged是有作用的。

    参考:

    1.https://colobu.com/2017/07/11/dive-into-sync-Map/

  • 相关阅读:
    HDU 4435
    、输入某人出生日期(以字符串方式输入,如198741)使用DateTime和TimeSpan类,(1)计算其人的年龄;(2)计算从现在到其60周岁期间,总共多少天。
    NetBeans IDE 7.0 Release Candidate 2 Now Available
    NetBeans 时事通讯(刊号 # 142 Apr 14, 2011)
    B3log Solo & B3log Symphony/Rhythm
    B3log Solo & B3log Symphony/Rhythm
    GAE SDK 1.4.3 发布了!
    NetBeans IDE 7.0 Release Candidate 2 Now Available
    Guice 3.0 正式版发布
    NetBeans 时事通讯(刊号 # 141 Mar 30, 2011)
  • 原文地址:https://www.cnblogs.com/peterleee/p/14172132.html
Copyright © 2011-2022 走看看