zoukankan      html  css  js  c++  java
  • goroutine 和 channel

    goroutine 

    我们现在有一个需求需求:要求统计 1-9000000000 的数字中,哪些是素数?
    分析思路:
      1、传统的方法,就是使用一个循环,循环的判断各个数是不是素数。[很慢]
      2、使用并发或者并行的方式,将统计素数的任务分配给多个 goroutine 去完成,这时就会使用到goroutine.【速度提高 4 倍】
    goroutine - 基本介绍
     
    进程和线程介绍:

      1、进程就是程序在操作系统中的一次执行过程,是系统进行资源分配和调度的基本单位

      2、线程是进程中的一个执行实例,是程序执行的最小单元,它是比进程更小的能独立运行的基本单位

      3、一个进程能够创建和销毁多个线程,同一个进程中的多个线程可以并发执行

      4、一个程序至少有一个进程,一个进程至少有一个线程

    程序、进程和线程的关系示意图:

    并发和并行

      1、多线程程序在单核上运行,就是并发

      2、多线程程序在多核上运行,就是并行

      3、示意图:

    小结:

      并发:因为是在一个cpu上,比如有10个线程,每个线程执行10ms(进行轮询操作),从人的角度来看,好像这10个线程都在运行,但是从微观角度来看,在某一个时间点,其实只有一个线程在执行,这就是并发

      并行:因为是在多个cpu上(比如有10个cpu),比如有10个线程,每个线程执行10ms(各自在不同的cpu上执行),从人的角度来看,这10个线程都在运行,但是从微观来看,在某一个时间点,也同时有10个线程在执行,这就是并行

    Go协程和 Go 主线程

      Go 主线程(有程序员直接称为线程/也可以理解成进程):
         一个 Go 线程上,可以起多个协程,你可以这样理解,协程是轻量级的线程[编译器做优化]。
    Go 协程的特点:

      1、有独立的栈空间

      2、共享程序堆空间

      3、调度由用户控制

      4、协程是轻量级的线程

    一个示意图:

    goroutine 快速入门 

    案例说明:
    请编写一个程序,完成如下功能:
      1、在主线程(可以理解成进程)中,开启一个 goroutine, 该协程每隔 1 秒输出 "hello,world,bingle"
      2、在主线程中也每隔一秒输出"hello,golang and bingle", 输出 10 次后,退出程序
      3、要求主线程和 goroutine 同时执行.

    代码如下:

    package main
    import (
        "fmt"
        "strconv"
        "time"
    )
    
    func test() {
        for i := 1; i <= 10; i++ {
            fmt.Println("test () hello,world,bingle " + strconv.Itoa(i))
            time.Sleep(time.Second)
        }
    }
    
    func main() {
    
        go test() // 开启了一个协程
    
        for i := 1; i <= 10; i++ {
            fmt.Println(" main() hello,golang and bingle" + strconv.Itoa(i))
            time.Sleep(time.Second)
        }
    }

    执行结果如下:输出的效果说明, main 这个主线程和 test 协程同时执行.

    主线程和协程执行流程图

    快速入门小结:

      1、主线程是一个物理线程,直接作用在 cpu 上的。是重量级的,非常耗费 cpu 资源。

      2、协程从主线程开启的,是轻量级的线程,是逻辑态。对资源消耗相对小。

      3、Golang 的协程机制是重要的特点,可以轻松的开启上万个协程。其它编程语言的并发机制是一般基于线程的,开启过多的线程,资源耗费大,这里就突显 Golang 在并发上的优势了

    goroutine 的调度模型

    MPG 模式基本介绍

    1、M:操作系统的主线程(是物理线程)

    2、P:协程执行需要的上下文

    3、G:协程

    MPG 模式运行的状态 1 

    1、当前程序有三个M,如果三个M都在一个cpu上运行,就是并发,如果在不同的cpu运行就是并行

    2、M1,M2,M3正在执行一个G,M1的协程队列有3个,M2的协程队列有3个,M3的协程队列有2个

    3、从上图可以看到,Go的协程是轻量级的线程,是逻辑态的,Go可以容易的开启上万个协程

    4、其他程序C/java/C#的多线程,往往是内核态的,比较重量级,几千个线程可能耗光CPU

    设置 Golang 运行的 cpu 数
      介绍:为了充分了利用多 cpu 的优势,在 Golang 程序中,设置运行的 cpu 数目

    1、go1.8后,默认让程序运行在多个核上,可以不用设置了

    2、go1.8前,还是要设置一下,可以更高效的利用cpu

    channel(管道)
      我们现在仍然有一个需求:现在要计算 1-200 的各个数的阶乘,并且把各个数的阶乘放入到 map 中。最后显示出来。要求使用 goroutine 完成 
    分析:
      1、使用 goroutine 来完成,效率高,但是会出现并发/并行安全问题.
      2、这里就提出了不同 goroutine 如何通信的问题
    代码实现:
    1、使用 goroutine 来完成(看看使用 gorotine 并发完成会出现什么问题? 然后我们会去解决)
    2、在运行某个程序时,如何知道是否存在资源竞争问题。 方法很简单,在编译该程序时,增加一个参数
    3、代码如下:
    package main 
    import ( 
        "fmt" 
        "time" 
    )
    
    
    // 思路 
    // 1. 编写一个函数,来计算各个数的阶乘,并放入到 map 中. 
    // 2. 我们启动的协程多个,统计的将结果放入到 map 中
    // 3. map 应该做出一个全局的.
    var (
        myMap = make(map[int]int, 10) 
    )
    // test 函数就是计算 n!, 让将这个结果放入到 myMap
    
    func test(n int)  {
        res := 1
        for i := 1; i <= n; i++ {
            res *= i
        }
        //这里我们将 res 放入到 myMap
        myMap[n] = res //concurrent map writes?
    }
    
    func main()  {
        // 我们这里开启多个协程完成这个任务[200 个]
        for i := 1; i <= 200; i++ {
            go test(i)
        }
    
        //休眠 10 秒钟【第二个问题 】
        time.Sleep(time.Second * 10)
        //这里我们输出结果,变量这个结果
        for i, v := range myMap { 
            fmt.Printf("map[%d]=%d
    ", i, v) 
        }
    }

    4、示意图:

    不同 goroutine 之间如何通讯 

    1、全局变量的互斥锁

    2、使用管道 channel 来解决

    使用全局变量加锁同步改进程序
      1、因为没有对全局变量 m 加锁,因此会出现资源争夺问题,代码会出现错误,提示 concurrent map writes
      2、解决方案:加入互斥锁
      3、我们的数的阶乘很大,结果会越界,可以将求阶乘改成 sum += uint64(i)
    代码改进:

    为什么需要 channel

    1、前面使用全局变量加锁同步来解决 goroutine 
    2、主线程在等待所有 goroutine 全部完成的时间很难确定,我们这里设置 10 秒,仅仅是估算。
    3、如果主线程休眠时间长了,会加长等待时间,如果等待时间短了,可能还有 goroutine 处于工作状态,这时也会随主线程的退出而销毁
    4、通过全局变量加锁同步来实现通讯,也并不利用多个协程对全局变量的读写操作。
    5、上面种种分析都在呼唤一个新的通讯机制-channel 
    channel 的基本介绍

    1、channle 本质就是一个数据结构-队列

    2、数据是先进先出【FIFO : first in first out】

    3、线程安全,多 goroutine 访问时,不需要加锁,就是说 channel 本身就是线程安全的

    4、channel 有类型的,一个 string 的 channel 只能存放 string 类型数据

    5、示意图:

    定义/声明 channel
    var 变量名 chan 数据类型
    举例:
      var intChan chan int (intChan 用于存放 int 数据)
      var mapChan chan map[int]string (mapChan 用于存放 map[int]string 类型)
      var perChan chan Person
      var perChan2 chan *Person
    说明:
      1、channel 是引用类型 
      2、channel 必须初始化才能写入数据, 即 make 后才能使用 
      3、管道是有类型的,intChan 只能写入 整数 int
    管道的初始化,写入数据到管道,从管道读取数据及基本的注意事项
    package main
    import (
        "fmt"
    )
    
    func main() {
    
        //演示一下管道的使用
        //1. 创建一个可以存放3个int类型的管道
        var intChan chan int
        intChan = make(chan int, 3)
    
        //2. 看看intChan是什么
        fmt.Printf("intChan 的值=%v intChan本身的地址=%p
    ", intChan, &intChan)
    
    
        //3. 向管道写入数据
        intChan<- 10
        num := 211
        intChan<- num
        intChan<- 50
        // //如果从channel取出数据后,可以继续放入
        <-intChan
        intChan<- 98//注意点, 当我们给管写入数据时,不能超过其容量
    
    
        //4. 看看管道的长度和cap(容量)
        fmt.Printf("channel len= %v cap=%v 
    ", len(intChan), cap(intChan)) // 3, 3
    
        //5. 从管道中读取数据
    
        var num2 int
        num2 = <-intChan 
        fmt.Println("num2=", num2)
        fmt.Printf("channel len= %v cap=%v 
    ", len(intChan), cap(intChan))  // 2, 3
    
        //6. 在没有使用协程的情况下,如果我们的管道数据已经全部取出,再取就会报告 deadlock
    
        num3 := <-intChan
        num4 := <-intChan
    
        //num5 := <-intChan
    
        fmt.Println("num3=", num3, "num4=", num4/*, "num5=", num5*/)
    
    }
    channel 使用的注意事项 

    1、channel 中只能存放指定的数据类型 

    2、channle 的数据放满后,就不能再放入了

    3、如果从 channel 取出数据后,可以继续放入

    4、在没有使用协程的情况下,如果 channel 数据取完了,再取,就会报 dead lock

    读写 channel 案例演示
    1、创建一个intChan,最多可以放3个int,演示存3数据到intChan,然后再取出三个int
    package main
    
    import "fmt"
    
    func main()  {
        // 创建一个intChan,最多可以放3个int,演示存3数据到intChan,然后再取出三个int
        var intChan chan int
        intChan = make(chan int,3)
        intChan <- 10
        intChan <- 20
        intChan <- 10
        // 因为 intChan 的容量为3,再存放数据就会报deadlock
        //intChan <- 100
        num1 := <- intChan
        num2 := <- intChan
        num3 := <- intChan
        // 因为 intChan 这时已经没有数据了,再取就会报deadlock
        //num4 := <- intChan
        fmt.Printf("num1 = %v,num2 = %v,num3 = %v",num1,num2,num3)
    }

    2、创建一个mapChan,最多可以放10个map[string]string的key-value,演示写入和读取

    3、创建一个personChan,最多可以存放10个Person结构体变量,演示写入和读取的用法
    package main
    
    import "fmt"
    
    type  Person struct {
        Name string
        Age int
    }
    
    func main()  {
        // 创建一个personChan,最多可以存放10个Person结构体变量,演示写入和读取的用法
        var personChan chan Person
        personChan = make(chan Person, 10)
        person1 := Person{Name:"bingle1",Age:18}
        person2 := Person{Name:"bingle2",Age:20}
        personChan <- person1
        personChan <- person2
        //取出
        person11 := <- personChan
        person22 := <- personChan
        fmt.Println(person11,person22)
    }

    4、创建一个personChan,最多可以存放10个*Person结构体变量,演示写入和读取的用法

    package main
    
    import "fmt"
    
    type  Person struct {
        Name string
        Age int
    }
    
    func main()  {
        // 创建一个personChan,最多可以存放10个*Person结构体变量,演示写入和读取的用法
        var personChan chan *Person
        personChan = make(chan *Person, 10)
        person1 := Person{Name:"bingle1",Age:18}
        person2 := Person{Name:"bingle2",Age:20}
        personChan <- &person1
        personChan <- &person2
        //取出
        person11 := <- personChan
        person22 := <- personChan
        fmt.Println(person11,person22)
    }
    channel 的关闭和遍历
    channel 的关闭 
      使用内置函数 close 可以关闭 channel, 当 channel 关闭后,就不能再向 channel 写数据了,但是仍然可以从该 channel 读取数据
    案例演示:
    package main
    
    import "fmt"
    
    func main()  {
        intChan := make(chan int, 3)
        intChan<- 100
        intChan<- 200
        close(intChan) // close
        //这是不能够再写入数到channel
        //intChan<- 300
        fmt.Println("okook~")
        //当管道关闭后,读取数据是可以的
        n1 := <-intChan
        fmt.Println("n1=", n1)
    }
    channel 的遍历
    channel 支持 for--range 的方式进行遍历,请注意两个细节
    1、在遍历时,如果 channel 没有关闭,则回出现 deadlock 的错误
    2、在遍历时,如果 channel 已经关闭,则会正常遍历数据,遍历完后,就会退出遍历。
    channel 遍历和关闭的案例演示 :
    package main
    
    import "fmt"
    
    func main()  {
        //遍历管道
        intChan2 := make(chan int, 100)
        for i := 0; i < 100; i++ {
            intChan2<- i * 2  //放入100个数据到管道
        }
    
        //遍历管道不能使用普通的 for 循环
        // for i := 0; i < len(intChan2); i++ {
    
        // }
        //在遍历时,如果channel没有关闭,则会出现deadlock的错误
        //在遍历时,如果channel已经关闭,则会正常遍历数据,遍历完后,就会退出遍历
        close(intChan2)
        for v := range intChan2 {
            fmt.Println("v=", v)
        }
    }

    应用实例1

    完成goroutine 和channel 协同工作的案例,要求如下:
    1、开启一个writeData协程,向管道intChan中写入50个整数
    2、开启一个readData协程,从管道intChan中读取writeData写入的数据
    3、注意:writeData 和 readData操作的是同一个管道
    4、主线程,需要等待writeData 和 readData 协程都完成才能退出【管道】
    思路分析:

     代码实现:

    package main
    import (
        "fmt"
        "time"
    )
    
    
    //write Data
    func writeData(intChan chan int) {
        for i := 1; i <= 50; i++ {
            //放入数据
            intChan<- i //
            fmt.Println("writeData ", i)
            //time.Sleep(time.Second)
        }
        close(intChan) //关闭
    }
    
    //read data
    func readData(intChan chan int, exitChan chan bool) {
    
        for {
            v, ok := <-intChan
            if !ok {
                break
            }
            time.Sleep(time.Second)
            fmt.Printf("readData 读到数据=%v
    ", v) 
        }
        //readData 读取完数据后,即任务完成
        exitChan<- true
        close(exitChan)
    
    }
    
    func main() {
    
        //创建两个管道
        intChan := make(chan int, 10)
        exitChan := make(chan bool, 1)
        
        go writeData(intChan)
        go readData(intChan, exitChan)
    
        //time.Sleep(time.Second * 10)
        for {
            _, ok := <-exitChan
            if !ok {
                break
            }
        }
    
    }

    应用实例2-阻塞

    应用实例 3

    现在有一个需求:
      要求统计 1-200000 的数字中,哪些是素数?这个问题在开篇就提出了,现在我们有 goroutine和 channel 的知识后,就可以完成了 [测试数据: 80000]
    分析思路:
      1、传统的方法,就是使用一个循环,循环的判断各个数是不是素数【ok】。
      2、使用并发/并行的方式,将统计素数的任务分配给多个(4 个)goroutine 去完成,完成任务时间短。
    代码实现:
    package main
    import (
        "fmt"
        "time"
    )
    
    //向 intChan放入 1-80000个数
    func putNum(intChan chan int) {
    
        for i := 1; i <= 80000; i++ {    
            intChan<- i
        }
    
        //关闭intChan
        close(intChan)
    }
    
    // 从 intChan取出数据,并判断是否为素数,如果是,就
    //     //放入到primeChan
    func primeNum(intChan chan int, primeChan chan int, exitChan chan bool) {
    
        //使用for 循环
        // var num int
        var flag bool // 
        for {
            //time.Sleep(time.Millisecond * 10)
            num, ok := <-intChan //intChan 取不到..
            
            if !ok { 
                break
            }
            flag = true //假设是素数
            //判断num是不是素数
            for i := 2; i < num; i++ {
                if num % i == 0 {//说明该num不是素数
                    flag = false
                    break
                }
            }
    
            if flag {
                //将这个数就放入到primeChan
                primeChan<- num
            }
        }
    
        fmt.Println("有一个primeNum 协程因为取不到数据,退出")
        //这里我们还不能关闭 primeChan
        //向 exitChan 写入true
        exitChan<- true    
    
    }
    
    func main() {
    
        intChan := make(chan int , 1000)
        primeChan := make(chan int, 20000)//放入结果
        //标识退出的管道
        exitChan := make(chan bool, 8) // 4个
    
        start := time.Now().Unix()
        
        //开启一个协程,向 intChan放入 1-8000个数
        go putNum(intChan)
        //开启4个协程,从 intChan取出数据,并判断是否为素数,如果是,就
        //放入到primeChan
        for i := 0; i < 8; i++ {
            go primeNum(intChan, primeChan, exitChan)
        }
    
        //这里我们主线程,进行处理
        //直接
        go func(){
            for i := 0; i < 8; i++ {
                <-exitChan
            }
    
            end := time.Now().Unix()
            fmt.Println("使用协程耗时=", end - start)
    
            //当我们从exitChan 取出了4个结果,就可以放心的关闭 prprimeChan
            close(primeChan)
        }()
    
    
        //遍历我们的 primeChan ,把结果取出
        for {
            _, ok := <-primeChan
            if !ok{
                break
            }
            //将结果输出
            //fmt.Printf("素数=%d
    ", res)
        }
    
        fmt.Println("main线程退出")
    }
    结论:使用 go 协程后,执行的速度,比普通方法提高至少 4 倍 
    channel 使用细节和注意事项 
    1、channel 可以声明为只读,或者只写性质

    2、使用 select 可以解决从管道取数据的阻塞问题

    package main
    import (
        "fmt"
        "time"
    )
    
    func main() {
    
        //使用select可以解决从管道取数据的阻塞问题
    
        //1.定义一个管道 10个数据int
        intChan := make(chan int, 10)
        for i := 0; i < 10; i++ {
            intChan<- i
        }
        //2.定义一个管道 5个数据string
        stringChan := make(chan string, 5)
        for i := 0; i < 5; i++ {
            stringChan <- "hello" + fmt.Sprintf("%d", i)
        }
    
        //传统的方法在遍历管道时,如果不关闭会阻塞而导致 deadlock
    
        //问题,在实际开发中,可能我们不好确定什么关闭该管道.
        //可以使用select 方式可以解决
        //label:
        for {
            select {
                //注意: 这里,如果intChan一直没有关闭,不会一直阻塞而deadlock
                //,会自动到下一个case匹配
                case v := <-intChan : 
                    fmt.Printf("从intChan读取的数据%d
    ", v)
                    time.Sleep(time.Second)
                case v := <-stringChan :
                    fmt.Printf("从stringChan读取的数据%s
    ", v)
                    time.Sleep(time.Second)
                default :
                    fmt.Printf("都取不到了,不玩了, 程序员可以加入逻辑
    ")
                    time.Sleep(time.Second)
                    return 
                    //break label
            }
        }
    }

    3、goroutine 中使用 recover,解决协程中出现 panic,导致程序崩溃问题

      如果我们起了一个协程,但是这个协程出现了panic,如果我们没有捕获这个panic,就会造成整个程序崩溃,这时我们可以在goroutine中使用recover来捕获panic,进行处理,这时即使这个协程发生了问题,但是主线程仍然不受影响,可以继续执行

    代码实现:

    package main
    import (
        "fmt"
        "time"
    )
    
    //函数
    func sayHello() {
        for i := 0; i < 10; i++ {
            time.Sleep(time.Second)
            fmt.Println("hello,world")
        }
    }
    //函数
    func test() {
        //这里我们可以使用defer + recover
        defer func() {
            //捕获test抛出的panic
            if err := recover(); err != nil {
                fmt.Println("test() 发生错误", err)
            }
        }()
        //定义了一个map
        var myMap map[int]string
        myMap[0] = "golang" //error
    }
    
    func main() {
    
        go sayHello()
        go test()
    
    
        for i := 0; i < 10; i++ {
            fmt.Println("main() ok=", i)
            time.Sleep(time.Second)
        }
    
    }
  • 相关阅读:
    不同品牌交换机设置telnet方法
    Oracle 11G RAC For Windows 2008 R2部署手册(亲测,成功实施多次)
    oracle 11g ADG实施手册(亲测,已成功部署多次)
    如何正确的使用uwsgi
    debian小巧好看的桌面
    zsh中home键失灵问题
    C#_Markov_心得感想
    NLP—WordNet——词与词之间的最小距离
    这不算爬虫吧?!
    Table-Driven Design 表驱动设计
  • 原文地址:https://www.cnblogs.com/taotaozhuanyong/p/14632303.html
Copyright © 2011-2022 走看看