zoukankan      html  css  js  c++  java
  • Golang协程和管道

    协程(goroutine)

    • 基本介绍

    并发和并行

    1. 多线程程序在单核上运行,就是并发
    2. 多线程程序在多核上运行,就是并行
      并发:因为是在一一个cpu上,比如有10个线程,每个线程执行10毫秒(进行轮询操作),从人的角度看,好像这10个线程都在运行,但是从微观上看,在某一个时间点看,其实只有一一个线程在执行,这就是并发。
      并行:因为是在多个cpu上(比如有10个cpu),比如有10个线程,每个线程执行10毫秒(各自在不同cpu.上执行),从人的角度看,这10个线程都在运行,但是从微观上看,在某一个时间点看,也同时有10个线程在执行,这就是并行
    • Go 协程和 Go 主线程
    1. Go 主线程(有程序员直接称为线程/也可以理解成进程): 一个 Go 线程上,可以起多个协程,你可以这样理解,协程是轻量级的线程[编译器做优化]
    2. Go 协程的特点
    1. 有独立的栈空间
    2. 共享程序堆空间
    3. 调度由用户控制
    4. 协程是轻量级的线程
        //编写一个函数,每隔1秒输出 "hello,world"
        func test() {
            for i := 1; i <= 10; i++ {
                fmt.Println("tesst () hello,world " + strconv.Itoa(i))
                time.Sleep(time.Second)
            }
        }
    
        func main() {
    
            go test() // go关键字开启了一个协程
    
            for i := 1; i <= 10; i++ {
                fmt.Println(" main() hello,golang" + strconv.Itoa(i))
                time.Sleep(time.Second)
            }
        }
    
    • 小结
    1. 主线程是一个物理线程,直接作用在 cpu 上的。是重量级的,非常耗费 cpu 资源。
    2. 协程从主线程开启的,是轻量级的线程,是逻辑态。对资源消耗相对小。
    3. Golang 的协程机制是重要的特点,可以轻松的开启上万个协程。其它编程语言的并发机制是一般基于线程的,开启过多的线程,资源耗费大,这里就突显 Golang 在并发上的优势了

    goroutine 的调度模型

    • MPG 模式基本介绍
    1. M: 操作系统的主线程(是物理线程)
    2. P: 协程执行需要的上下文
    3. G: 协程
    • 设置 Golang 运行的 cpu 数
    1. go1.8后, 默认让程序运行在多个核上,,可以不用设置了
    2. go1.8前, 还是要设置一下,可以更高效的利益cpu
        func main() {
            cpuNum := runtime.NumCPU()
            fmt.Println("cpuNum=", cpuNum)
    
            //可以自己设置使用多个cpu
            runtime.GOMAXPROCS(cpuNum - 1)
            fmt.Println("ok")
        }
    
    • goroutine 之间通讯
    1. 全局变量的互斥锁 (Go 语言提供两类锁: 互斥锁(Mutex)和读写锁(RWMutex)。其中读写锁(RWMutex)是基于互斥锁(Mutex)实现的)
        import (
            "sync"
        )
        var ( 
            //声明一个全局的互斥锁
            //lock 是一个全局的互斥锁, 
            //sync 是包: synchornized 同步
            //Mutex : 是互斥
            lock sync.Mutex
        )
        func funcName ( * args[]) {
            //加锁
            lock.Lock()
    
            ........    //协程
    
            //解锁
            lock.Unlock()
        }
    
    1. 使用管道 channel 来解决
    1. 前面使用全局变量加锁同步来解决 goroutine 的通讯,但不完美
    2. 主线程在等待所有 goroutine 全部完成的时间很难确定,我们这里设置 10 秒,仅仅是估算。
    3. 如果主线程休眠时间长了,会加长等待时间,如果等待时间短了,可能还有 goroutine 处于工作状态,这时也会随主线程的退出而销毁
    4. 通过全局变量加锁同步来实现通讯,也并不利用多个协程对全局变量的读写操作。

    管道(channel)

    1. channle 本质就是一个数据结构-队列
    2. 数据是先进先出【FIFO : first in first out】
    3. 线程安全,多 goroutine 访问时,不需要加锁,就是说 channel 本身就是线程安全的
    4. channel 有类型的,一个 string 的 channel 只能存放 string 类型数据。
    • 定义/声明 channel
        var 变量名 chan 数据类型
        举例:
        var intChan chan int (intChan 用于存放 int 数据)
        var mapChan chan map[int]string (mapChan 用于存放 map[int]string 类型)
        var perChan chan Person
        var perChan2 chan *Person
        ...
    
        说明
        1. channel 是引用类型
        2. channel 必须初始化才能写入数据, 即 make 后才能使用
        3. 管道是有类型的,intChan 只能写入 整数 int
    
    • 基本操作
        func main() {
            //1. 创建一个可以存放3个int类型的管道
            var intChan chan int
            intChan = make(chan int, 3)
    
            //2. intChan是什么
            fmt.Printf("intChan 的值=%v intChan本身的地址=%p
    ", intChan, &intChan)
            //intChan 的值=0xc00001c100 intChan本身的地址=0xc00000e028
    
            //3. 向管道写入数据
            intChan<- 10
            num := 211
            intChan<- num
            intChan<- 50
            /*
                //如果从channel取出数据后,可以继续放入
                <-intChan
                intChan<- 98//注意点, 当我们给管写入数据时,不能超过其容量
            */
    
            //4. 管道的长度和cap(容量)
            fmt.Printf("channel len= %v cap=%v 
    ", len(intChan), cap(intChan)) // 3, 3
    
            //5. 从管道中读取数据
            var num2 int
            num2 = <-intChan 
            fmt.Println("num2=", num2)
            fmt.Printf("channel len= %v cap=%v 
    ", len(intChan), cap(intChan))  // 2, 3
    
            //6. 在没有使用协程的情况下,如果我们的管道数据已经全部取出,再取就会报告 deadlock
    
            num3 := <-intChan
            num4 := <-intChan
    
            //num5 := <-intChan
    
            fmt.Println("num3=", num3, "num4=", num4/*, "num5=", num5*/)
        }
    
    • 注意事项
    1. channel 中只能存放指定的数据类型
    2. channle 的数据放满后,就不能再放入了
    3. 如果从 channel 取出数据后,可以继续放入
    4. 在没有使用协程的情况下,如果 channel 数据取完了,再取,就会报 dead lock
    • channel 的关闭
      使用内置函数 close 可以关闭 channel, 当 channel 关闭后,就不能再向 channel 写数据了,但是仍然可以从该 channel 读取数据
    func main() {
        intChan := make(chan int, 3)
        intChan <- 10
        intChan <- 20
        // 关闭 channel
        close(intChan)
        // 关闭 channel 后,无法将数据写入到 channel 中,读取数据是可以的
        num := <- intChan
        fmt.Println(num) // 10
    }
    
    • channel 的遍历
      channel 支持 for--range 的方式进行遍历,请注意两个细节
    1. 在遍历时,如果 channel 没有关闭,则回出现 deadlock 的错误
    2. 在遍历时,如果 channel 已经关闭,则会正常遍历数据,遍历完后,就会退出遍历。
        func main() {
            ch := make(chan int, 3)
            ch <- 10
            ch <- 20
            ch <- 30
            // 关闭 channel
            close(ch)
            // 遍历 channel
            for v := range ch {
                fmt.Println(v)
            }
        }
    
    • 应用实例:协程与管道协同工作
    1. 开启一个 writeData 协程,向管道中写入30个整数;
    2. 开启一个 readData 协程,从管道中读取writeData写入的数据;
    3. writeData 和 readData 操作的是同一个管道;
    4. 主线程需要等待这两个协程都完成工作才能退出。
    //write Data
    func writeData(intChan chan int) {
    	for i := 1; i <= 50; i++ {
    		//放入数据
    		intChan<- i //
    		fmt.Println("writeData ", i)
    	}
    	close(intChan) //关闭
    }
    
    //read data
    func readData(intChan chan int, exitChan chan bool) {
    
    	for {
    		v, ok := <-intChan
    		if !ok {
    			break
    		}
    		fmt.Printf("readData 读到数据=%v
    ", v) 
    	}
    	//readData 读取完数据后,即任务完成
    	exitChan<- true
    	close(exitChan)
    
    }
    
    func main() {
    	//创建两个管道
    	intChan := make(chan int, 50)
    	exitChan := make(chan bool, 1)
    	
    	go writeData(intChan)
    	go readData(intChan, exitChan)
    
    	for {
    		_, ok := <-exitChan //会轮询两次,第二次为false
    		if !ok {
    			break
    		}
    	}
    }
    
    • 应用实例:阻塞
    1. 如果编译器(运行),发现一个管道只有写,没有读,则该管道会阻塞。
    2. 如果写管道和读管道的频率不一致,无所谓。
    3. 如果channel没有关闭, 也会死锁阻塞。
    func writeData(intChan chan int) {
    	for i := 1; i <= 3; i++ {
    		//放入数据
    		intChan<- i //
    		fmt.Println("writeData ", i)
    	}
    	close(intChan) //关闭
    }
    
    func readData(intChan chan int, exitChan chan bool) {
    	for {
    		v, ok := <-intChan
    		if !ok {
    			break
    		}
            2. 读取比写入频率慢,不会死锁
    		time.Sleep(time.Second*10)
    		fmt.Printf("readData 读到数据=%v
    ", v) 
    	}
    	//readData 读取完数据后,即任务完成
    	exitChan<- true
        
    	//close(exitChan)
    
    }
    
    func main() {
    
    	//创建两个管道
    	intChan := make(chan int, 3)
    	exitChan := make(chan bool, 1)
    	
        1. 只有写没有读,会死锁
    	go writeData(intChan)
    	//go readData(intChan, exitChan)
    
    	for {
            3. channel没有关闭,这里会死锁
    		_, ok := <-exitChan
    		if !ok {
    			break
    		}
    	}
    
    }
    
    • 应用实例:多管道同时运行
    1. 不要在读取端关闭 channel ,因为写入端无法知道 channel 是否已经关闭,往已关闭的 channel 写数据会 panic ;
    2. 有多个写入端时,不要在写入端关闭 channle ,因为其他写入端无法知道 channel 是否已经关闭,关闭已经关闭的 channel 会发生 panic ;
    3. 如果只有一个写入端,可以在这个写入端放心关闭 channel 。
    func putNum(intChan chan int) {
    	for i := 1; i <= 80000; i++ {    
    		intChan<- i
    	}
    	close(intChan)
    }
    
    func primeNum(intChan chan int, primeChan chan int, exitChan chan bool) {
    	var num int //从intChan取出的数字
    	var flag bool //素数标识
    	for {
    		num, ok := <-intChan
    		if !ok {  //intChan 取不到
    			break
    		}
    		flag = true //假设是素数
    		//判断num是不是素数
    		for i := 2; i < num; i++ {
    			if num % i == 0 {//说明该num不是素数
    				flag = false
    				break
    			}
    		}
    		if flag {
    			//将这个数就放入到primeChan
    			primeChan<- num
    		}
    	}
    
    	fmt.Println("一个primeNum 协程退出")
    	//这里我们还不能关闭 primeChan
    	//向 exitChan 写入true
    	exitChan<- true	
    }
    
    func main() {
    	intChan := make(chan int , 1000)
    	primeChan := make(chan int, 20000)//过滤后放入结果
    	exitChan := make(chan bool, 8) ////标识退出的管道
    
    	start := time.Now().Unix()
    	
    	//开启一个协程,向 intChan放入 1-8000个数
    	go putNum(intChan)
    
    	//开启4个协程,从 intChan取出数据,并判断是否为素数,如果是,就放入到primeChan
    	for i := 0; i < 6; i++ {
    		go primeNum(intChan, primeChan, exitChan)
    	}
    
    	//这里我们主线程,进行处理
    	//直接
    	go func(){
    		for i := 0; i < 6; i++ {
    			<-exitChan
    		}
    
    		end := time.Now().Unix()
    		fmt.Println("使用协程耗时=", end - start)
    
    		//当我们从exitChan 取出了4个结果,就可以放心的关闭 prprimeChan
    		close(primeChan)
    	}()
    
    
    	//遍历我们的 primeChan ,把结果取出
    	for {
    		_, ok := <-primeChan
    		if !ok{
    			break
    		}
    		//将结果输出
    		//fmt.Printf("素数=%d
    ", res)
    	}
    
    	fmt.Println("main线程退出")
    }
    
    • 注意事项
    1. channel 可以声明为只读,或者只写性质
    	func main() {
    		//1. 在默认情况下下,管道是双向
    		//var chan1 chan int //可读可写
    		
    		//2 声明为只写
    		var chan2 chan<- int
    		chan2 = make(chan int, 3)
    		chan2<- 20
    		//num := <-chan2 //error
    		fmt.Println("chan2=", chan2)
    
    		//3. 声明为只读
    		var chan3 <-chan int
    		num2 := <-chan3
    		//chan3<- 30 //err
    		fmt.Println("num2", num2)
    
    	}
    
    1. 使用 select 可以解决从管道取数据的阻塞问题
    	func main() {
    		//1.定义一个管道 10个数据int
    		intChan := make(chan int, 10)
    		for i := 0; i < 10; i++ {
    			intChan<- i
    		}
    		//2.定义一个管道 5个数据string
    		stringChan := make(chan string, 5)
    		for i := 0; i < 5; i++ {
    			stringChan <- "hello" + fmt.Sprintf("%d", i)
    		}
    
    		//传统的方法在遍历管道时,如果不关闭会阻塞而导致 deadlock
    		//在实际开发中,不确定要关闭该管道,且不希望阻塞,可以使用select 方式可以解决
    		//label:
    		for {
    			select {
    				//注意: 这里,如果intChan一直没有关闭,不会一直阻塞而deadlock,会自动到下一个case匹配
    				case v := <-intChan : 
    					fmt.Printf("从intChan读取的数据%d
    ", v)
    					time.Sleep(time.Second)
    				case v := <-stringChan :
    					fmt.Printf("从stringChan读取的数据%s
    ", v)
    					time.Sleep(time.Second)
    				default :
    					fmt.Printf("都取不到了,不玩了, 程序员可以加入逻辑
    ")
    					time.Sleep(time.Second)
    					return 
    					//break label 一般不建议使用;return后后续代码不会执行,而break标签只是跳出当前循环,继续执行下面的代码
    			}
    		}
    	}
    
    1. goroutine 中使用 recover,解决协程中出现 panic,导致程序崩溃问题

    如果我们起了一个协程,但是这个协程出现了panic,如果我们没有捕获这个panic,就会造成整个程序崩溃,这时我们可以在goroutine中使用recover来捕获panic,进行处理,这样即使这个协程发生的问题,但是主线程仍然不受影响,可以继续执行。

    	//函数
    	func sayHello() {
    		for i := 0; i < 10; i++ {
    			time.Sleep(time.Second)
    			fmt.Println("hello,world")
    		}
    	}
    	//函数
    	func test() {
    		//这里我们可以使用defer + recover
    		defer func() {
    			//捕获test抛出的panic
    			if err := recover(); err != nil {
    				fmt.Println("test() 发生错误", err)
    			}
    		}()
    		//定义了一个map
    		var myMap map[int]string
    		myMap[0] = "golang" //error 切片没有mark
    	}
    
    	func main() {
    		go sayHello()
    		go test()
    
    		for i := 0; i < 10; i++ { //防止协程没有执行结束,程序就已经退出,随便找点事做
    			fmt.Println("main() ok=", i)
    			time.Sleep(time.Second)
    		}
    
    	}
    
  • 相关阅读:
    saltstack远程执行
    centos7防火墙的关闭和禁用
    saltstack 安装使用
    flask基础-第一个flask-jinja2-response三剑客-request-session
    linux服务器排查病毒纪实
    读完这篇文章,就基本搞定了Redis主从复制
    Django学习【第26篇】:中介模型以及优化查询以及CBV模式
    Django学习【第26篇】:后端CORS解决跨域问题
    Django学习【第25篇】:前端Jsonp解决跨域问题
    Django学习【第24篇】:JS实现的ajax和同源策略
  • 原文地址:https://www.cnblogs.com/KylinBlog/p/13607296.html
Copyright © 2011-2022 走看看