zoukankan      html  css  js  c++  java
  • Go语言同步和异步执行多个任务封装

    同步适合多个连续执行的,每一步的执行依赖于上一步操作,异步执行则和任务执行顺序无关(如从10个站点抓取数据)

    同步执行类RunnerAsync

    支持返回超时检测,系统中断检测

    错误常量定义,task/err.go

    package task
    
    import "errors"
    
    
    //超时错误
    var ErrTimeout = errors.New("received timeout")
    
    //操作系统系统中断错误
    var ErrInterrupt = errors.New("received interrupt")

    实现代码如下,task/runner_async.go

    package task
    
    import (
        "os"
        "os/signal"
        "time"
    )
    
    //同步执行任务
    type RunnerAsync struct {
        //操作系统的信号检测
        interrupt chan os.Signal
    
        //记录执行完成的状态
        complete chan error
    
        //超时检测
        timeout <-chan time.Time
    
        //保存所有要执行的任务,顺序执行
        tasks []func(id int)
    }
    
    //new一个RunnerAsync对象
    func NewRunnerAsync(d time.Duration) *RunnerAsync {
        return &RunnerAsync{
            interrupt: make(chan os.Signal, 1),
            complete:  make(chan error),
            timeout:   time.After(d),
        }
    }
    
    //添加一个任务
    func (this *RunnerAsync) Add(tasks ...func(id int)) {
        this.tasks = append(this.tasks, tasks...)
    }
    
    //启动RunnerAsync,监听错误信息
    func (this *RunnerAsync) Start() error {
    
        //接收操作系统信号
        signal.Notify(this.interrupt, os.Interrupt)
    
        //执行任务
        go func() {
            this.complete <- this.Run()
        }()
    
        select {
        //返回执行结果
        case err := <-this.complete:
            return err
    
            //超时返回
        case <-this.timeout:
            return ErrTimeout
        }
    }
    
    //顺序执行所有的任务
    func (this *RunnerAsync) Run() error {
        for id, task := range this.tasks {
            if this.gotInterrupt() {
                return ErrInterrupt
            }
            //执行任务
            task(id)
        }
        return nil
    }
    
    //判断是否接收到操作系统中断信号
    func (this *RunnerAsync) gotInterrupt() bool {
        select {
        case <-this.interrupt:
            //停止接收别的信号
            signal.Stop(this.interrupt)
            return true
            //正常执行
        default:
            return false
        }
    }
    

    使用方法    

    Add添加一个任务,任务为接收int类型的一个闭包

    Start开始执行伤,返回一个error类型,nil为执行完毕, ErrTimeout代表执行超时,ErrInterrupt代表执行被中断(类似Ctrl + C操作)

    测试代码

    task/runner_async_test.go

    package task
    
    import (
    	"fmt"
    	"os"
    	"runtime"
    	"testing"
    	"time"
    )
    
    func TestRunnerAsync_Start(t *testing.T) {
    
    	//开启多核
    	runtime.GOMAXPROCS(runtime.NumCPU())
    
    	//创建runner对象,设置超时时间
    	runner := NewRunnerAsync(8 * time.Second)
    	//添加运行的任务
    	runner.Add(
    		createTaskAsync(),
    		createTaskAsync(),
    		createTaskAsync(),
    		createTaskAsync(),
    		createTaskAsync(),
    		createTaskAsync(),
    		createTaskAsync(),
    		createTaskAsync(),
    		createTaskAsync(),
    		createTaskAsync(),
    		createTaskAsync(),
    		createTaskAsync(),
    		createTaskAsync(),
    	)
    
    	fmt.Println("同步执行任务")
    
    	//开始执行任务
    	if err := runner.Start(); err != nil {
    		switch err {
    		case ErrTimeout:
    			fmt.Println("执行超时")
    			os.Exit(1)
    		case ErrInterrupt:
    			fmt.Println("任务被中断")
    			os.Exit(2)
    		}
    	}
    
    	t.Log("执行结束")
    
    }
    
    //创建要执行的任务
    func createTaskAsync() func(id int) {
    	return func(id int) {
    		fmt.Printf("正在执行%v个任务
    ", id)
    		//模拟任务执行,sleep两秒
    		//time.Sleep(1 * time.Second)
    	}
    }

    执行结果

    同步执行任务
    正在执行0个任务
    正在执行1个任务
    正在执行2个任务
    正在执行3个任务
    正在执行4个任务
    正在执行5个任务
    正在执行6个任务
    正在执行7个任务
    正在执行8个任务
    正在执行9个任务
    正在执行10个任务
    正在执行11个任务
    正在执行12个任务
    

      

    异步执行类Runner

    支持返回超时检测,系统中断检测

    实现代码如下,task/runner.go

    package task
    
    import (
        "os"
        "time"
        "os/signal"
        "sync"
    )
    
    //异步执行任务
    type Runner struct {
        //操作系统的信号检测
        interrupt chan os.Signal
    
        //记录执行完成的状态
        complete chan error
    
        //超时检测
        timeout <-chan time.Time
    
        //保存所有要执行的任务,顺序执行
        tasks []func(id int) error
    
        waitGroup sync.WaitGroup
    
        lock sync.Mutex
    
        errs []error
    }
    
    //new一个Runner对象
    func NewRunner(d time.Duration) *Runner {
        return &Runner{
            interrupt: make(chan os.Signal, 1),
            complete:  make(chan error),
            timeout:   time.After(d),
            waitGroup: sync.WaitGroup{},
            lock:      sync.Mutex{},
        }
    }
    
    //添加一个任务
    func (this *Runner) Add(tasks ...func(id int) error) {
        this.tasks = append(this.tasks, tasks...)
    }
    
    //启动Runner,监听错误信息
    func (this *Runner) Start() error {
    
        //接收操作系统信号
        signal.Notify(this.interrupt, os.Interrupt)
    
        //并发执行任务
        go func() {
            this.complete <- this.Run()
        }()
    
        select {
        //返回执行结果
        case err := <-this.complete:
            return err
            //超时返回
        case <-this.timeout:
            return ErrTimeout
        }
    }
    
    //异步执行所有的任务
    func (this *Runner) Run() error {
        for id, task := range this.tasks {
            if this.gotInterrupt() {
                return ErrInterrupt
            }
    
            this.waitGroup.Add(1)
            go func(id int) {
                this.lock.Lock()
    
                //执行任务
                err := task(id)
                //加锁保存到结果集中
                this.errs = append(this.errs, err)
    
                this.lock.Unlock()
                this.waitGroup.Done()
            }(id)
        }
        this.waitGroup.Wait()
    
        return nil
    }
    
    //判断是否接收到操作系统中断信号
    func (this *Runner) gotInterrupt() bool {
        select {
        case <-this.interrupt:
            //停止接收别的信号
            signal.Stop(this.interrupt)
            return true
            //正常执行
        default:
            return false
        }
    }
    
    //获取执行完的error
    func (this *Runner) GetErrs() []error {
        return this.errs
    }

    使用方法    

    Add添加一个任务,任务为接收int类型,返回类型error的一个闭包

    Start开始执行伤,返回一个error类型,nil为执行完毕, ErrTimeout代表执行超时,ErrInterrupt代表执行被中断(类似Ctrl + C操作)

    getErrs获取所有的任务执行结果

    测试示例代码

    task/runner_test.go

    package task
    
    import (
        "testing"
        "time"
        "fmt"
        "os"
        "runtime"
    )
    
    func TestRunner_Start(t *testing.T) {
        //开启多核心
        runtime.GOMAXPROCS(runtime.NumCPU())
    
        //创建runner对象,设置超时时间
        runner := NewRunner(18 * time.Second)
        //添加运行的任务
        runner.Add(
            createTask(),
            createTask(),
            createTask(),
            createTask(),
            createTask(),
            createTask(),
            createTask(),
            createTask(),
            createTask(),
            createTask(),
            createTask(),
            createTask(),
            createTask(),
            createTask(),
        )
    
        fmt.Println("异步执行任务")
    
        //开始执行任务
        if err := runner.Start(); err != nil {
            switch err {
            case ErrTimeout:
                fmt.Println("执行超时")
                os.Exit(1)
            case ErrInterrupt:
                fmt.Println("任务被中断")
                os.Exit(2)
            }
        }
    
        t.Log("执行结束")
    
        t.Log(runner.GetErrs())
    
    }
    
    //创建要执行的任务
    func createTask() func(id int) error {
        return func(id int) error {
            fmt.Printf("正在执行%v个任务
    ", id)
            //模拟任务执行,sleep
            //time.Sleep(1 * time.Second)
            return nil
        }
    }

    执行结果

    异步执行任务
    正在执行2个任务
    正在执行1个任务
    正在执行4个任务
    正在执行3个任务
    正在执行6个任务
    正在执行5个任务
    正在执行9个任务
    正在执行7个任务
    正在执行10个任务
    正在执行13个任务
    正在执行8个任务
    正在执行11个任务
    正在执行12个任务
    正在执行0个任务 
    

      

     

  • 相关阅读:
    Linux 如何查看当前目录
    Docker快速入手实战笔记
    【ssh】ssh登录出现‘The authenticity of host ‘IP’ can't be established.’的问题
    【AFL(七)】afl-fuzz.c小改——输出文件夹暂存
    【steam】Steam背景美化——长展柜终极指南
    【AFL(六)】AFL源码中的那些头文件
    【AFL(五)】文件变异策略
    【Latex】详细的简易教程——写在论文开始之前
    【Latex】论文写作工具:VScode 2019 + latex workshop
    【AFL(四)】afl-cmin修改:文件夹相关操作鲁棒性
  • 原文地址:https://www.cnblogs.com/chenqionghe/p/8269556.html
Copyright © 2011-2022 走看看