zoukankan      html  css  js  c++  java
  • 关于Goroutine与Channel

    关于Goroutine的原理

    原理上的内容比较多,比如goroutine启动的时候要执行哪些相关的操作,一点一点的补充一下。

    channel的基本原理

    channel是go语言中的特殊的机制,既可以同步两个被并发执行的函数,又可以让这两个函数通过相互传递特定类型的值来进行通信。事实上这也是channel的两个主要功能。

    按照通道在初始化时是否有缓冲值,又可以分为缓冲通道非缓冲通道。通道初始化的时候也还是需要使用make进行,比如make(chan int,10)声明一个缓冲空间为10个int的通道,直接make(chan int)就是声明一个非缓冲通道

    直接采用内建函数close(strChan)就可以关闭通道。应该保证在安全的情况下进行关闭通道的操作。基本的原则:内建函数 len(strChan)可以查看通道中当前有的元素的数量 cap(strChan)可以查看通道的总的容量,总容量一旦初始化之后就不会再发生改变了。

    关于select语句的使用,在go语言中,执行select语句的时候,会自动地自上而下地判断每个case中的发送或者接受的操作可否被立即执行,即是说当前的Goroutine不会因此操作而被阻塞。select语句在执行的时候,会先对各个case中的表达式进行判断求值,而且直到所有的求值操作都完成之后才会考虑选其中的某个case去执行。这要依据当时通道的特性来判断,当发现第一个满足选择条件的case的时候,这个case中的语句就会被执行,其他的语句就会被忽略,当有多个case都满足情况的话,系统会根据一个伪随机算法决定哪个case会被执行。default是一个特殊的case,如果没有合适的case的话,default中的语句就会被执行,如果select语句中没有加上default语句,那么如果此时没有case符合条件的话,当前goroutine就会一直阻塞在当前的这一条select语句上。因此default:对于select而言是必要的。

    通常select还会和for语句结合在一起来使用,因为单独的select操作只会被选择一次,要想持续不断地使用select从通道中读出信息,还是要和for结合在一起使用。于是跳出多层循环的时候,特别是添加了超时控制的案例,可以参考使用场景(2)中介绍的两种方法.

    • 无论怎样都不应该在接收端关闭通道,因为无法判断发送端是否还有数据要发送,通道有一个很好的特性,就是发送端关闭通道后,接收端仍然可以正常接受已经存在通道中的数据。谁启的通道,谁最后负责关,是这个道理。
    • 注意element , ok := <-chann 的这种语法, 如果通道被关闭则ok的值会变为false,element的值会变为该通道类型的零值,通常用ok这种语法来判断是否退出某个循环。比如下面这段代码,同时也可以看下goroutine的相关使用模式:
    package main
    
    import (
    	"fmt"
    	"time"
    )
    
    func main() {
    	ch := make(chan int, 1)
    	sign := make(chan byte, 2)
    
    	go func() {
    		for i := 0; i < 5; i++ {
    			ch <- i
    
    			time.Sleep(1 * time.Second)
    		}
    
    		close(ch)
    		fmt.Println("The channel is closed.")
    		sign <- 0
    	}()
    
    	go func() {
    		//这个循环会一直尝试从ch中读取信息出来 即使ch已经被发送端关闭
    		//但还是可以读信息出来 最后当ok 为false的时候 说明已经没有数据从ch中读出
    		//跳出循环 注意这种判断方式
    		for {
    			fmt.Printf("before extract channel len: %v ,", len(ch))
    			e, ok := <-ch
    			fmt.Printf("channel value: %d if extract ok :(%v) after extraction channel len : %v channel cap : %v 
    ", e, ok, len(ch), cap(ch))
    			if !ok {
    				break
    			}
    
    			time.Sleep(2 * time.Second)
    		}
    		fmt.Println("Done.")
    		sign <- 1
    	}()
    	//要是不添加两次取值的操作的话 主进程就会马上结束 这里相当于是实现了一个
    	//同步的操作 等待两个go func都结束之后 再结束主进程 注意这种技巧
    	<-sign
    	<-sign
    }
    
    /*output:
    before extract channel len: 1 ,channel value: 0 if extract ok :(true) after extraction channel len : 0 channel cap : 1
    before extract channel len: 1 ,channel value: 1 if extract ok :(true) after extraction channel len : 0 channel cap : 1
    before extract channel len: 1 ,channel value: 2 if extract ok :(true) after extraction channel len : 0 channel cap : 1
    before extract channel len: 1 ,channel value: 3 if extract ok :(true) after extraction channel len : 0 channel cap : 1
    The channel is closed.
    before extract channel len: 1 ,channel value: 4 if extract ok :(true) after extraction channel len : 0 channel cap : 1
    before extract channel len: 0 ,channel value: 0 if extract ok :(false) after extraction channel len : 0 channel cap : 1
    Done.
    */
    
    
    • 同样的从通道中迭代取出元素的操作还可以使用 for range 来进行操作,当通道已经被关闭或者没有值可以再接收的话,for循环会立即被结束,比如使用场景(3)中的Batch函数,可以修改成如下的方式,更加简洁:
      func (handler PersonHandlerImpl) Batch(origs <-chan Person) <-chan Person {
    	dests := make(chan Person, 100)
    	go func() {
    		for p :=range origs{
    			handler.Handle(&p)
    			log.Printf("old value : %v
    ", p)
    			//time.Sleep(time.Second)
    			dests <- p
    		}
    		fmt.Println("alll the info has been handled")
    		close(dests)
    	}()
    	return dests
    }
     
      
    

    关于通道的基本原则

    • 通道缓冲已经满了的时候,再向通道中发送数据,会造成Goroutine的阻塞,通道没有初始化,即值为nil的时候,向其中发送数据会造成通道永久阻塞。
    • 关闭通道的操作应该由发送端来进行,通道关闭后,如果还有数据,接收端仍可以正常接受数据。
    • 向通道中发送值,进行的是值传递

    channel使用场景分析

    使用场景(1)

    注意 app.go文件夹中的 346 行左右开始地方的一个坑 注意time.After的返回值 由于放在了for循环中 因此 每次会新new 一个 channel出来 还有注意跳出多层循环的方式
    主要参考的是《Go并发编程实战的相关内容》

    代码如下:

    package main
    
    import (
    	"fmt"
    	"runtime"
    )
    
    func main() {
    	names := []string{"E", "H", "R", "J", "M"}
    	for _, name := range names {
    		go func() {
    			fmt.Printf("Hello , %s 
    ", name)
    		}()
    	}
    	//要是不添加runtime的话 就不会有内容输出
    	//因为for循环执行速度太快了 直接循环结束跳出了最后的循环
    	//之后 for循环中生成的5个go func 会被分别进行调度
    	runtime.Gosched()
    }
    
    /* output
    Hello , M 
    Hello , M 
    Hello , M 
    Hello , M 
    Hello , M
    */
    

    根据代码可以看出,具体循环的时候for循环中的go func 的调度并不是按照想象的那样,一次循环一个go func ,不要对go func的执行时机做任何假设。

    优化方案

    一种思路是把runtime.Gosched()函数放在每次for循环结束的时候,这样每次for循环之后,都会被重新调度一次,可能会出现正确的结果,并不是每次都准确,比如go func的程序需要运行一段时间,在这段运行的时间之内,可能就已经循环了几个元素过去了

    package main
    
    import (
    	"fmt"
    	"runtime"
    	"time"
    )
    
    func main() {
    	names := []string{"E", "H", "R", "J", "M", "N", "O", "P"}
    	for _, name := range names {
    		go func() {
    			time.Sleep(1000 * time.Nanosecond)
    			fmt.Printf("Hello , %s 
    ", name)
    		}()
    
    		runtime.Gosched()
    	}
    
    }
    
    /* output:
    Hello , E
    Hello , J
    Hello , J
    Hello , P
    Hello , P
    Hello , P
    */
    

    还有一种思路是采用传递参数的方式,就是给goroutine带上了参数,虽然goroutine已经脱离了main函数的控制,但是它已经带上了main函数给的烙印,相当于是某种解耦的感觉,for循环多次就不会影响打印的结果了,比如下面代码:

    package main
    
    import (
    	"fmt"
    	"runtime"
    	"time"
    )
    
    func main() {
    	names := []string{"E", "H", "R", "J", "M", "N", "O", "P"}
    	for _, name := range names {
    		go func(who string) {
    			time.Sleep(1000 * time.Nanosecond)
    			fmt.Printf("Hello , %s 
    ", who)
    		}(name)
    
    	}
    	runtime.Gosched()
    
    }
    
    /* output:
    Hello , E
    Hello , H
    Hello , R
    Hello , J
    Hello , M
    */
    
    

    但是这个方法仍然很有问题,只能保证在函数执行时间很短的时候结果正常,而且不输出重复的内容,如果程序执行时间比较长的话,很有可能main函数会被提前结束,按顺序生成的多个goroutine在cpu那里会不会仍然按照顺序被调度执行?这个仍然不确定?有几个goroutine会不能被正常调度到并且执行,比如像上面的代码的输出样子,而且每次输出的结果也都是不确定的。

    使用场景(2)

    编码的时候遇到这样一个场景,服务创建成功之后,需要等待ip被分配,ip被分配好之后,服务才正式部署成功,最后将所有的信息返回给前台,于是打算这样实现,在服务创建成功之后就不断循环,查询ip如果分配成功了就返回,如果超过了时间也返回失败,最后这部分的代码像下面这样,
    第一个例子中退出的方式采用的是标记的思路形式,每次循环结束的时候会检查一下标记看看是否退出,第二个采用的是特殊的语法,直接跳出最外层的循环,注意这种时间控制的实现,还是弄成一个defalt一个case比较好,由于case的调度可能有随机性,因此正常执行的内容放在default的部分,时间控制的那个channel放在某一个case当中。

    package main
    
    import (
    	"fmt"
    	"time"
    )
    
    func main() {
    	sign := make(chan int)
    	chtemp := make(chan int, 5)
    	go func() {
    		for i := 0; i < 5; i++ {
    			time.Sleep(time.Millisecond * 300)
    			chtemp <- i
    		}
    		close(chtemp)
    
    	}()
    	var e int
    	ok := true
    
    	//new 一个新的channel返回 注意这里要提前声明好
    	t := time.After(time.Second)
    	go func() {
    		for {
    			select {
    			case <-t:
    				fmt.Println("time out")
    				ok = false
    				break
    				//注意这里是使用 = 而不是 :=
    
    			default:
    				e, ok = <-chtemp
    				fmt.Printf("value : %v 
    ", e)
    				if !ok {
    					break
    				}
    			}
    			if !ok {
    				sign <- 1
    				break
    			}
    		}
    	}()
    	<-sign
    }
    
    
    
    
    //一个时间控制的channel
    //注意这个要在循环之外单独声明 否则每次都会分配一个新的 time.After的channel返回过来
    t := time.After(time.Second * 10)
    
    //注意这种跳出多层循环的操作方式 要是单层使用break的话 仅仅跳出的是 select 那一层的循环
    
    A:
    	for {
    		select {
    		//如果时间到了 就返回错误信息
    		case <-t:
    			log.Println("time out to allocate ip")
    			//delete the se which deploy failed
    			a.Ctx.ResponseWriter.Header().Set("Content-Type", "application/json")
    			http.Error(a.Ctx.ResponseWriter, `{"errorMessage":"`+"deploy error : time out"+`"}`, 406)
    			break A
    		//如果时间没到 就是 t 还没有发回信息 select语句就默认跳转到default块中
    		//执行查找ip是否分配的操作
    		default:
    			//log.Println("logout:", <-timeout)
    			sename := service.ObjectMeta.Labels["name"]
    			podslist, err := a.Podip(sename)
    			if err != nil {
    				log.Println(err.Error())
    				a.Ctx.ResponseWriter.Header().Set("Content-Type", "application/json")
    				http.Error(a.Ctx.ResponseWriter, `{"errorMessage":"`+err.Error()+`"}`, 406)
    				break A
    			}
    			if len(podslist) == 0 {
    				continue
    			} else {
    				log.Println("allocation ok ......")
    				a.Data["json"] = detail
    				a.ServeJson()
    				break A
    			}
    
    		}
    
    	}
    
    

    使用场景(3)

    常常有这样一种场景,把某些信息从旧的资源池中取出来,经过一些加工处理,再放入新的资源池中,这个过程如果按传统的方式就是采用完全串行的方式效率会很低,粒度太粗了,具体的粒度可以细化以每次所取的单位资源为粒度。
    比如以书上p339为例,有一个资源池存储这person的信息,将每个person从中取出来,之后进行一些处理,再存到新的资源池中,这里用oldarray以及newarray来模拟旧的和新的资源池:

    具体的代码如下:

    package main
    
    //参考go 并发编程实战 p337
    import (
    	"log"
    	"strconv"
    	"time"
    )
    
    type Person struct {
    	name string
    	age  int
    	addr string
    }
    
    var oldpersonarray = [5]Person{}
    var newpersonarray = [5]Person{}
    
    type PersonHandler interface {
    	Batch(origs <-chan Person) <-chan Person
    	Handle(orig *Person)
    }
    
    //struct 实现了personhandler 接口
    type PersonHandlerImpl struct{}
    
    //从origs接收信息 处理之后再返回给新的channel
    func (handler PersonHandlerImpl) Batch(origs <-chan Person) <-chan Person {
    	dests := make(chan Person, 100)
    	go func() {
    		for {
    			p, ok := <-origs
    			if !ok {
    				close(dests)
    				break
    			}
    			handler.Handle(&p)
    			log.Printf("old value : %v
    ", p)
    			//time.Sleep(time.Second)
    			dests <- p
    		}
    	}()
    	return dests
    }
    
    //这里要使用引用传递
    func (handler PersonHandlerImpl) Handle(orig *Person) {
    	orig.addr = "new address"
    }
    
    func getPersonHandler() PersonHandler {
    	return &PersonHandlerImpl{}
    
    }
    
    //print the oldpersonarray into the chan<-Person
    func fetchPerson(origs chan<- Person) {
    	for _, v := range oldpersonarray {
    		//fmt.Printf("get the value : %v %v 
    ", k, v)
    		time.Sleep(time.Second)
    		origs <- v
    	}
    	close(origs)
    
    }
    
    //fetch the value from the channel and store it into the newpersonarray
    func savePerson(dest <-chan Person) <-chan int {
    	intChann := make(chan int)
    	go func() {
    		index := 0
    		for {
    			p, ok := <-dest
    			if !ok {
    				break
    			}
    			//time.Sleep(time.Second)
    			log.Printf("new value transfer %v 
    ", p)
    
    			newpersonarray[index] = p
    			index++
    
    		}
    
    		intChann <- 1
    	}()
    	return intChann
    }
    
    func init() {
    	//使用range的话是值传递 这里要给oldpersonarray赋值进来
    	tmplen := len(oldpersonarray)
    	for i := 0; i < tmplen; i++ {
    		oldpersonarray[i].addr = "old address"
    		oldpersonarray[i].age = i
    		oldpersonarray[i].name = strconv.Itoa(i)
    
    	}
    
    	log.Printf("first print init value : %v
    ", oldpersonarray)
    
    }
    func main() {
    
    	handeler := getPersonHandler()
    	origs := make(chan Person, 100)
    	dests := handeler.Batch(origs)
    	//go func() { fetchPerson(origs) }()
    	// 不加go func的话 要等这句执行完 才能执行下一句
    	// 则orgis信息都输出 完全关闭掉 这个时候 从dest接收信息的语句才开始执行
    	// 所以不会动态输出 这句加上go func的话 就会没隔 1s 动态输出
    	// 如果将fetchPerson 再往前面放一句 则old value也不会动态输出
    	fetchPerson(origs)
    	sign := savePerson(dests)
    	<-sign
    	log.Printf("last print new value : %v 
    ", newpersonarray)
    
    }
    
    

    整体的结构图如下:

    代码结构图

    代码基本分析:

    • 首先声明一个 PersonHandler 的接口,之后声明一个struct PersonHandlerImpl 将接口中的两个方法都实现了,init函数用于进行oldarray的初始化工作。注意为了减少出错,内部的函数在方声明的时候都是单向的channel。
    • 1,2 fetchperson从oldarray中区数据,并把数据存到origs channel中,注意最后取完数据到通道之后,要由发送方将channel关闭,否则可能造成deadlock。注意在main函数中,如果fech操作没有放到一个goroutine中来执行,就仍然是串行的,相当于是把数据都放入到channel中,另一端才开始取,没发挥出并发的优势。
    • 3,4 Batch函数将person信息从origs中取出来,进行处理后,同时传到dests中,最后将dests返回,注意这里不是全部传入之后才将dests返回,而是新启动一个goroutine执行传入操作,同时将dests返回,注意要主动关闭channel。
    • 5 savePerson操作接收一个<-chann 之后从中接受person信息,将值写入到新的资源池中,最后全部写入结束之后,传一个sign channel给主进程,结束。
    • 总结,在需要动态输出信息的时候,goroutine往往是和channel结合在一起使用。最常见的用法是,一个goroutine负责向channel中写入数据,之后将channel返回,由其他进程取出信息。比如之前写过的一些websocket从前台接受信息,后台处理信息之后再动态返回给前台打出结果的模型,就和这个差不多,总之具体的异步执行流程要理清楚,都有哪些channel,负责传递的信息分别是什么。
  • 相关阅读:
    Java核心技术(初阶)知识点复习——[2]面向对象思想
    Java核心技术(初阶)知识点复习——[1]Java的类结构和main函数
    printStream与printWriter
    java反射的初步探索
    JDKJREJVM的关系
    树链剖分模板
    树状数组模板2
    树状数组模板1
    树状数组+欧拉降幂
    线段树模板二
  • 原文地址:https://www.cnblogs.com/Goden/p/4621555.html
Copyright © 2011-2022 走看看