zoukankan      html  css  js  c++  java
  • Golang(九)简单 Goroutine 池实现

    0. 前言

    • 最近使用 Golang 写一个并发执行的测试脚本
    • 之前习惯使用 Java,习惯性想先建一个线程池。然后意识到 Golang 没有封装好的线程池
    • 结合之前学习的 Goroutine 原理和 Golang 大道至简的设计思想,可能 Goroutine 的开销和切换代价比较低,不需要对并发数有过多限制
    • 但是 Goroutine 启动数量过多的话总感觉不太好,于是利用锁和通道实现了简单的线程池做并发控制,欢迎大家点评
    • 源码地址:https://github.com/wangao1236/GoPool

    1. 相关接口

    • 接口仿照 Java 的 ExecutorService 和 Runnable 接口定义:
    // Task represents an in-process Goroutine task.
    type Task interface {
        // Run method corresponds to Run method of Java's Runnable interface.
        Run()
    }
    
    // Executor defines the actions associated with the Goroutine pool.
    type Executor interface {
        // Execute method corresponds to Execute method of Java's ExecutorService interface.
        Execute(task Task)
        // Wait waits for all the tasks to complete.
        Wait()
        // Done returns a channel which is closed if all the tasks completed.
        Done() chan struct{}
    }
    View Code
    • 接口调用:
    func TestNewExecutor(t *testing.T) {
        t.Log(t.Name())
        ex := NewExecutor(4)
    
        for _, domain := range domains {
            ex.Execute(&TestTask{
                fmt.Sprintf("ping %s -c 10", domain),
            })
        }
        ex.Wait()
    }
    View Code
    • 首先定义一个 Executor,然后通过 Execute 传入 Task 对象,调用 Wait 方法等待所有任务结束

    2. 具体实现

    • 主要利用 sync.Mutex 和 []channel struct{} 维护一个等待执行的任务队列
    • 任务传入时,等待一个 startCh 通道信号
    • 对于符合执行条件(未设定并发数或者当前执行任务小于并发数时)关闭 startCh 信号,解除阻塞
    • 任务执行完毕后,关闭 stopCh 信号,允许等待队列里的任务继续执行
    • 对于所有任务执行完毕(正在执行数和等待执行数均为 0),关闭 done 信号,解除整个 Executor 的阻塞,表示所有任务执行完毕
    • 部分代码如下:
    package pkg
    
    import (
        "sync"
    )
    
    // Task represents an in-process Goroutine task.
    type Task interface {
        // Run method corresponds to Run method of Java's Runnable interface.
        Run()
    }
    
    // Executor defines the actions associated with the Goroutine pool.
    type Executor interface {
        // Execute method corresponds to Execute method of Java's ExecutorService interface.
        Execute(task Task)
        // Wait waits for all the tasks to complete.
        Wait()
        // Done returns a channel which is closed if all the tasks completed.
        Done() chan struct{}
    }
    
    type executor struct {
        lock             sync.Mutex
        waitingTasks     []chan struct{}
        activeTasks      int64
        concurrencyLimit int64
        done           chan struct{}
    }
    
    func (ex *executor) Execute(task Task) {
        ex.start(task)
    }
    
    func (ex *executor) Wait() {
        <-ex.done
    }
    
    func (ex *executor) Done() chan struct{} {
        return ex.done
    }
    
    func (ex *executor) start(task Task) {
        startCh := make(chan struct{})
        stopCh := make(chan struct{})
    
        go startTask(startCh, stopCh, task)
        ex.enqueue(startCh)
        go ex.waitDone(stopCh)
    
    }
    
    // NewExecutor returns a new Executor.
    func NewExecutor(concurrencyLimit int64) Executor {
        ex := &executor{
            waitingTasks:     make([]chan struct{}, 0),
            activeTasks:      0,
            concurrencyLimit: concurrencyLimit,
            done:           make(chan struct{}),
        }
        return ex
    }
    
    func startTask(startCh, stopCh chan struct{}, task Task) {
        defer close(stopCh)
    
        <-startCh
        task.Run()
    }
    
    func (ex *executor) enqueue(startCh chan struct{}) {
        ex.lock.Lock()
        defer ex.lock.Unlock()
    
        if ex.concurrencyLimit == 0 || ex.activeTasks < ex.concurrencyLimit {
            close(startCh)
            ex.activeTasks++
        } else {
            ex.waitingTasks = append(ex.waitingTasks, startCh)
        }
    }
    
    func (ex *executor) waitDone(stopCh chan struct{}) {
        <-stopCh
    
        ex.lock.Lock()
        defer ex.lock.Unlock()
    
        if len(ex.waitingTasks) == 0 {
            ex.activeTasks--
            if ex.activeTasks == 0 {
                close(ex.done)
            }
        } else {
            close(ex.waitingTasks[0])
            ex.waitingTasks = ex.waitingTasks[1:]
        }
    }
    View Code
    • 通过 Executor 传入的任务首先开始执行 start 方法
    • start 方法里定义了该任务的 startCh 和 stopCh 信号
    • 各启动一个 Goroutine 等待任务开始和任务结束
    • 同时把表示任务的 startCh 加入等待队列中表示,队列需要靠 sync.Mutex 保护
    • 当一个任务结束时,解除 waitDone 方法的阻塞,启动队首的任务,解除 startTask 里的阻塞
    • 所有任务结束后,解除 Wait 方法里的阻塞
    • 完整代码参见上述链接
  • 相关阅读:
    H5游戏开发之抓住小恐龙
    jQuery.noConflict()
    H5游戏开发之多边形碰撞检测
    windows命令行快速启动软件
    Spring MVC系列[1]—— HelloWorld
    过马路与异常
    超简单!一步创建自己的wifi热点~
    朱熹、王阳明与面向对象
    jenkins SVN更改密码后出现的坑爹问题
    Jenkins权限控制
  • 原文地址:https://www.cnblogs.com/wangao1236/p/Golang.html
Copyright © 2011-2022 走看看