zoukankan      html  css  js  c++  java
  • Golang并发和并行(子gorutine)

    前言

    CPU最小执行单位是线程,后台开发人员一直在费尽心思得解决大并发问题 从单线程----->多线程(切换)-------->协程(上下文开销小),无非是在寻找1种相对完美的方案当1个线程遇到IO阻塞时可以让OS以最小的开销把另1个线程调度到CPU上继续执行。规避IO、最大限度地把所有物理CPU利用起来。

    gorutine就是来自Goole的免费解决方案。

    gorutine不是被os调度的线程而是由golang的rutime调度的用户态的微线程(协程)。因为它是微线程所以它的上下文切换开销小。执行速度也快!

    当然CPU最小的执行单位还是线程,golang的rutime最终也会把这些协程自己调度到线程池中的线程上。

    Goroutines are part of making concurrency easy to use.

    1.让程序员可以轻松得使用实现大并发效果。

    The idea, which has been around for a while, is to multiplex independently executing functions—coroutines—onto a set of threads.

    2.Gorutine是建立在线程池之上来实现的

    When a coroutine blocks, such as by calling a blocking system call, the run-time automatically moves other coroutines on the same operating system thread

    to a different, runnable thread so they won't be blocked.

    3.当1个Gorutine遇到IO阻塞时Golang的run-time 会自动得把另一个Gorutine调度到另1个没有被阻塞的线程里面。(永远不会阻塞)

    The programmer sees none of this, which is the point. The result, which we call goroutines, can be very cheap: unless they spend a lot of time in long-running system calls, they cost little more than the memory for the stack, which is just a few kilobytes.

    5.Gorutin开销非常小,非常轻量级(can be very cheap)大概几千字节。

    To make the stacks small, Go's run-time uses segmented stacks. A newly minted goroutine is given a few kilobytes, which is almost always enough.When it isn't, the run-time allocates (and frees) extension segments automatically.

    6.为了使Gorutin的stack更小,Golang的runtime使用了segmented stacks(把1个栈划分成一小块一小块的)当Gorutine的栈空间不足时还能自动扩展/缩容。666

    The overhead(开销) averages about three cheap instructions per function call. It is practical to create hundreds of thousands of goroutines in the same address space. If goroutines were just threads, system resources would run out at a much smaller number.

    7.创建几千个Gorutine也是可行的!

    并发与并行

    并发:同一时间段内执行多个任务(你在用微信和两个女朋友聊天)。

    并行:同一时刻执行多个任务(你和你朋友都在用微信和女朋友聊天)。

    Go语言的并发通过goroutine实现。goroutine类似于线程,属于用户态的线程,比较轻量级所以我们可以根据需要创建成千上万个goroutine并发工作。

    goroutine是由Go语言的运行时(runtime)调度完成,而线程是由操作系统调度完成的,所以它的执行时间也比较快。

    Go语言还提供channel在多个goroutine间进行通信

    goroutinechannel是 Go 语言秉承的 CSP(Communicating Sequential Process)并发模式的重要实现基础。

     

    goroutine简介

    在java/c++中我们要实现并发编程的时候,我们通常需要自己维护一个线程池,并且需要自己去包装一个又一个的任务。

    同时需要自己去调度线程执行任务并维护上下文切换,这一切通常会耗费程序员大量的心智。

    那么能不能有一种机制,程序员只需要定义很多个任务,让系统去帮助我们把这些任务分配到CPU上实现并发执行呢?

    我们原来的实现使用线程实现并发的方案流程是:

    程序----》os线程池-----》os调度线程----->cpu

    在Golang中

    程序---》gorutine------》go's runtime调度gorutines--------》线程池-------》os线程接口-----》os调度线程----->cpu

    Go语言中的goroutine就是这样一种线程封装机制gorutine是用户态的线程。

    但 goroutine是由Go的运行时(runtime)调度和管理的。Go程序中的runtime会智能地将 goroutine 中的任务合理地分配给每个CPU

    Go语言之所以被称为现代化的编程语言,就是因为Golang在语言层面已经内置了调度和上下文切换的机制

    在Go语言编程中你不需要去自己写进程、线程、协程,你的技能包里只有一个技能–goroutine,当你需要让某个任务并发执行的时候,你只需要把这个任务包装成一个函数,开启一个goroutine去执行这个函数就可以了,就是这么简单。

    使用gorutine

    使用gorutine非常简单就是在定义的函数前加go关键字就行

    需要注意的是当main函数结束后,由main函数启动的gorutine也会全部消失。

    package main
    
    import "fmt"
    
    func hello() {
    	fmt.Println("hello")
    }
    //程序启动之后会主动创建1个main gorutine
    func main() {
    	go hello() //开启1个独立的gorutine
    	fmt.Println("main")
    	//main函数结束之后由main函数启动的gorutine也全部结束
    }
    

    但是由gorutine启动的子gorutine并不会随着父gorutine的结束而结束!

    // package main
    
    // import "fmt"
    
    // func hello() {
    // 	fmt.Println("hello")
    // }
    // //程序启动之后会主动创建1个main gorutine
    // func main() {
    // 	go hello() //开启1个独立的gorutine
    // 	fmt.Println("main")
    // 	//main函数结束之后由main函数启动的gorutine也全部结束
    // }
    
    // package main
    
    // import "fmt"
    // func main() {
    // 	for i := 0; i < 1000; i++ {
    // 		//如何证明goritine是异步的
    // 		go func() {
    // 			fmt.Println(i)
    // 		}()
    // 	}
    
    // }
    /*
    14
    16
    104
    40
    40
    40
    154
    164
    171
    40
    40
    40
    415
    464
    44
    44
    44
    44
    为什么会有3个444呢?因为for循环也在执行,gorutine也在执行
    */
    
    package main
    
    import (
    	"fmt"
    	"runtime"
    	"time"
    )
    
    func child() {
    	for {
    		time.Sleep(time.Second * 2)
    		fmt.Println("我是child gorutine")
    	}
    
    }
    
    func main() {
    	//main函数启动的gorutine会随着main函数的结束而结束
    	go func() {
    		fmt.Println("------我是parent gorutine开始")
    		//gorutine启动的子gorutine并不会随着gorutine的结束而结束
    		go child()
    		fmt.Println("------我是parent gorutine结束")
    		return
    	}()
    
    	for {
    		runtime.GC()
    		//GC runs a garbage collection and blocks the caller until the garbage collection is complete. It may also block the entire program.
    	}
    }
    

      

    sync.WaitGroup保证多个gorutine执行顺序

    sync.WaitGroup实现main gorutin等待所有goritines结束
    package main
    
    import (
    	"fmt"
    	"math/rand"
    	"sync"
    	"time"
    )
    
    //waitGroup协调gortines顺序
    var wg sync.WaitGroup
    
    func f() {
    	//在go中生成随机数字(要加seed种子)
    	rand.Seed(time.Now().UnixNano())
    	for i := 0; i < 5; i++ {
    		n1 := rand.Intn(11)
    		fmt.Println(n1)
    	}
    
    }
    
    func f1(i int) {
    	// goroutine结束就登记-1
    	defer wg.Done()
    	//开启1个gorutine:睡300毫秒
    	time.Sleep(time.Millisecond * time.Duration(rand.Intn(300)))
    	fmt.Printf("goroutine%d
    ",i)
    }
    
    func main() {
    	for i := 0; i < 10; i++ {
    		// 启动一个goroutine就登记+1
    		wg.Add(1)
    		go f1(i)
    	}
    	//如何等待10个gorutines全部完成,main函数再结束。
    	wg.Wait()//wg.Wait()等待计数器减为0
    
    }
    

      

     

    GOMAXPROCS 并行

    Go运行时的调度器使用GOMAXPROCS参数来确定需要使用多少个OS线程来同时执行Go的gorutine。

    GOMAXPROCS默认值是机器上的CPU核心数。(跑满所有cup)例如在一个8核心的机器上,调度器会把Go代码同时调度到8个OS线程上(GOMAXPROCS是m:n调度中的n)。

    也就是说你使用的是Go1.5之后的版本,开启多个gorutine之后,它们的执行顺序 默认就是并行的    go可没有Python中的GIL锁啊! 多个cpu可同1时刻跑多个os线程。

    但是有时候我们也需要玩得佛系一点!

    比如我们写得 go程序 主要是采集一些监控信息入库.....一些IO密集型操作,跑满所有cup反而会增加rutine的调度频率,影响到服务器上其他的业务!

    Go语言中可以通过runtime.GOMAXPROCS()函数设置当前程序并发时占用的CPU逻辑核心数。

    package main
    
    import (
    	"fmt"
    	"runtime"
    	"sync"
    )
    
    var wg sync.WaitGroup
    
    func f1() {
    	defer wg.Done()
    	for i := 0; i < 10; i++ {
    		fmt.Printf("f1:%d
    ", i)
    	}
    }
    
    func f2() {
    	defer wg.Done()
    	for i := 0; i < 10; i++ {
    		fmt.Printf("f2:%d
    ", i)
    	}
    }
    
    func main() {
    	//默认go的runtime会把gorutine调度到机器所有cpu上
    	//不过我们可设置gorutine可以被调度到几个CUP上
    	runtime.GOMAXPROCS(8)
    	wg.Add(2)
    	go f1()
    	go f2()
    	wg.Wait()
    }
    

      

    Go语言中os线程和goroutine的关系

    cpu执行的最小单位是os线程,所有的gorutine最终都需要被runtime调度、映射到真正的os线程上,才能被CPU执行。

    1个os线程对应用户态N个goroutine。

    1个go程序可以同时使用N个os线程。

    goroutine和os线程是多对多的映射关系,即m:n。

    gorutine的调用模型(GMP)

    GPM是Go语言运行时(runtime)层面的实现的,其目的是把M个gorutine映射成N个os线程,然后再被os调度到cpu执行。

    • G(goroutine)里面除了存放本goroutine信息外 还有与所在P的绑定等信息。
    • P(processor) 管理着一组goroutine队列,P里面会存储当前goroutine运行的上下文环境(函数指针,堆栈地址及地址边界),P会对自己管理的goroutine队列做一些调度(比如把占用CPU时间较长的goroutine暂停、运行后续的goroutine等等)当自己的队列消费完了就去全局队列里取,如果全局队列里也消费完了会去其他P的队列里抢任务。
    • M(machine)是Go运行时(runtime)对操作系统内核线程的虚拟, M与内核线程一般是1:1映射的关系, 一个groutine最终是要放到M上执行的;

    P与M一般也是1:1对应的。

    他们关系是:

    P管理着一组G挂载在M上运行。

    Processor里其中1个Gorutine长久阻塞在一个Machine上时,runtime会新建一个Machine,阻塞Gorutine所在的Processor会把剩余的Gorutine挂载在新建的Mechine上。

    当被阻塞的Gorutie阻塞完成或者认为其已经死掉时 回收旧的Machine。

    推理得出:1Processor管理N个gorutines-------->1个processor又被挂载1个mechine运行----------》1个mechine映射到1个os线程--------->被cpu执行掉。

    P的个数是通过runtime.GOMAXPROCS设定(最大256),Go1.5版本之后默认为物理线程数。 在并发量大的时候会增加一些P和M,但不会太多,切换太频繁的话得不偿失。

    单从线程调度讲,Go语言相比起其他语言的优势在于OS线程是由OS内核来调度的,goroutine则是由Go运行时(runtime)自己的调度器调度的,这个调度器使用一个称为m:n调度的技术(复用/调度m个goroutine到n个OS线程)。 其一大特点是goroutine的调度是在用户态下完成的, 不涉及内核态与用户态之间的频繁切换,包括内存的分配与释放,都是在用户态维护着一块大的内存池, 不直接调用系统的malloc函数(除非内存池需要改变),成本比调度OS线程低很多。 另一方面充分利用了多核的硬件资源,近似的把若干goroutine均分在物理线程上, 再加上本身goroutine的超轻量,以上种种保证了go调度方面的性能。

    点我了解更多

    channel

    Go语言的并发模型是CSP(Communicating Sequential Processes)通信顺序进程,就是通过通信的方式实现程序(gorutine)之间的顺序执行

    当每个独立的gorutine并发执行之后,它们之间的如何交换数据?多个gorurine协作起来系统得完成1个系统?

    channel是golang中的1种数据类型,它专用于gorutine之间数据交换,充当队列角色。

    特点

    单双工通信、数据流向为先进先出

    数据不能反复读取读完了就没有了

    有缓冲区channel实现同步通信

    无缓冲区channel有了存储数据的功能实现了异步通信

    定义channel

    package main
    
    import "fmt"
    
    //需要指定channel中存放元素的类型
    var channel chan int
    
    func main() {
    	//channel是引用类型需要make()开辟内存
    	fmt.Println(channel)//<nil>
    	//初始化channel1设置它的缓存取大小为5个int
    	channel=make(chan int,5)
    	fmt.Println(channel)
    }
    

    channel使用注意细节

    channel的capacity是固定了,length是动态的。

    只要在初始化阶段固定了channel的长度(N)。

    生产者只能投放N个数据,不能让channel超载数据。

    同理channel中的(N)个数据读取(消费)完了,消费者也无法再消费。

     计划经济:不能产能过剩、不能超前消费。否则会引起panic deadlock(死锁)

    1.channel不支持在线扩容

    package main
    
    import (
    	"fmt"
    	"sync"
    )
    
    var wg sync.WaitGroup
    
    //需要指定channel中存放元素的类型
    var channel chan int
    func main() {
    	//channel是引用类型需要make()开辟内存
    	// fmt.Println(channel) //<nil>
    	//初始化channel1设置它的缓存取大小为2个int
    	channel = make(chan int,2)
    	channel <- 1
    	fmt.Println("1发送到了通道channel1中")
    	channel <- 2
    	fmt.Println("2发送到了通道channel1中")
    	//fatal error: all goroutines are asleep - deadlock!
    	//你给我定义时说让我存2个int,我不能超载!!!!
    	channel <- 3
    	fmt.Println("3发送到了通道channel1中")
    	close(channel)
    
    
    }
    

      

    2.channel里面的元素读取完了就不能再获取了

    package main
    
    import (
    	"fmt"
    	"sync"
    )
    
    var wg sync.WaitGroup
    
    //需要指定channel中存放元素的类型
    var channel chan int
    func main() {
    	//channel是引用类型需要make()开辟内存
    	// fmt.Println(channel) //<nil>
    	//初始化channel1设置它的缓存取大小为2个int
    	channel = make(chan int,2)
    	channel <- 1
    	fmt.Println("1发送到了通道channel1中")
    	channel <- 2
    	fmt.Println("2发送到了通道channel1中")
    	v1,_:=<-channel
    	fmt.Printf("从管道channel1中获取到了%d
    ",v1)
    	v2,_:=<-channel
    	fmt.Printf("从管道channel1中获取到了%d
    ",v2)
    	//fatal error: all goroutines are asleep - deadlock!
    	//管道不超载,所以不要多取值
    	v3,_:=<-channel
    	fmt.Printf("从管道channel1中获取到了%d
    ",v3)
    	
    	
    
    }
    

    3.close(channel)

    channel关闭之后有1个好处就是 可以随便从channel中读取数据不会引起pannic
    但是写入数据会报错。
    package main
    
    import "fmt"
    
    var intchannel chan int
    
    func main() {
    	intchannel = make(chan int, 3)
    	intchannel <- 1
    	intchannel <- 2
    	//channel关闭之后:可以随便从channel中读取数据不会引起pannic,但是写入数据会报错。
    	close(intchannel)
    	//可以随便读
    	<-intchannel
    	<-intchannel
    	<-intchannel
    	<-intchannel
    	//值读完了之后返回:false 0(默认值)
    	v1,ok := <-intchannel
    	fmt.Println(ok,v1)
    	//不能写panic: send on closed channel
    	// intchannel <- 3
    
    }
    

    4.channel的遍历 

    我们可以使用for range 循环遍历已经关闭的channel。

    package main
    
    import "fmt"
    
    var intchannel chan int
    
    func main() {
    	intchannel = make(chan int, 3)
    	intchannel <- 1
    	intchannel <- 2
    	intchannel <- 3
    	//channel关闭之后:可以随便从channel中读取数据不会引起pannic,但是写入数据会报错。
    	close(intchannel)
    	//遍历已经close的channel不会报错
    	for v := range intchannel {
    		fmt.Println(v)
    	}
    
    }
    

    5.生产者和消费者步调不一致时

    只要在channel的两端有供(生产者)消(消费者)存在,程序会阻塞,但不永远会造成程序死锁! 

     当生产者在channel中send的数据小于/大于消费者可消费的数目时或者消费者消费的数据量大于生产者生产的数目都不会引起程序死锁!

    看到这样的报错心里也就有数了!

    fatal error: all goroutines are asleep - deadlock!

    无非是你写的代码哪个环节?channel中有值但是没人消费!或者有消费但是没值了! 

    你只能在那里一直等导致main gorutine无法退出,一旦主 gorutine无法退出,所以的子gorutines也全部会被阻塞,从而导致deadlock.

    package main
    
    import (
    	"fmt"
    	"math/rand"
    	"time"
    )
    
    func senNum(ch chan<- int) {
    	for {
    		n := rand.Intn(10)
    		time.Sleep(5 * time.Second)
    		ch <- n
    	}
    
    }
    
    func reciveNum(ch <-chan int) {
    	for {
    		time.Sleep(1 * time.Second)
    		ok, n := <-ch 
    		/*
    		等(阻塞)4秒才能接受到值,因为senNum()每隔5秒钟才会给chanek中send1个值
    		*/
    		fmt.Println(ok, n)
    	}
    
    }
    
    func main() {
    	ch := make(chan int, 1)
    	go senNum(ch)
    	reciveNum(ch)
    }
    

      

    操作channel

    channel 解决了多个并发的gorutines之间有序传送数据的问题,但是使用它需要注意一下几点。

     那些我使用channel遇到的坑

    package main
    
    import (
    	"fmt"
    	"time"
    )
    
    var ch1 chan int
    
    // var once sync.Once
    
    func readChannle(channel chan int) {
    
    	for v := range channel {
    		fmt.Println(v)
    	}
    
    }
    
    func main() {
    	ch1 = make(chan int, 2)
    	ch1 <- 1
    	ch1 <- 2
    	//ch1 <- 3
    	/* bug 1
    	会阻塞:因为ch1的缓冲区大小为2个int,你却还给它send值,
    	它已经装不下第3个值了!(除非有消费者在另一端消费)
    	*/
    	<-ch1
    	<-ch1
    	//<-ch1 //deadlock死锁
    	/*
    		bug2
    		会阻塞:因为ch1里面的值全部被获取完了
    		除非有生产者给channel中写值 main函数不能能一直在这里等值 或者你是gorutine可以一直等
    	*/
    
    	close(ch1)
    	readChannle(ch1)
    	<-ch1
    	/*
    		benefit 1
    		关闭了channel之后即便channel的值被取光了,再从channel中获取值
    		程序也不会阻塞造成死锁
    	*/
    
    	//ch1<-1
    	/*
    		bug2
    		send on closed channel
    		但是channel关闭之后就不能在send数据了
    	*/
    
    	//close(ch1)
    	/*
    		bug3 channel不能被close两次
    	*/
    
    	/*
    	Warnning 如何在gorutine中close(channel)
    	让gorutine外面的main函数可以获取值呢?
    	gorutine可是并行的,g1关闭了 g1就不能写和再关闭了
    	解决:once.Do(func() { close(ch2) })互斥锁
    
    	*/
    	go readChannle(ch1)
    	go readChannle(ch1)
    	go readChannle(ch1)
    	go readChannle(ch1)
    	go readChannle(ch1)
    	time.Sleep(2 * time.Second)
    
    }
    

      

    发送

    channel <- 1
    

      

    接收

    value, ok := <-channel
    

    练习1 

    package main
    
    import (
    	"fmt"
    	"sync"
    )
    
    var wg sync.WaitGroup
    
    //生产者生产10个包子
    func producer(channel1 chan string) {
    	defer wg.Done()
    	for i := 0; i < 10; i++ {
    		channel1 <- fmt.Sprintf("包子%d
    ", i)
    	}
    	//关闭channel以便于别人来取chanel1取值值时不会报错啊!
    	close(channel1)
    }
    
    //消费者
    func consumer(channel1 chan string) {
    	defer wg.Done()
    	for v := range channel1 {
    		fmt.Println(v)
    	}
    }
    
    
    func main() {
    	wg.Add(4)
    	channal1 := make(chan string, 10)
    	go producer(channal1) 
    	go consumer(channal1)//消费者1
    	go consumer(channal1)//
    	go consumer(channal1)//
    	wg.Wait()
    	fmt.Println("main结束")
    }
    

      

    练习2

    /*
    1.启动1个gorutine1,生成100个数字写入channel1中
    2.启动1个gorutine2,从gorutine中获取到数字,计算其平方写入channel2。
    3.main函数从channel2中查询结果
    */
    package main
    
    import (
    	"fmt"
    	"sync"
    )
    
    var wg sync.WaitGroup
    var once sync.Once
    
    func producer(chanel1 chan int) {
    	defer wg.Done()
    	for i := 0; i < 100; i++ {
    		chanel1 <- i
    	}
    	close(chanel1)
    
    }
    
    func consumer(ch1 chan int, ch2 chan int) {
    	defer wg.Done()
    	for v := range ch1 {
    		ch2 <- v * v
    	}
    	//确保某个close操作被gorutines抢到后只被 close 1次
    	once.Do(func() { close(ch2) })
    
    }
    
    func main() {
    	wg.Add(6)
    	ch1 := make(chan int, 100)
    	ch2 := make(chan int, 100)
    	go producer(ch1)
    	go consumer(ch1, ch2)
    	go consumer(ch1, ch2)
    	go consumer(ch1, ch2)
    	go consumer(ch1, ch2)
    	go consumer(ch1, ch2)
    	wg.Wait()
    	for v := range ch2 {
    		fmt.Println(v)
    
    	}
    
    }
    

      

    单向通道 

    我们默认使用的channel是双向通道(既可以向channel中 send值,也可以向通道中read值),单向通道就是限制channel仅能sen/read。

    单向通道明确了通道的使用功能,限制了用户操作通道的权限。可以避免在并发场景下读写channel冲突导致deadlock。

    应用场景

    单向通道多出现于函数的参数、返回值起到限制函数操作channel的权限。

    只可以读的channel

    <-chan int
    

    只写的channel

    chan<- int

      

    func Tick(d Duration) <-chan Time {
    	if d <= 0 {
    		return nil
    	}
    	return NewTicker(d).C
    }
    

      

    gorutine pool

    在go可以轻松启动多个gorutine物极必反,无论我们启动多少个gorutine最终干活的还是os线程。

    1个8核的服务器可以同时启动16个线程,但是golang中启动了1000个goritine。16个os线程划分1000个gorutine无疑是增加了go runtime调度频率。并没有加速程序执行速度。

    package main
    
    import (
    	"fmt"
    	"time"
    )
    
    
    //worker
    func worker(id int, jobs <-chan int, results chan<- int) {
    	for v := range jobs {
    		fmt.Printf("work:%d satrt,job:%d
    ", id, v)
    		time.Sleep(time.Second)
    		results <- v * 2
    		fmt.Printf("work:%d end,job:%d
    ", id, v)
    
    	}
    }
    
    func main() {
    	jobs := make(chan int, 100)
    	results := make(chan int, 100)
    	//开启3个gorutines
    	for i := 0; i < 3; i++ {
    		go worker(i, jobs, results)
    	}
    	//消费50个任务
    	for i := 0; i < 5; i++ {
    		jobs <- i
    	}
    	close(jobs)
    	
    	// 输出5个results结果
    	for j := 1; j <= 5; j++ {
    		<-results
    	}
    
    }
    

      

    练习 使用struct作为channel的存放的元素

    一般在golang的channel里面存储的数据体量比较大的数据时,可以使用struct  pointer类型节省存储空间。

    ps:

    如果某1个文件的的权限是755,如 计算该文件 属主+属组+其他用户权限的总和。(7+5+5)

    package main
    import "fmt"
    func main() {
    	n := 755
    	sum := 0
    	for n > 0 {
    		//每次除以10
    		sum += n % 10 
    		n = n / 10    
    	}
    	fmt.Println(sum)
    
    }
    

     

     使用goroutinechannel实现一个计算int64随机数各位数和的程序。

    1. 开启一个goroutine循环生成int64类型的随机数,发送到jobChan
    2. 开启24个goroutinejobChan中取出随机数计算各位数的和,将结果发送到resultChan
    3. goroutineresultChan取出结果并打印到终端输出
    //
    package main
    
    import (
    	"fmt"
    	"math/rand"
    	"sync"
    	"time"
    )
    
    /*
    开启一个goroutine循环生成int64类型的随机数,发送到jobChan
    开启24个goroutine从jobChan中取出随机数计算各位数的和,将结果发送到resultChan
    主goroutine从resultChan取出结果并打印到终端输出
    
    */
    
    type job struct {
    	value int64
    }
    
    type result struct {
    	job *job
    	sum int64
    }
    
    var jobsChannel = make(chan *job, 100)
    var resultChanel = make(chan *result, 100)
    var wg sync.WaitGroup
    
    func producer(jobs chan<- *job) {
    	defer wg.Done()
    	for {
    		v := rand.Int63()
    		newJob := &job{
    			value: v,
    		}
    		jobs <- newJob
    		//休眠500毫秒
    		time.Sleep(time.Millisecond * 500)
    
    	}
    
    }
    
    func consumer(jobs <-chan *job, results chan<- *result) {
    	defer wg.Done()
    	for {
    		job := <-jobs
    		sum := int64(0)
    		jobValue := job.value
    		for jobValue > 0 {
    			sum += jobValue % int64(10)
    			jobValue = jobValue / int64(10)
    		}
    		newResult := &result{
    			job: job,
    			sum: sum,
    		}
    		results <- newResult
    
    	}
    }
    
    func main() {
    	wg.Add(1)
    	go producer(jobsChannel)
    	wg.Add(24)
    	for i := 0; i < 24; i++ {
    		go consumer(jobsChannel, resultChanel)
    	}
    
    	for v := range resultChanel {
    		fmt.Printf("%d---->%d
    ",v.job.value, v.sum)
    	}
    	wg.Wait()
    
    }
    

      

    select关键字

    在某些场景下我们需要同时从多个通道接收数据。

    通道在接收数据时,如果没有数据可以接收将会发生阻塞。

    Go内置了select关键字,可以循环监听通道的可读和可写的状态, 同时响应多个通道的操作。

    package main
    
    import "fmt"
    
    func main() {
    	ch := make(chan int, 1)
    	for i := 0; i < 10; i++ {
    		//select自动监听到channel是否为可读/可写状态!
    		select {
    		case x := <-ch:
    			fmt.Printf("第%d循环:ch通道能读值啦!获取%d
    ",i, x)
    		case ch <- i:
    			fmt.Printf("第%d循环:ch通道能写值啦!写入%d
    ", i, i)
    
    		}
    	}
    }
    

      

    sync包

    golang除了提供channel 这种CSP机制达到gorutines并发控制目的之外,还提供了1个sync包。

    sync包中提供了Mutex(互斥锁)、once(一次性操作)、waigroup(主线程等待所有gorutine结束再推出)等功能,帮助我们实现并发安全

    sync.Mutex

    Mutex防止同1时刻,同1资源被多个gorutine操作。

    // A Mutex must not be copied after first use.
    type Mutex struct {
    	state int32
    	sema  uint32
    }
    

    Mextex是使用struct实现的而在golang中struct属于value类型。

    需要注意的是在使用sync.Mutex时如果把它当成参数传入到函数里面,mutax就会被copy生成2把不同的mutex。

    互斥锁

    互斥锁 不区分是读、写操作,只要有1个goruitne拿到Mutax,其余的所有gorutines,无论是读还写,只能等待。

    var lock sync.Mutex
    lock.Lock()   //加锁
    lock.Lock()   //加锁
    

      

     

    1个公共资源被N个gorutines 操作引发的问题 

    package main
    
    import (
    	"fmt"
    	"sync"
    )
    
    //锁
    var x = 0
    var wg sync.WaitGroup
    
    //每次执行add增加5000
    func add() {
    	defer wg.Done()
    	for i := 0; i < 5000; i++ {
    		x++
    	}
    
    }
    
    func main() {
    	wg.Add(2)
    	//开启2个gorutines同时对x+1
    	go add()
    	go add()
    	/*2个gorutines如果同1时刻都去获取公共变量x=50,
    	然后在独自的栈中对x+1改变了x都=51
    	就少+了1次,导致结果计算不准!
    	*/
    	wg.Wait()
    	fmt.Println(x)
    }
    
    使用互斥锁
    A Mutex must not be copied after first use.
    在使用互斥锁一定要确保该锁不是复制品(作为参数传递时一定要传指针
    package main
    
    import (
    	"fmt"
    	"sync"
    )
    
    //锁
    var x = 0
    var wg sync.WaitGroup
    
    /*
    A Mutex must not be copied after first use.
    使用互斥锁一定要确保该锁不是复制品(作为参数传递时一定要传指针)
    */
    
    //互斥锁
    var lock sync.Mutex
    
    //每次执行add增加5000
    func add() {
    	defer wg.Done()
    	for i := 0; i < 5000; i++ {
    		lock.Lock()   //加锁
    		x++           //操作同1资源
    		lock.Unlock() //释放锁
    	}
    
    }
    
    func main() {
    	wg.Add(2)
    	//开启2个gorutines同时对x+1
    	go add()
    	go add()
    	wg.Wait()
    	fmt.Println(x)
    }
    

      

    RWMutex(读/写互斥锁)(mysql读写分离)

    var rwlock sync.RWMutex
    //读锁
    rwlock.RLock()
    rwlock.RUnlock()
    //写锁
    rwlock.Lock()
    rwlock.Unlock()
    

      

    Rwmutex区分gorutine读、写操作,仅在写时资源被lock,读的gorutines等(读并发、写串行)

    应用场景:所以使用RWMutex之后,在读操作大于写操作次数的场景下并发执行效率会比Mutex更快。

    package main
    
    import (
    	"fmt"
    	"sync"
    	"time"
    )
    
    var x = 0
    var lock sync.Mutex
    var rwlock sync.RWMutex
    var wg sync.WaitGroup
    
    //rwlock
    func read() {
    	defer wg.Done()
    	//加普通互斥锁
    	// lock.Lock()
    	//加读锁
    	rwlock.RLock()
    	fmt.Println(x)
    	time.Sleep(time.Millisecond)
    	//释放普通互斥锁
    	// lock.Unlock()
    	//释放读锁
    	rwlock.RUnlock()
    }
    
    func write() {
    	defer wg.Done()
    	// lock.Lock()
    	//加写锁
    	rwlock.Lock()
    	x++
    	time.Sleep(10 * time.Millisecond)
    	// lock.Unlock()
    	//释放写锁
    	rwlock.Unlock()
    }
    
    func main() {
    
    	start := time.Now()
    
    	for i := 0; i < 10; i++ {
    		go write()
    		wg.Add(1)
    	}
    	//读的次数一定要大于写的次数
    	for i := 0; i < 1000; i++ {
    		go read()
    		wg.Add(1)
    	}
    	wg.Wait()
    	fmt.Println(time.Now().Sub(start))
    
    	//Mutex:1.205s
    	//RWMutex 194ms
    }
    

      

    sync.WaitGroup

    var wg sync.WaitGroup
    wg.Add(2)
    wg.Done(2)
    wg.Wait()
    

      

    主gorutine结束之后,又它开启的其他gorutines会自动结束!!     

    如何做到让main gorutine等待它开启的gorutines结束之后,再结束呢?

    main gorutine执行time.Sleep(duration)肯定是不合适,因为我们无法精确预测出 gorutines到底会执行多久?

    方法名功能
    (wg * WaitGroup) Add(delta int) 计数器+delta
    (wg *WaitGroup) Done() 计数器-1
    (wg *WaitGroup) Wait() 阻塞直到计数器变为0
    var wg sync.WaitGroup
    
    func hello() {
    	defer wg.Done()
    	fmt.Println("Hello Goroutine!")
    }
    func main() {
    	wg.Add(1)
    	go hello() // 启动另外一个goroutine去执行hello函数
    	fmt.Println("main goroutine done!")
    	wg.Wait()
    }
    

      

    sync.Once

     如何确保某些操作在并发的场景下只执行1次,例如只加载一次配置文件、只执行1次close(channel)等。

    func (o *Once) Do(f func()) {}
    

    Onece的Do方法只能接受1个没有参数的函数作为它的参数,  如果要传递的func参数是有参数的func, 就需要搭配闭包来使用。  

    下面是借助sync.Once实现的并发安全的单例模式:

    package singleton
    
    import (
        "sync"
    )
    
    type singleton struct {}
    
    var instance *singleton
    var once sync.Once
    
    func GetInstance() *singleton {
        once.Do(func() {
            instance = &singleton{}
        })
        return instance
    }
    

      

    sync.Map

    var syncMap sync.Map
    //新增
    syncMap.Store(key, n)
    //删除
    syncMap.Delete(key)
    //改
    syncMap.LoadOrStore(key)
    //遍历
    syncMap.Range(walk)
    

      

    golang中的map在并发情况下: 只读是线程安全的,但是写线程不安全,所以为了并发安全 & 高效,官方帮我们实现了另1个sync.map。

    fatal error: concurrent map writes  //go内置的map只能支持20个并发写!
    
    package main
    
    import (
    	"fmt"
    	"strconv"
    	"sync"
    )
    
    var m = make(map[string]int)
    
    func get(key string) int {
    	return m[key]
    }
    
    func set(key string, value int) {
    	m[key] = value
    }
    
    func main() {
    	wg := sync.WaitGroup{}
    	for i := 0; i < 20; i++ {
    		wg.Add(1)
    		go func(n int) {
    			key := strconv.Itoa(n)
    			//设置1个值
    			set(key, n)
    			//获取1个值
    			fmt.Printf("k=:%v,v:=%v
    ", key, get(key))
    			wg.Done()
    		}(i)
    	}
    	wg.Wait()
    }
    

      

    就支持20个并发也太少了!

    Go语言的sync包中提供了一个开箱即用的并发安全版map–sync.Map。开箱即用表示不用像内置的map一样使用make函数初始化就能直接使用。

    同时sync.Map内置了诸如StoreLoadLoadOrStoreDeleteRange等操作方法。

    package main
    
    import (
    	"fmt"
    	"strconv"
    	"sync"
    )
    
    var syncMap sync.Map
    var wg sync.WaitGroup
    
    
    
    func walk(key, value interface{}) bool {
    	fmt.Println("即将删除Key =", key, "Value =", value)
    	syncMap.Delete(key)
    	return true
    }
    
    func main() {
    	for i := 0; i < 200; i++ {
    		//开启20个协程去syncMap并发写操作,也是可以顺利写进去的的!
    		key := strconv.Itoa(i)
    		wg.Add(1)
    		go func(n int) {
    			//设置key
    			syncMap.Store(key, n)
    			//通过key获取value
    			value, ok := syncMap.Load(key)
    			if !ok {
    				fmt.Println("没有该key", key)
    			}
    			fmt.Println(value)
    			wg.Done()
    		}(i)
    
    	}
    	//使用for 循环或者 for range 循环无法遍历所有syncMap只能使用syncMap.Range()
    	//不幸运的Go没有提供sync.Map的Length的方法,需要自己实现!!
    	syncMap.Range(walk)
    	wg.Wait()
    }
    

      

      

    atomic包

    代码中的加锁操作因为涉及内核态的上下文切换会比较耗时、代价比较高

    针对基本数据类型我们还可以使用原子操作来保证并发安全,因为原子操作是Go语言提供的方法它在用户态就可以完成,因此性能比加锁操作更好

    Go语言中原子操作由内置的标准库sync/atomic提供

    方法解释
    func LoadInt32(addr *int32) (val int32)
    func LoadInt64(addr *int64) (val int64)
    func LoadUint32(addr *uint32) (val uint32)
    func LoadUint64(addr *uint64) (val uint64)
    func LoadUintptr(addr *uintptr) (val uintptr)
    func LoadPointer(addr *unsafe.Pointer) (val unsafe.Pointer)
    读取操作
    func StoreInt32(addr *int32, val int32)
    func StoreInt64(addr *int64, val int64)
    func StoreUint32(addr *uint32, val uint32)
    func StoreUint64(addr *uint64, val uint64)
    func StoreUintptr(addr *uintptr, val uintptr)
    func StorePointer(addr *unsafe.Pointer, val unsafe.Pointer)
    写入操作
    func AddInt32(addr *int32, delta int32) (new int32)
    func AddInt64(addr *int64, delta int64) (new int64)
    func AddUint32(addr *uint32, delta uint32) (new uint32)
    func AddUint64(addr *uint64, delta uint64) (new uint64)
    func AddUintptr(addr *uintptr, delta uintptr) (new uintptr)
    修改操作
    func SwapInt32(addr *int32, new int32) (old int32)
    func SwapInt64(addr *int64, new int64) (old int64)
    func SwapUint32(addr *uint32, new uint32) (old uint32)
    func SwapUint64(addr *uint64, new uint64) (old uint64)
    func SwapUintptr(addr *uintptr, new uintptr) (old uintptr)
    func SwapPointer(addr *unsafe.Pointer, new unsafe.Pointer) (old unsafe.Pointer)
    交换操作
    func CompareAndSwapInt32(addr *int32, old, new int32) (swapped bool)
    func CompareAndSwapInt64(addr *int64, old, new int64) (swapped bool)
    func CompareAndSwapUint32(addr *uint32, old, new uint32) (swapped bool)
    func CompareAndSwapUint64(addr *uint64, old, new uint64) (swapped bool)
    func CompareAndSwapUintptr(addr *uintptr, old, new uintptr) (swapped bool)
    func CompareAndSwapPointer(addr *unsafe.Pointer, old, new unsafe.Pointer) (swapped bool)
    比较并交换操作
    ackage main
    
    import (
    	"fmt"
    	"sync"
    	"sync/atomic"
    	"time"
    )
    
    type Counter interface {
    	Inc()
    	Load() int64
    }
    
    // 普通版
    type CommonCounter struct {
    	counter int64
    }
    
    func (c CommonCounter) Inc() {
    	c.counter++
    }
    
    func (c CommonCounter) Load() int64 {
    	return c.counter
    }
    
    // 互斥锁版
    type MutexCounter struct {
    	counter int64
    	lock    sync.Mutex
    }
    
    func (m *MutexCounter) Inc() {
    	m.lock.Lock()
    	defer m.lock.Unlock()
    	m.counter++
    }
    
    func (m *MutexCounter) Load() int64 {
    	m.lock.Lock()
    	defer m.lock.Unlock()
    	return m.counter
    }
    
    // 原子操作版
    type AtomicCounter struct {
    	counter int64
    }
    
    func (a *AtomicCounter) Inc() {
    	atomic.AddInt64(&a.counter, 1)
    }
    
    func (a *AtomicCounter) Load() int64 {
    	return atomic.LoadInt64(&a.counter)
    }
    
    func test(c Counter) {
    	var wg sync.WaitGroup
    	start := time.Now()
    	for i := 0; i < 1000; i++ {
    		wg.Add(1)
    		go func() {
    			c.Inc()
    			wg.Done()
    		}()
    	}
    	wg.Wait()
    	end := time.Now()
    	fmt.Println(c.Load(), end.Sub(start))
    }
    
    func main() {
    	c1 := CommonCounter{} // 非并发安全
    	test(c1)
    	c2 := MutexCounter{} // 使用互斥锁实现并发安全
    	test(&c2)
    	c3 := AtomicCounter{} // 并发安全且比互斥锁效率更高
    	test(&c3)
    }
    

      

    https://morsmachine.dk/go-scheduler

    参考

  • 相关阅读:
    Apache Mahout
    ganglia安装
    编译出错 recompile with -fPIC
    centos6.4 基本安装nagios
    HDU1285 确定比赛名次
    HDU2888 Check Corners(二维RMQ)
    HDU3183 A Magic Lamp
    HDU1711 Number Sequence(KMP模版题)
    HDU3694 Fermat Point in Quadrangle(求四边形费马点)
    POJ2155 Matrix(二维树状数组)
  • 原文地址:https://www.cnblogs.com/sss4/p/12755374.html
Copyright © 2011-2022 走看看