zoukankan      html  css  js  c++  java
  • Go语言实战 并发模式

    章节目录学习内容有:runner、pool、Go读写锁、以及总结。总结我习惯将其放在前面。

    总结

    稍后添加

    runner

    common.go

    package common
    
    import (
        "time"
        "os"
        "errors"
        "os/signal"
    )
    var ErrTimeOut = errors.New("执行者执行超时")
    var ErrInterrupt = errors.New("执行者被中断")
    // 一个执行者可以执行任何任务 但是任务是限制完成的 该执行者可以通过发送终止信号终止它
    type Runner struct{
        tasks []func(int) //要执行的任务
        complete chan error //用于通知任务全部完成  定义出现错误的原因 无错误返回nil
        timeout <-chan time.Time //单向通道 这些任务再多久内完成
        interrupt chan  os.Signal //可以控制强制终止的信号
    }
    //用于返回我们需要的Runner
    //参数,用来设置这个执行者的超时时间 func New(tm time.Duration)
    *Runner{ return &Runner{ complete:make(chan error), timeout:time.After(tm), interrupt:make(chan os.Signal,1), } } //将需要执行的任务 添加到Runner里 func (r *Runner) Add(tasks ...func(int)){ r.tasks = append(r.tasks,tasks...) } func (r *Runner) run() error { for id,task := range r.tasks { if r.isInterrupt(){ return ErrInterrupt } task(id) } return nil } //检查是否接收到了中断信号 func (r *Runner) isInterrupt() bool{ select { case <-r.interrupt: signal.Stop(r.interrupt) return true default: return false } } func (r *Runner) Start() error{ signal.Notify(r.interrupt,os.Interrupt) go func() { r.complete <- r.run() }() select { case err := <-r.complete: return err case <-r.timeout: return ErrTimeOut } }

    main.go

    func main() {
        log.Println("...开始执行任务...")
    
        timeout := 3 * time.Second  //定义超时时间为3s
        r := common.New(timeout)
    
        r.Add(createTask(), createTask(), createTask()) //添加3个任务
    
        if err:=r.Start();err!=nil{
            switch err {
            case common.ErrTimeOut:
                log.Println(err)
                os.Exit(1)
            case common.ErrInterrupt:
                log.Println(err)
                os.Exit(2)
            }
        }
        log.Println("...任务执行结束...")
    }
    
    func createTask() func(int) {
        return func(id int) {
            log.Printf("正在执行任务%d", id)
            time.Sleep(time.Duration(id)* time.Second)
        }
    }

    输出:

    2018/07/31 16:40:42 ...开始执行任务...
    2018/07/31 16:40:42 正在执行任务0
    2018/07/31 16:40:42 正在执行任务1
    2018/07/31 16:40:43 正在执行任务2
    2018/07/31 16:40:45 执行者执行超时

    pool

    common.go

    //一个安全的资源池,被管理的资源必须都实现io.Close接口
    type Pool struct {
        m       sync.Mutex                       //m是一个互斥锁
        res     chan io.Closer                   //有缓冲通道 大小在初始化Pool指定
        factory func() (io.Closer, error)        //生成新资源的函数
        closed  bool                             //资源是否关闭
    }
    
    var ErrPoolClosed = errors.New("资源池已经被关闭。")
    
    //创建一个资源池  接受两个参数:fn创建新资源函数 size指定资源池的大小
    func New(fn func() (io.Closer, error), size uint) (*Pool, error) {
        if size <= 0 {
            return nil, errors.New("size的值太小了。")
        }
        return &Pool{
            factory: fn,
            res:     make(chan io.Closer, size),
        }, nil
    }
    //从资源池里获取一个资源
    func (p *Pool) Acquire() (io.Closer,error) {
        select {
        case r,ok := <-p.res:
            log.Println("Acquire:共享资源")
            if !ok {
                return nil,ErrPoolClosed
            }
            return r,nil
        default:
            log.Println("Acquire:新生成资源")
            return p.factory()
        }
    }
    
    //关闭资源池,释放资源
    func (p *Pool) Close() {
        p.m.Lock()
        defer p.m.Unlock()
    
        if p.closed {
            return
        }
    
        p.closed = true
    
        //关闭通道,不让写入了
        close(p.res)
    
        //关闭通道里的资源
        for r:=range p.res {
            r.Close()
        }
    }
    
    func (p *Pool) Release(r io.Closer){
        //保证该操作和Close方法的操作是安全的
        p.m.Lock()
        defer p.m.Unlock()
    
        //资源池都关闭了,就省这一个没有释放的资源了,释放即可
        if p.closed {
            r.Close()
            return
        }
    
        select {
        case p.res <- r:
            log.Println("资源释放到池子里了")
        default:
            log.Println("资源池满了,释放这个资源吧")
            r.Close()
        }
    }

    main.go

    const (
        //模拟的最大goroutine
        maxGoroutine = 5
        //资源池的大小
        poolRes      = 2
    )
    
    func main() {
        //等待任务完成
        var wg sync.WaitGroup
        wg.Add(maxGoroutine)
    
        p, err := common.New(createConnection, poolRes)
        if err != nil {
            log.Println(err)
            return
        }
        //模拟好几个goroutine同时使用资源池查询数据
        for query := 0; query < maxGoroutine; query++ {
            go func(q int) {
                dbQuery(q, p)
                wg.Done()
            }(query)
        }
    
        wg.Wait()
        log.Println("开始关闭资源池")
        p.Close()
    }
    
    //模拟数据库查询
    func dbQuery(query int, pool *common.Pool) {
        conn, err := pool.Acquire()
        if err != nil {
            log.Println(err)
            return
        }
    
        defer pool.Release(conn)
    
        //模拟查询
        time.Sleep(time.Duration(rand.Intn(1000)) * time.Millisecond)
        log.Printf("第%d个查询,使用的是ID为%d的数据库连接", query, conn.(*dbConnection).ID)
    }
    //数据库连接
    type dbConnection struct {
        ID int32//连接的标志
    }
    
    //实现io.Closer接口
    func (db *dbConnection) Close() error {
        log.Println("关闭连接", db.ID)
        return nil
    }
    
    var idCounter int32
    
    //生成数据库连接的方法,以供资源池使用
    func createConnection() (io.Closer, error) {
        //并发安全,给数据库连接生成唯一标志
        id := atomic.AddInt32(&idCounter, 1)
        return &dbConnection{id}, nil
    }

    输出:

    2018/07/31 16:51:58 Acquire:新生成资源
    2018/07/31 16:51:58 Acquire:新生成资源
    2018/07/31 16:51:58 Acquire:新生成资源
    2018/07/31 16:51:58 Acquire:新生成资源
    2018/07/31 16:51:58 Acquire:新生成资源
    2018/07/31 16:51:58 第1个查询,使用的是ID为4的数据库连接
    2018/07/31 16:51:58 资源释放到池子里了
    2018/07/31 16:51:58 第2个查询,使用的是ID为5的数据库连接
    2018/07/31 16:51:58 资源释放到池子里了
    2018/07/31 16:51:58 第4个查询,使用的是ID为1的数据库连接
    2018/07/31 16:51:58 资源池满了,释放这个资源吧
    2018/07/31 16:51:58 关闭连接 1
    2018/07/31 16:51:58 第3个查询,使用的是ID为3的数据库连接
    2018/07/31 16:51:58 资源池满了,释放这个资源吧
    2018/07/31 16:51:58 关闭连接 3
    2018/07/31 16:51:58 第0个查询,使用的是ID为2的数据库连接
    2018/07/31 16:51:58 资源池满了,释放这个资源吧
    2018/07/31 16:51:58 关闭连接 2
    2018/07/31 16:51:58 开始关闭资源池
    2018/07/31 16:51:58 关闭连接 4
    2018/07/31 16:51:58 关闭连接 5

    由于资源使用频繁,Go提供原生的资源池管理,利用sync.Pool实现

    package main
    
    import (
        "sync"
        "time"
        "math/rand"
        "log"
        "sync/atomic"
    )
    
    const (
        //模拟的最大goroutine
        maxGoroutine = 5
    )
    
    func main() {
        //等待任务完成
        var wg sync.WaitGroup
        wg.Add(maxGoroutine)
    
        p:=&sync.Pool{
            New:createConnection,
        }
    
        //模拟好几个goroutine同时使用资源池查询数据
        for query := 0; query < maxGoroutine; query++ {
            go func(q int) {
                dbQuery(q, p)
                wg.Done()
            }(query)
        }
    
        wg.Wait()
    }
    
    //模拟数据库查询
    func dbQuery(query int, pool *sync.Pool) {
        conn:=pool.Get().(*dbConnection)
    
        defer pool.Put(conn)
    
        //模拟查询
        time.Sleep(time.Duration(rand.Intn(1000)) * time.Millisecond)
        log.Printf("第%d个查询,使用的是ID为%d的数据库连接", query, conn.ID)
    }
    //数据库连接
    type dbConnection struct {
        ID int32//连接的标志
    }
    
    //实现io.Closer接口
    func (db *dbConnection) Close() error {
        log.Println("关闭连接", db.ID)
        return nil
    }
    
    var idCounter int32
    
    //生成数据库连接的方法,以供资源池使用
    func createConnection() interface{} {
        //并发安全,给数据库连接生成唯一标志
        id := atomic.AddInt32(&idCounter, 1)
        return &dbConnection{ID:id}
    }

    输出

    2018/07/31 16:55:40 第2个查询,使用的是ID为4的数据库连接
    2018/07/31 16:55:40 第4个查询,使用的是ID为1的数据库连接
    2018/07/31 16:55:40 第3个查询,使用的是ID为5的数据库连接
    2018/07/31 16:55:41 第1个查询,使用的是ID为3的数据库连接
    2018/07/31 16:55:41 第0个查询,使用的是ID为2的数据库连接

    Go读写锁

    package main
    
    import (
        "sync"
        "fmt"
        "math/rand"
    )
    
    var count int
    var wg sync.WaitGroup
    
    func main() {
        wg.Add(10)
    
        for i:=0;i<5;i++ {
            go read(i)
        }
    
        for i:=0;i<5;i++ {
            go write(i);
        }
    
        wg.Wait()
    }
    
    func read(n int) {
        fmt.Printf("读goroutine %d 正在读取...
    ",n)
    
        v := count
    
        fmt.Printf("读goroutine %d 读取结束,值为:%d
    ", n,v)
        wg.Done()
    }
    
    func write(n int) {
        fmt.Printf("写goroutine %d 正在写入...
    ",n)
        v := rand.Intn(1000)
    
        count = v
    
        fmt.Printf("写goroutine %d 写入结束,新值为:%d
    ", n,v)
        wg.Done()
    }
    存在竞争态例子

    输出

    写goroutine 1 正在写入...
    写goroutine 1 写入结束,新值为:81
    写goroutine 0 正在写入...
    写goroutine 0 写入结束,新值为:887
    读goroutine 1 正在读取...
    读goroutine 1 读取结束,值为:887
    读goroutine 0 正在读取...
    读goroutine 0 读取结束,值为:887
    写goroutine 2 正在写入...
    写goroutine 2 写入结束,新值为:847
    写goroutine 3 正在写入...
    写goroutine 3 写入结束,新值为:59
    读goroutine 2 正在读取...
    读goroutine 2 读取结束,值为:59
    读goroutine 3 正在读取...
    读goroutine 3 读取结束,值为:59
    写goroutine 4 正在写入...
    读goroutine 4 正在读取...
    读goroutine 4 读取结束,值为:81
    写goroutine 4 写入结束,新值为:81

    方法一:加互斥锁

    方案二:读写锁

    var count int
    var wg sync.WaitGroup
    var rw sync.RWMutex
    
    func main() {
        wg.Add(10)
    
        for i:=0;i<5;i++ {
            go read(i)
        }
    
        for i:=0;i<5;i++ {
            go write(i);
        }
    
        wg.Wait()
    }
    
    func read(n int) {
        rw.RLock()
        fmt.Printf("读goroutine %d 正在读取...
    ",n)
    
        v := count
    
        fmt.Printf("读goroutine %d 读取结束,值为:%d
    ", n,v)
        wg.Done()
        rw.RUnlock()
    }
    
    func write(n int) {
        rw.Lock()
        fmt.Printf("写goroutine %d 正在写入...
    ",n)
        v := rand.Intn(1000)
    
        count = v
    
        fmt.Printf("写goroutine %d 写入结束,新值为:%d
    ", n,v)
        wg.Done()
        rw.Unlock()
    }

    输出 

    读goroutine 2 正在读取...
    读goroutine 2 读取结束,值为:0
    写goroutine 4 正在写入...
    写goroutine 4 写入结束,新值为:81
    读goroutine 3 正在读取...
    读goroutine 3 读取结束,值为:81
    读goroutine 4 正在读取...
    读goroutine 4 读取结束,值为:81
    读goroutine 0 正在读取...
    读goroutine 0 读取结束,值为:81
    读goroutine 1 正在读取...
    读goroutine 1 读取结束,值为:81
    写goroutine 0 正在写入...
    写goroutine 0 写入结束,新值为:887
    写goroutine 1 正在写入...
    写goroutine 1 写入结束,新值为:847
    写goroutine 2 正在写入...
    写goroutine 2 写入结束,新值为:59
    写goroutine 3 正在写入...
    写goroutine 3 写入结束,新值为:81

    实现一个安全的Map

    //安全的Map
    type SynchronizedMap struct {
        rw *sync.RWMutex
        data map[interface{}]interface{}
    }
    //存储操作
    func (sm *SynchronizedMap) Put(k,v interface{}){
        sm.rw.Lock()
        defer sm.rw.Unlock()
    
        sm.data[k]=v
    }
    //获取操作
    func (sm *SynchronizedMap) Get(k interface{}) interface{}{
        sm.rw.RLock()
        defer sm.rw.RUnlock()
    
        return sm.data[k]
    }
    
    //删除操作
    func (sm *SynchronizedMap) Delete(k interface{}) {
        sm.rw.Lock()
        defer sm.rw.Unlock()
    
        delete(sm.data,k)
    }
    
    //遍历Map,并且把遍历的值给回调函数,可以让调用者控制做任何事情
    func (sm *SynchronizedMap) Each(cb func (interface{},interface{})){
        sm.rw.RLock()
        defer sm.rw.RUnlock()
    
        for k, v := range sm.data {
            cb(k,v)
        }
    }
    
    //生成初始化一个SynchronizedMap
    func NewSynchronizedMap() *SynchronizedMap{
        return &SynchronizedMap{
            rw:new(sync.RWMutex),
            data:make(map[interface{}]interface{}),
        }
    }
  • 相关阅读:
    [LeetCode] 1190. Reverse Substrings Between Each Pair of Parentheses
    [LeetCode] 923. 3Sum With Multiplicity
    [LeetCode] 643. Maximum Average Subarray I
    [LeetCode] 669. Trim a Binary Search Tree
    [LeetCode] 1743. Restore the Array From Adjacent Pairs
    [LeetCode] 888. Fair Candy Swap
    [LeetCode] 1102. Path With Maximum Minimum Value
    [LeetCode] 1631. Path With Minimum Effort
    [LeetCode] 1522. Diameter of N-Ary Tree
    [LeetCode] 1376. Time Needed to Inform All Employees
  • 原文地址:https://www.cnblogs.com/ycx95/p/9397060.html
Copyright © 2011-2022 走看看