zoukankan      html  css  js  c++  java
  • Golang(九)简单 Goroutine 池实现

    0. 前言

    • 最近使用 Golang 写一个并发执行的测试脚本
    • 之前习惯使用 Java,习惯性想先建一个线程池。然后意识到 Golang 没有封装好的线程池
    • 结合之前学习的 Goroutine 原理和 Golang 大道至简的设计思想,可能 Goroutine 的开销和切换代价比较低,不需要对并发数有过多限制
    • 但是 Goroutine 启动数量过多的话总感觉不太好,于是利用锁和通道实现了简单的线程池做并发控制,欢迎大家点评
    • 源码地址:https://github.com/wangao1236/GoPool

    1. 相关接口

    • 接口仿照 Java 的 ExecutorService 和 Runnable 接口定义:
    // Task represents an in-process Goroutine task.
    type Task interface {
        // Run method corresponds to Run method of Java's Runnable interface.
        Run()
    }
    
    // Executor defines the actions associated with the Goroutine pool.
    type Executor interface {
        // Execute method corresponds to Execute method of Java's ExecutorService interface.
        Execute(task Task)
        // Wait waits for all the tasks to complete.
        Wait()
        // Done returns a channel which is closed if all the tasks completed.
        Done() chan struct{}
    }
    View Code
    • 接口调用:
    func TestNewExecutor(t *testing.T) {
        t.Log(t.Name())
        ex := NewExecutor(4)
    
        for _, domain := range domains {
            ex.Execute(&TestTask{
                fmt.Sprintf("ping %s -c 10", domain),
            })
        }
        ex.Wait()
    }
    View Code
    • 首先定义一个 Executor,然后通过 Execute 传入 Task 对象,调用 Wait 方法等待所有任务结束

    2. 具体实现

    • 主要利用 sync.Mutex 和 []channel struct{} 维护一个等待执行的任务队列
    • 任务传入时,等待一个 startCh 通道信号
    • 对于符合执行条件(未设定并发数或者当前执行任务小于并发数时)关闭 startCh 信号,解除阻塞
    • 任务执行完毕后,关闭 stopCh 信号,允许等待队列里的任务继续执行
    • 对于所有任务执行完毕(正在执行数和等待执行数均为 0),关闭 done 信号,解除整个 Executor 的阻塞,表示所有任务执行完毕
    • 部分代码如下:
    package pkg
    
    import (
        "sync"
    )
    
    // Task represents an in-process Goroutine task.
    type Task interface {
        // Run method corresponds to Run method of Java's Runnable interface.
        Run()
    }
    
    // Executor defines the actions associated with the Goroutine pool.
    type Executor interface {
        // Execute method corresponds to Execute method of Java's ExecutorService interface.
        Execute(task Task)
        // Wait waits for all the tasks to complete.
        Wait()
        // Done returns a channel which is closed if all the tasks completed.
        Done() chan struct{}
    }
    
    type executor struct {
        lock             sync.Mutex
        waitingTasks     []chan struct{}
        activeTasks      int64
        concurrencyLimit int64
        done           chan struct{}
    }
    
    func (ex *executor) Execute(task Task) {
        ex.start(task)
    }
    
    func (ex *executor) Wait() {
        <-ex.done
    }
    
    func (ex *executor) Done() chan struct{} {
        return ex.done
    }
    
    func (ex *executor) start(task Task) {
        startCh := make(chan struct{})
        stopCh := make(chan struct{})
    
        go startTask(startCh, stopCh, task)
        ex.enqueue(startCh)
        go ex.waitDone(stopCh)
    
    }
    
    // NewExecutor returns a new Executor.
    func NewExecutor(concurrencyLimit int64) Executor {
        ex := &executor{
            waitingTasks:     make([]chan struct{}, 0),
            activeTasks:      0,
            concurrencyLimit: concurrencyLimit,
            done:           make(chan struct{}),
        }
        return ex
    }
    
    func startTask(startCh, stopCh chan struct{}, task Task) {
        defer close(stopCh)
    
        <-startCh
        task.Run()
    }
    
    func (ex *executor) enqueue(startCh chan struct{}) {
        ex.lock.Lock()
        defer ex.lock.Unlock()
    
        if ex.concurrencyLimit == 0 || ex.activeTasks < ex.concurrencyLimit {
            close(startCh)
            ex.activeTasks++
        } else {
            ex.waitingTasks = append(ex.waitingTasks, startCh)
        }
    }
    
    func (ex *executor) waitDone(stopCh chan struct{}) {
        <-stopCh
    
        ex.lock.Lock()
        defer ex.lock.Unlock()
    
        if len(ex.waitingTasks) == 0 {
            ex.activeTasks--
            if ex.activeTasks == 0 {
                close(ex.done)
            }
        } else {
            close(ex.waitingTasks[0])
            ex.waitingTasks = ex.waitingTasks[1:]
        }
    }
    View Code
    • 通过 Executor 传入的任务首先开始执行 start 方法
    • start 方法里定义了该任务的 startCh 和 stopCh 信号
    • 各启动一个 Goroutine 等待任务开始和任务结束
    • 同时把表示任务的 startCh 加入等待队列中表示,队列需要靠 sync.Mutex 保护
    • 当一个任务结束时,解除 waitDone 方法的阻塞,启动队首的任务,解除 startTask 里的阻塞
    • 所有任务结束后,解除 Wait 方法里的阻塞
    • 完整代码参见上述链接
  • 相关阅读:
    2018 ACM 网络选拔赛 徐州赛区
    2018 ACM 网络选拔赛 焦作赛区
    2018 ACM 网络选拔赛 沈阳赛区
    poj 2289 网络流 and 二分查找
    poj 2446 二分图最大匹配
    poj 1469 二分图最大匹配
    poj 3249 拓扑排序 and 动态规划
    poj 3687 拓扑排序
    poj 2585 拓扑排序
    poj 1094 拓扑排序
  • 原文地址:https://www.cnblogs.com/wangao1236/p/Golang.html
Copyright © 2011-2022 走看看