zoukankan      html  css  js  c++  java
  • go 一步步实现Goroutine Pool

     Goroutine Pool架构

    超大规模并发的场景下,不加限制的大规模的goroutine可能造成内存暴涨,给机器带来极大的压力,吞吐量下降和处理速度变慢。

    而实现一个Goroutine Pool,复用goroutine,减轻runtime的调度压力以及缓解内存压力,依托这些优化,在大规模goroutine并发的场景下可以极大地提高并发性能。

    Pool类型

    type Pool struct {
    	// capacity of the pool.
    	//capacity是该Pool的容量,也就是开启worker数量的上限,每一个worker需要一个goroutine去执行;
    	//worker类型为任务类。
    	capacity int32
    	// running is the number of the currently running goroutines.
    	//running是当前正在执行任务的worker数量
    	running int32
    	// expiryDuration set the expired time (second) of every worker.
    	//expiryDuration是worker的过期时长,在空闲队列中的worker的最新一次运行时间与当前时间之差如果大于这个值则表示已过期,定时清理任务会清理掉这个worker;
    	expiryDuration time.Duration
    	// workers is a slice that store the available workers.
    	//任务队列
    	workers []*Worker
    	// release is used to notice the pool to closed itself.
    	//当关闭该Pool支持通知所有worker退出运行以防goroutine泄露
    	release chan sig
    	// lock for synchronous operation
    	//用以支持Pool的同步操作
    	lock sync.Mutex
    	//once用在确保Pool关闭操作只会执行一次
    	once sync.Once
    }

    初始化Pool

    // NewPool generates a instance of ants pool
    func NewPool(size, expiry int) (*Pool, error) {
    	if size <= 0 {
    		return nil, errors.New("Pool Size <0,not Create")
    	}
    	p := &Pool{
    		capacity:       int32(size),
    		release:        make(chan sig, 1),
    		expiryDuration: time.Duration(expiry) * time.Second,
    		running:		0,
    	}
    	// 启动定期清理过期worker任务,独立goroutine运行,
    	// 进一步节省系统资源
    	p.monitorAndClear()
    	return p, nil
    }

    获取Worker

    // getWorker returns a available worker to run the tasks.
    func (p *Pool) getWorker() *Worker {
    	var w *Worker
    	// 标志,表示当前运行的worker数量是否已达容量上限
    	waiting := false
    	// 涉及从workers队列取可用worker,需要加锁
    	p.lock.Lock()
    	workers := p.workers
    	n := len(workers) - 1
    	fmt.Println("空闲worker数量:",n+1)
    	fmt.Println("协程池现在运行的worker数量:",p.running)
    	// 当前worker队列为空(无空闲worker)
    	if n < 0 {
    		//没有空闲的worker有两种可能:
    		//1.运行的worker超出了pool容量
    		//2.当前是空pool,从未往pool添加任务或者一段时间内没有任务添加,被定期清除
    		// 运行worker数目已达到该Pool的容量上限,置等待标志
    		if p.running >= p.capacity {
    			//print("超过上限")
    			waiting = true
    		} else {
    			// 当前无空闲worker但是Pool还没有满,
    			// 则可以直接新开一个worker执行任务
    			p.running++
    			w = &Worker{
    				pool: p,
    				task: make(chan functinType),
    				str:make(chan string),
    			}
    		}
    		// 有空闲worker,从队列尾部取出一个使用
    	} else {
    		//<-p.freeSignal
    		w = workers[n]
    		workers[n] = nil
    		p.workers = workers[:n]
    		p.running++
    	}
    	// 判断是否有worker可用结束,解锁
    	p.lock.Unlock()
    	if waiting {
    		//当一个任务执行完以后会添加到池中,有了空闲的任务就可以继续执行:
    		// 阻塞等待直到有空闲worker
    		for len(p.workers) == 0{
    			continue
    		}
    		p.lock.Lock()
    		workers = p.workers
    		l := len(workers) - 1
    		w = workers[l]
    		workers[l] = nil
    		p.workers = workers[:l]
    		p.running++
    		p.lock.Unlock()
    	}
    	return w
    }

    定期清理过期Worker

    func (p *Pool) monitorAndClear() {
    	go func() {
    		for {
    			// 周期性循环检查过期worker并清理
    			time.Sleep(p.expiryDuration)
    			currentTime := time.Now()
    			p.lock.Lock()
    			idleWorkers := p.workers
    			n := 0
    			for i, w := range idleWorkers {
    				// 计算当前时间减去该worker的最后运行时间之差是否符合过期时长
    				if currentTime.Sub(w.recycleTime) <= p.expiryDuration {
    					break
    				}
    				n = i
    				w.stop()
    				idleWorkers[i] = nil
    			}
    			if n > 0 {
    				n++
    				p.workers = idleWorkers[n:]
    			}
    			p.lock.Unlock()
    		}
    	}()
    }

    复用Worker

    // putWorker puts a worker back into free pool, recycling the goroutines.
    func (p *Pool) putWorker(worker *Worker) {
    	// 写入回收时间,亦即该worker的最后运行时间
    	worker.recycleTime = time.Now()
    	p.lock.Lock()
    	p.running --
    	p.workers = append(p.workers, worker)
    	p.lock.Unlock()
    
    }

    动态扩容或者缩小容量

    // ReSize change the capacity of this pool
    func (p *Pool) ReSize(size int) {
    	cap := int(p.capacity)
    	if size <  cap{
    		diff := cap - size
    		for i := 0; i < diff; i++ {
    			p.getWorker().stop()
    		}
    	} else if size == cap {
    		return
    	}
    	atomic.StoreInt32(&p.capacity, int32(size))
    } 

    提交Worker

    // Submit submit a task to pool
    func (p *Pool) Submit(task functinType,str string) error {
    	if len(p.release) > 0 {
    		return errors.New("Pool is Close")
    	}
    	//创建或得到一个空闲的worker
    	w := p.getWorker()
    	w.run()
    	//将任务参数通过信道传递给它
    	w.sendarg(str)
    	//将任务通过信道传递给它
    	w.sendTask(task)
    	return nil
    }
    

      

    Worker类

    package Poolpkg
    
    import (
    	"sync/atomic"
    	"time"
    )
    
    type functinType func(string) error
    
    
    // Worker is the actual executor who runs the tasks,
    // it starts a goroutine that accepts tasks and
    // performs function calls.
    type Worker struct {
    	// pool who owns this worker.
    	pool *Pool
    	// task is a job should be done.
    	task chan functinType
    	// recycleTime will be update when putting a worker back into queue.
    	recycleTime time.Time
    
    	str chan string
    }
    
    // run starts a goroutine to repeat the process
    // that performs the function calls.
    func (w *Worker) run() {
    
    	go func() {
    		//监听任务列表,一旦有任务立马取出运行
    		count := 1
    		var str string
    		var f functinType
    		for count <=2{
    			select {
    			case str_temp, ok := <- w.str:
    				if !ok {
    					return
    				}
    				count ++
    				str = str_temp
    			case f_temp, ok := <-w.task:
    				if !ok {
    					//如果接收到关闭
    					atomic.AddInt32(&w.pool.running, -1)
    					close(w.task)
    					return
    				}
    				count  ++
    				f = f_temp
    			}
    		}
    		err := f(str)
    		if err != nil{
    			//fmt.Println("执行任务失败")
    		}
    		//回收复用
    		w.pool.putWorker(w)
    		return
    	}()
    }
    
    // stop this worker.
    func (w *Worker) stop() {
    	w.sendTask(nil)
    	close(w.str)
    }
    
    // sendTask sends a task to this worker.
    func (w *Worker) sendTask(task functinType) {
    	w.task <- task
    }
    
    func (w *Worker) sendarg(str string) {
    	w.str <- str
    }
    

      

    总结和实践

    怎么理解Woreker,task、Pool的关系

    Woker类型其实就是task的载体,Worker类型有两个很重要的参数:

    task chan functinType:用来是传递task。
    str chan string:用来传递task所需的参数。

    task是任务本身,它一般为一个函数,在程序中被定义为函数类型:

    type functinType func(string) error

    Pool存储Worker,当用户要执行一个task时,首先要得到一个Worker,必须从池中获取,获取到一个Worker后,就开启一个协程去处理,在这个协程中接收任务task和参数。

    //创建或得到一个空闲的worker
    w := p.getWorker()
    //开协程去处理 w.run() //将任务参数通过信道传递给它 w.sendarg(str) //将任务通过信道传递给它 w.sendTask(task)

    Worker怎么接收task和参数

    count定义接收数据的个数,一个Woker必须接收到task和参数才能开始工作。
    工作完后这个Worker被返回到Pool中,下次还可以复用这个Worker,也就是复用Worker这个实例。
    go func() {
    		//监听任务列表,一旦有任务立马取出运行
    		count := 1
    		var str string
    		var f functinType
    		for count <=2{
    			select {
    			case str_temp, ok := <- w.str:
    				if !ok {
    					return
    				}
    				count ++
    				str = str_temp
    			case f_temp, ok := <-w.task:
    				if !ok {
    					//如果接收到关闭
    					atomic.AddInt32(&w.pool.running, -1)
    					close(w.task)
    					return
    				}
    				count  ++
    				f = f_temp
    			}
    		}
    		err := f(str)
    		if err != nil{
    			//fmt.Println("执行任务失败")
    		}
    		//回收复用
    		w.pool.putWorker(w)
    		return
    	}()

    Pool怎么处理用户提交task获取Worker的请求

    1.先得到Pool池中空闲Worker的数量,然后判断

    2.如果小于零,则表示池中没有空闲的Worker,这里有两种原因:

    • 1.运行的Worker数量超过了Pool容量,当用户获取Worker的请求数量激增,池中大多数Worker都是执行完任务的Worker重新添加到池中的,返回的Worker跟不上激增的需求。
    • 2.当前是空pool,从未往pool添加任务或者一段时间内没有Worker任务运行,被定期清除。

    3.如果大于或者等于零,有空闲的Worker直接从池中获取最后一个Worker。

    4.如果是第二种的第一种情况,则阻塞等待池中有空闲的Worker。

    if waiting {
    		//当一个任务执行完以后会添加到池中,有了空闲的任务就可以继续执行:
    		// 阻塞等待直到有空闲worker
    		for len(p.workers) == 0{
    			continue
    		}
    		p.lock.Lock()
    		workers = p.workers
    		l := len(workers) - 1
    		w = workers[l]
    		workers[l] = nil
    		p.workers = workers[:l]
    		p.running++
    		p.lock.Unlock()
    	}
    

    5.如果是第二种的第二种情况,直接创建一个Worker实例。

    // 当前无空闲worker但是Pool还没有满,
    // 则可以直接新开一个worker执行任务
    p.running++
    w = &Worker{
    	pool: p,
    	task: make(chan functinType),
    	str:make(chan string),
    }

    测试

    package main
    
    import (
    	"Pool/Poolpkg"
    	"fmt"
    )
    
    func main(){
         //开20个大小的Pool池,过期清除时间5分钟 Pool,err := Poolpkg.NewPool(20,5) i :=0 for i < 50 { err = Pool.Submit(Print_Test1,"并发测试!") if err != nil{ fmt.Println(err) } i++ } }

    源码

    Pool

    package Poolpkg
    
    import (
    	"errors"
    	"fmt"
    	"sync"
    	"sync/atomic"
    	"time"
    )
    
    type sig struct{}
    
    
    
    // Pool accept the tasks from client,it limits the total
    // of goroutines to a given number by recycling goroutines.
    type Pool struct {
    	// capacity of the pool.
    	//capacity是该Pool的容量,也就是开启worker数量的上限,每一个worker需要一个goroutine去执行;
    	//worker类型为任务类。
    	capacity int32
    	// running is the number of the currently running goroutines.
    	//running是当前正在执行任务的worker数量
    	running int32
    	// expiryDuration set the expired time (second) of every worker.
    	//expiryDuration是worker的过期时长,在空闲队列中的worker的最新一次运行时间与当前时间之差如果大于这个值则表示已过期,定时清理任务会清理掉这个worker;
    	expiryDuration time.Duration
    	// workers is a slice that store the available workers.
    	//任务队列
    	workers []*Worker
    	// release is used to notice the pool to closed itself.
    	//当关闭该Pool支持通知所有worker退出运行以防goroutine泄露
    	release chan sig
    	// lock for synchronous operation
    	//用以支持Pool的同步操作
    	lock sync.Mutex
    	//once用在确保Pool关闭操作只会执行一次
    	once sync.Once
    }
    
    // NewPool generates a instance of ants pool
    func NewPool(size, expiry int) (*Pool, error) {
    	if size <= 0 {
    		return nil, errors.New("Pool Size <0,not Create")
    	}
    	p := &Pool{
    		capacity:       int32(size),
    		release:        make(chan sig, 1),
    		expiryDuration: time.Duration(expiry) * time.Second,
    		running:		0,
    	}
    	// 启动定期清理过期worker任务,独立goroutine运行,
    	// 进一步节省系统资源
    	p.monitorAndClear()
    	return p, nil
    }
    
    // Submit submit a task to pool
    func (p *Pool) Submit(task functinType,str string) error {
    	if len(p.release) > 0 {
    		return errors.New("Pool is Close")
    	}
    	//创建或得到一个空闲的worker
    	w := p.getWorker()
    	w.run()
    	//将任务参数通过信道传递给它
    	w.sendarg(str)
    	//将任务通过信道传递给它
    	w.sendTask(task)
    	return nil
    }
    
    // getWorker returns a available worker to run the tasks.
    func (p *Pool) getWorker() *Worker {
    	var w *Worker
    	// 标志,表示当前运行的worker数量是否已达容量上限
    	waiting := false
    	// 涉及从workers队列取可用worker,需要加锁
    	p.lock.Lock()
    	workers := p.workers
    	n := len(workers) - 1
    	fmt.Println("空闲worker数量:",n+1)
    	fmt.Println("协程池现在运行的worker数量:",p.running)
    	// 当前worker队列为空(无空闲worker)
    	if n < 0 {
    		//没有空闲的worker有两种可能:
    		//1.运行的worker超出了pool容量
    		//2.当前是空pool,从未往pool添加任务或者一段时间内没有任务添加,被定期清除
    		// 运行worker数目已达到该Pool的容量上限,置等待标志
    		if p.running >= p.capacity {
    			//print("超过上限")
    			waiting = true
    		} else {
    			// 当前无空闲worker但是Pool还没有满,
    			// 则可以直接新开一个worker执行任务
    			p.running++
    			w = &Worker{
    				pool: p,
    				task: make(chan functinType),
    				str:make(chan string),
    			}
    		}
    		// 有空闲worker,从队列尾部取出一个使用
    	} else {
    		//<-p.freeSignal
    		w = workers[n]
    		workers[n] = nil
    		p.workers = workers[:n]
    		p.running++
    	}
    	// 判断是否有worker可用结束,解锁
    	p.lock.Unlock()
    	if waiting {
    		//当一个任务执行完以后会添加到池中,有了空闲的任务就可以继续执行:
    		// 阻塞等待直到有空闲worker
    		for len(p.workers) == 0{
    			continue
    		}
    		p.lock.Lock()
    		workers = p.workers
    		l := len(workers) - 1
    		w = workers[l]
    		workers[l] = nil
    		p.workers = workers[:l]
    		p.running++
    		p.lock.Unlock()
    	}
    	return w
    }
    
    //定期清理过期Worker
    func (p *Pool) monitorAndClear() {
    	go func() {
    		for {
    			// 周期性循环检查过期worker并清理
    			time.Sleep(p.expiryDuration)
    			currentTime := time.Now()
    			p.lock.Lock()
    			idleWorkers := p.workers
    			n := 0
    			for i, w := range idleWorkers {
    				// 计算当前时间减去该worker的最后运行时间之差是否符合过期时长
    				if currentTime.Sub(w.recycleTime) <= p.expiryDuration {
    					break
    				}
    				n = i
    				w.stop()
    				idleWorkers[i] = nil
    				p.running--
    			}
    			if n > 0 {
    				n++
    				p.workers = idleWorkers[n:]
    			}
    			p.lock.Unlock()
    		}
    	}()
    }
    
    //Worker回收(goroutine复用)
    // putWorker puts a worker back into free pool, recycling the goroutines.
    func (p *Pool) putWorker(worker *Worker) {
    	// 写入回收时间,亦即该worker的最后运行时间
    	worker.recycleTime = time.Now()
    	p.lock.Lock()
    	p.running --
    	p.workers = append(p.workers, worker)
    	p.lock.Unlock()
    
    }
    
    //动态扩容或者缩小池容量
    // ReSize change the capacity of this pool
    func (p *Pool) ReSize(size int) {
    	cap := int(p.capacity)
    	if size <  cap{
    		diff := cap - size
    		for i := 0; i < diff; i++ {
    			p.getWorker().stop()
    		}
    	} else if size == cap {
    		return
    	}
    	atomic.StoreInt32(&p.capacity, int32(size))
    }
    

    Woker

    package Poolpkg
    
    import (
    	"sync/atomic"
    	"time"
    )
    
    type functinType func(string) error
    
    
    // Worker is the actual executor who runs the tasks,
    // it starts a goroutine that accepts tasks and
    // performs function calls.
    type Worker struct {
    	// pool who owns this worker.
    	pool *Pool
    	// task is a job should be done.
    	task chan functinType
    	// recycleTime will be update when putting a worker back into queue.
    	recycleTime time.Time
    
    	str chan string
    }
    
    // run starts a goroutine to repeat the process
    // that performs the function calls.
    func (w *Worker) run() {
    
    	go func() {
    		//监听任务列表,一旦有任务立马取出运行
    		count := 1
    		var str string
    		var f functinType
    		for count <=2{
    			select {
    			case str_temp, ok := <- w.str:
    				if !ok {
    					return
    				}
    				count ++
    				str = str_temp
    			case f_temp, ok := <-w.task:
    				if !ok {
    					//如果接收到关闭
    					atomic.AddInt32(&w.pool.running, -1)
    					close(w.task)
    					return
    				}
    				count  ++
    				f = f_temp
    			}
    		}
    		err := f(str)
    		if err != nil{
    			//fmt.Println("执行任务失败")
    		}
    		//回收复用
    		w.pool.putWorker(w)
    		return
    	}()
    }
    
    // stop this worker.
    func (w *Worker) stop() {
    	w.sendTask(nil)
    	close(w.str)
    }
    
    // sendTask sends a task to this worker.
    func (w *Worker) sendTask(task functinType) {
    	w.task <- task
    }
    
    func (w *Worker) sendarg(str string) {
    	w.str <- str
    }
    

     

  • 相关阅读:
    快速搞懂.NET 5/.NET Core应用程序的发布部署
    .NET 5 程序高级调试-WinDbg
    mmap出现 Permission denied
    Java int和integer有什么区别 (mybatis踩坑)
    NodeJS mysql timestamp 数据插入失败的问题
    mysql case when 用法
    postcss 源码解析以及运用
    rust漫游
    关于接口设计的思考--我们真的需要这么多入参吗
    详解apollo的设计与使用
  • 原文地址:https://www.cnblogs.com/-wenli/p/12378699.html
Copyright © 2011-2022 走看看