zoukankan      html  css  js  c++  java
  • goroutine和channel

    一、goroutine

    1、并发和并行:

    多线程程序在单核上运行就是并发。

    多线程程序在多核上运行就是并行。

    2、Go协程和Go主线程

    Go主线程(有人直接称为线程/也可以理解成进程):一个Go线程上,可以起多个协程,协程是轻量级的线程[编译器做优化]。

    Go协程的特点:有独立的栈空间;共享程序堆空间;调度由用户控制;协程是轻量级的线程。

    请编写一个程序,完成如下功能:
    在主线程(可以理解成进程)中,开启一个goroutine, 该协程每隔1秒输出 "hello,world"
    在主线程中也每隔一秒输出"hello,golang", 输出10次后,退出程序
    要求主线程和goroutine同时执行.
    画出主线程和协程执行流程图

    package main
    
    import (
    	"fmt"
    	"strconv"
    	"time"
    )
    
    func test() {
    	for i := 1; i <= 10; i++ {
    		fmt.Println("test() hello,world " + strconv.Itoa(i))
    		time.Sleep(time.Second)
    	}
    }
    
    func main() {
    	go test() //开协启一个协程
    
    	for i := 1; i <= 10; i++ {
    		fmt.Println("  main() hello,golang " + strconv.Itoa(i))
    		time.Sleep(time.Second)
    	}
    }
    

    主线程是一个物理线程,直接作用在cpu上的。是重量级的,非常耗费cpu资源。
    协程从主线程开启的,是轻量级的线程,是逻辑态。对资源消耗相对小。
    Golang的协程机制是重要的特点,可以轻松的开启上万个协程。其它编程语言的并发机制是一般基于线程的,开启过多的线程,资源耗费大,这里就突显Golang在并发上的优势了。

    3、goroutine的调度模型MPG

    M指的是Machine,一个M直接关联了一个内核线程。由操作系统管理。 P指的是”processor”,代表了M所需的上下文环境,也是处理用户级代码逻辑的处理器。它负责衔接M和G的调度上下文,将等待执行的G与M对接。 G指的是Goroutine,其实本质上也是一种轻量级的线程。包括了调用栈,重要的调度信息,例如channel等。
    P的数量由环境变量中的GOMAXPROCS决定,通常来说它是和核心数对应,例如在4Core的服务器上回启动4个线程。G会有很多个,每个P会将Goroutine从一个就绪的队列中做Pop操作,为了减小锁的竞争,通常情况下每个P会负责一个队列。
    三者关系如下图所示: 

    以上这个图讲的是两个线程(内核线程)的情况。一个M会对应一个内核线程,一个M也会连接一个上下文P,一个上下文P相当于一个“处理器”,一个上下文连接一个或者多个Goroutine。为了运行goroutine,线程必须保存上下文。
    上下文P(Processor)的数量在启动时设置为GOMAXPROCS环境变量的值或通过运行时函数GOMAXPROCS()。通常情况下,在程序执行期间不会更改。上下文数量固定意味着只有固定数量的线程在任何时候运行Go代码。可以使用它来调整Go进程到个人计算机的调用,例如4核PC在4个线程上运行Go代码。
    图中P正在执行的Goroutine为蓝色的;处于待执行状态的Goroutine为灰色的,灰色的Goroutine形成了一个队列runqueues。
    Go语言里,启动一个goroutine很容易:go function就行,所以每有一个go语句被执行,runqueue队列就在其末尾加入一个goroutine,一旦上下文运行goroutine直到调度点,它会从其runqueue中弹出goroutine,设置堆栈和指令指针并开始运行goroutine。

    能否抛弃P(Processor),让Goroutine的runqueues挂到M上呢?答案是不行,需要上下文的目的是:当遇到内核线程阻塞的时候可以直接放开其他线程。

    一个很简单的例子就是系统调用sysall,一个线程肯定不能同时执行代码和系统调用被阻塞,这个时候,此线程M需要放弃当前的上下文环境P,以便可以让其他的Goroutine被调度执行。

    如上图左图所示,M0中的G0执行了syscall,然后就创建了一个M1(也有可能来自线程缓存),(转向右图)然后M0丢弃了P,等待syscall的返回值,M1接受了P,将继续执行Goroutine队列中的其他Goroutine。
    当系统调用syscall结束后,M0会“偷”一个上下文,如果不成功,M0就把它的Gouroutine G0放到一个全局的runqueue中,将自己置于线程缓存中并进入休眠状态。全局runqueue是各个P在运行完自己的本地的Goroutine runqueue后用来拉取新goroutine的地方。P也会周期性的检查这个全局runqueue上的goroutine,否则,全局runqueue上的goroutines可能得不到执行而饿死。
    均衡的分配工作:按照以上的说法,上下文P会定期的检查全局的goroutine队列中的goroutine,以便自己在消费掉自身Goroutine队列的时候有事可做。假如全局goroutine队列中的goroutine也没了呢?就从其他运行的中的P的runqueue里偷。
    每个P中的Goroutine不同导致他们运行的效率和时间也不同,在一个有很多P和M的环境中,不能让一个P跑完自身的Goroutine就没事可做了,因为或许其他的P有很长的goroutine队列要跑,得需要均衡。 该如何解决呢?Go的做法倒也直接,从其他P中偷一半!

    4、设置golang运行的cpu数

    为了充分利用多cpu的优势,在golang程序中可以设置运行cpu数目。 go1.8后,默认让程序运行在多个核上,可以不用设置。go1.8之前,需要设置一下,可以更高效的利用cpu。

    package main
    
    import (
    	"fmt"
    	"runtime"
    )
    
    func main() {
    	//获取当前系统cpu的数量
    	num := runtime.NumCPU()
    
    	//设置运行go程序的cpu数量
    	runtime.GOMAXPROCS(num)
    	fmt.Println("cpu number = ", num)
    }
    

    二、channel

    计算1-200的各个数的阶乘,并且把各个数的阶乘放入到map中。最后显示出来。要求使用goroutine完成。 

    package main
    
    import (
    	"fmt"
    	"time"
    )
    
    var (
    	myMap = make(map[int]int, 10)
    )
    
    func fac(n int) {
    	res := 1
    	for i := 1; i <= n; i++ {
    		res *= i
    	}
    
    	//将阶乘的计算结果放到map中
    	myMap[n] = res
    }
    
    func main() {
    	for i := 1; i <= 200; i++ {
    		go fac(i)
    	}
    
    	time.Sleep(time.Second * 10)
    
    	for i, v := range myMap {
    		fmt.Printf("map[%d]=%d
    ", i, v)
    	}
    }
    

    上述代码因为没有对全局变量myMap加锁,因此会出现资源争夺问题,代码会出现错误,提示 concurrent map writes

    不同goroutine之间如何通讯:(1)、全局变量加入互斥锁;(2)、使用管道channel来解决。

    为了解决上述代码中存在的资源竞争问题,全局变量myMap加入互斥锁。

    package main
    
    import (
    	"fmt"
    	"sync"
    	"time"
    )
    
    var (
    	myMap = make(map[int]int, 10)
    
    	//声明一个全局的互斥锁,
    	lock sync.Mutex
    )
    
    func fac(n int) {
    	res := 1
    	for i := 1; i <= n; i++ {
    		res *= i
    	}
    
    	//将阶乘的计算结果放到map中
    	//加锁
    	lock.Lock()
    	myMap[n] = res
    	//释放锁
    	lock.Unlock()
    }
    
    func main() {
    	for i := 1; i <= 20; i++ {
    		go fac(i)
    	}
    
    	time.Sleep(time.Second * 10)
    
    	lock.Lock()
    	for i, v := range myMap {
    		fmt.Printf("map[%d]=%d
    ", i, v)
    	}
    	lock.Unlock()
    }

    1、channel的基本介绍

    channle本质就是一个数据结构-队列。
    数据是先进先出【FIFO : first in first out】。
    线程安全,多 goroutine 访问时,不需要加锁,就是说channel本身就是线程安全的。
    channel有类型的,一个string的channel只能存放string类型数据。

    2、声明channel

    var 变量名 chan 数据类型

    var intChan chan int (intChan 用于存放 int 数据)
    var mapChan chan map[int]string (mapChan 用于存放 map[int]string 类型)
    var perChan chan Person
    var perChanPtr chan *Person

    channel是引用类型
    channel必须初始化才能写入数据, 即make后才能使用
    管道是有类型的,intChan只能写入整数int

    3、管道的初始化及读写数据

    package main
    
    import "fmt"
    
    func main() {
    	//创建一个可以存放3个int类型的管道
    	var intChan chan int
    	intChan = make(chan int, 3)
    
    	fmt.Printf("intChan的值=%v intChan本身的地址=%p
    ", intChan, &intChan)
    
    	//向管道写入数据
    	intChan <- 10
    	num := 211
    	intChan <- num
    	intChan <- 50
    
    	//向管道写入数据时不能超过其容量
    	//intChan <- 80
    
    	//查看管道的长度和容量
    	fmt.Printf("channel len=%v cap=%v
    ", len(intChan), cap(intChan))
    
    	//从管道中读取数据
    	var n int
    	n = <-intChan
    	fmt.Println("n=", n)
    	fmt.Printf("channel len=%v cap=%v
    ", len(intChan), cap(intChan))
    
    	//在没有使用协程的情况下,如果管道的数据已经全部取出,再取就会报告deadlock
    	num1 := <-intChan
    	num2 := <-intChan
    	//num3 := <-intChan
    	fmt.Println("num1=", num1, "num2=", num2)
    }
    

    channel中只能存放指定的数据类型

    channle的数据放满后,就不能再放入了
    如果从channel取出数据后,可以继续放入
    在没有使用协程的情况下,如果channel数据取完了,再取就会报dead lock

    4、练习题

    (1)、创建一个intChan,最多可以存放3个int,存3个数据到intChan中,然后再取出这三个int。

    package main
    
    import "fmt"
    
    func main() {
    	var intChan chan int
    	intChan = make(chan int, 10)
    
    	//intChan容量是3,再存放会报告deadlock
    	intChan <- 10
    	intChan <- 20
    	intChan <- 30
    
    	num1 := <-intChan
    	num2 := <-intChan
    	num3 := <-intChan
    	//intChany已经没有数据了,再取数据会报告deadlock
    
    	fmt.Printf("num1=%v num2=%v num3=%v", num1, num2, num3)
    }
    

    (2)、创建一个mapChan,最多可以存放10个map[string]string的key-value,对这个chan进行写入和读取。

    package main
    
    import "fmt"
    
    func main() {
    	var mapChan chan map[string]string
    	mapChan = make(chan map[string]string, 10)
    
    	m1 := make(map[string]string, 20)
    	m1["city1"] = "北京"
    	m1["city2"] = "天津"
    
    	m2 := make(map[string]string, 20)
    	m2["hero1"] = "宋江"
    	m2["hero2"] = "武松"
    
    	mapChan <- m1
    	mapChan <- m2
    
    	mo1 := <-mapChan
    	mo2 := <-mapChan
    
    	fmt.Printf("mo1=%v
    mo2=%v", mo1, mo2)
    }
    

    (3)、创建一个catChan,最多可以存放10个Cat结构体变量,对这个chan进行写入和读取。

    package main
    
    import "fmt"
    
    type Cat struct {
    	Name string
    	Age  int
    }
    
    func main() {
    	var catChan chan Cat
    	catChan = make(chan Cat, 10)
    
    	cat1 := Cat{Name: "tom", Age: 10,}
    	cat2 := Cat{Name: "nancy", Age: 78,}
    
    	catChan <- cat1
    	catChan <- cat2
    
    	c1 := <-catChan
    	c2 := <-catChan
    
    	fmt.Printf("c1=%v
    c2=%v", c1, c2)
    }
    

    (4)、创建一个catChanPtr,最多可以存放10个*Cat变量,对这个chan进行写入和读取。

    package main
    
    import "fmt"
    
    type Cat struct {
    	Name string
    	Age  int
    }
    
    func main() {
    	var catChan chan *Cat
    	catChan = make(chan *Cat, 10)
    
    	cat1 := Cat{Name: "tom", Age: 10,}
    	cat2 := Cat{Name: "nancy", Age: 78,}
    
    	catChan <- &cat1
    	catChan <- &cat2
    
    	c1 := <-catChan
    	c2 := <-catChan
    
    	fmt.Printf("c1=%p
    c2=%p", c1, c2)
    }
    

    (5)、创建一个allChan,最多可以存放10个任意数据类型变量,对这个chan写入和读取。

    package main
    
    import "fmt"
    
    type Cat struct {
    	Name string
    	Age  int
    }
    
    func main() {
    	var allChan chan interface{}
    	allChan = make(chan interface{}, 10)
    
    	cat1 := Cat{Name: "tom", Age: 10,}
    	cat2 := Cat{Name: "nancy", Age: 78,}
    
    	allChan <- &cat1
    	allChan <- &cat2
    	allChan <- 10
    	allChan <- "jack"
    
    	c1 := <-allChan
    	c2 := <-allChan
    	v1 := <-allChan
    	v2 := <-allChan
    
    	fmt.Println(v1, v2, c1, c2)
    }
    

    5、channel的遍历和关闭

    使用内置函数close可以关闭channel, 当channel关闭后,就不能再向channel写数据了,但是仍然可以从该channel读取数据。

    package main
    
    import "fmt"
    
    func main() {
    	intChan := make(chan int, 3)
    	intChan <- 100
    	intChan <- 200
    
    	//关闭后不能再写数据
    	close(intChan)
    
    	//管道关闭之后,读取数据时可以的
    	n1 := <-intChan
    	fmt.Println("n1=", n1)
    }
    

    channel的遍历

    channel支持for--range的方式进行遍历,请注意两个细节
    (1)、在遍历时,如果channel没有关闭,则回出现deadlock的错误
    (2)、在遍历时,如果channel已经关闭,则会正常遍历数据,遍历完后,就会退出遍历。

    package main
    
    import "fmt"
    
    func main() {
    	intChan := make(chan int, 100)
    	for i := 0; i < 100; i++ {
    		intChan <- i * 2
    	}
    
    	close(intChan)
    	for v := range intChan {
    		fmt.Println("v=", v)
    	}
    }
    

     使用goroutine和channel协调完成如下需求:

    开启一个writeData协程,向管道intChan中写入50个整数;开启一个readData协程,从管道intChan中读取writeData写入的数据。writeData和readData操作的是同一个管道,主线程需要等待writeData和readData协程都完成才能退出。

    package main
    
    import "fmt"
    
    func writeData(intChan chan int) {
    	for i := 1; i <= 50; i++ {
    		intChan <- i
    		fmt.Println("writeData ", i)
    	}
    
    	close(intChan)
    }
    
    func readData(intChan chan int, exitChan chan bool) {
    	for {
    		v, ok := <-intChan
    		if !ok {
    			break
    		}
    
    		fmt.Printf("readData 读到数据=%v
    ", v)
    	}
    
    	exitChan <- true
    	close(exitChan)
    }
    
    func main() {
    	intChan := make(chan int, 50)
    	exitChan := make(chan bool, 1)
    
    	go writeData(intChan)
    	go readData(intChan, exitChan)
    
    	for {
    		_, ok := <-exitChan
    		if !ok {
    			break
    		}
    	}
    }
    

    统计1-8000的数字中,哪些是素数?

    package main
    
    import (
    	"fmt"
    	"time"
    )
    
    func putNum(intChan chan int) {
    	for i := 1; i <= 8000; i++ {
    		intChan <- i
    	}
    
    	close(intChan)
    }
    
    func primeNum(intChan chan int, primeChan chan int, exitChan chan bool) {
    	var flag bool
    	for {
    		time.Sleep(time.Millisecond * 10)
    		num, ok := <-intChan
    
    		if !ok {
    			break
    		}
    
    		flag = true
    
    		for i := 2; i < num; i++ {
    			if num%i == 0 {
    				flag = false
    				break
    			}
    		}
    
    		if flag {
    			primeChan <- num
    		}
    	}
    
    	fmt.Println("有一个primeNum协程因为取不到数据退出")
    
    	exitChan <- true
    }
    
    func main() {
    	intChan := make(chan int, 1000)
    	primeChan := make(chan int, 2000)
    	exitChan := make(chan bool, 4)
    
    	go putNum(intChan)
    
    	for i := 0; i < 4; i++ {
    		go primeNum(intChan, primeChan, exitChan)
    	}
    
    	go func() {
    		for i := 0; i < 4; i++ {
    			<-exitChan
    		}
    
    		close(primeChan)
    	}()
    
    	for {
    		res, ok := <-primeChan
    		if !ok {
    			break
    		}
    
    		fmt.Printf("素数=%d
    ", res)
    	}
    
    	fmt.Println("main线程退出")
    }
    

    6、channel使用细节

    (1)、默认情况下,管道是双向的,可读写的。channel可以声明为只读,或者只写性质。

    package main
    
    import "fmt"
    
    func main() {
    	//声明为只写
    	var intChan chan<- int
    	intChan = make(chan int, 3)
    	intChan <- 20
    
    	fmt.Println("intChan=", intChan)
    
    	//声明为只读
    	var stringChan <-chan string
    	str := <-stringChan
    
    	fmt.Println("str=", str)
    }
    

    (2)、channel只读和只写的最佳实践

    package main
    
    import "fmt"
    
    func send(ch chan<- int, exitChan chan struct{}) {
    	for i := 0; i < 10; i++ {
    		ch <- i
    	}
    	close(ch)
    
    	var a struct{}
    	exitChan <- a
    }
    
    func recv(ch <-chan int, exitChan chan struct{}) {
    	for {
    		v, ok := <-ch
    		if !ok {
    			break
    		}
    		fmt.Println(v)
    	}
    	var a struct{}
    	exitChan <- a
    }
    
    func main() {
    	var ch chan int
    	ch = make(chan int, 10)
    	exitChan := make(chan struct{}, 2)
    
    	go send(ch, exitChan)
    	go recv(ch, exitChan)
    
    	var total = 0
    
    	for _ = range exitChan {
    		total++
    		if total == 2 {
    			break
    		}
    	}
    	fmt.Println("结束")
    }
    

    (3)、使用select解决从管道中取数据的阻塞问题

    package main
    
    import (
    	"fmt"
    	"time"
    )
    
    func main() {
    	intChan := make(chan int, 10)
    	for i := 0; i < 10; i++ {
    		intChan <- i
    	}
    
    	stringChan := make(chan string, 5)
    	for i := 0; i < 5; i++ {
    		stringChan <- "hello" + fmt.Sprintf("%d", i)
    	}
    
    	//传统的方法在遍历管道时,如果不关闭管道会阻塞而导致deadlock
    	//在实际开发中,不好确定什么时候关闭管道。可以使用select方式解决
    	for {
    		select {
    			//如果intChan一直没有关闭,不会一直阻塞而deadlock,会自动到下一个case匹配
    			case v := <-intChan:
    				fmt.Printf("从intChan读取的数据%d
    ", v)
    				time.Sleep(time.Second)
    			case v := <-stringChan:
    				fmt.Printf("从stringChan读取的数据%s
    ", v)
    				time.Sleep(time.Second)
    			default:
    				fmt.Printf("都取不到数据")
    				time.Sleep(time.Second)
    				return
    		}
    
    	}
    }
    

    (4)、goroutine中使用recover,解决协程中出现panic导致程序崩溃问题

    如果开启一个协程,但是这个协程出现了panic,如果没有捕获这个panic,就会造成整个程序崩溃,这时可以在goroutine中使用recover来捕获panic进行处理。这样即使这个协程发生问题,主线程仍然不受影响,可以继续执行。

    package main
    
    import (
    	"fmt"
    	"time"
    )
    
    func sayHello() {
    	for i := 0; i < 10; i++ {
    		time.Sleep(time.Second)
    		fmt.Println("hello,world")
    	}
    }
    
    func test() {
    	defer func() {
    		if err := recover(); err != nil {
    			fmt.Println("test() 发生错误", err)
    		}
    	}()
    
    	var myMap map[int]string
    	//error,没有为map申请内存
    	myMap[0] = "golang"
    }
    func main() {
    	go sayHello()
    	go test()
    
    	for i := 0; i < 10; i++ {
    		fmt.Println("main() ok=", i)
    		time.Sleep(time.Second)
    	}
    }
    
  • 相关阅读:
    iaas,paas,saas理解
    July 06th. 2018, Week 27th. Friday
    July 05th. 2018, Week 27th. Thursday
    July 04th. 2018, Week 27th. Wednesday
    July 03rd. 2018, Week 27th. Tuesday
    July 02nd. 2018, Week 27th. Monday
    July 01st. 2018, Week 27th. Sunday
    June 30th. 2018, Week 26th. Saturday
    June 29th. 2018, Week 26th. Friday
    June 28th. 2018, Week 26th. Thursday
  • 原文地址:https://www.cnblogs.com/xidian2014/p/10679452.html
Copyright © 2011-2022 走看看