zoukankan      html  css  js  c++  java
  • 5. Go 并发编程--sync/atomic


    1. 前言

    在学习 mutex后。 在读源码的时候发现里面使用了很多atomaic 包的方法来保证原子。在并发编程中,常常提到的并发安全,具体数据就是 对数据的修改读取是否是原子操作 所以也常说,并发是否能保证了原子操作。在Golang语言中,实现原子操作是在标准库实现的,即 sync/atomic


    2. 简单的使用

    2.1 mutex示例

    在Mutex示例中,使用读写锁和互斥锁实现代码如下

    package syncDemo
    
    import "sync"
    
    // RWMutexConfig 读写锁实现
    type RWMutexConfig struct {
        rw   sync.RWMutex
        data []int
    }
    
    // Get get config data
    func (c *RWMutexConfig) Get() []int {
        c.rw.RLock()
        defer c.rw.RUnlock()
        return c.data
    }
    
    // Set set config data
    func (c *RWMutexConfig) Set(n []int) {
        c.rw.Lock()
        defer c.rw.Unlock()
        c.data = n
    }
    
    // MutexConfig 互斥锁实现
    type MutexConfig struct {
        data []int
        mu   sync.Mutex
    }
    
    // Get get config data
    func (c *MutexConfig) Get() []int {
        c.mu.Lock()
        defer c.mu.Unlock()
        return c.data
    }
    
    // Set set config data
    func (c *MutexConfig) Set(n []int) {
        c.mu.Lock()
        defer c.mu.Unlock()
        c.data = n
    }
    
    

    并发基准测试代码如下

    package syncDemo
    
    import "testing"
    
    type iConfig interface {
        Get() []int
        Set([]int)
    }
    
    func bench(b *testing.B, c iConfig) {
    
        b.RunParallel(func(p *testing.PB) {
            for p.Next() {
                c.Set([]int{100})
                c.Get()
                c.Get()
                c.Get()
                c.Set([]int{100})
                c.Get()
                c.Get()
            }
        })
    }
    
    func BenchmarkMutexConfig(b *testing.B) {
        conf := &MutexConfig{data: []int{1, 2, 3}}
        bench(b, conf)
    }
    
    func BenchmarkRWMutexConfig(b *testing.B) {
        conf := &RWMutexConfig{data: []int{1, 2, 3}}
        bench(b, conf)
    }
    
    

    运行 基准测试 结果如下

    D:goProjectsrcdaily测试syncDemomutex>go test -race -bench=.
    goos: windows
    goarch: amd64
    pkg: syncDemo
    cpu: Intel(R) Core(TM) i7-10510U CPU @ 1.80GHz
    BenchmarkMutexConfig-8             85347             15611 ns/op
    BenchmarkRWMutexConfig-8           94676             11367 ns/op  # 读写锁实现性能高于互锁
    PASS
    ok      syncDemo        4.140s
    

    2.2 sync/atomic实现方式如下

    // Config atomic 实现
    type AtomaticConfig struct {
        v atomic.Value // 假设 data 就是整个 config 了
    }
    
    // Get get config data
    func (c *AtomaticConfig) Get() []int {
        // 类型断言
        return (*c.v.Load().(*[]int))
    }
    
    // Set set config data
    func (c *AtomaticConfig) Set(n []int) {
        c.v.Store(&n)
    }
    

    基准测试代码如下

    package syncDemo
    
    import "testing"
    
    type iConfig interface {
        Get() []int
        Set([]int)
    }
    
    func bench(b *testing.B, c iConfig) {
    
        b.RunParallel(func(p *testing.PB) {
            for p.Next() {
                c.Set([]int{100})
                c.Get()
                c.Get()
                c.Get()
                c.Set([]int{100})
                c.Get()
                c.Get()
            }
        })
    }
    
    func BenchmarkAtomaticConfig(b *testing.B) {
        conf := &AtomaticConfig{}
        bench(b, conf)
    }
    
    

    读写锁互斥锁atomic 基准测试放一起测试,测试性能

    package syncDemo
    
    import "testing"
    
    type iConfig interface {
        Get() []int
        Set([]int)
    }
    
    func bench(b *testing.B, c iConfig) {
    
        b.RunParallel(func(p *testing.PB) {
            for p.Next() {
                c.Set([]int{100})
                c.Get()
                c.Get()
                c.Get()
                c.Set([]int{100})
                c.Get()
                c.Get()
            }
        })
    }
    
    // 互斥锁
    func BenchmarkMutexConfig(b *testing.B) {
        conf := &MutexConfig{data: []int{1, 2, 3}}
        bench(b, conf)
    }
    
    // 读写锁
    func BenchmarkRWMutexConfig(b *testing.B) {
        conf := &RWMutexConfig{data: []int{1, 2, 3}}
        bench(b, conf)
    }
    
    // atomic 
    func BenchmarkAtomicConfig(b *testing.B) {
        conf := &AtomicConfig{}
        bench(b, conf)
    }
    

    运行基准测试结果如下,可以发现 atomic 的性能又好上了许多

    D:goProjectsrcdaily测试syncDemomutex>go test -race -bench=.
    goos: windows
    goarch: amd64
    pkg: syncDemo
    cpu: Intel(R) Core(TM) i7-10510U CPU @ 1.80GHz
    BenchmarkMutexConfig-8             75313             16914 ns/op
    BenchmarkRWMutexConfig-8           88446             11719 ns/op
    BenchmarkAtomicConfig-8           260695              4679 ns/op
    PASS
    ok      syncDemo        5.362s
    

    atomic.Value 这种适合配置文件这种读特别多,写特别少的场景,因为他是 COW(Copy On Write)写时复制的一种思想,COW 就是指我需要写入的时候我先把老的数据复制一份到一个新的对象,然后再写入新的值。


    维基百科对COW(Copy On Write)的描述如下

    写入时复制(英语:Copy-on-write,简称 COW)是一种计算机程序设计领域的优化策略。其核心思想是,如果有多个调用者(callers)同时请求相同资源(如内存或磁盘上的数据存储),他们会共同获取相同的指针指向相同的资源,直到某个调用者试图修改资源的内容时,系统才会真正复制一份专用副本(private copy)给该调用者,而其他调用者所见到的最初的资源仍然保持不变。这过程对其他的调用者都是透明的。此作法主要的优点是如果调用者没有修改该资源,就不会有副本(private copy)被创建,因此多个调用者只是读取操作时可以共享同一份资源。

    这种思路会有一个问题,就是可能有部分 goroutine 在使用老的对象,所以老的对象不会立即被回收,如果存在大量写入的话,会导致产生大量的副本,性能反而不一定好 。
    这种方式的好处就是不用加锁,所以也不会有 goroutine 的上下文切换,并且在读取的时候大家都读取的相同的副本所以性能上回好一些。

    COW 策略在 linux, redis 当中都用的很多


    3. 源码分析

    3.1 方法总结

    atomic 的函数签名有很多,但是大部分都是重复的为了不同的数据类型创建了不同的签名,这就是没有泛型的坏处了,基础库会比较麻烦

    1. 第一类 AddXXX 当需要添加的值为负数的时候,做减法,正数做加法
      // 第一类,AddXXX
      func AddInt32(addr *int32, delta int32) (new int32)
      func AddInt64(addr *int64, delta int64) (new int64)
      func AddUint32(addr *uint32, delta uint32) (new uint32)
      func AddUint64(addr *uint64, delta uint64) (new uint64)
      func AddUintptr(addr *uintptr, delta uintptr) (new uintptr)
      
    2. 第二类 CompareAndSwapXXX CAS 操作, 会先比较传入的地址的值是否是 old,如果是的话就尝试赋新值,如果不是的话就直接返回 false,返回 true 时表示赋值成功。
      func CompareAndSwapInt32(addr *int32, old, new int32) (swapped bool)
      func CompareAndSwapInt64(addr *int64, old, new int64) (swapped bool)
      func CompareAndSwapPointer(addr *unsafe.Pointer, old, new unsafe.Pointer) (swapped bool)
      func CompareAndSwapUint32(addr *uint32, old, new uint32) (swapped bool)
      func CompareAndSwapUint64(addr *uint64, old, new uint64) (swapped bool)
      func CompareAndSwapUintptr(addr *uintptr, old, new uintptr) (swapped bool)
      
    3. 第三类 LoadXXX,从某个地址中取值
      func LoadInt32(addr *int32) (val int32)
      func LoadInt64(addr *int64) (val int64)
      func LoadPointer(addr *unsafe.Pointer) (val unsafe.Pointer)
      func LoadUint32(addr *uint32) (val uint32)
      func LoadUint64(addr *uint64) (val uint64)
      func LoadUintptr(addr *uintptr) (val uintptr)
      
    4. 第四类 StoreXXX ,给某个地址赋值
      func StoreInt32(addr *int32, val int32)
      func StoreInt64(addr *int64, val int64)
      func StorePointer(addr *unsafe.Pointer, val unsafe.Pointer)
      func StoreUint32(addr *uint32, val uint32)
      func StoreUint64(addr *uint64, val uint64)
      func StoreUintptr(addr *uintptr, val uintptr)
      
    5. 第五类 SwapXXX ,交换两个值,并且返回老的值
      func SwapInt32(addr *int32, new int32) (old int32)
      func SwapInt64(addr *int64, new int64) (old int64)
      func SwapPointer(addr *unsafe.Pointer, new unsafe.Pointer) (old unsafe.Pointer)
      func SwapUint32(addr *uint32, new uint32) (old uint32)
      func SwapUint64(addr *uint64, new uint64) (old uint64)
      func SwapUintptr(addr *uintptr, new uintptr) (old uintptr)
      
    6. 最后一类Value 用于任意类型的值的 Store、Load,我们开始的案例就用到了这个,这是 1.4 版本之后引入的,签名的方法都只能作用于特定的类型,引入这个方法之后就可以用于任意类型了。
      type Value
      func (v *Value) Load() (x interface{})
      func (v *Value) Store(x interface{})
      

    3.2 CAS

    CAS(Compare-and-Swap),即比较并替换,是一种实现并发算法时常用到的技术,Java并发包中的很多类都使用了CAS技术。

    CAS需要有3个操作数:内存地址V,旧的预期值A,即将要更新的目标值B。

    CAS指令执行时,当且仅当内存地址V的值与预期值A相等时,将内存地址V的值修改为B,否则就什么都不做。整个比较并替换的操作是一个原子操作。


    3.2.1 延申阅读: CAS的缺点

    CAS虽然很高效的解决了原子操作问题,但是CAS仍然存在三大问题。

    1. 循环时间长开销很大。

      循环时间长开销很大:我们可以看到getAndAddInt方法执行时,如果CAS失败,会一直进行尝试。如果CAS长时间一直不成功,可能会给CPU带来很大的开销。

    2. 只能保证一个共享变量的原子操作。

      只能保证一个共享变量的原子操作:当对一个共享变量执行操作时,我们可以使用循环CAS的方式来保证原子操作,但是对多个共享变量操作时,循环CAS就无法保证操作的原子性,这个时候就可以用锁来保证原子性。

    3. ABA问题。

    什么是ABA问题?ABA问题怎么解决?
    CAS 的使用流程通常如下:
    1)首先从地址 V 读取值 A;
    2)根据 A 计算目标值 B;
    3)通过 CAS 以原子的方式将地址 V 中的值从 A 修改为 B。

    但是在第1步中读取的值是A,并且在第3步修改成功了,我们就能说它的值在第1步和第3步之间没有被其他线程改变过了吗?

    如果在这段期间它的值曾经被改成了B,后来又被改回为A,那CAS操作就会误认为它从来没有被改变过。这个漏洞称为CAS操作的“ABA”问题。Java并发包为了解决这个问题,提供了一个带有标记的原子引用类“AtomicStampedReference”,它可以通过控制变量值的版本来保证CAS的正确性。因此,在使用CAS前要考虑清楚“ABA”问题是否会影响程序并发的正确性,如果需要解决ABA问题,改用传统的互斥同步可能会比原子类更高效。

    sync/atomic 包中的源码除了 Value 之外,其他的函数都是没有直接的源码的, 需要去 tuntime/internal/atomic 中寻找, CAS函数为例, 其他都是大同小异


    3.2.2 VALUE

    Value 是个 结构体,结构体中只有唯一的成员是个 interface类型,也就意味着value可以是个任意类型的值

    type VALUE struct {
      v interface{}
    }
    

    v 用来保存 传入的值


    store

    store方法是将值存储为x, 需要注意的是,每次传入的x 不能为nil, 并且他们的类型必须是相同的, 不让会导致 panic异常

    源码如下:

    func (v Value) Store(x interface{}){
      if x == nil{
        panic("sync/atomic: store of nil value into Value")
      }
      // ifaceWords 其实就是定义了一下 interface 的结构,包含 data 和 type 两部分
      // 这里 vp 是原有值,xp 是传入的值
    	vp := (*ifaceWords)(unsafe.Pointer(v))
    	xp := (*ifaceWords)(unsafe.Pointer(&x))
        // for 循环不断尝试
    	for {
            // 这里先用原子方法取一下老的类型值
    		typ := LoadPointer(&vp.typ)
    		if typ == nil {
          // 等于 nil 就说明原始值没有,需要存储新的值。
          // 调用 runtime 的方法禁止抢占,避免操作完成一半就被抢占了
          // 同时可以避免 GC 的时候看到 unsafe.Pointer(^uintptr(0)) 这个中间状态的值
    			runtime_procPin()
    			if !CompareAndSwapPointer(&vp.typ, nil, unsafe.Pointer(^uintptr(0))) {
    				runtime_procUnpin()
    				continue
    			}
    
    			// 分别把值和类型保存下来
    			StorePointer(&vp.data, xp.data)
    			StorePointer(&vp.typ, xp.typ)
    			runtime_procUnpin()
    			return
    		}
    
    		if uintptr(typ) == ^uintptr(0) {
          // 如果判断发现这个类型是这个固定值,说明当前第一次赋值还没有完成,所以进入自旋等待
    			continue
    		}
    		// 第一次赋值已经完成,判断新的赋值的类型和之前是否一致,如果不一致就直接 panic
    		if typ != xp.typ {
    			panic("sync/atomic: store of inconsistently typed value into Value")
    		}
        // 保存值
    		StorePointer(&vp.data, xp.data)
    		return
    	}
    }
    }
    

    复杂逻辑在第一次写入,因为第一次写入的时候有两次原子写操作,所以这个时候用 typ 值作为一个判断,通过不同值判断当前所处的状态,这个在我们业务代码中其实也经常用到。然后因为引入了这个中间状态,所以又使用了 runtime_procPin 方法避免抢占

    func sync_runtime_procPin() int {
    	return procPin()
    }
    
    func procPin() int {
        // 获取到当前 goroutine 的 m
    	_g_ := getg()
    	mp := _g_.m
    
        // unpin 的时候就是 locks--
    	mp.locks++
    	return int(mp.p.ptr().id)
    }
    

    Load

    该方法使用来 加载数据,也就是读取数据, 源码如下

    func (v *Value) Load() (x interface{}) {
      // ifaceWords 是定义了一个 interface 的结构,包含 data 和 type 两部分
    	vp := (*ifaceWords)(unsafe.Pointer(v))
      
      //从结构体中获取类型
    	typ := LoadPointer(&vp.typ)
      // 这个说明还没有第一次 store 或者是第一次 store 还没有完成
    	if typ == nil || uintptr(typ) == ^uintptr(0) {
    		// First store not yet completed.
    		return nil
    	}
      // 获取值
    	data := LoadPointer(&vp.data)
      // 构造 x 类型
    	xp := (*ifaceWords)(unsafe.Pointer(&x))
    	xp.typ = typ
    	xp.data = data
    	return
    }
    

    3. 实战:实现一个“无锁”的栈

    3.1 无锁栈

    对多线程场景下的无锁操作的研究一直是个热点,理想中的无锁操作,它应能天然地避开有锁操作的一些缺陷,比如:

    1. 减少线程切换,能够相对快速高效地读写(不使用 mutex, semaphore)
    2. 避免死锁的可能,任何操作都应能在有限的等待时间内完成,

    go实现无锁栈代码如下

    package main
    
    import (
    	"sync/atomic"
    	"unsafe"
    )
    
    // LFStack 无锁栈
    // 使用链表实现
    type LFStack struct {
    	head unsafe.Pointer // 栈顶
    }
    
    // Node 节点
    type Node struct {
    	val  int32
    	next unsafe.Pointer
    }
    
    // NewLFStack NewLFStack
    func NewLFStack() *LFStack {
    	n := unsafe.Pointer(&Node{})
    	return &LFStack{head: n}
    }
    
    // Push 入栈
    func (s *LFStack) Push(v int32) {
    	n := &Node{val: v}
    
    	for {
    		// 先取出栈顶
    		old := atomic.LoadPointer(&s.head)
    		n.next = old
    		// CAS操作
    		if atomic.CompareAndSwapPointer(&s.head, old, unsafe.Pointer(n)) {
    			return
    		}
    	}
    }
    
    // Pop 出栈,没有数据时返回 nil
    func (s *LFStack) Pop() int32 {
    	for {
    		// 先取出栈顶
    		old := atomic.LoadPointer(&s.head)
    		if old == nil {
    			return 0
    		}
    
    		oldNode := (*Node)(old)
    		// 取出下一个节点
    		next := atomic.LoadPointer(&oldNode.next)
    		// 重置栈顶
    		if atomic.CompareAndSwapPointer(&s.head, old, next) {
    			return oldNode.val
    		}
    	}
    }
    

    3.2 扩展阅读

    栈(stack)又名堆栈,它是一种运算受限的线性表。限定仅在表尾进行插入和删除操作的线性表。这一端被称为栈顶,相对地,把另一端称为栈底。向一个栈插入新元素又称作进栈入栈压栈,它是把新元素放到栈顶元素的上面,使之成为新的栈顶元素;从一个栈删除元素又称作出栈或退栈,它是把栈顶元素删除掉,使其相邻的元素成为新的栈顶元素。




    4. 参考

    1. CAS概述
    2. 百度百科栈
    3. https://lailin.xyz/post/go-training-week3-atomic.html
    ♥永远年轻,永远热泪盈眶♥
  • 相关阅读:
    一个简单的PHP登录演示(SESSION版 与 COOKIE版)
    web系统之session劫持解决
    CKFinder 1.4.3 任意文件上传漏洞
    linux服务器磁盘扩容的方法
    Linux下lvm在线扩容步骤
    Centos7使用LVM扩容磁盘(测试成功)
    CentOS7下利用init.d启动脚本实现tomcat开机自启动
    Linux tomcat安装详解(未完)
    linux下 目录(扩容)挂载磁盘
    Linux下环境变量设置
  • 原文地址:https://www.cnblogs.com/failymao/p/15492216.html
Copyright © 2011-2022 走看看