zoukankan      html  css  js  c++  java
  • Go的几种并发模式

    runner模式,大概我们就是通过interrupt和pool来进行并发的控制

    package runner
    
    import (
        "errors"
        "os"
        "os/signal"
        "time"
    )
    
    type Runner struct {
        interrupt chan os.Signal
        complete chan error
        timeout <-chan time.Time
        tasks []func(int)
    }
    
    var ErrTimeout = errors.New("received timeout")
    var ErrInterrupt = errors.New("received interrupt")
    
    func New(d time.Duration) *Runner {
        return &Runner{
            interrupt:make(chan os.Signal, 1),
            complete:make(chan error),
            timeout:time.After(d),
        }
    }
    
    func (r *Runner) Add(tasks ...func(int)) {
        r.tasks = append(r.tasks, tasks...)
    }
    
    func (r *Runner) Start() error {
        signal.Notify(r.interrupt, os.Interrupt)
    
        go func() {
            r.complete <- r.run()
        }()
    
        select {
        case err := <-r.complete:
            return err
        case <-r.timeout:
            return ErrTimeout
        }
    
    }
    
    func (r *Runner) run() error {
        for id, task := range r.tasks {
            if r.gotInterrupt() {
                return ErrInterrupt
            }
            task(id)
        }
        return nil
    }
    
    func (r *Runner) gotInterrupt() bool {
        select {
        case <-r.interrupt:
            signal.Stop(r.interrupt)
            return true
        default:
            return false
        }
    }

    main

    package main
    
    import (
        "awesomeProject2/runner"
        "log"
        "os"
        "time"
    )
    
    const timeout = 3 * time.Second
    
    
    func main() {
        log.Println("Starting work.")
        r := runner.New(timeout)
        r.Add(createTask(), createTask(), createTask())
    
        if err := r.Start(); err != nil {
            switch err {
            case runner.ErrTimeout:
                log.Println("Terminating due to timeout.")
                os.Exit(1)
            case runner.ErrInterrupt:
                log.Println("Terminating due to interrupt.")
                os.Exit(2)
            }
        }
    
        log.Println("Process ended.")
    
    }
    
    func createTask() func(int) {
        return func(id int) {
            log.Printf("Processor - Task", id)
            time.Sleep(time.Duration(id) * time.Second)
        }
    }

    pool模式,这个模式主要就是字面意义上的pool了,我们主要是操作使用和释放以及关闭和阻塞

    package pool
    
    import (
        "errors"
        "io"
        "log"
        "sync"
    )
    
    type Pool struct {
        m             sync.Mutex
        resources    chan io.Closer
        factory        func() (io.Closer, error)
        closed        bool
    }
    
    var ErrPoolClosed = errors.New("Pool has been closed")
    
    func New(fn func() (io.Closer, error), size int) (*Pool, error) {
        if size <= 0 {
            return nil, errors.New("Size value too small")
        }
        return &Pool{
            resources: make(chan io.Closer, size),
            factory:   fn,
        }, nil
    }
    
    func (p *Pool) Acquire() (io.Closer, error) {
        select {
        case r, ok := <-p.resources:
            log.Println("Acquire: ", "Shared Resource")
            if !ok {
                return nil, ErrPoolClosed
            }
            return r, nil
        default:
            log.Println("Acquire: ", "New Resource")
            return p.factory()
        }
    }
    
    func (p *Pool) Release(r io.Closer) {
        p.m.Lock()
        defer p.m.Unlock()
    
        if p.closed {
            r.Close()
            return
        }
    
        select {
        case p.resources <- r:
            log.Println("Release: ", "In Queue")
        default:
            log.Println("Release: ", "Closing")
            r.Close()
        }
    }
    
    func (p *Pool) Close() {
        p.m.Lock()
        defer p.m.Unlock()
    
        if p.closed {
            return
        }
    
        p.closed = true
    
        close(p.resources)
        for r := range p.resources {
            r.Close()
        }
    }

    main

    package main
    
    import (
        "awesomeProject2/pool"
        "io"
        "log"
        "math/rand"
        "sync"
        "sync/atomic"
        "time"
    )
    
    const (
        maxGoroutines     = 25
        pooledResources = 2
    )
    
    type dbConnection struct {
        Id int32
    }
    
    func (db *dbConnection) Close() error {
        return nil
    }
    
    var idCounter int32
    
    func createConnection() (io.Closer, error) {
        id := atomic.AddInt32(&idCounter, 1)
        return &dbConnection{id}, nil
    }
    
    
    
    func main() {
        var wg sync.WaitGroup
        wg.Add(maxGoroutines)
    
        p, err := pool.New(createConnection, pooledResources)
        if err != nil {
            log.Println(err)
        }
    
        for query := 0; query < maxGoroutines; query++ {
            go func(q int) {
                performQueries(q, p)
                wg.Done()
            }(query)
        }
    
        wg.Wait()
        log.Println("Shutdown Program")
        p.Close()
    }
    
    func performQueries(query int, p *pool.Pool) {
        conn, err := p.Acquire()
        if err != nil {
            log.Println(err)
            return
        }
        defer p.Release(conn)
    
        time.Sleep(time.Duration(rand.Intn(1000)) * time.Millisecond)
        log.Printf("Query: QID[%d] CID[%d]
    ", query, conn.(*dbConnection).Id)
    
    }

    work模式,这里关键的一个点就是如果我们range 一个chan,我们只要不close掉,则会一直等待chan的传值,也就简单的实现了一个并发

    package work
    
    import "sync"
    
    type Worker interface {
        Task()
    }
    
    type Pool struct {
        work chan Worker
        wg sync.WaitGroup
    }
    
    func New(maxGroutines int) *Pool {
        p := Pool{
            work:make(chan Worker),
        }
        p.wg.Add(maxGroutines)
        for i := 0; i < maxGroutines; i++ {
            go func() {
                for w := range p.work {
                    w.Task()
                }
                p.wg.Done()
            }()
        }
        return &p
    }
    
    func (p *Pool) Run(w Worker) {
        p.work <- w
    }
    
    func (p *Pool) ShutDown() {
        close(p.work)
        p.wg.Wait()
    }

    main

    package main
    
    import (
        "awesomeProject2/work"
        "log"
        "sync"
        "time"
    )
    
    var names = []string{
        "steve",
        "bob",
        "mary",
        "therese",
        "jason",
    }
    
    type namePrinter struct {
        name string
    }
    
    func (m *namePrinter) Task() {
        log.Println(m.name)
        time.Sleep(time.Second)
    }
    
    
    func main() {
        p := work.New(2)
        var wg sync.WaitGroup
        wg.Add(100 * len(names))
    
        for i := 0; i < 100; i++ {
            for _, name := range names {
                np := namePrinter{
                    name:name,
                }
                go func() {
                    p.Run(&np)
                    wg.Done()
                }()
            }
        }
        wg.Wait()
        p.ShutDown()
    }

    还有个值得关注的点就是select的应用。

    一个没有高级趣味的人。 email:hushui502@gmail.com
  • 相关阅读:
    阶段3 3.SpringMVC·_03.SpringMVC常用注解_1 RequestParam注解
    阶段3 3.SpringMVC·_02.参数绑定及自定义类型转换_7 获取Servlet原生的API
    函数传参
    利用 操作符特性 代替if判断语句
    for(;;)和 while(1) 有什么区别吗?for()和while()的使用情景。
    一个简单的mfc单页界面文件读写程序(MFC 程序入口和执行流程)
    ARM异常---一个DataAbort的触发过程:
    C语言,单链表操作(增删改查)(version 0.1)
    Cpu实验
    H-JATG:NAND_FLASH的参数设置
  • 原文地址:https://www.cnblogs.com/CherryTab/p/12390835.html
Copyright © 2011-2022 走看看