runner模式,大概我们就是通过interrupt和pool来进行并发的控制
package runner import ( "errors" "os" "os/signal" "time" ) type Runner struct { interrupt chan os.Signal complete chan error timeout <-chan time.Time tasks []func(int) } var ErrTimeout = errors.New("received timeout") var ErrInterrupt = errors.New("received interrupt") func New(d time.Duration) *Runner { return &Runner{ interrupt:make(chan os.Signal, 1), complete:make(chan error), timeout:time.After(d), } } func (r *Runner) Add(tasks ...func(int)) { r.tasks = append(r.tasks, tasks...) } func (r *Runner) Start() error { signal.Notify(r.interrupt, os.Interrupt) go func() { r.complete <- r.run() }() select { case err := <-r.complete: return err case <-r.timeout: return ErrTimeout } } func (r *Runner) run() error { for id, task := range r.tasks { if r.gotInterrupt() { return ErrInterrupt } task(id) } return nil } func (r *Runner) gotInterrupt() bool { select { case <-r.interrupt: signal.Stop(r.interrupt) return true default: return false } }
main
package main import ( "awesomeProject2/runner" "log" "os" "time" ) const timeout = 3 * time.Second func main() { log.Println("Starting work.") r := runner.New(timeout) r.Add(createTask(), createTask(), createTask()) if err := r.Start(); err != nil { switch err { case runner.ErrTimeout: log.Println("Terminating due to timeout.") os.Exit(1) case runner.ErrInterrupt: log.Println("Terminating due to interrupt.") os.Exit(2) } } log.Println("Process ended.") } func createTask() func(int) { return func(id int) { log.Printf("Processor - Task", id) time.Sleep(time.Duration(id) * time.Second) } }
pool模式,这个模式主要就是字面意义上的pool了,我们主要是操作使用和释放以及关闭和阻塞
package pool import ( "errors" "io" "log" "sync" ) type Pool struct { m sync.Mutex resources chan io.Closer factory func() (io.Closer, error) closed bool } var ErrPoolClosed = errors.New("Pool has been closed") func New(fn func() (io.Closer, error), size int) (*Pool, error) { if size <= 0 { return nil, errors.New("Size value too small") } return &Pool{ resources: make(chan io.Closer, size), factory: fn, }, nil } func (p *Pool) Acquire() (io.Closer, error) { select { case r, ok := <-p.resources: log.Println("Acquire: ", "Shared Resource") if !ok { return nil, ErrPoolClosed } return r, nil default: log.Println("Acquire: ", "New Resource") return p.factory() } } func (p *Pool) Release(r io.Closer) { p.m.Lock() defer p.m.Unlock() if p.closed { r.Close() return } select { case p.resources <- r: log.Println("Release: ", "In Queue") default: log.Println("Release: ", "Closing") r.Close() } } func (p *Pool) Close() { p.m.Lock() defer p.m.Unlock() if p.closed { return } p.closed = true close(p.resources) for r := range p.resources { r.Close() } }
main
package main import ( "awesomeProject2/pool" "io" "log" "math/rand" "sync" "sync/atomic" "time" ) const ( maxGoroutines = 25 pooledResources = 2 ) type dbConnection struct { Id int32 } func (db *dbConnection) Close() error { return nil } var idCounter int32 func createConnection() (io.Closer, error) { id := atomic.AddInt32(&idCounter, 1) return &dbConnection{id}, nil } func main() { var wg sync.WaitGroup wg.Add(maxGoroutines) p, err := pool.New(createConnection, pooledResources) if err != nil { log.Println(err) } for query := 0; query < maxGoroutines; query++ { go func(q int) { performQueries(q, p) wg.Done() }(query) } wg.Wait() log.Println("Shutdown Program") p.Close() } func performQueries(query int, p *pool.Pool) { conn, err := p.Acquire() if err != nil { log.Println(err) return } defer p.Release(conn) time.Sleep(time.Duration(rand.Intn(1000)) * time.Millisecond) log.Printf("Query: QID[%d] CID[%d] ", query, conn.(*dbConnection).Id) }
work模式,这里关键的一个点就是如果我们range 一个chan,我们只要不close掉,则会一直等待chan的传值,也就简单的实现了一个并发
package work import "sync" type Worker interface { Task() } type Pool struct { work chan Worker wg sync.WaitGroup } func New(maxGroutines int) *Pool { p := Pool{ work:make(chan Worker), } p.wg.Add(maxGroutines) for i := 0; i < maxGroutines; i++ { go func() { for w := range p.work { w.Task() } p.wg.Done() }() } return &p } func (p *Pool) Run(w Worker) { p.work <- w } func (p *Pool) ShutDown() { close(p.work) p.wg.Wait() }
main
package main import ( "awesomeProject2/work" "log" "sync" "time" ) var names = []string{ "steve", "bob", "mary", "therese", "jason", } type namePrinter struct { name string } func (m *namePrinter) Task() { log.Println(m.name) time.Sleep(time.Second) } func main() { p := work.New(2) var wg sync.WaitGroup wg.Add(100 * len(names)) for i := 0; i < 100; i++ { for _, name := range names { np := namePrinter{ name:name, } go func() { p.Run(&np) wg.Done() }() } } wg.Wait() p.ShutDown() }
还有个值得关注的点就是select的应用。