zoukankan      html  css  js  c++  java
  • Golang并发编程原理解析

    go并发编程

    一.背景知识介绍

    1.进程和线程

    • 进程是程序在操作系统中一次执行的过程,系统进行资源分配和调度的基本单位
    • 线程是进程的一个执行实体,是CPU调度和分派的基本单位,它是比进程更小的能独立运行的基本单位
    • 一个进程可以创建和撤销多个线程,同一个进程中的多个线程之间可以并发执行

    2.并发和并行

    • 多线程程序在单核CPU上运行,就是并发
    • 多线程程序在多核CPU上运行,就是并行
    • 并发不是并行,并发主要是由切换时间片来实现同时运行,并行则是直接利用多核实现多线程的运行,go可以设置使用的核数,以发挥多核计算机的能力

    3.协程和线程

    • 协程:独立的栈空间,共享堆空间,调度由用户自己控制,本质上有点类似于用户级线程,这些用户级线程的调度也是自己实现的
    • 线程,一个线程上可以跑多个协程,协程是轻量级的线程

    4.goroutine

    • 每个goroutine实例只有4-5KB的内存占用(可伸缩),和由于实现机制而大幅度减少的创建和销毁开销是go高并发的根本原因
    • goroutine 奉行通过通信来共享内存,而不是通过共享内存来通信

    二.Channel

    • Go语言的并发模型是CSP,提倡通过通信来进行内存共享,而不是通过共享内存来实现通信
    • goroutine是Go程序并发的实体,channel就是他们之间的连接,channel是可以让一个goroutine发送特定值到另一个goroutine的通信机制
    • Go语言中的channel是一种特殊的类型,channel像是一个传送带或者队列,总是遵循FIFO先入先出的原则,保证收发数据的顺序
    • 每一个Channel都有其具体的传输数据类型,也就是说声明的需要为其指定元素类型
    • 声明的Channel需要make初始化之后才能使用
    • 声明并初始化channel
      ch := make(chan int)
      

    1.channle的操作

    channel有三种操作,接收,发送,关闭。发送和接收都使用 <- 符号

    • 接收
    x := <- ch // 从ch中接收值并赋值给变量x
    
    • 发送
    ch <- 10 // 把10发送到ch中
    
    • 关闭
    close(ch)
    

    注意事项:

    • 关于关闭通道,只有在需要通知接收方的goroutine,所有的数据都已经发送完毕的时候,才需要关闭通道,也就是说,关闭通道,可以告诉接收方的goroutine,所有的数据都已经发送完毕了
    • 通道是可以被GC回收的,关闭通道不是必须的,不像文件那样,在结束操作后必须关闭文件
    • 对一个关闭的通道,再发送数据,就会导致panic
    • 对一个关闭的通道进行接收会一直获取值,直到通道为空为止
    • 对一个关闭的,并且没有值的通道,进行接收操作,会得到对应类型的零值
    • 关闭一个已经关闭的通道会导致panic

    2.无缓冲的Channel

    • 无缓冲通道,又叫做阻塞通道

    • 无缓冲通道只有在有人接收值的时候才能发送值,否则就会一直阻塞住,同样的,只有在有人发送值的时候,才能接收值
    func main() {
        ch := make(chan int)
        ch <- 10
        fmt.Println("发送成功")
        
        // fatal error: all goroutines are asleep - deadlock!
    }
    
    func recv(c chan int) {
        ret := <-c
        fmt.Println("接收成功", ret)
    }
    func main() {
        ch := make(chan int)
        go recv(ch) // 启用goroutine从通道接收值
        ch <- 10
        fmt.Println("发送成功")
    }
    
    • 使用无缓冲通道进行通信将导致发送和接收的goroutine同步化,因此,无缓冲通道也被称为同步通道

    3.有缓冲的Channel

    • 在通道使用make函数初始化的时候可以指定缓冲容量
    ch := make(chan int, 10) // 创建一个容量为10的有缓冲区通道
    

    • 只要通道的容量大于0,那么该通道就是有缓冲通道
    • 通道的容量表示通道中能最多存放元素的数量
    • 通过len函数获取当前通道内元素数量,通过cap函数获取通道的容量

    4.单向channel

    • 限制通道在函数中只能发送或者接收
    • chan <- int是一个只能发送的通道,可以发送但是不能接收;
    • <-chan int是一个只能接收的通道,可以接收但是不能发送。
    • 在函数传参以及任何赋值操作中,将双向通道转为单向通道是可以的,但反过来不行
    func counter(out chan<- int) {
        for i := 0; i < 100; i++ {
            out <- i
        }
        close(out)
    }
    
    func squarer(out chan<- int, in <-chan int) {
        for i := range in {
            out <- i * i
        }
        close(out)
    }
    func printer(in <-chan int) {
        for i := range in {
            fmt.Println(i)
        }
    }
    
    func main() {
        ch1 := make(chan int)
        ch2 := make(chan int)
        go counter(ch1)
        go squarer(ch2, ch1)
        printer(ch2)
    }
    

    5.优雅的从通道循环取值

    • 通过通道发送有限的数据时,可以通过close函数关闭通道来告知接收该通道值的goroutine停止等待
    • 当通道关闭时,再往该通道发送值会导致panic,并且从该通道接收到值,全部接收完了之后再接收的话,会接收到类型零值
    • 那我们应该如何判断通道被关闭了呢?有下面两种方法可供参考
    func main() {
        ch1 := make(chan int)
        ch2 := make(chan int)
        // 开启goroutine将0~100的数发送到ch1中
        go func() {
            for i := 0; i < 100; i++ {
                ch1 <- i
            }
            close(ch1)
        }()
        // 开启goroutine从ch1中接收值,并将该值的平方发送到ch2中
        go func() {
            for {
                i, ok := <-ch1 // 通道关闭后再取值ok=false
                if !ok {
                    break
                }
                ch2 <- i * i
            }
            close(ch2)
        }()
        // 在主goroutine中从ch2中接收值打印
        for i := range ch2 { // 通道关闭后会退出for range循环
            fmt.Println(i)
        }
    }
    

    6.channel异常情况总结

    三.select多路复用

    • 在某些场景下,我们需要从多个通道接收数据,通道在接收数据时,如果没有数据可以接收得阻塞住,等待数据过来,为了应对这种场景,GO内置了select关键字,可以同时响应多个通道的操作
    • go select的思想来源于网络IO模型中的select,本质上也是IO多路复用,只不过这里的IO是基于channel而不是基于网络
    • select的使用类似switch,有一系列case和default,每个case对应一个通道的接收或发送过程
        select {
        case <-chan1:
           // 如果chan1成功读到数据,则进行该case处理语句
        case chan2 <- 1:
           // 如果成功向chan2写入数据,则进行该case处理语句
        default:
           // 如果上面都没有成功,则进入default处理流程
        }
    
    • select会一直等待,直到某个case的通信操作完成时,就会执行case分支对应的语句
    • select可以同时监听多个channel,直到其中一个channel 就绪,如果多个channel同时就绪,则随机选择一个执行
    • select可以用来判断管道是否已满,比如仅有一个case和default,该case是往管道写,当管道满了之后,该case会失败,因仅有一个case,所以会走default,那就可以在default中通知该通道已满
    package main
    
    import (
    	"fmt"
    	"time"
    )
    
    // 判断管道有没有存满
    func main() {
    	// 创建管道
    	output1 := make(chan string, 10)
    	// 子协程写数据
    	go write(output1)
    	// 取数据
    	for s := range output1 {
    		fmt.Println("res:", s)
    		time.Sleep(time.Second)
    	}
    }
    
    func write(ch chan string) {
    	for {
    		select {
    		// 写数据
    		case ch <- "hello":
    			fmt.Println("write hello")
    		default:  // 因为通道满了,往执行不了写数据了,就会走default
    			fmt.Println("channel full")
    		}
    		time.Sleep(time.Millisecond * 500)
    	}
    }
    

    四.goroutine池

    • 本质是生产者消费者模型
    • 可以有效的控制goroutine数量,防止暴涨
    • 需求:
      • 计算一个数字的各个数位之和,例如数字123,结果是1+2+3=6
      • 随机生成数字进行计算
    package main
    
    import (
        "fmt"
        "math/rand"
    )
    
    type Job struct {
        // id
        Id int
        // 需要计算的随机数
        RandNum int
    }
    
    type Result struct {
        // 这里必须传对象实例
        job *Job
        // 求和
        sum int
    }
    
    func main() {
        // 需要2个管道
        // 1.job管道
        jobChan := make(chan *Job, 128)
        // 2.结果管道
        resultChan := make(chan *Result, 128)
        // 3.创建工作池
        createPool(64, jobChan, resultChan)
        // 4.开个打印的协程
        go func(resultChan chan *Result) {
            // 遍历结果管道打印
            for result := range resultChan {
                fmt.Printf("job id:%v randnum:%v result:%d\n", result.job.Id,
                    result.job.RandNum, result.sum)
            }
        }(resultChan)
        var id int
        // 循环创建job,输入到管道
        for {
            id++
            // 生成随机数
            r_num := rand.Int()
            job := &Job{
                Id:      id,
                RandNum: r_num,
            }
            jobChan <- job
        }
    }
    
    // 创建工作池
    // 参数1:开几个协程
    func createPool(num int, jobChan chan *Job, resultChan chan *Result) {
        // 根据开协程个数,去跑运行
        for i := 0; i < num; i++ {
            go func(jobChan chan *Job, resultChan chan *Result) {
                // 执行运算
                // 遍历job管道所有数据,进行相加
                for job := range jobChan {
                    // 随机数接过来
                    r_num := job.RandNum
                    // 随机数每一位相加
                    // 定义返回值
                    var sum int
                    for r_num != 0 {
                        tmp := r_num % 10
                        sum += tmp
                        r_num /= 10
                    }
                    // 想要的结果是Result
                    r := &Result{
                        job: job,
                        sum: sum,
                    }
                    //运算结果扔到管道
                    resultChan <- r
                }
            }(jobChan, resultChan)
        }
    }
    

    五.并发安全和锁

    • 当多个goroutine操作同一个临界区的资源时,可能会有数据竞态问题,导致非预期结果

    样例如下,我们启动两个goroutine去累加x的值,但这两个goroutine访问x时会存在数据竞争

    var x int64
    var wg sync.WaitGroup
    
    func add() {
        for i := 0; i < 5000; i++ {
            x = x + 1
        }
        wg.Done()
    }
    func main() {
        wg.Add(2)
        go add()
        go add()
        wg.Wait()
        fmt.Println(x)
    }
    

    1.互斥锁

    • 互斥锁是一种常用的控制共享资源访问的方法
    • Go语言中使用sync包的Mutex类型来实现互斥锁
    • 使用互斥锁可以保证同一时间有且只有一个goroutine进入临界区,其他goroutine则在等待锁
    • 当互斥锁释放后,等待的goroutine才可以获取锁进入临界区
    • 多个goroutine同时等待一个锁时,唤醒的策略是随机的
    var x int64
    var wg sync.WaitGroup
    var lock sync.Mutex
    
    func add() {
        for i := 0; i < 5000; i++ {
            lock.Lock() // 加锁
            x = x + 1
            lock.Unlock() // 解锁
        }
        wg.Done()
    }
    func main() {
        wg.Add(2)
        go add()
        go add()
        wg.Wait()
        fmt.Println(x)
    }
    

    2.读写锁

    • 很多场景下是读多写少的,当我们并发去读取一个资源不涉及资源修改时,完全没有必要使用互斥锁,因为互斥锁是完全互斥的
    • 读多写少的情况下,使用读写锁是一个更好的选择
    • Go语言中使用sync包的RWMutex来实现读写锁
    • 读写锁分为两种:读锁和写锁
    • 当一个goroutine获取读锁之后,其他的goroutine如果是获取读锁则会获得读锁,如果是获取写锁就会等待
    • 当一个goroutine获取写锁之后,其他的goroutine无论是获取读锁还是获取写锁,都会等待
    • 读写锁非常适合读多写少的场景,如果读和写差别不大,则读写锁的优势就发挥不出来

    样例如下:

    var (
        x      int64
        wg     sync.WaitGroup
        lock   sync.Mutex
        rwlock sync.RWMutex
    )
    
    func write() {
        // lock.Lock()   // 加互斥锁
        rwlock.Lock() // 加写锁
        x = x + 1
        time.Sleep(10 * time.Millisecond) // 假设读操作耗时10毫秒
        rwlock.Unlock()                   // 解写锁
        // lock.Unlock()                     // 解互斥锁
        wg.Done()
    }
    
    func read() {
        // lock.Lock()                  // 加互斥锁
        rwlock.RLock()               // 加读锁
        time.Sleep(time.Millisecond) // 假设读操作耗时1毫秒
        rwlock.RUnlock()             // 解读锁
        // lock.Unlock()                // 解互斥锁
        wg.Done()
    }
    
    func main() {
        start := time.Now()
        for i := 0; i < 10; i++ {
            wg.Add(1)
            go write()
        }
    
        for i := 0; i < 1000; i++ {
            wg.Add(1)
            go read()
        }
    
        wg.Wait()
        end := time.Now()
        fmt.Println(end.Sub(start))
    }
    

    3.sync.Map

    • Go语言中内置的map并不是并发安全的

    样例如下:

    var m = make(map[string]int)
    
    func get(key string) int {
    	return m[key]
    }
    
    func set(key string, value int) {
    	m[key] = value
    }
    
    func main() {
    	wg := sync.WaitGroup{}
    	for i := 0; i < 20; i++ {
    		wg.Add(1)
    		go func(n int) {
    			key := strconv.Itoa(n)
    			set(key, n)
    			fmt.Printf("k=:%v,v:=%v\n", key, get(key))
    			wg.Done()
    		}(i)
    	}
    	wg.Wait()
    	
    	// fatal error: concurrent map writes
    }
    
    • Go语言的sync包中提供了一个开箱即用的并发安全版Map,sync.Map,开箱即用表示不用像内置的Map一样使用make函数初始化就能用,同时sync.Map内置了诸如Store,Load,LoadOrStore,Delete,Range等操作方法。
    var m = sync.Map{}
    
    func main() {
        wg := sync.WaitGroup{}
        for i := 0; i < 20; i++ {
            wg.Add(1)
            go func(n int) {
                key := strconv.Itoa(n)
                m.Store(key, n)
                value, _ := m.Load(key)
                fmt.Printf("k=:%v,v:=%v\n", key, value)
                wg.Done()
            }(i)
        }
        wg.Wait()
    }
    

    六.并发同步

    • Go语言中使用sync包的WaitGroup来实现并发任务的同步

    1.sync.WaitGroup

    sync.WaitGroup有以下三种方法:

    • (wg * WaitGroup) Add(delta int) : 计数器 + delta
    • (wg *WaitGroup) Done() : 计数器-1
    • (wg *WaitGroup) Wait() : 阻塞知道计数器变为0

    sync.WaitGroup内部维护着一个计数器,计数器的值可以增加和减少,例如当我们启动了N个并发任务时,就将计数器增加N,每个任务通过调用Done方法将计数器减1,通过调用Wait()来等待并发任务执行完,当计数器的值为0时间,表示所有并发任务都已经完成

    var wg sync.WaitGroup
    
    func hello() {
        defer wg.Done()
        fmt.Println("Hello Goroutine!")
    }
    func main() {
        wg.Add(1)
        go hello() // 启动另外一个goroutine去执行hello函数
        fmt.Println("main goroutine done!")
        wg.Wait()
    }
    

    2.sync.Once

    • 很多场景下,我们需要确保某些操作在高并发时只执行一次,例如只加载一次配置文件
    • Go语言中的sync包提供了一个针对只执行一次场景的解决方案-sync.Once
    • sync.Once只有一个Do方法,Do(f func())

    样例如下:

    • 延迟一个开销很大的初始化操作到真正用到它的时候再执行
    var icons map[string]image.Image
    
    var loadIconsOnce sync.Once
    
    func loadIcons() {
        icons = map[string]image.Image{
            "left":  loadIcon("left.png"),
            "up":    loadIcon("up.png"),
            "right": loadIcon("right.png"),
            "down":  loadIcon("down.png"),
        }
    }
    
    // Icon 是并发安全的
    func Icon(name string) image.Image {
        loadIconsOnce.Do(loadIcons)
        return icons[name]
    }
    

    sync.Once内部包含一个互斥锁和一个布尔值,互斥锁保证布尔值和数据的安全,而布尔值用来记录初始化是否完成,这样设计就能保证初始化操作的时候是并发安全的,并且初始化操作也不会执行多次

    七.原子操作

    • 代码中加锁操作因为设计到内核态的上下文切换,耗时比较高,代价也大
    • 针对基本数据类型我们可以使用原子操作来保证并发安全
    • Go语言中原子操作由内置的标准库sync/atomic提供

    样例如下:

    • 比较互斥锁和原子操作的性能
    var x int64
    var l sync.Mutex
    var wg sync.WaitGroup
    
    // 普通版加函数
    func add() {
        // x = x + 1
        x++ // 等价于上面的操作
        wg.Done()
    }
    
    // 互斥锁版加函数
    func mutexAdd() {
        l.Lock()
        x++
        l.Unlock()
        wg.Done()
    }
    
    // 原子操作版加函数
    func atomicAdd() {
        atomic.AddInt64(&x, 1)
        wg.Done()
    }
    
    func main() {
        start := time.Now()
        for i := 0; i < 10000; i++ {
            wg.Add(1)
            // go add()       // 普通版add函数 不是并发安全的
            // go mutexAdd()  // 加锁版add函数 是并发安全的,但是加锁性能开销大
            go atomicAdd() // 原子操作版add函数 是并发安全,性能优于加锁版
        }
        wg.Wait()
        end := time.Now()
        fmt.Println(x)
        fmt.Println(end.Sub(start))
    }
    
    • atomic包提供了底层的原子级内存操作,对于同步算法的实现很有帮助,但是除了某些特殊的底层应用,使用通道或者sync包实现同步更好!

    八.总结

    Go语言的并发模型是CSP,提倡通过通信来进行内存共享,而不是通过共享内存来实现通信

    心之所向,素履以往
  • 相关阅读:
    对话系统综述
    3.738. 单调递增的数字
    3.765-情侣牵手
    2.135-分发糖果
    1.312-戳气球
    4.BN推导
    3.CNN-卷积神经网络推导
    2.DNN-神经网络推导
    联系人
    DS博客作业05--查找
  • 原文地址:https://www.cnblogs.com/yinbiao/p/15752295.html
Copyright © 2011-2022 走看看