zoukankan      html  css  js  c++  java
  • golang 第三方定时任务库 github.com/robfig/cron/v3 核心源码解读

    定时任务是一个通用场景的功能,在golang中,现在github最为常见的一个第三方定时任务库就是 github.com/robfig/cron/v3 目前(2020年1月9日) 7.2k Star。
    我之前使用Python的时候用惯了apscheduler,切换这个是真的不习惯。

    个人博客原文地址

    https://www.charmcode.cn/article/2021-01-09_golang_cron_code

    感觉github.com/robfig/cron/v3功能太简陋了,

    • 不支持定时任务持久化,我重启一下服务,调度任务信息就没了,需要自己存储调度信息。
    • 再比如不支持一次定时见issue等,虽然有PR 但是 v3 分支还是依旧不支持,parse文件函数不支持,虽然可以按照作者的说法,调用一次之后再调用移除可以实现。
    • 不支持立即运行,见issue ,作者表示可以调用后手动调用解决,但是我感觉不够优雅,没有指定首次运行时间方便。(我突然有种想提PR的冲动,哈哈哈)

    综上,个人感觉这个库封装的不是很完善,作为一个golang新手,读解析一下这个定时任务库,还是很有收获的。如果能力允许,以解决以上问题为目标,自己提PR。

    注意

    文章内容皆为个人见解,并且只看了核心的实现方式,细节部分没有解析,不保证准确,如果和你的理解有歧义,以你的为准。

    前置知识

    你需要掌握golang的 goroutine知识,包括channel通信,select多路复用, time.NewTimer等知识,否则解读起来就会很困难。
    time.NewTimer的作用

    func main(){
    	// Calling NewTimer method
    	timer := time.NewTimer(5 * time.Second)
    
    	// Notifying the channel
    	<-timer.C
    
    	// Printed after 5 seconds 5秒之后输出
    	fmt.Println("Timer is inactivated")
    }
    

    简单的demo

    更多使用demo可以参考 个人Go学习笔记

    
    package _1_demo
    
    import (
    	"fmt"
    	"github.com/robfig/cron/v3"
    	"testing"
    	"time"
    )
    
    // 定时任务
    func jobTask() {
    	fmt.Printf( "任务启动: %s 
    ",time.Now().Format("2006-01-02 15:04:05"))
    }
    
    func TestCron(t *testing.T) {
    	// 创建一个cron对象
    	c := cron.New()
    
    	// 任务调度
    	enterId, err := c.AddFunc("@every 3s", jobTask)
    	if err!=nil{
    		panic(err)
    	}
    	fmt.Printf("任务id是 %d 
    ", enterId)
    
    	// 同步执行任务会阻塞当前执行顺序  一般使用Start()
    	//c.Run()
    	//fmt.Println("当前执行顺序.......")
    
    	// goroutine 协程启动定时任务(看到后面Start函数和run()函数,就会明白启动这一步也可以写在任务调度之前执行)
    	c.Start()
    	// Start()内部有一个running 布尔值 限制只有一个Cron对象启动 所以这一步多个 c.Start() 也只会有一个运行
    	c.Start()
    	c.Start()
    
    	// 用于阻塞 后面可以使用 select {} 阻塞
    	time.Sleep(time.Second * 9)
    
    	// 关闭定时任务(其实不关闭也可以,主进程直接结束了, 内部的goroutine协程也会自动结束)
    	c.Stop()
    
    }
    

    源码解读

    核心文件主要就是cron.go文件

    首先可以看到 c := cron.New() 创建了这个 Cron结构体对象

    type Cron struct {
    	entries   []*Entry                         // 用于存放job指针对象的数组
    	chain     Chain
    	stop      chan struct{}                  // 定制调度任务
    	add       chan *Entry                    // 添加一个调度任务
    	remove    chan EntryID               // 移除 一个调度任务
    	snapshot  chan chan []Entry     // 正在运行中的调度任务 
    	running   bool                             // 保证整个Cron对象只启动一次 和启动后其他chan正常
    	logger    Logger                          // 记录日志
    	runningMu sync.Mutex              // 协程锁,确保执行安全
    	location  *time.Location            // 时区
    	parser    ScheduleParser           // 解析参数
    	nextID    EntryID                         // 下一个调度任务的id
    	jobWaiter sync.WaitGroup        // 确保单一的调度任务执行完毕
    }
    

    Entry包含那些

    
    // Entry consists of a schedule and the func to execute on that schedule.
    type Entry struct {
    	// ID is the cron-assigned ID of this entry, which may be used to look up a
    	// snapshot or remove it.
    	ID EntryID  // 任务调度Id,默认是自增 创建任务时返回
    
    	// Schedule on which this job should be run.
    	Schedule Schedule // 调度任务运行
    
    	// Next time the job will run, or the zero time if Cron has not been
    	// started or this entry's schedule is unsatisfiable
    	Next time.Time       // 下次执行时间
    
    	// Prev is the last time this job was run, or the zero time if never.
    	Prev time.Time      // 上次执行时间
    
    	// WrappedJob is the thing to run when the Schedule is activated.
    	WrappedJob Job      // 执行的任务
    
    	// Job is the thing that was submitted to cron.
    	// It is kept around so that user code that needs to get at the job later,
    	// e.g. via Entries() can do so.
    	Job Job
    }
    
    

    调度任务enterId, err := c.AddFunc("@every 3s", jobTask) 会使用以下两个文件来解析定时执行的参数,也就是翻译给golang 解析@erery 3s是干什么

    启动c.Start()

    // Start the cron scheduler in its own goroutine, or no-op if already started.
    func (c *Cron) Start() {
    	c.runningMu.Lock()
    	defer c.runningMu.Unlock()
    	if c.running {
    		return
    	}
    	c.running = true
    	go c.run()
    }
    

    可以看到Start() 执行了三个操作

    • 1 上锁 最后解锁
    • 2 判断此对象的状态是否正在运行,如果运行了直接 return
    • 3 如果没有运行,就修改状态,然后启动协程运行 run()方法

    核心逻辑run()方法

    这里需要知道的知识

    • time.NewTimer(d Duration)返回一个sleep指定时间的channel
    • select{} 多路复用阻塞
      • 任意一个case满足select{}就回直接执行结束
    • sort.Sort 的用法(我这里不是很熟悉)
    // run the scheduler.. this is private just due to the need to synchronize
    // access to the 'running' state variable.
    func (c *Cron) run() {
    	c.logger.Info("start")
    
    	// Figure out the next activation times for each entry.
    	now := c.now()  // 获取现在时间
            // 循环调度任务 计算下一次执行时间
    	for _, entry := range c.entries {  
    		entry.Next = entry.Schedule.Next(now)
    		c.logger.Info("schedule", "now", now, "entry", entry.ID, "next", entry.Next)
    	}
            
            // 最外层死循环 这一层会一直存在
    	for {
    		// Determine the next entry to run.
    		sort.Sort(byTime(c.entries)) // 排序确定下一个要运行的目标
    
    		var timer *time.Timer // 声明一个Timer 指针变量
    		if len(c.entries) == 0 || c.entries[0].Next.IsZero() {
    			// If there are no entries yet, just sleep - it still handles new entries
    			// and stop requests.
                            // 如果cron启动后 还没有 调度信息的话 就生成一个sleep10W小时的 chan Time,用于阻塞下面的 select{} ,因为`select`是多路复用,其他channel能返回数据时,select就回执行不会阻塞。
                            // 所以当没有任务时,启动Start()程序 就会被这个阻塞
    			timer = time.NewTimer(100000 * time.Hour)
    		} else {
                            // 如果有调度信息,就 sleep 调度任务中第一个的 循环时间 
    			timer = time.NewTimer(c.entries[0].Next.Sub(now))
    		}
                    // 第二层死循环  内部使用select{}阻塞
    		for {
    			select {
                            // 上一步中的 timer sleep时间如果到了就执行
    			case now = <-timer.C:
    				now = now.In(c.location)
    				c.logger.Info("wake", "now", now)
    
    				// Run every entry whose next time was less than now
    				for _, e := range c.entries {
    					if e.Next.After(now) || e.Next.IsZero() {
    						break
    					}
    					c.startJob(e.WrappedJob)
    					e.Prev = e.Next
    					e.Next = e.Schedule.Next(now)
    					c.logger.Info("run", "now", now, "entry", e.ID, "next", e.Next)
    				}
                            // 向Cron中添加了 一个调度任务就会执行
    			case newEntry := <-c.add:
    				timer.Stop()
    				now = c.now()
    				newEntry.Next = newEntry.Schedule.Next(now)
    				c.entries = append(c.entries, newEntry)
    				c.logger.Info("added", "now", now, "entry", newEntry.ID, "next", newEntry.Next)
                            
    			case replyChan := <-c.snapshot:
    				replyChan <- c.entrySnapshot()
    				continue
                            // 停止定时任务
    			case <-c.stop:
    				timer.Stop()
    				c.logger.Info("stop")
    				return
                            // 移除任务
    			case id := <-c.remove:
    				timer.Stop()
    				now = c.now()
    				c.removeEntry(id)
    				c.logger.Info("removed", "entry", id)
    			}
                            // 当以上任意一个channel满足时,就会结束内层循环 重复上一层步骤
    			break
    		}
    	}
    }
    
    

    自己的总结

    这个robfig/cron/v3 这个库实现定时任务的核心逻辑,就是利用以下几个点:

    • 主体for循环
      • 循环配合time.NewTimerchannel sleep确保定时任务能定时执行
    • select多路选择阻塞
      • 确保能在任意时间改变其case中的channel 执行
    • time.NewTimersleep定时时间

    疑惑的地方

    自己看完还是有很多疑惑的地方,很多函数用法,没怎么用过比如:

    • snapshot chan chan []Entry
      • 定义一个chan 类型是 chan []Entry ?? 没怎么见过这种用法
    • 其他函数用法(慢慢学习吧)

    总体来说思路设计的很巧妙,感觉如果只是单纯的写web接口的话,很少直接接触到这样的设计。
    最后顺便说一句Golang简单的语法是真的方便。

  • 相关阅读:
    微信OpenID获取
    2015总结及2016目标
    python start
    csv到mysql数据库如何分割
    读书 --- 老码识途
    读书--编写高质量代码 改善C#程序的157个建议2
    读书--编写高质量代码 改善C#程序的157个建议
    BinarySearch
    在aspx中写c#
    AWS 2020 Innovate所有视频
  • 原文地址:https://www.cnblogs.com/CharmCode/p/14254510.html
Copyright © 2011-2022 走看看