zoukankan      html  css  js  c++  java
  • goroutine和channel

    一、goroutine

    1、并发和并行:

    多线程程序在单核上运行就是并发。

    多线程程序在多核上运行就是并行。

    2、Go协程和Go主线程

    Go主线程(有人直接称为线程/也可以理解成进程):一个Go线程上,可以起多个协程,协程是轻量级的线程[编译器做优化]。

    Go协程的特点:有独立的栈空间;共享程序堆空间;调度由用户控制;协程是轻量级的线程。

    请编写一个程序,完成如下功能:
    在主线程(可以理解成进程)中,开启一个goroutine, 该协程每隔1秒输出 "hello,world"
    在主线程中也每隔一秒输出"hello,golang", 输出10次后,退出程序
    要求主线程和goroutine同时执行.
    画出主线程和协程执行流程图

    package main
    
    import (
    	"fmt"
    	"strconv"
    	"time"
    )
    
    func test() {
    	for i := 1; i <= 10; i++ {
    		fmt.Println("test() hello,world " + strconv.Itoa(i))
    		time.Sleep(time.Second)
    	}
    }
    
    func main() {
    	go test() //开协启一个协程
    
    	for i := 1; i <= 10; i++ {
    		fmt.Println("  main() hello,golang " + strconv.Itoa(i))
    		time.Sleep(time.Second)
    	}
    }
    

    主线程是一个物理线程,直接作用在cpu上的。是重量级的,非常耗费cpu资源。
    协程从主线程开启的,是轻量级的线程,是逻辑态。对资源消耗相对小。
    Golang的协程机制是重要的特点,可以轻松的开启上万个协程。其它编程语言的并发机制是一般基于线程的,开启过多的线程,资源耗费大,这里就突显Golang在并发上的优势了。

    3、goroutine的调度模型MPG

    M指的是Machine,一个M直接关联了一个内核线程。由操作系统管理。 P指的是”processor”,代表了M所需的上下文环境,也是处理用户级代码逻辑的处理器。它负责衔接M和G的调度上下文,将等待执行的G与M对接。 G指的是Goroutine,其实本质上也是一种轻量级的线程。包括了调用栈,重要的调度信息,例如channel等。
    P的数量由环境变量中的GOMAXPROCS决定,通常来说它是和核心数对应,例如在4Core的服务器上回启动4个线程。G会有很多个,每个P会将Goroutine从一个就绪的队列中做Pop操作,为了减小锁的竞争,通常情况下每个P会负责一个队列。
    三者关系如下图所示: 

    以上这个图讲的是两个线程(内核线程)的情况。一个M会对应一个内核线程,一个M也会连接一个上下文P,一个上下文P相当于一个“处理器”,一个上下文连接一个或者多个Goroutine。为了运行goroutine,线程必须保存上下文。
    上下文P(Processor)的数量在启动时设置为GOMAXPROCS环境变量的值或通过运行时函数GOMAXPROCS()。通常情况下,在程序执行期间不会更改。上下文数量固定意味着只有固定数量的线程在任何时候运行Go代码。可以使用它来调整Go进程到个人计算机的调用,例如4核PC在4个线程上运行Go代码。
    图中P正在执行的Goroutine为蓝色的;处于待执行状态的Goroutine为灰色的,灰色的Goroutine形成了一个队列runqueues。
    Go语言里,启动一个goroutine很容易:go function就行,所以每有一个go语句被执行,runqueue队列就在其末尾加入一个goroutine,一旦上下文运行goroutine直到调度点,它会从其runqueue中弹出goroutine,设置堆栈和指令指针并开始运行goroutine。

    能否抛弃P(Processor),让Goroutine的runqueues挂到M上呢?答案是不行,需要上下文的目的是:当遇到内核线程阻塞的时候可以直接放开其他线程。

    一个很简单的例子就是系统调用sysall,一个线程肯定不能同时执行代码和系统调用被阻塞,这个时候,此线程M需要放弃当前的上下文环境P,以便可以让其他的Goroutine被调度执行。

    如上图左图所示,M0中的G0执行了syscall,然后就创建了一个M1(也有可能来自线程缓存),(转向右图)然后M0丢弃了P,等待syscall的返回值,M1接受了P,将继续执行Goroutine队列中的其他Goroutine。
    当系统调用syscall结束后,M0会“偷”一个上下文,如果不成功,M0就把它的Gouroutine G0放到一个全局的runqueue中,将自己置于线程缓存中并进入休眠状态。全局runqueue是各个P在运行完自己的本地的Goroutine runqueue后用来拉取新goroutine的地方。P也会周期性的检查这个全局runqueue上的goroutine,否则,全局runqueue上的goroutines可能得不到执行而饿死。
    均衡的分配工作:按照以上的说法,上下文P会定期的检查全局的goroutine队列中的goroutine,以便自己在消费掉自身Goroutine队列的时候有事可做。假如全局goroutine队列中的goroutine也没了呢?就从其他运行的中的P的runqueue里偷。
    每个P中的Goroutine不同导致他们运行的效率和时间也不同,在一个有很多P和M的环境中,不能让一个P跑完自身的Goroutine就没事可做了,因为或许其他的P有很长的goroutine队列要跑,得需要均衡。 该如何解决呢?Go的做法倒也直接,从其他P中偷一半!

    4、设置golang运行的cpu数

    为了充分利用多cpu的优势,在golang程序中可以设置运行cpu数目。 go1.8后,默认让程序运行在多个核上,可以不用设置。go1.8之前,需要设置一下,可以更高效的利用cpu。

    package main
    
    import (
    	"fmt"
    	"runtime"
    )
    
    func main() {
    	//获取当前系统cpu的数量
    	num := runtime.NumCPU()
    
    	//设置运行go程序的cpu数量
    	runtime.GOMAXPROCS(num)
    	fmt.Println("cpu number = ", num)
    }
    

    二、channel

    计算1-200的各个数的阶乘,并且把各个数的阶乘放入到map中。最后显示出来。要求使用goroutine完成。 

    package main
    
    import (
    	"fmt"
    	"time"
    )
    
    var (
    	myMap = make(map[int]int, 10)
    )
    
    func fac(n int) {
    	res := 1
    	for i := 1; i <= n; i++ {
    		res *= i
    	}
    
    	//将阶乘的计算结果放到map中
    	myMap[n] = res
    }
    
    func main() {
    	for i := 1; i <= 200; i++ {
    		go fac(i)
    	}
    
    	time.Sleep(time.Second * 10)
    
    	for i, v := range myMap {
    		fmt.Printf("map[%d]=%d
    ", i, v)
    	}
    }
    

    上述代码因为没有对全局变量myMap加锁,因此会出现资源争夺问题,代码会出现错误,提示 concurrent map writes

    不同goroutine之间如何通讯:(1)、全局变量加入互斥锁;(2)、使用管道channel来解决。

    为了解决上述代码中存在的资源竞争问题,全局变量myMap加入互斥锁。

    package main
    
    import (
    	"fmt"
    	"sync"
    	"time"
    )
    
    var (
    	myMap = make(map[int]int, 10)
    
    	//声明一个全局的互斥锁,
    	lock sync.Mutex
    )
    
    func fac(n int) {
    	res := 1
    	for i := 1; i <= n; i++ {
    		res *= i
    	}
    
    	//将阶乘的计算结果放到map中
    	//加锁
    	lock.Lock()
    	myMap[n] = res
    	//释放锁
    	lock.Unlock()
    }
    
    func main() {
    	for i := 1; i <= 20; i++ {
    		go fac(i)
    	}
    
    	time.Sleep(time.Second * 10)
    
    	lock.Lock()
    	for i, v := range myMap {
    		fmt.Printf("map[%d]=%d
    ", i, v)
    	}
    	lock.Unlock()
    }

    1、channel的基本介绍

    channle本质就是一个数据结构-队列。
    数据是先进先出【FIFO : first in first out】。
    线程安全,多 goroutine 访问时,不需要加锁,就是说channel本身就是线程安全的。
    channel有类型的,一个string的channel只能存放string类型数据。

    2、声明channel

    var 变量名 chan 数据类型

    var intChan chan int (intChan 用于存放 int 数据)
    var mapChan chan map[int]string (mapChan 用于存放 map[int]string 类型)
    var perChan chan Person
    var perChanPtr chan *Person

    channel是引用类型
    channel必须初始化才能写入数据, 即make后才能使用
    管道是有类型的,intChan只能写入整数int

    3、管道的初始化及读写数据

    package main
    
    import "fmt"
    
    func main() {
    	//创建一个可以存放3个int类型的管道
    	var intChan chan int
    	intChan = make(chan int, 3)
    
    	fmt.Printf("intChan的值=%v intChan本身的地址=%p
    ", intChan, &intChan)
    
    	//向管道写入数据
    	intChan <- 10
    	num := 211
    	intChan <- num
    	intChan <- 50
    
    	//向管道写入数据时不能超过其容量
    	//intChan <- 80
    
    	//查看管道的长度和容量
    	fmt.Printf("channel len=%v cap=%v
    ", len(intChan), cap(intChan))
    
    	//从管道中读取数据
    	var n int
    	n = <-intChan
    	fmt.Println("n=", n)
    	fmt.Printf("channel len=%v cap=%v
    ", len(intChan), cap(intChan))
    
    	//在没有使用协程的情况下,如果管道的数据已经全部取出,再取就会报告deadlock
    	num1 := <-intChan
    	num2 := <-intChan
    	//num3 := <-intChan
    	fmt.Println("num1=", num1, "num2=", num2)
    }
    

    channel中只能存放指定的数据类型

    channle的数据放满后,就不能再放入了
    如果从channel取出数据后,可以继续放入
    在没有使用协程的情况下,如果channel数据取完了,再取就会报dead lock

    4、练习题

    (1)、创建一个intChan,最多可以存放3个int,存3个数据到intChan中,然后再取出这三个int。

    package main
    
    import "fmt"
    
    func main() {
    	var intChan chan int
    	intChan = make(chan int, 10)
    
    	//intChan容量是3,再存放会报告deadlock
    	intChan <- 10
    	intChan <- 20
    	intChan <- 30
    
    	num1 := <-intChan
    	num2 := <-intChan
    	num3 := <-intChan
    	//intChany已经没有数据了,再取数据会报告deadlock
    
    	fmt.Printf("num1=%v num2=%v num3=%v", num1, num2, num3)
    }
    

    (2)、创建一个mapChan,最多可以存放10个map[string]string的key-value,对这个chan进行写入和读取。

    package main
    
    import "fmt"
    
    func main() {
    	var mapChan chan map[string]string
    	mapChan = make(chan map[string]string, 10)
    
    	m1 := make(map[string]string, 20)
    	m1["city1"] = "北京"
    	m1["city2"] = "天津"
    
    	m2 := make(map[string]string, 20)
    	m2["hero1"] = "宋江"
    	m2["hero2"] = "武松"
    
    	mapChan <- m1
    	mapChan <- m2
    
    	mo1 := <-mapChan
    	mo2 := <-mapChan
    
    	fmt.Printf("mo1=%v
    mo2=%v", mo1, mo2)
    }
    

    (3)、创建一个catChan,最多可以存放10个Cat结构体变量,对这个chan进行写入和读取。

    package main
    
    import "fmt"
    
    type Cat struct {
    	Name string
    	Age  int
    }
    
    func main() {
    	var catChan chan Cat
    	catChan = make(chan Cat, 10)
    
    	cat1 := Cat{Name: "tom", Age: 10,}
    	cat2 := Cat{Name: "nancy", Age: 78,}
    
    	catChan <- cat1
    	catChan <- cat2
    
    	c1 := <-catChan
    	c2 := <-catChan
    
    	fmt.Printf("c1=%v
    c2=%v", c1, c2)
    }
    

    (4)、创建一个catChanPtr,最多可以存放10个*Cat变量,对这个chan进行写入和读取。

    package main
    
    import "fmt"
    
    type Cat struct {
    	Name string
    	Age  int
    }
    
    func main() {
    	var catChan chan *Cat
    	catChan = make(chan *Cat, 10)
    
    	cat1 := Cat{Name: "tom", Age: 10,}
    	cat2 := Cat{Name: "nancy", Age: 78,}
    
    	catChan <- &cat1
    	catChan <- &cat2
    
    	c1 := <-catChan
    	c2 := <-catChan
    
    	fmt.Printf("c1=%p
    c2=%p", c1, c2)
    }
    

    (5)、创建一个allChan,最多可以存放10个任意数据类型变量,对这个chan写入和读取。

    package main
    
    import "fmt"
    
    type Cat struct {
    	Name string
    	Age  int
    }
    
    func main() {
    	var allChan chan interface{}
    	allChan = make(chan interface{}, 10)
    
    	cat1 := Cat{Name: "tom", Age: 10,}
    	cat2 := Cat{Name: "nancy", Age: 78,}
    
    	allChan <- &cat1
    	allChan <- &cat2
    	allChan <- 10
    	allChan <- "jack"
    
    	c1 := <-allChan
    	c2 := <-allChan
    	v1 := <-allChan
    	v2 := <-allChan
    
    	fmt.Println(v1, v2, c1, c2)
    }
    

    5、channel的遍历和关闭

    使用内置函数close可以关闭channel, 当channel关闭后,就不能再向channel写数据了,但是仍然可以从该channel读取数据。

    package main
    
    import "fmt"
    
    func main() {
    	intChan := make(chan int, 3)
    	intChan <- 100
    	intChan <- 200
    
    	//关闭后不能再写数据
    	close(intChan)
    
    	//管道关闭之后,读取数据时可以的
    	n1 := <-intChan
    	fmt.Println("n1=", n1)
    }
    

    channel的遍历

    channel支持for--range的方式进行遍历,请注意两个细节
    (1)、在遍历时,如果channel没有关闭,则回出现deadlock的错误
    (2)、在遍历时,如果channel已经关闭,则会正常遍历数据,遍历完后,就会退出遍历。

    package main
    
    import "fmt"
    
    func main() {
    	intChan := make(chan int, 100)
    	for i := 0; i < 100; i++ {
    		intChan <- i * 2
    	}
    
    	close(intChan)
    	for v := range intChan {
    		fmt.Println("v=", v)
    	}
    }
    

     使用goroutine和channel协调完成如下需求:

    开启一个writeData协程,向管道intChan中写入50个整数;开启一个readData协程,从管道intChan中读取writeData写入的数据。writeData和readData操作的是同一个管道,主线程需要等待writeData和readData协程都完成才能退出。

    package main
    
    import "fmt"
    
    func writeData(intChan chan int) {
    	for i := 1; i <= 50; i++ {
    		intChan <- i
    		fmt.Println("writeData ", i)
    	}
    
    	close(intChan)
    }
    
    func readData(intChan chan int, exitChan chan bool) {
    	for {
    		v, ok := <-intChan
    		if !ok {
    			break
    		}
    
    		fmt.Printf("readData 读到数据=%v
    ", v)
    	}
    
    	exitChan <- true
    	close(exitChan)
    }
    
    func main() {
    	intChan := make(chan int, 50)
    	exitChan := make(chan bool, 1)
    
    	go writeData(intChan)
    	go readData(intChan, exitChan)
    
    	for {
    		_, ok := <-exitChan
    		if !ok {
    			break
    		}
    	}
    }
    

    统计1-8000的数字中,哪些是素数?

    package main
    
    import (
    	"fmt"
    	"time"
    )
    
    func putNum(intChan chan int) {
    	for i := 1; i <= 8000; i++ {
    		intChan <- i
    	}
    
    	close(intChan)
    }
    
    func primeNum(intChan chan int, primeChan chan int, exitChan chan bool) {
    	var flag bool
    	for {
    		time.Sleep(time.Millisecond * 10)
    		num, ok := <-intChan
    
    		if !ok {
    			break
    		}
    
    		flag = true
    
    		for i := 2; i < num; i++ {
    			if num%i == 0 {
    				flag = false
    				break
    			}
    		}
    
    		if flag {
    			primeChan <- num
    		}
    	}
    
    	fmt.Println("有一个primeNum协程因为取不到数据退出")
    
    	exitChan <- true
    }
    
    func main() {
    	intChan := make(chan int, 1000)
    	primeChan := make(chan int, 2000)
    	exitChan := make(chan bool, 4)
    
    	go putNum(intChan)
    
    	for i := 0; i < 4; i++ {
    		go primeNum(intChan, primeChan, exitChan)
    	}
    
    	go func() {
    		for i := 0; i < 4; i++ {
    			<-exitChan
    		}
    
    		close(primeChan)
    	}()
    
    	for {
    		res, ok := <-primeChan
    		if !ok {
    			break
    		}
    
    		fmt.Printf("素数=%d
    ", res)
    	}
    
    	fmt.Println("main线程退出")
    }
    

    6、channel使用细节

    (1)、默认情况下,管道是双向的,可读写的。channel可以声明为只读,或者只写性质。

    package main
    
    import "fmt"
    
    func main() {
    	//声明为只写
    	var intChan chan<- int
    	intChan = make(chan int, 3)
    	intChan <- 20
    
    	fmt.Println("intChan=", intChan)
    
    	//声明为只读
    	var stringChan <-chan string
    	str := <-stringChan
    
    	fmt.Println("str=", str)
    }
    

    (2)、channel只读和只写的最佳实践

    package main
    
    import "fmt"
    
    func send(ch chan<- int, exitChan chan struct{}) {
    	for i := 0; i < 10; i++ {
    		ch <- i
    	}
    	close(ch)
    
    	var a struct{}
    	exitChan <- a
    }
    
    func recv(ch <-chan int, exitChan chan struct{}) {
    	for {
    		v, ok := <-ch
    		if !ok {
    			break
    		}
    		fmt.Println(v)
    	}
    	var a struct{}
    	exitChan <- a
    }
    
    func main() {
    	var ch chan int
    	ch = make(chan int, 10)
    	exitChan := make(chan struct{}, 2)
    
    	go send(ch, exitChan)
    	go recv(ch, exitChan)
    
    	var total = 0
    
    	for _ = range exitChan {
    		total++
    		if total == 2 {
    			break
    		}
    	}
    	fmt.Println("结束")
    }
    

    (3)、使用select解决从管道中取数据的阻塞问题

    package main
    
    import (
    	"fmt"
    	"time"
    )
    
    func main() {
    	intChan := make(chan int, 10)
    	for i := 0; i < 10; i++ {
    		intChan <- i
    	}
    
    	stringChan := make(chan string, 5)
    	for i := 0; i < 5; i++ {
    		stringChan <- "hello" + fmt.Sprintf("%d", i)
    	}
    
    	//传统的方法在遍历管道时,如果不关闭管道会阻塞而导致deadlock
    	//在实际开发中,不好确定什么时候关闭管道。可以使用select方式解决
    	for {
    		select {
    			//如果intChan一直没有关闭,不会一直阻塞而deadlock,会自动到下一个case匹配
    			case v := <-intChan:
    				fmt.Printf("从intChan读取的数据%d
    ", v)
    				time.Sleep(time.Second)
    			case v := <-stringChan:
    				fmt.Printf("从stringChan读取的数据%s
    ", v)
    				time.Sleep(time.Second)
    			default:
    				fmt.Printf("都取不到数据")
    				time.Sleep(time.Second)
    				return
    		}
    
    	}
    }
    

    (4)、goroutine中使用recover,解决协程中出现panic导致程序崩溃问题

    如果开启一个协程,但是这个协程出现了panic,如果没有捕获这个panic,就会造成整个程序崩溃,这时可以在goroutine中使用recover来捕获panic进行处理。这样即使这个协程发生问题,主线程仍然不受影响,可以继续执行。

    package main
    
    import (
    	"fmt"
    	"time"
    )
    
    func sayHello() {
    	for i := 0; i < 10; i++ {
    		time.Sleep(time.Second)
    		fmt.Println("hello,world")
    	}
    }
    
    func test() {
    	defer func() {
    		if err := recover(); err != nil {
    			fmt.Println("test() 发生错误", err)
    		}
    	}()
    
    	var myMap map[int]string
    	//error,没有为map申请内存
    	myMap[0] = "golang"
    }
    func main() {
    	go sayHello()
    	go test()
    
    	for i := 0; i < 10; i++ {
    		fmt.Println("main() ok=", i)
    		time.Sleep(time.Second)
    	}
    }
    
  • 相关阅读:
    MarkDown使用教程
    B+树详解
    B-树(B树)详解
    SQL优化之limit 1
    mysql explain用法和结果的含义
    MySQL 常用内置函数与所有内置函数
    Mac os 相关查找命令
    数据库——自然连接、内连接、外连接(左外连接、右外连接,全连接)、交叉连接
    sql语句执行顺序
    有三个线程,怎么让他们按顺序执行?
  • 原文地址:https://www.cnblogs.com/xidian2014/p/10679452.html
Copyright © 2011-2022 走看看