zoukankan      html  css  js  c++  java
  • 进一步认识golang中的并发

    如果你成天与编程为伍,那么并发这个名词对你而言一定特别耳熟。需要并发的场景太多了,例如一个聊天程序,如果你想让这个聊天程序能够同时接收信息和发送信息,就一定会用到并发,无论那是什么样的并发。

    并发的意义就是:让一个程序同时做多件事情!

    理解这一点非常重要,是的,并发的目的只是为了能让程序同时做另一件事情而已,并发的目的并不是让程序运行的更快(如果是多核处理器,而且任务可以分成相互独立的部分,那么并发确实可以让事情解决的更快)。记得我学C++那时候开始接触并发,还以为每开一个线程程序就会加速一倍呢。。。。

    golang从语言级别上对并发提供了支持,而且在启动并发的方式上直接添加了语言级的关键字。我并不会很多语言,而且也没有很多的项目经验,可能从我嘴里说出的比较不会非常客观,但是起码和C/C++(不考虑C++11)利用系统API来操作线程的方式相比,golang的并发机制运用起来就非常舒适了,不必非要按照固定的格式来定义线程函数,也不必因为启动线程的时候只能给线程函数传递一个参数而烦恼。和Java相比的话,Go的优点就是并发的部分不必非得实现成一个class,而且更加轻量(其实我也不知道到底为什么更轻量^_^)。

    因为最近自己想写一个小开源项目,而且其中的关键部分会用到很多并发机制,于是开始重温习Go的并发相关的知识。从我学习Go到现在已经将近1年了,觉得现在再重新看Go的并发时收获颇多,因为毕竟写了不少Go的小程序,遇到过许多解释不通的现象和困惑,借着这次温故知新的机会,把学习来的新经验赶紧记录下来,分享给各位网友尤其是喜欢Go的朋友们。

    并发的启动

    这篇文章关于并发的启动我就一概而过了,如果要让一个函数并发运行,只需一个关键字"go":

    [java] view plain copy
     
     在CODE上查看代码片派生到我的代码片
    1. func Afuntion(para1, para2, para3, ...) {  
    2.     // Do some process  
    3.     // ...  
    4. }  
    5.   
    6. func main() {  
    7.     go Afuntion(para1, para2, para3, ...) //只需加一个go前缀,Afunction()就会并发运行  
    8. }  

    go的并发启动非常简单,几乎没有什么额外的准备工作,要并发的函数和一般的函数没有什么区别,参数随意,启动的时候只需要加一个go关键之即可。

    当然,并发的启动没什么好讲的,并发最精髓的部分在于这些协程(协程类似于线程,但是是更轻量的线程)的调度

    我没法以一个资深的老专家向你全方位的讲解调度的各个方面,但是我可以把我遇到过的一些场景和我所用过的调度方法(所以绝对是能用的)分享给你。

    go提供了sync包channel机制来解决协程间的同步与通信。channel的用法非常灵活,使用的方式多种多样,而且官网的Effective Go中给出了channel的一种并发以外的方式。我们先来介绍sync包提供的调度支持吧。

    sync.WaitGroup

    sync包中的WaitGroup实现了一个类似任务队列的结构,你可以向队列中加入任务,任务完成后就把任务从队列中移除,如果队列中的任务没有全部完成,队列就会触发阻塞以阻止程序继续运行,具体用法参考如下代码:

    [java] view plain copy
     
     在CODE上查看代码片派生到我的代码片
    1. // 代码粘上就可以跑通  
    2. package main  
    3.   
    4. import (  
    5.     "fmt"  
    6.     "sync"  
    7. )  
    8.   
    9. var waitgroup sync.WaitGroup  
    10.   
    11. func Afunction(shownum int) {  
    12.     fmt.Println(shownum)  
    13.     waitgroup.Done() //任务完成,将任务队列中的任务数量-1,其实.Done就是.Add(-1)  
    14. }  
    15.   
    16. func main() {  
    17.     for i := 0; i < 10; i++ {  
    18.         waitgroup.Add(1) //每创建一个goroutine,就把任务队列中任务的数量+1  
    19.         go Afunction(i)  
    20.     }  
    21.     waitgroup.Wait() //.Wait()这里会发生阻塞,直到队列中所有的任务结束就会解除阻塞  
    22. }  

    我们可以利用sync.WaitGroup来满足这样的情况:

            ▲某个地方需要创建多个goroutine,并且一定要等它们都执行完毕后再继续执行接下来的操作。

    是的,WaitGroup最大的优点就是.Wait()可以阻塞到队列中的任务都完毕后才解除阻塞。

    channel
    channel是一种golang内置的类型,英语的直译为"通道",其实,它真的就是一根管道,而且是一个先进先出的数据结构

    我们能对channel进行的操作只有4种:

    (1) 创建chennel (通过make()函数)

    (2) 放入数据 (通过 channel <- data 操作) 

    (3) 取出数据 (通过 <-channel 操作)

    (4)  关闭channel (通过close()函数)

     

    但是channel有一些非常给力的性质需要你牢记,请一定要记住并理解好它们:

    (1) channel是一种阻塞管道,是自动阻塞的。意思就是,如果管道满了,一个对channel放入数据的操作就会阻塞,直到有某个routine从channel中取出数据,这个放入数据的操作才会执行。相反同理,如果管道是空的,一个从channel取出数据的操作就会阻塞,直到某个routine向这个channel中放入数据,这个取出数据的操作才会执行。这事channel最重要的一个性质,没有之一。

    [java] view plain copy
     
     在CODE上查看代码片派生到我的代码片
    1. package main  
    2.   
    3. func main() {  
    4.     ch := make(chan int, 3)  
    5.     ch <- 1  
    6.     ch <- 1  
    7.     ch <- 1  
    8.     ch <- //这一行操作就会发生阻塞,因为前三行的放入数据的操作已经把channel填满了  
    9. }  
    [java] view plain copy
     
     在CODE上查看代码片派生到我的代码片
    1. package main  
    2.   
    3. func main() {  
    4.     ch := make(chan int, 3)  
    5.     <-ch //这一行会发生阻塞,因为channel才刚创建,是空的,没有东西可以取出  
    6. }  

    (2)channel分为有缓冲的channel和无缓冲的channel。两种channel的创建方法如下:

    [java] view plain copy
     
     在CODE上查看代码片派生到我的代码片
    1. ch := make(chan int) //无缓冲的channel,同等于make(chan int, 0)  
    2. ch := make(chan int, 5) //一个缓冲区大小为5的channel  

    操作一个channel时一定要注意其是否带有缓冲,因为有些操作会触发channel的阻塞导致死锁。下面就来解释这些需要注意的情景。

    首先来看一个一个例子,这个例子是两段只有主函数不同的代码:

    [java] view plain copy
     
     在CODE上查看代码片派生到我的代码片
    1. package main  
    2.   
    3. import "fmt"  
    4.   
    5. func Afuntion(ch chan int) {  
    6.     fmt.Println("finish")  
    7.     <-ch  
    8. }  
    9.   
    10. func main() {  
    11.     ch := make(chan int) //无缓冲的channel  
    12.     go Afuntion(ch)  
    13.     ch <- 1  
    14.       
    15.     // 输出结果:  
    16.     // finish  
    17. }  
    [java] view plain copy
     
     在CODE上查看代码片派生到我的代码片
    1. package main  
    2.   
    3. import "fmt"  
    4.   
    5. func Afuntion(ch chan int) {  
    6.     fmt.Println("finish")  
    7.     <-ch  
    8. }  
    9.   
    10. func main() {  
    11.     ch := make(chan int) //无缓冲的channel  
    12.     //只是把这两行的代码顺序对调一下  
    13.     ch <- 1  
    14.     go Afuntion(ch)  
    15.   
    16.     // 输出结果:  
    17.     // 死锁,无结果  
    18. }  

    前一段代码最终会输出"finish"并正常结束,但是后一段代码会发生死锁。为什么会出现这种现象呢,咱们把上面两段代码的逻辑跑一下。

    第一段代码:

            1. 创建了一个无缓冲channel

            2. 启动了一个goroutine,这个routine中对channel执行取出操作,但是因为这时候channel为空,所以这个取出操作发生阻塞,但是主routine可没有发生阻塞,它还在继续运行呢

            3. 主goroutine这时候继续执行下一行,往channel中放入了一个数据

            4. 这时阻塞的那个routine检测到了channel中存在数据了,所以接触阻塞,从channel中取出数据,程序就此完毕

    第二段代码:

            1.  创建了一个无缓冲的channel

            2.  主routine要向channel中放入一个数据,但是因为channel没有缓冲,相当于channel一直都是满的,所以这里会发生阻塞。可是下面的那个goroutine还没有创建呢,主routine在这里一阻塞,整个程序就只能这么一直阻塞下去了,然后。。。然后就没有然后了。。死锁!

    ※从这里可以看出,对于无缓冲的channel,放入操作和取出操作不能再同一个routine中,而且应该是先确保有某个routine对它执行取出操作,然后才能在另一个routine中执行放入操作。

    对于带缓冲的channel,就没那么多讲究了,因为有缓冲空间,所以只要缓冲区不满,放入操作就不会阻塞,同样,只要缓冲区不空,取出操作就不会阻塞。而且,带有缓冲的channel的放入和取出可以用在同一个routine中。

    但是,并不是说有了缓冲就可以随意使用channel的放入和取出了,我们一定要注意放入和取出的速率问题。下面我们就举个例子来说明这种问题:

    我们经常会用利用channel自动阻塞的性质来控制当前运行的goroutine的总数量,如下:

    [java] view plain copy
     
     在CODE上查看代码片派生到我的代码片
    1. package main  
    2.   
    3. import (  
    4.     "fmt"  
    5. )  
    6.   
    7. func Afunction(ch chan int) {  
    8.     fmt.Println("finish")  
    9.     <-ch //goroutine执行完了就从channel取出一个数据  
    10. }  
    11.   
    12. func main() {  
    13.     ch := make(chan int, 10)  
    14.     for i := 0; i < 1000; i++ {  
    15.         //每当创建goroutine的时候就向channel中放入一个数据,如果里面已经有10个数据了,就会  
    16.         //阻塞,由此我们将同时运行的goroutine的总数控制在<=10个的范围内  
    17.         ch <- 1  
    18.         go Afunction(ch)  
    19.     }  
    20.     // 这里只是示范个例子,当然,接下来应该有些更加周密的同步操作  
    21. }  

    上面这种channel的使用方式几乎经常会用到,但是再看一下接下来这段代码,它和上面这种使用channel的方式几乎一样,但是它会造成问题:

    [java] view plain copy
     
     在CODE上查看代码片派生到我的代码片
    1. package main  
    2.   
    3. func Afunction(ch chan int) {  
    4.     ch <- 1  
    5.     ch <- 1  
    6.     ch <- 1  
    7.     ch <- 1  
    8.     ch <- 1  
    9.   
    10.     <-ch  
    11. }  
    12.   
    13. func main() {  
    14.     //主routine的操作同上面那段代码  
    15.     ch := make(chan int, 10)  
    16.     for i := 0; i < 100; i++ {  
    17.         ch <- 1  
    18.         go Afunction(ch)  
    19.     }  
    20.   
    21.     // 这段代码运行的结果为死锁  
    22. }  

    上面这段运行和之前那一段基本上原理是一样的,但是运行后却会发生死锁。为什么呢?其实总结起来就一句话,"放得太快,取得太慢了"。

    按理说,我们应该在我们主routine中创建子goroutine并每次向channel中放入数据,而子goroutine负责从channel中取出数据。但是我们的这段代码在创建了子goroutine后,每个routine会向channel中放入5个数据。这样,每向channel中放入6个数据才会执行一次取出操作,这样一来就可能会有某一时刻,channel已经满了,但是所有的routine都在执行放入操作(因为它们当前执行放入操作的概率是执行取出操作的6倍),这样一来,所有的routine都阻塞了,从而导致死锁。

    在使用带缓冲的channel时一定要注意放入与取出的速率问题。

    (3)关闭后的channel可以取数据,但是不能放数据。而且,channel在执行了close()后并没有真的关闭,channel中的数据全部取走之后才会真正关闭。

    [java] view plain copy
     
     在CODE上查看代码片派生到我的代码片
    1. package main  
    2.   
    3. func main() {  
    4.     ch := make(chan int, 5)  
    5.     ch <- 1  
    6.     ch <- 1  
    7.     close(ch)  
    8.     ch <- //不能对关闭的channel执行放入操作  
    9.           
    10.         // 会触发panic  
    11. }  
    [java] view plain copy
     
     在CODE上查看代码片派生到我的代码片
    1. package main  
    2.   
    3. func main() {  
    4.     ch := make(chan int, 5)  
    5.     ch <- 1  
    6.     ch <- 1  
    7.     close(ch)  
    8.     <-ch //只要channel还有数据,就可能执行取出操作  
    9.   
    10.         //正常结束  
    11. }  
    [java] view plain copy
     
     在CODE上查看代码片派生到我的代码片
    1. package main  
    2.   
    3. import "fmt"  
    4.   
    5. func main() {  
    6.     ch := make(chan int, 5)  
    7.     ch <- 1  
    8.     ch <- 1  
    9.     ch <- 1  
    10.     ch <- 1  
    11.     close(ch)  //如果执行了close()就立即关闭channel的话,下面的循环就不会有任何输出了  
    12.     for {  
    13.         data, ok := <-ch  
    14.         if !ok {  
    15.             break  
    16.         }  
    17.         fmt.Println(data)  
    18.     }  
    19.       
    20.     // 输出:  
    21.     // 1  
    22.     // 1  
    23.     // 1  
    24.     // 1  
    25.     //   
    26.     // 调用了close()后,只有channel为空时,channel才会真的关闭  
    27. }  



    使用channel控制goroutine数量

    channel的性质到这里就介绍完了,但是看上去,channel的使用似乎比WaitGroup要注意更多的细节,那么有什么理由一定要用channel来实现同步呢?channel相比WaitGroup有一个很大的优点,就是channel不仅可以实现协程的同步,而且可以控制当前正在运行的goroutine的总数。

    下面就介绍几种利用channel控制goroutine数量的方法:

    一.如果任务数量是固定的:

    [java] view plain copy
     
     在CODE上查看代码片派生到我的代码片
    1. package main  
    2.   
    3. func Afunction(ch chan int) {  
    4.     ch <- 1  
    5. }  
    6.   
    7. func main() {  
    8.     var (  
    9.         ch        chan int = make(chan int, 20) //可以同时运行的routine数量为20  
    10.         dutycount int      = 500  
    11.     )  
    12.     for i := 0; i < dutycount; i++ {  
    13.         go Afunction(ch)  
    14.     }  
    15.   
    16.     //知道了任务总量,可以像这样利用固定循环次数的循环检测所有的routine是否工作完毕  
    17.     for i := 0; i < dutycount; i++ {  
    18.         <-ch  
    19.     }  
    20. }  


    二.如果任务的数量不固定

    [java] view plain copy
     
     在CODE上查看代码片派生到我的代码片
    1. package main  
    2.   
    3. import (  
    4.     "fmt"  
    5. )  
    6.   
    7. func Afunction(routineControl chan int, feedback chan string) {  
    8.     defer func() {  
    9.         <-routineControl  
    10.         feedback <- "finish"  
    11.     }()  
    12.   
    13.     // do some process  
    14.     // ...  
    15. }  
    16.   
    17. func main() {  
    18.     var (  
    19.         routineCtl chan int    = make(chan int, 20)  
    20.         feedback   chan string = make(chan string, 10000)  
    21.   
    22.         msg      string  
    23.         allwork  int  
    24.         finished int  
    25.     )  
    26.     for i := 0; i < 1000; i++ {  
    27.         routineCtl <- 1  
    28.         allwork++  
    29.         go Afunction(routineCtl, feedback)  
    30.     }  
    31.   
    32.     for {  
    33.         msg = <-feedback  
    34.         if msg == "finish" {  
    35.             finished++  
    36.         }  
    37.         if finished == allwork {  
    38.             break  
    39.         }  
    40.     }  
    41. }  



    如果转载请注明出处:http://blog.csdn.NET/gophers/article/details/24665419

  • 相关阅读:
    118/119. Pascal's Triangle/II
    160. Intersection of Two Linked Lists
    168. Excel Sheet Column Title
    167. Two Sum II
    172. Factorial Trailing Zeroes
    169. Majority Element
    189. Rotate Array
    202. Happy Number
    204. Count Primes
    MVC之Model元数据
  • 原文地址:https://www.cnblogs.com/php-rearch/p/5962873.html
Copyright © 2011-2022 走看看