zoukankan      html  css  js  c++  java
  • 由浅入深聊聊Golang的sync.Map

    前言

    今天在技术群中有小伙伴讨论并发安全的东西,其实之前就有写过map相关文章:由浅入深聊聊Golang的map。但是没有详细说明sync.Map是怎么一回事。 回想了一下,竟然脑中只剩下“两个map、一个只读一个读写,xxxxx”等,关键词。有印象能扯,但是有点乱,还是写一遍简单记录一下吧。

    1.为什么需要sync.Map? 2.sync.Map如何使用? 3.理一理sync.Map源码实现? 4.sync.Map的优缺点? 5.思维扩散?

    正文

    1.为什么需要sync.Map?

    关于map可以直接查看由浅入深聊聊Golang的map,不再赘述。

    为什么需要呢? 原因很简单,就是:map在并发情况虚啊,只读是线程安全的,同时写线程不安全,所以为了并发安全 & 高效,官方实现了一把。

    1.1 并发写map会有什么问题?

    来看看不使用sync.Map的map是如何实现并发安全的:

    func main() {
    	m := map[int]int {1:1}
    	go do(m)
    	go do(m)
    
    	time.Sleep(1*time.Second)
    	fmt.Println(m)
    }
    
    func do (m map[int]int) {
    	i := 0
    	for i < 10000 {
    		m[1]=1
    		i++
    	}
    }
    复制代码

    输出:

    fatal error: concurrent map writes
    复制代码

    oh,no。 报错说的很明显,这哥们不能同时写。

    1.2 低配版解决方案

    加一把大锁

    // 大家好,我是那把大锁
    var s sync.RWMutex
    func main() {
    	m := map[int]int {1:1}
    	go do(m)
    	go do(m)
    
    	time.Sleep(1*time.Second)
    	fmt.Println(m)
    }
    
    func do (m map[int]int) {
    	i := 0
    	for i < 10000 {
    	    // 加锁
    		s.Lock()
    		m[1]=1
    		// 解锁
    		s.Unlock()
    		i++
    	}
    }
    复制代码

    输出:

    map[1:1]
    复制代码

    这回终于正常了,但是会有什么问题呢? 加大锁大概率都不是最优解,一般都会有效率问题。 通俗说就是加大锁影响其他的元素操作了。

    解决思路:减少加锁时间。
    方法: 1.空间换时间。  2.降低影响范围。
    复制代码

    sync.Map就是用了以上的思路。继续往下看。

    在这里插入图片描述

    2.sync.Map如何使用?

    上代码:

    func main() {
        // 关键人物出场
    	m := sync.Map{}
    	m.Store(1,1)
    	go do(m)
    	go do(m)
    
    	time.Sleep(1*time.Second)
    	fmt.Println(m.Load(1))
    }
    
    func do (m sync.Map) {
    	i := 0
    	for i < 10000 {
    		m.Store(1,1)
    		i++
    	}
    }
    复制代码

    输出:

    1 true
    复制代码

    运行ok。这把秀了。

    3.理一理sync.Map源码实现?

    先白话文说下大概逻辑。让下文看的更快。(大概只有是这样流程就好) 写:直写。 读:先读read,没有再读dirty。

    在这里插入图片描述

    从“基础结构 + 增删改查”的思路来详细过一遍源码。

    3.1 基础结构

    sync.Map的核心数据结构:

    type Map struct {
    	mu Mutex
    	read atomic.Value // readOnly
    	dirty map[interface{}]*entry
    	misses int
    }
    复制代码
    说明类型作用
    mu Mutex 加锁作用。保护后文的dirty字段
    read atomic.Value 存读的数据。因为是atomic.Value类型,只读,所以并发是安全的。实际存的是readOnly的数据结构。
    misses int 计数作用。每次从read中读失败,则计数+1。
    dirty map[interface{}]*entry 包含最新写入的数据。当misses计数达到一定值,将其赋值给read。

    这里有必要简单描述一下,大概的逻辑,

    readOnly的数据结构:

    type readOnly struct {
        m  map[interface{}]*entry
        amended bool 
    }
    复制代码
    说明类型作用
    m map[interface{}]*entry 单纯的map结构
    amended bool Map.dirty的数据和这里的 m 中的数据不一样的时候,为true

    entry的数据结构:

    type entry struct {
        //可见value是个指针类型,虽然read和dirty存在冗余情况(amended=false),但是由于是指针类型,存储的空间应该不是问题
        p unsafe.Pointer // *interface{}
    }
    复制代码

    这个结构体主要是想说明。虽然前文read和dirty存在冗余的情况,但是由于value都是指针类型,其实存储的空间其实没增加多少。

    3.2 查询

    func (m *Map) Load(key interface{}) (value interface{}, ok bool) {
        // 因read只读,线程安全,优先读取
        read, _ := m.read.Load().(readOnly)
        e, ok := read.m[key]
        
        // 如果read没有,并且dirty有新数据,那么去dirty中查找
        if !ok && read.amended {
            m.mu.Lock()
            // 双重检查(原因是前文的if判断和加锁非原子的,害怕这中间发生故事)
            read, _ = m.read.Load().(readOnly)
            e, ok = read.m[key]
            
            // 如果read中还是不存在,并且dirty中有新数据
            if !ok && read.amended {
                e, ok = m.dirty[key]
                // m计数+1
                m.missLocked()
            }
            
            m.mu.Unlock()
        }
        
        if !ok {
            return nil, false
        }
        return e.load()
    }
    
    func (m *Map) missLocked() {
        m.misses++
        if m.misses < len(m.dirty) {
            return
        }
        
        // 将dirty置给read,因为穿透概率太大了(原子操作,耗时很小)
        m.read.Store(readOnly{m: m.dirty})
        m.dirty = nil
        m.misses = 0
    }
    复制代码

    流程图:

    在这里插入图片描述 这边有几个点需要强调一下:

    如何设置阀值?

    这里采用miss计数和dirty长度的比较,来进行阀值的设定。

    为什么dirty可以直接换到read?

    因为写操作只会操作dirty,所以保证了dirty是最新的,并且数据集是肯定包含read的。 (可能有同学疑问,dirty不是下一步就置为nil了,为何还包含?后文会有解释。)

    为什么dirty置为nil?

    我不确定这个原因。猜测:一方面是当read完全等于dirty的时候,读的话read没有就是没有了,即使穿透也是一样的结果,所以存的没啥用。另一方是当存的时候,如果元素比较多,影响插入效率。

    3.3 删

    func (m *Map) Delete(key interface{}) {
        // 读出read,断言为readOnly类型
        read, _ := m.read.Load().(readOnly)
        e, ok := read.m[key]
        // 如果read中没有,并且dirty中有新元素,那么就去dirty中去找。这里用到了amended,当read与dirty不同时为true,说明dirty中有read没有的数据。
        
        if !ok && read.amended {
            m.mu.Lock()
            // 再检查一次,因为前文的判断和锁不是原子操作,防止期间发生了变化。
            read, _ = m.read.Load().(readOnly)
            e, ok = read.m[key]
            
            if !ok && read.amended {
                // 直接删除
                delete(m.dirty, key)
            }
            m.mu.Unlock()
        }
        
        if ok {
        // 如果read中存在该key,则将该value 赋值nil(采用标记的方式删除!)
            e.delete()
        }
    }
    
    func (e *entry) delete() (hadValue bool) {
        for {
        	// 再次再一把数据的指针
            p := atomic.LoadPointer(&e.p)
            if p == nil || p == expunged {
                return false
            }
            
            // 原子操作
            if atomic.CompareAndSwapPointer(&e.p, p, nil) {
                return true
            }
        }
    }
    复制代码

    流程图:

    在这里插入图片描述

    这边有几个点需要强调一下:

    1.为什么dirty是直接删除,而read是标记删除?

    read的作用是在dirty前头优先度,遇到相同元素的时候为了不穿透到dirty,所以采用标记的方式。 同时正是因为这样的机制+amended的标记,可以保证read找不到&&amended=false的时候,dirty中肯定找不到

    2.为什么dirty是可以直接删除,而没有先进行读取存在后删除?

    删除成本低。读一次需要寻找,删除也需要寻找,无需重复操作。

    3.如何进行标记的?

    将值置为nil。(这个很关键)

    3.4 增(改)

    func (m *Map) Store(key, value interface{}) {
        // 如果m.read存在这个key,并且没有被标记删除,则尝试更新。
        read, _ := m.read.Load().(readOnly)
        if e, ok := read.m[key]; ok && e.tryStore(&value) {
            return
        }
        
        // 如果read不存在或者已经被标记删除
        m.mu.Lock()
        read, _ = m.read.Load().(readOnly)
       
        if e, ok := read.m[key]; ok { // read 存在该key
        // 如果entry被标记expunge,则表明dirty没有key,可添加入dirty,并更新entry。
            if e.unexpungeLocked() { 
                // 加入dirty中,这儿是指针
                m.dirty[key] = e
            }
            // 更新value值
            e.storeLocked(&value) 
            
        } else if e, ok := m.dirty[key]; ok { // dirty 存在该key,更新
            e.storeLocked(&value)
            
        } else { // read 和 dirty都没有
            // 如果read与dirty相同,则触发一次dirty刷新(因为当read重置的时候,dirty已置为nil了)
            if !read.amended { 
                // 将read中未删除的数据加入到dirty中
                m.dirtyLocked() 
                // amended标记为read与dirty不相同,因为后面即将加入新数据。
                m.read.Store(readOnly{m: read.m, amended: true})
            }
            m.dirty[key] = newEntry(value) 
        }
        m.mu.Unlock()
    }
    
    // 将read中未删除的数据加入到dirty中
    func (m *Map) dirtyLocked() {
        if m.dirty != nil {
            return
        }
        
        read, _ := m.read.Load().(readOnly)
        m.dirty = make(map[interface{}]*entry, len(read.m))
        
        // 遍历read。
        for k, e := range read.m {
            // 通过此次操作,dirty中的元素都是未被删除的,可见标记为expunged的元素不在dirty中!!!
            if !e.tryExpungeLocked() {
                m.dirty[k] = e
            }
        }
    }
    
    // 判断entry是否被标记删除,并且将标记为nil的entry更新标记为expunge
    func (e *entry) tryExpungeLocked() (isExpunged bool) {
        p := atomic.LoadPointer(&e.p)
        
        for p == nil {
            // 将已经删除标记为nil的数据标记为expunged
            if atomic.CompareAndSwapPointer(&e.p, nil, expunged) {
                return true
            }
            p = atomic.LoadPointer(&e.p)
        }
        return p == expunged
    }
    
    // 对entry尝试更新 (原子cas操作)
    func (e *entry) tryStore(i *interface{}) bool {
        p := atomic.LoadPointer(&e.p)
        if p == expunged {
            return false
        }
        for {
            if atomic.CompareAndSwapPointer(&e.p, p, unsafe.Pointer(i)) {
                return true
            }
            p = atomic.LoadPointer(&e.p)
            if p == expunged {
                return false
            }
        }
    }
    
    // read里 将标记为expunge的更新为nil
    func (e *entry) unexpungeLocked() (wasExpunged bool) {
        return atomic.CompareAndSwapPointer(&e.p, expunged, nil)
    }
    
    // 更新entry
    func (e *entry) storeLocked(i *interface{}) {
        atomic.StorePointer(&e.p, unsafe.Pointer(i))
    }
    复制代码

    流程图:

    在这里插入图片描述 这边有几个点需要强调一下:
    1. read中的标记为已删除的区别?

    标记为nil,说明是正常的delete操作,此时dirty中不一定存在 a. dirty赋值给read后,此时dirty不存在 b. dirty初始化后,肯定存在

    标记为expunged,说明是在dirty初始化的时候操作的,此时dirty中肯定不存在。

    1. 可能存在性能问题?

    初始化dirty的时候,虽然都是指针赋值,但read如果较大的话,可能会有些影响。

    4.sync.Map的优缺点?

    先说结论,后来证明。

    优点:是官方出的,是亲儿子;通过读写分离,降低锁时间来提高效率; 缺点:不适用于大量写的场景,这样会导致read map读不到数据而进一步加锁读取,同时dirty map也会一直晋升为read map,整体性能较差。 适用场景:大量读,少量写

    这里主要证明一下,为什么适合大量读,少量写。 代码的大概思路:通过比较单纯的map和sync.Map,在并发安全的情况下,只写和读写的效率

    var s sync.RWMutex
    var w sync.WaitGroup
    func main() {
    	mapTest()
    	syncMapTest()
    }
    func mapTest() {
    	m := map[int]int {1:1}
    	startTime := time.Now().Nanosecond()
    	w.Add(1)
    	go writeMap(m)
    	w.Add(1)
    	go writeMap(m)
    	//w.Add(1)
    	//go readMap(m)
    
    	w.Wait()
    	endTime := time.Now().Nanosecond()
    	timeDiff := endTime-startTime
    	fmt.Println("map:",timeDiff)
    }
    
    func writeMap (m map[int]int) {
    	defer w.Done()
    	i := 0
    	for i < 10000 {
    		// 加锁
    		s.Lock()
    		m[1]=1
    		// 解锁
    		s.Unlock()
    		i++
    	}
    }
    
    func readMap (m map[int]int) {
    	defer w.Done()
    	i := 0
    	for i < 10000 {
    		s.RLock()
    		_ = m[1]
    		s.RUnlock()
    		i++
    	}
    }
    
    func syncMapTest() {
    	m := sync.Map{}
    	m.Store(1,1)
    	startTime := time.Now().Nanosecond()
    	w.Add(1)
    	go writeSyncMap(m)
    	w.Add(1)
    	go writeSyncMap(m)
    	//w.Add(1)
    	//go readSyncMap(m)
    
    	w.Wait()
    	endTime := time.Now().Nanosecond()
    	timeDiff := endTime-startTime
    	fmt.Println("sync.Map:",timeDiff)
    }
    
    func writeSyncMap (m sync.Map) {
    	defer w.Done()
    	i := 0
    	for i < 10000 {
    		m.Store(1,1)
    		i++
    	}
    }
    
    func readSyncMap (m sync.Map) {
    	defer w.Done()
    	i := 0
    	for i < 10000 {
    		m.Load(1)
    		i++
    	}
    }
    复制代码
    情况结果
    只写 map: 1,022,000 sync.Map: 2,164,000
    读写 map: 8,696,000 sync.Map: 2,047,000

    会发现大量写的场景下,由于sync.Map里头操作更多其实,所以效率没有单纯的map+metux高。

    5.思维扩散?

    想一想,mysql加锁,是不是有表级锁、行级锁,前文的sync.RWMutex加锁方式相当于表级锁。

    而sync.Map其实也是相当于表级锁,只不过多读写分了两个map,本质还是一样的。 既然这样,那就自然知道优化方向了:就是把锁的粒度尽可能降低来提高运行速度。

    思路:对一个大map进行hash,其内部是n个小map,根据key来来hash确定在具体的那个小map中,这样加锁的粒度就变成1/n了。 网上找了下,真有大佬实现了:点这里


    作者:咖啡色的羊驼
    链接:https://juejin.im/post/5d36a7cbf265da1bb47da444
    来源:掘金
    著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。
  • 相关阅读:
    Web 设计与开发终极资源大全(下)
    任务失败,原因是未找到“LC.exe”,或未安装正确的 Microsoft Windows SDK
    NET开发常用DLL资源下载
    sqlserver2005 sqlserver2000连接字符串的区别(NET)
    创业大讲座观后感
    工欲善其事必先利其器搭建Android平台
    Java学习很好的笔记
    Java调用jama实现矩阵运算
    MySQL的安装及使用教程
    eval解析JSON中的注意点
  • 原文地址:https://www.cnblogs.com/ExMan/p/12421725.html
Copyright © 2011-2022 走看看