zoukankan      html  css  js  c++  java
  • go channel 案例分析

    断断续续理了一下关于channel的一些概念,现在可以把下面的程序理清楚了。

    1. source code

    这个程序来自于《Go语言程序设计》 7.2.2 并发的Grep, 程序如下。

    package main
    
    import (
        "bufio"
        "bytes"
        "fmt"
        "io"
        "log"
        "os"
        "path/filepath"
        "regexp"
    )
    
    type Job struct {
        filename string
        results  chan<- Result
    }
    
    type Result struct {
        filename string
        fino     int
        line     string
    }
    
    var numberOfWorkers = 4
    
    func main() {
        //retrieve input arguments
        if len(os.Args) < 3 || os.Args[1] == "-1" || os.Args[1] == "--help" {
            fmt.Printf("usage:  %s <regrex> <files>
    ", filepath.Base(os.Args[0]))
            os.Exit(1)
        }
    
        if lineRx, err := regexp.Compile(os.Args[1]); err != nil {
            log.Fatalf("invalid regexp: %s
    ", err)
        } else {
            //do the real work
            grep(lineRx, os.Args[2:])
        }
    }
    
    func grep(lineRx *regexp.Regexp, filenames []string) {
        jobs := make(chan Job, numberOfWorkers)
        done := make(chan struct{}, numberOfWorkers)
        results := make(chan Result, minimun(1000, len(filenames)))
    
        go addJobs(jobs, filenames, results) //produce Jobs
        for i := 0; i < numberOfWorkers; i++ {
            go doJobs(done, lineRx, jobs) //consume Jobs concurrent
        }
        go awaitCompletion(done, results) //wait until all the Jobs done
        processResults(results)
    }
    
    func addJobs(jobs chan<- Job, filenames []string, result chan<- Result) {
        for _, filename := range filenames {
            jobs <- Job{filename, result}
        }
        close(jobs)
    }
    
    func doJobs(done chan<- struct{}, lineRx *regexp.Regexp, jobs <-chan Job) {
        for job := range jobs {
            job.Do(lineRx)
        }
        done <- struct{}{}
    }
    
    func awaitCompletion(done <-chan struct{}, results chan Result) {
        for i := 0; i < numberOfWorkers; i++ {
            <-done
        }
        //???what will happen to data in results???
        close(results)
    }
    
    func processResults(results <-chan Result) {
        for result := range results {
            fmt.Printf("%s:%d:%s
    ", result.filename, result.fino, result.line)
        }
    }
    
    func minimun(a int, b int) int {
        if a <= b {
            return a
        } else {
            return b
        }
    }
    
    func (job Job) Do(lineRx *regexp.Regexp) {
        file, err := os.Open(job.filename)
        if err != nil {
            log.Printf("error: %s
    ", err)
            return
        }
        defer file.Close()
    
        reader := bufio.NewReader(file)
        for lino := 1; ; lino++ {
            line, err := reader.ReadBytes('
    ')
            if err != nil {
                if err != io.EOF {
                    log.Printf("error:%d: %s
    ", lino, err)
                }
                break
            }
    
            line = bytes.TrimRight(line, "
    
    ")
            if lineRx.Match(line) {
                job.results <- Result{job.filename, lino, string(line)}
            }
        }
    }

    编译程序:

    go build program

    program  a12345b   a.txt  b.txt c.txt...

    2.编写并发go routinue 的一些模式说明

    同步通信时需要避开两个陷阱:

    陷阱一:主线程提前退出

    当其他线程工作没有完成,而主线程提前退出。主线程退出会导致其他线程强制退出,而得不到想要的结果。

    常见的解决方式是 让主 gorountine在done通道上等待,根据接收到的消息判断工作是否完成。另一种是使用sync.WaitGroup。

    陷阱二:死锁

    注意读写线程之间的关系,例如:不关闭 写chanel 会导致 使用range 读数据的 rountine堵塞。

    3.程序同步关系图

    这里我画出程序执行流程:

    0-1 : prepare channels

    2:     addJobs start

    3:     in addJobs,   close(jobs)  to notice the reader

    4:     in doJobs,     finish read jobs(not blocked here),  the consumer won't be controlled by the producer

    5:     in do Jobs,   close done to inform that the reader is free

    6, 7, 8, 9:   ....

    10:  whole program finished

    从上述流程上可以看出,都是生产者 通过一定的形式 通知消费者。告知消费者,产品已经生产完成,因此消费者不需要等待了,消费者只需要把剩下的任务完成就可以,消费者不需要受控于生产者了。

    而这种通知 通过两种形式完成。

    第一种: close channel.  例如:  close(jobs),  close(results)

    第二种:读写done 通道。例如:  done <- struct{}{},   <-done。  由于done占用资源比较小,程序中并没有把它关闭。

    continue to the github project  gocrawl.....

  • 相关阅读:
    'index.js' does not match the corresponding name on disk: '. ode_modules
    onload()方法只能在body标签中调用吗?怎么调用多个多个方法?
    HTML5新属性在Google浏览器中不能显示的问题
    HTML引入JS、CSS的各种方法
    框架、架构、设计模式的区别
    npm 命令 --save 和 --save-dev 的区别
    软件开发的权限控制和权限验证
    报错:Something is already running on port 8000.
    $stateProvider resovle 无法找到的原因
    git 统计代码量 shell脚本
  • 原文地址:https://www.cnblogs.com/harrysun/p/3980501.html
Copyright © 2011-2022 走看看