zoukankan      html  css  js  c++  java
  • golang 第三方定时任务库 github.com/robfig/cron/v3 核心源码解读

    定时任务是一个通用场景的功能,在golang中,现在github最为常见的一个第三方定时任务库就是 github.com/robfig/cron/v3 目前(2020年1月9日) 7.2k Star。
    我之前使用Python的时候用惯了apscheduler,切换这个是真的不习惯。

    个人博客原文地址

    https://www.charmcode.cn/article/2021-01-09_golang_cron_code

    感觉github.com/robfig/cron/v3功能太简陋了,

    • 不支持定时任务持久化,我重启一下服务,调度任务信息就没了,需要自己存储调度信息。
    • 再比如不支持一次定时见issue等,虽然有PR 但是 v3 分支还是依旧不支持,parse文件函数不支持,虽然可以按照作者的说法,调用一次之后再调用移除可以实现。
    • 不支持立即运行,见issue ,作者表示可以调用后手动调用解决,但是我感觉不够优雅,没有指定首次运行时间方便。(我突然有种想提PR的冲动,哈哈哈)

    综上,个人感觉这个库封装的不是很完善,作为一个golang新手,读解析一下这个定时任务库,还是很有收获的。如果能力允许,以解决以上问题为目标,自己提PR。

    注意

    文章内容皆为个人见解,并且只看了核心的实现方式,细节部分没有解析,不保证准确,如果和你的理解有歧义,以你的为准。

    前置知识

    你需要掌握golang的 goroutine知识,包括channel通信,select多路复用, time.NewTimer等知识,否则解读起来就会很困难。
    time.NewTimer的作用

    func main(){
    	// Calling NewTimer method
    	timer := time.NewTimer(5 * time.Second)
    
    	// Notifying the channel
    	<-timer.C
    
    	// Printed after 5 seconds 5秒之后输出
    	fmt.Println("Timer is inactivated")
    }
    

    简单的demo

    更多使用demo可以参考 个人Go学习笔记

    
    package _1_demo
    
    import (
    	"fmt"
    	"github.com/robfig/cron/v3"
    	"testing"
    	"time"
    )
    
    // 定时任务
    func jobTask() {
    	fmt.Printf( "任务启动: %s 
    ",time.Now().Format("2006-01-02 15:04:05"))
    }
    
    func TestCron(t *testing.T) {
    	// 创建一个cron对象
    	c := cron.New()
    
    	// 任务调度
    	enterId, err := c.AddFunc("@every 3s", jobTask)
    	if err!=nil{
    		panic(err)
    	}
    	fmt.Printf("任务id是 %d 
    ", enterId)
    
    	// 同步执行任务会阻塞当前执行顺序  一般使用Start()
    	//c.Run()
    	//fmt.Println("当前执行顺序.......")
    
    	// goroutine 协程启动定时任务(看到后面Start函数和run()函数,就会明白启动这一步也可以写在任务调度之前执行)
    	c.Start()
    	// Start()内部有一个running 布尔值 限制只有一个Cron对象启动 所以这一步多个 c.Start() 也只会有一个运行
    	c.Start()
    	c.Start()
    
    	// 用于阻塞 后面可以使用 select {} 阻塞
    	time.Sleep(time.Second * 9)
    
    	// 关闭定时任务(其实不关闭也可以,主进程直接结束了, 内部的goroutine协程也会自动结束)
    	c.Stop()
    
    }
    

    源码解读

    核心文件主要就是cron.go文件

    首先可以看到 c := cron.New() 创建了这个 Cron结构体对象

    type Cron struct {
    	entries   []*Entry                         // 用于存放job指针对象的数组
    	chain     Chain
    	stop      chan struct{}                  // 定制调度任务
    	add       chan *Entry                    // 添加一个调度任务
    	remove    chan EntryID               // 移除 一个调度任务
    	snapshot  chan chan []Entry     // 正在运行中的调度任务 
    	running   bool                             // 保证整个Cron对象只启动一次 和启动后其他chan正常
    	logger    Logger                          // 记录日志
    	runningMu sync.Mutex              // 协程锁,确保执行安全
    	location  *time.Location            // 时区
    	parser    ScheduleParser           // 解析参数
    	nextID    EntryID                         // 下一个调度任务的id
    	jobWaiter sync.WaitGroup        // 确保单一的调度任务执行完毕
    }
    

    Entry包含那些

    
    // Entry consists of a schedule and the func to execute on that schedule.
    type Entry struct {
    	// ID is the cron-assigned ID of this entry, which may be used to look up a
    	// snapshot or remove it.
    	ID EntryID  // 任务调度Id,默认是自增 创建任务时返回
    
    	// Schedule on which this job should be run.
    	Schedule Schedule // 调度任务运行
    
    	// Next time the job will run, or the zero time if Cron has not been
    	// started or this entry's schedule is unsatisfiable
    	Next time.Time       // 下次执行时间
    
    	// Prev is the last time this job was run, or the zero time if never.
    	Prev time.Time      // 上次执行时间
    
    	// WrappedJob is the thing to run when the Schedule is activated.
    	WrappedJob Job      // 执行的任务
    
    	// Job is the thing that was submitted to cron.
    	// It is kept around so that user code that needs to get at the job later,
    	// e.g. via Entries() can do so.
    	Job Job
    }
    
    

    调度任务enterId, err := c.AddFunc("@every 3s", jobTask) 会使用以下两个文件来解析定时执行的参数,也就是翻译给golang 解析@erery 3s是干什么

    启动c.Start()

    // Start the cron scheduler in its own goroutine, or no-op if already started.
    func (c *Cron) Start() {
    	c.runningMu.Lock()
    	defer c.runningMu.Unlock()
    	if c.running {
    		return
    	}
    	c.running = true
    	go c.run()
    }
    

    可以看到Start() 执行了三个操作

    • 1 上锁 最后解锁
    • 2 判断此对象的状态是否正在运行,如果运行了直接 return
    • 3 如果没有运行,就修改状态,然后启动协程运行 run()方法

    核心逻辑run()方法

    这里需要知道的知识

    • time.NewTimer(d Duration)返回一个sleep指定时间的channel
    • select{} 多路复用阻塞
      • 任意一个case满足select{}就回直接执行结束
    • sort.Sort 的用法(我这里不是很熟悉)
    // run the scheduler.. this is private just due to the need to synchronize
    // access to the 'running' state variable.
    func (c *Cron) run() {
    	c.logger.Info("start")
    
    	// Figure out the next activation times for each entry.
    	now := c.now()  // 获取现在时间
            // 循环调度任务 计算下一次执行时间
    	for _, entry := range c.entries {  
    		entry.Next = entry.Schedule.Next(now)
    		c.logger.Info("schedule", "now", now, "entry", entry.ID, "next", entry.Next)
    	}
            
            // 最外层死循环 这一层会一直存在
    	for {
    		// Determine the next entry to run.
    		sort.Sort(byTime(c.entries)) // 排序确定下一个要运行的目标
    
    		var timer *time.Timer // 声明一个Timer 指针变量
    		if len(c.entries) == 0 || c.entries[0].Next.IsZero() {
    			// If there are no entries yet, just sleep - it still handles new entries
    			// and stop requests.
                            // 如果cron启动后 还没有 调度信息的话 就生成一个sleep10W小时的 chan Time,用于阻塞下面的 select{} ,因为`select`是多路复用,其他channel能返回数据时,select就回执行不会阻塞。
                            // 所以当没有任务时,启动Start()程序 就会被这个阻塞
    			timer = time.NewTimer(100000 * time.Hour)
    		} else {
                            // 如果有调度信息,就 sleep 调度任务中第一个的 循环时间 
    			timer = time.NewTimer(c.entries[0].Next.Sub(now))
    		}
                    // 第二层死循环  内部使用select{}阻塞
    		for {
    			select {
                            // 上一步中的 timer sleep时间如果到了就执行
    			case now = <-timer.C:
    				now = now.In(c.location)
    				c.logger.Info("wake", "now", now)
    
    				// Run every entry whose next time was less than now
    				for _, e := range c.entries {
    					if e.Next.After(now) || e.Next.IsZero() {
    						break
    					}
    					c.startJob(e.WrappedJob)
    					e.Prev = e.Next
    					e.Next = e.Schedule.Next(now)
    					c.logger.Info("run", "now", now, "entry", e.ID, "next", e.Next)
    				}
                            // 向Cron中添加了 一个调度任务就会执行
    			case newEntry := <-c.add:
    				timer.Stop()
    				now = c.now()
    				newEntry.Next = newEntry.Schedule.Next(now)
    				c.entries = append(c.entries, newEntry)
    				c.logger.Info("added", "now", now, "entry", newEntry.ID, "next", newEntry.Next)
                            
    			case replyChan := <-c.snapshot:
    				replyChan <- c.entrySnapshot()
    				continue
                            // 停止定时任务
    			case <-c.stop:
    				timer.Stop()
    				c.logger.Info("stop")
    				return
                            // 移除任务
    			case id := <-c.remove:
    				timer.Stop()
    				now = c.now()
    				c.removeEntry(id)
    				c.logger.Info("removed", "entry", id)
    			}
                            // 当以上任意一个channel满足时,就会结束内层循环 重复上一层步骤
    			break
    		}
    	}
    }
    
    

    自己的总结

    这个robfig/cron/v3 这个库实现定时任务的核心逻辑,就是利用以下几个点:

    • 主体for循环
      • 循环配合time.NewTimerchannel sleep确保定时任务能定时执行
    • select多路选择阻塞
      • 确保能在任意时间改变其case中的channel 执行
    • time.NewTimersleep定时时间

    疑惑的地方

    自己看完还是有很多疑惑的地方,很多函数用法,没怎么用过比如:

    • snapshot chan chan []Entry
      • 定义一个chan 类型是 chan []Entry ?? 没怎么见过这种用法
    • 其他函数用法(慢慢学习吧)

    总体来说思路设计的很巧妙,感觉如果只是单纯的写web接口的话,很少直接接触到这样的设计。
    最后顺便说一句Golang简单的语法是真的方便。

  • 相关阅读:
    metal的gpu query
    体积雾 global fog unity 及改进
    hdr rt format对颜色的影响
    unity deferred lighting
    unity linear space时 photoshop blend的正确设置
    unity linear work flow
    一些数据 bandwidth之类
    deferred rendering with msaa
    unity 显示mipmaplevel
    【转】在C#中使用SendMessage
  • 原文地址:https://www.cnblogs.com/CharmCode/p/14254510.html
Copyright © 2011-2022 走看看