zoukankan      html  css  js  c++  java
  • Golang 并发编程

    并行和并发

    • 并行:在同一时刻,有多条指令在多个 CPU 处理器上同时执行
    • 并发:在同一时刻,只能有一条指令执行,但多个进程指令被快速地轮换执行

    go 语言并发优势

    • go 从语言层面就支持了并发
    • 简化了并发程序的编写

    goroutine 是什么

    • 它是 go 并发设计的核心
    • goroutine 就是协程,它比线程更小,十几个 goroutine 在底层可能就是五六个线程
    • go 语言内部实现了 goroutine 的内存共享,执行 goroutine 只需极少的栈内存(大概是 4~5KB)

    Go 语言的并发是基于 goroutine 的,goroutine 类似于线程,但并非线程。可以将 goroutine 理解为一种虚拟线程。Go 语言运行时会参与调度 goroutine,并将 goroutine 合理地分配到每个 CPU 中,最大限度地使用 CPU 性能。开启一个 goroutine 的消耗非常小(大约 2KB 的内存),你可以轻松创建数百万个 goroutine。

    goroutine 的特点:

    • goroutine 具有可增长的分段堆栈。这意味着它们只在需要时才会使用更多内存。
    • goroutine 的启动时间比线程快。
    • goroutine 原生支持利用 channel 安全地进行通信。
    • goroutine 共享数据结构时无需使用互斥锁。

    创建 goroutine

    • 只需要在语句前添加 go 关键字,就可以创建并发执行单元
    • 开发⼈员无需了解任何执⾏细节,调度器会自动将其安排到合适的系统线程上执行
    package main
    
    import (
    	"fmt"
    	"time"
    )
    
    // 测试协程
    func newTask() {
    	i := 0
    	for {
    		i++
    		fmt.Printf("new goroutine: i = %d
    ", i)
    		time.Sleep(time.Second)
    	}
    }
    
    func main() {
    	// 子协程调用方法
    	go newTask()
    	i := 0
    	for {
    		i++
    		fmt.Printf("main goroutine: i = %d
    ", i)
    		time.Sleep(time.Second)
    	}
    }
    

    如果主协程退出了,其他任务还执行吗?

    package main
    
    import (
    	"fmt"
    	"time"
    )
    
    func main() {
    	go func() {
    		i := 0
    		for {
    			i++
    			fmt.Printf("new froutine: i = %d
    ", i)
    			time.Sleep(time.Second)
    		}
    	}()
    	// 主程序退出子协程也会退出
    	for {}
    }
    

    runtime 包

    runtime.Gosched():让出 CPU 时间片,重新等待安排任务

    package main
    
    import (
    	"fmt"
    	"runtime"
    )
    
    func main() {
    	go func(s string) {
    		for i := 0; i < 2; i++ {
    			fmt.Println(s)
    		}
    	}("world")
    	// 主协程
    	for i := 0; i < 2; i++ {
    		// 切一下,再次分配任务
    		runtime.Gosched()
    		fmt.Println("hello")
    	}
    }
    

    runtime.Goexit(),退出当前协程

    package main
    
    import (
    	"fmt"
    	"runtime"
    )
    
    func main() {
    	go func() {
    		defer fmt.Println("A.defer")
    		func() {
    			defer fmt.Println("B.defer")
    			// 结束协程
    			runtime.Goexit()
    			defer fmt.Println("C.defer")
    			fmt.Println("B")
    		}()
    		fmt.Println("A")
    	}()
    	for {
    	}
    }
    

    runtime.GOMAXPROCS(),设置跑程序的 CPU 核数

    package main
    
    import (
    	"fmt"
    	"runtime"
    )
    
    func main() {
    	//runtime.GOMAXPROCS(1)
    	runtime.GOMAXPROCS(3)
    	for {
    		go fmt.Println(0)
    		fmt.Println(1)
    	}
    }
    

    channel 是什么

    • goroutine 运行在相同的地址空间,因此访问共享内存必须做好同步,处理好线程安全问题
    • goroutine 奉行通过通信来共享内存,而不是共享内存来通信
    • channel 是一个引用类型,用于多个 goroutine 通讯,其内部实现了同步,确保并发安全

    channel 的基本使用

    • channel 可以用内置 make()函数创建
    • 定义一个 channel 时,也需要定义发送到 channel 的值的类型
    make(chan 类型)
    make(chan 类型, 容量)
    
    • 当 capacity= 0 时,channel 是无缓冲阻塞读写的,当 capacity> 0 时,channel 有缓冲、是非阻塞的,直到写满 capacity 个元素才阻塞写入
    • channel 通过操作符<-来接收和发送数据,发送和接收数据语法:
    channel <- value      //数据存入管道
    <-channel             //管道取数据
    x := <-channel        //接收管道数据
    x, ok := <-channel    //功能根上面类似,OK是布尔值,检查管道是否为空或已经关闭
    
    package main
    
    import "fmt"
    
    func main() {
    	// 创建一个存储int类型的channel
    	c := make(chan int)
    	// 一个协程写入数据
    	go func() {
    		defer fmt.Println("子协程结束")
    		fmt.Println("子协程正在运行")
    		// 存数据
    		c <- 666
    	}()
    
    	// 外面读数据
    	num := <- c
    	fmt.Println("num = ", num)
    	fmt.Println("main 结束")
    }
    

    无缓冲的 channel

    • 无缓冲的通道是指在接收前没有能力保存任何值的通道

    有缓冲的 channel

    • 有缓冲的通道是一种在被接收前能存储一个或者多个值的通道
    package main
    
    import (
    	"fmt"
    	"time"
    )
    
    func main() {
    	// 0 代表无缓冲通道
    	//c := make(chan int, 0)
    	c := make(chan int, 3)
    	fmt.Printf("len(c)=%d, cap(c)=%d
    ", len(c), cap(c))
    	// 子协程去写数据
    	go func() {
    		defer fmt.Printf("子协程结束了")
    		for i := 0; i < 3; i++ {
    			c <- i
    			fmt.Printf("子协程在运行[%d], len(c)=%d, cap(c)=%d
    ", i, len(c), cap(c))
    		}
    	}()
    
    	time.Sleep(2 * time.Second)
    	for i := 0; i < 3; i++ {
    		num := <-c
    		fmt.Println("num=", num)
    	}
    	fmt.Println("main 结束")
    }
    

    close()

    • 可以通过内置的 close()函数关闭 channel
    package main
    
    import "fmt"
    
    func main() {
    	c := make(chan int)
    	go func() {
    		for i := 0; i < 5; i++ {
    			c <- i
    		}
    		close(c)
    	}()
    
    	for {
    		if data, ok := <-c; ok {
    			fmt.Println(data)
    		} else {
    			fmt.Println("结束了")
    			break
    		}
    	}
    	fmt.Println("main 结束")
    }
    

    单方向的 channel

    • 默认情况下,通道是双向的,也就是,既可以往里面发送数据也可以接收数据
    • go 可以定义单方向的通道,也就是只发送数据或者只接收数据,声明如下
    var ch1 chan int         //正常的,可以读,可以写
    var ch2 chan<- float64   //只写float64的管道
    var ch3 <-chan int        //只读int的管道
    
    • 可以将 channel 隐式转换为单向队列,只收或只发,不能将单向 channel 转换为普通 channel
    package main
    
    import "fmt"
    
    func main() {
    	// 定义通道
    	c := make(chan int, 3)
    	// chan转为只写的
    	var send chan<- int = c
    	// chan 转为只读
    	var recv <-chan int = c
    
    	send <- 1
    	fmt.Println(<-recv)
    	// 不能从单向转换回去
    }
    
    package main
    
    import "fmt"
    
    // 生产者 只写
    func producter(out chan<- int) {
    	defer close(out)
    	for i := 0; i < 5; i++ {
    		out <- i
    	}
    }
    // 消费者 只读
    func cunsumer(in <-chan int) {
    	for num := range in {
    		fmt.Println(num)
    	}
    }
    
    func main() {
    	// 定义通道
    	c := make(chan int, 3)
    	// chan转为只写的
    	var send chan<- int = c
    	// chan 转为只读
    	var recv <-chan int = c
    
    	go producter(send)
    	cunsumer(recv)
    
    	fmt.Println("main 结束")
    }
    

    Workpool 模型

    • 本质上是生产者消费者模型
    • 可以有效控制 goroutine 数量,防止暴涨
    • 需求:
      • 计算一个数字的各个位数之和,例如数字 123,结果为 1+2+3=6
      • 随机生成数字进行计算
    package main
    
    import (
    	"fmt"
    	"math/rand"
    )
    
    type Job struct {
    	Id      int
    	RandNum int
    }
    
    type Result struct {
    	job *Job
    	sum int
    }
    
    // 创建任务池
    func createPoll(poolSize int, jobChan chan *Job, resultChan chan *Result) {
    	// 根据指定的poolSize去启动协程个数
    	for i := 0; i < poolSize; i++ {
    		go func(jobChan chan *Job, resultChan chan *Result) {
    			for job := range jobChan {
    				r_num := job.RandNum
    				var sum int
    				for r_num != 0 {
    					tmp := r_num % 10
    					sum += tmp
    					r_num /= 10
    				}
    				r := &Result{
    					job: job,
    					sum: sum,
    				}
    				resultChan <- r
    			}
    		}(jobChan, resultChan)
    	}
    }
    
    func main() {
    	// 初始化管道
    	jobChan := make(chan *Job, 128)
    	resultChan := make(chan *Result, 128)
    
    	// 创建任务
    	createPoll(64, jobChan, resultChan)
    
    	// 开启协程去读数据
    	go func(resultChan chan *Result) {
    		for result := range resultChan {
    			fmt.Printf("job id: %d, randNum: %d, result: %d
    ",
    				result.job.Id, result.job.RandNum, result.sum)
    		}
    	}(resultChan)
    
    	// 循环生成随机数
    	var id int
    	for {
    		id++
    		rand_num := rand.Int()
    		job := &Job{
    			Id:      id,
    			RandNum: rand_num,
    		}
    		jobChan <- job
    	}
    }
    

    定时器

    Timer:时间到了,执行只执行 1 次

    package main
    
    import (
    	"fmt"
    	"time"
    )
    
    func main() {
    	// 1. timer基本使用
    	//timer01 := time.NewTimer(2 * time.Second)
    	//t1 := time.Now()
    	//fmt.Printf("t1:%v
    ", t1)
    	//t2 := <- timer01.C
    	//fmt.Printf("t2:%v
    ", t2)
    
    	// 2. 验证timer只能响应1次
    	//timer02 := time.NewTimer(time.Second)
    	//for {
    	//	<- timer02.C
    	//	fmt.Println("时间到了")
    	//}
    
    	// 3. 实现延时的功能
    	// 1)
    	//time.Sleep(time.Second)
    	// 2)
    	//timer03 := time.NewTimer(2*time.Second)
    	//<- timer03.C
    	//fmt.Println("2秒到")
    	// 3)
    	//<- time.After(2*time.Second)
    	//fmt.Println("2秒到")
    
    	// 4. 停止定时器
    	//timer04 := time.NewTimer(2*time.Second)
    	//go func() {
    	//	<-timer04.C
    	//	fmt.Println("定时器执行力")
    	//}()
    	//b := timer04.Stop()
    	//if b {
    	//	fmt.Println("timer04定时器关闭了")
    	//}
    
    	// 5. 重置定时器
    	timer05 := time.NewTimer(3*time.Second)
    	timer05.Reset(1*time.Second)
    	fmt.Println(time.Now())
    	fmt.Println(<-timer05.C)
    }
    

    Ticker:时间到了,多次执行

    package main
    
    import (
    	"fmt"
    	"time"
    )
    
    func main() {
    	// 1. 获取ticker对象
    	ticker := time.NewTicker(1*time.Second)
    	i := 0
    	go func() {
    		for {
    			i++
    			fmt.Println(<-ticker.C)
    			if i == 5 {
    				ticker.Stop()
    			}
    		}
    	}()
    
    	for{}
    }
    

    select

    • go 语言提供了 select 关键字,可以监听 channel 上的数据流动
    • 语法与 switch 类似,区别是 select 要求每个 case 语句里必须是一个 IO 操作
    select {
    case <-chan1:
       // 如果chan1成功读到数据,则进行该case处理语句
    case chan2 <- 1:
       // 如果成功向chan2写入数据,则进行该case处理语句
    default:
       // 如果上面都没有成功,则进入default处理流程
    }
    

    select 可以同时监听一个或多个 channel,直到其中一个 channel ready

    package main
    
    import (
    	"fmt"
    	"time"
    )
    
    func test01(ch chan string) {
    	time.Sleep(2 * time.Second)
    	ch <- "hello"
    }
    
    func test02(ch chan string) {
    	time.Sleep(5 * time.Second)
    	ch <- "world"
    }
    
    func main() {
    	chan01 := make(chan string)
    	chan02 := make(chan string)
    
    	go test01(chan01)
    	go test02(chan02)
    
    	select {
    	case s1 := <-chan01:
    		fmt.Println("s1=", s1)
    	case s2 := <-chan02:
    		fmt.Println("s2=", s2)
    	}
    }
    

    ** 如果多个 channel 同时 ready,则随机选择一个执行**

    package main
    
    import "fmt"
    
    func main() {
    	ch01 := make(chan int)
    	ch02 := make(chan int)
    
    	go func() {
    		ch01 <- 1
    	}()
    
    	go func() {
    		ch02 <- 2
    	}()
    
    	select {
    	case val := <-ch01:
    		fmt.Println("ch01, ->", val)
    	case val := <-ch02:
    		fmt.Println("ch02, ->", val)
    	}
    }
    

    可以用于判断管道是否存满

    package main
    
    import (
    	"fmt"
    	"time"
    )
    
    func writeChan(ch chan int) {
    	var i int
    	for {
    		i++
    		select {
    		case ch <- i:
    			fmt.Println("set val: ", i)
    		default:
    			fmt.Println("channel full")
    		}
    		time.Sleep(500*time.Millisecond)
    	}
    }
    
    func main() {
    	// 定义管道
    	ch := make(chan int, 10)
    	// 开启协程去写数据
    	go writeChan(ch)
    
    	// 取数据
    	for i := range ch {
    		fmt.Println("get val: ", i)
    		time.Sleep(2*time.Second)
    	}
    }
    

    等待组

    • sync.WaitGroup:用来等待一组子协程的结束,需要设置等待的个数,每个子协程结束后要调用 Done(),最后在主协程中 Wait()即可
    • 有 3 个方法
      • Add():添加计数
      • Done():操作结束,计数减 1
      • Wait():等待所有操作结束
    package main
    
    import "fmt"
    
    // 手动实现协程等待
    
    func main() {
    	// 创建管道
    	ch := make(chan int)
    	// 计数,代表子协程个数
    	count := 2
    	go func() {
    		fmt.Println("子协程01")
    		ch <- 1
    	}()
    
    	go func() {
    		fmt.Println("子协程02")
    		ch <- 1
    	}()
    
    	for range ch {
    		count --
    		if count == 0 {
    			close(ch)
    		}
    	}
    }
    
    package main
    
    import (
    	"fmt"
    	"sync"
    )
    
    // 等待组
    
    func main() {
    	// 声明等待组
    	var wg sync.WaitGroup
    	wg.Add(2)
    	go func() {
    		fmt.Println("子协程01")
    		// 协程完成执行done操作
    		wg.Done()
    	}()
    
    	go func(){
    		defer wg.Done()
    		fmt.Println("子协程02")
    	} ()
    
    	wg.Wait()
    }
    

    互斥锁

    • go 中 channel 实现了同步,确保并发安全,同时也提供了锁的操作方式,确保多个协程的安全问题
    • go 中 sync 包提供了锁相关的支持
    • Mutex 互斥锁:以加锁方式解决并发安全问题
    package main
    
    import (
    	"fmt"
    	"sync"
    )
    
    // 没有对全局变量加锁,多个修改操作同时对x修改会产生数据错误
    var x int
    var wg sync.WaitGroup
    
    func add() {
    	defer wg.Done()
    	for i := 0; i < 5000; i++ {
    		x += 1
    	}
    }
    
    func main() {
    	wg.Add(3)
    	go add()
    	go add()
    	go add()
    	wg.Wait()
    	fmt.Println("x->", x)
    }
    
    package main
    
    import (
    	"fmt"
    	"sync"
    )
    
    // 没有对全局变量加锁,多个修改操作同时对x修改会产生数据错误
    var x int
    var wg sync.WaitGroup
    // 声明锁
    var mutex sync.Mutex
    
    func add() {
    	defer wg.Done()
    	for i := 0; i < 5000; i++ {
    		// 加锁
    		mutex.Lock()
    		x += 1
    		// 解锁
    		mutex.Unlock()
    	}
    }
    
    func main() {
    	wg.Add(3)
    	go add()
    	go add()
    	go add()
    	wg.Wait()
    	fmt.Println("x->", x)
    }
    

    读写锁

    • 分为读锁和写锁
    • 当读操作多时,不涉及数据修改,应该允许程序同时去读,写的时候再加锁
    package main
    
    import (
    	"fmt"
    	"sync"
    	"time"
    )
    
    // 声明读写锁
    var rwLock sync.RWMutex
    var wg sync.WaitGroup
    // 全局变量
    var x int
    
    func write() {
    	rwLock.Lock()
    	fmt.Println("write rwlock")
    	x += 1
    	time.Sleep(2 * time.Second)
    	fmt.Println("write rwunlock")
    	rwLock.Unlock()
    	wg.Done()
    }
    
    func read(i int) {
    	rwLock.RLock()
    	fmt.Println("read rwlock")
    	fmt.Printf("gorountine: %d x=%d
    ", i, x)
    	time.Sleep(5 * time.Second)
    	fmt.Println("read rwunlock")
    	rwLock.RUnlock()
    	wg.Done()
    }
    
    func main() {
    	wg.Add(1)
    	go write()
    	time.Sleep(time.Millisecond * 50)
    	for i := 0; i < 10; i++ {
    		wg.Add(1)
    		go read(i)
    	}
    	wg.Wait()
    }
    

    读写锁与互斥锁性能对比

    package main
    
    import (
    	"fmt"
    	"sync"
    	"time"
    )
    
    // 声明读写锁
    var rwLock sync.RWMutex
    // 声明互斥锁
    var mutex sync.Mutex
    // 声明等待组
    var wg sync.WaitGroup
    // 全局变量
    var x int
    
    // 写数据
    func write() {
    	for i := 0; i < 100; i++ {
    		//mutex.Lock()
    		rwLock.Lock()
    		x += 1
    		time.Sleep(10 * time.Millisecond)
    		//mutex.Unlock()
    		rwLock.Unlock()
    	}
    	wg.Done()
    }
    
    func read(i int) {
    	for i := 0; i < 100; i++ {
    		//mutex.Lock()
    		rwLock.RLock()
    		time.Sleep(time.Millisecond)
    		//mutex.Unlock()
    		rwLock.RUnlock()
    	}
    	wg.Done()
    }
    
    // 互斥锁运行时间: 14551861000
    // 读写锁运行时间: 1321229000
    func main() {
    	start := time.Now().UnixNano()
    
    	wg.Add(1)
    	go write()
    	for i := 0; i < 100; i++ {
    		wg.Add(1)
    		go read(i)
    	}
    	wg.Wait()
    	end := time.Now().UnixNano()
    	fmt.Println("运行时间: ", end-start)
    }
    

    原子操作

    • 加锁操作比较耗时,整数可以使用原子操作保证线程安全
    • 原子操作在用户态就可以完成,因此性能比互斥锁高
    • 原子操作方法在 doc.go 文件中
      • AddXxx():加减操作
      • CompareAndSwapXxx():比较并交换
      • LoadXxx():读取操作
      • StoreXxx():写入操作
      • SwapXxx():交换操作

    原子操作与互斥锁性能对比

    package main
    
    import (
    	"fmt"
    	"sync"
    	"sync/atomic"
    	"time"
    )
    
    // 声明互斥锁
    var mutex sync.Mutex
    // 声明等待组
    var wg sync.WaitGroup
    // 全局变量
    var x int32
    
    // 互斥锁操作
    func add01() {
    	for i := 0; i < 500; i++ {
    		mutex.Lock()
    		x += 1
    		mutex.Unlock()
    	}
    	wg.Done()
    }
    
    // 原子操作
    func add02() {
    	for i := 0; i < 500; i++ {
    		atomic.AddInt32(&x, 1)
    	}
    	wg.Done()
    }
    
    // 互斥锁运行时间:   1896256000
    // 原子操作运行时间: 89723000
    func main() {
    	start := time.Now().UnixNano()
    
    	for i := 0; i < 10000; i++ {
    		wg.Add(1)
    		//go add01()
    		go add02()
    	}
    
    	wg.Wait()
    	end := time.Now().UnixNano()
    	fmt.Println("x: ", x)
    	fmt.Println("运行时间: ", end-start)
    }
    

    Map 的并发操作

    Map 不是并发安全的,但并发读是没问题的

    package main
    
    import (
    	"fmt"
    	"sync"
    )
    
    func main() {
    	wg := sync.WaitGroup{}
    	m := make(map[int]int)
    
    	// 添加一些数据
    	for i := 0; i < 10; i++ {
    		m[i] = i
    	}
    
    	// 并发去读
    	for i := 0; i < 10; i++ {
    		wg.Add(1)
    		go func(k int) {
    			fmt.Println(m[k])
    			wg.Done()
    		}(i)
    	}
    	wg.Wait()
    	fmt.Println(m)
    }
    

    Map 并发写是有问题的

    package main
    
    import (
    	"fmt"
    	"sync"
    )
    
    func main() {
    	wg := sync.WaitGroup{}
    	m := make(map[int]int)
    	mutex := sync.Mutex{}
    
    	// 并发写
    	for i := 0; i < 10; i++ {
    		wg.Add(1)
    		go func(k int) {
    			defer wg.Done()
    			mutex.Lock()
    			m[k] = k
    			mutex.Unlock()
    		}(i)
    	}
    	wg.Wait()
    	fmt.Println(m)
    }
    

    Golang 为什么这么块: Go 为什么这么“快”-腾讯技术

  • 相关阅读:
    XO Wave-数字音频编纂软件
    LSS:撰写 LaTeX 的扶直对象
    Ubuntu 推出能主动装置编码器、Flash、Java、MS 字体的新包
    目前国内主要有4家“播客”网站
    开始换用 Delphi 2009
    关于 Delphi 中流的使用(10): 压缩与解压缩进度 回复 "ilst" 的问题
    试试带参数的 Exit
    在 Delphi 中调用 JavaScript(2)
    在 Delphi 中调用 JavaScript(1) 回复 "fancy" 的问题
    如何获取重载函数的地址 回复 "flq_00000" 的问题
  • 原文地址:https://www.cnblogs.com/zhichaoma/p/12509725.html
Copyright © 2011-2022 走看看