最近因工作需要开发计划任务模块(严格来说应该是修改bug吧,其他同事负责的)接触到了Hangfire。早前听同事说hangfire有点坑,怀着好奇,趁这两天bug改的差不多了,在github上面down了hangfire源码,下面分享一下,自己读hangfire源码的一些理解,和工作中需要注意的地方。介绍大概分为以下几个部分吧。1.准备工作,2.简单使用,3.源码分析,4.避坑。需要说明一下接触hangfire源码的时间不长,也就几天时间理解不到位,或者说错了的,希望在评论指正。
准备工作:hangfire源代码的代码量不多,github地址: https://github.com/HangfireIO/Hangfire,有兴趣的朋友可以自己下载瞅瞅源码。功能上大概可以分为客户端模式和服务端模式。用到的技术大概有Multi Thread、Expression、Dapper、Cron等。可以这么说,它的定时任务完全就是基于多线程协作实现的。因为是多线程环境,所以个人觉得看起来有点费力。
简单使用:.Net&.Net Core环境都可以使用,下面就以.Net Core的使用为例。
1.客户端和服务端独立部署
client端
1 public IServiceProvider ConfigureServices(IServiceCollection services) 2 { 3 // 其他代码 4 services.AddHangfire(config => 5 { 6 config.UseSqlServerStorage(...); 7 }); 8 } 9 10 public void Configure(IApplicationBuilder app, IWebHostEnvironment env) 11 { 12 // 其他代码... 13 // 启用Dashboard看板 14 app.UseHangfireDashboard(); 15 }
server端
1 public void Configuration(IAppBuilder app) 2 { 3 GlobalConfiguration.Configuration 4 .UseSqlServerStorage("连接字符串", new SqlServerStorageOptions 5 { 6 // options 7 }); 8 app.UseHangfireServer(new BackgroundJobServerOptions 9 { 10 }); 11 } 12 13
或者
1 services.AddHangfireServer(options => 2 { 3 // 基于IHostedService接口实现 4 });
PS:server端还有一种实现方式,实现IHostedService接口 其实跟上面的使用方法一样的,注入到服务就ok,在程序启动阶段会自动执行IHostedService接口的两个方法,可以简单看下IHostedService接口的定义。
1 public interface IHostedService 2 { 3 Task StartAsync(CancellationToken cancellationToken); 4 Task StopAsync(CancellationToken cancellationToken); 5 }
接口就定义了两个方法,start是在程序启动的时候执行,当然stop就是在程序停止的时候执行。我们用一张图简单描绘一下它的执行时机,图是盗的。
以上就是hangfire的client端和server端分开部署的一个简单应用,下面我们看下第二种,client&server部署在同一台机器上。
2.客户端和服务端统一部署
1 public void Configuration(IAppBuilder app) 2 { 3 GlobalConfiguration.Configuration.UseSqlServerStorage(); // 配置数据库连接 4 5 app.UseHangfireServer(); // 启用server 6 app.UseHangfireDashboard(); // 启用看板 7 }
简单的几行代码,当然我也只会简单的用法。以上服务注入并执行,接下来就是往hangfire里面添加任务。
1 BackgroundJob.Enqueue(() => Console.WriteLine("Simple!")); // 立即执行 2 BackgroundJob.Schedule(() => Console.WriteLine("Reliable!"), TimeSpan.FromDays(7)); // 延后执行 3 RecurringJob.AddOrUpdate(() => Console.WriteLine("Transparent!"), Cron.Daily); // 循环执行,支持cron表达式
简单使用就到这吧,我们继续大纲的第三部分,源码分析。
源码分析
客户端模式就不用说了,说白了就是往hangfire数据库里面写任务,我们主要是看看服务端的执行原理。我们先找到入口,也可以看做是NetCore里面的一个中间件吧。看代码
1 app.UseHangfireServer(); // 启用server
UseHangfireServer实现
1 public static IAppBuilder UseHangfireServer( 2 [NotNull] this IAppBuilder builder, 3 [NotNull] JobStorage storage, 4 [NotNull] BackgroundJobServerOptions options, 5 [NotNull] params IBackgroundProcess[] additionalProcesses) 6 { 7 // 其他代码... 8 var server = new BackgroundJobServer(options, storage, additionalProcesses); 9 10 return builder; 11 }
UseHangfireServer扩展方法实现里面,比较重要的一行代码就是创建BackgroundJobServer,BackgroundJobServer实现了IBackgroundProcessingServer接口,server的启动也就是间接在它的构造器里面完成的。我们不妨先瞅瞅IBackgroundProcessingServer接口和BackgroundJobServer类的定义。
1 // IBackgroundProcessingServer 2 public interface IBackgroundProcessingServer : IDisposable 3 { 4 void SendStop(); 5 bool WaitForShutdown(TimeSpan timeout); 6 Task WaitForShutdownAsync(CancellationToken cancellationToken); 7 } 8 9 // BackgroundJobServer 10 public class BackgroundJobServer : IBackgroundProcessingServer 11 { 12 // 其他成员... 13 public BackgroundJobServer( 14 [NotNull] BackgroundJobServerOptions options, 15 [NotNull] JobStorage storage, 16 [NotNull] IEnumerable<IBackgroundProcess> additionalProcesses, 17 [CanBeNull] IJobFilterProvider filterProvider, 18 [CanBeNull] JobActivator activator, 19 [CanBeNull] IBackgroundJobFactory factory, 20 [CanBeNull] IBackgroundJobPerformer performer, 21 [CanBeNull] IBackgroundJobStateChanger stateChanger) 22 { 23 // 其他代码 24 var processes = new List<IBackgroundProcessDispatcherBuilder>(); 25 processes.AddRange(GetRequiredProcesses(filterProvider, activator, factory, performer, stateChanger)); 26 processes.AddRange(additionalProcesses.Select(x => x.UseBackgroundPool(1))); 27 var properties = new Dictionary<string, object> 28 { 29 { "Queues", options.Queues }, 30 { "WorkerCount", options.WorkerCount } 31 }; 32 33 _processingServer = new BackgroundProcessingServer( 34 storage, 35 processes, 36 properties, 37 GetProcessingServerOptions()); 38 } 39 public void SendStop() 40 { 41 } 42 public void Dispose() 43 { 44 } 45 [Obsolete("This method is a stub. There is no need to call the `Start` method. Will be removed in version 2.0.0.")] 46 public void Start() 47 { 48 } 49 [Obsolete("Please call the `Shutdown` method instead. Will be removed in version 2.0.0.")] 50 public void Stop() 51 { 52 } 53 [Obsolete("Please call the `Shutdown` method instead. Will be removed in version 2.0.0.")] 54 public void Stop(bool force) 55 { 56 } 57 public bool WaitForShutdown(TimeSpan timeout) 58 { 59 } 60 public Task WaitForShutdownAsync(CancellationToken cancellationToken) 61 { 62 }
IBackgroundProcessingServer接口里面的这几个方法都是跟停用server,取消任务清理资源相关的。BackgroundJobServer类里面真正完成接口的实现是由BackgroundProcessingServer类型的同名函数实现,这个对象是在构造函数里面初始化的,在初始化BackgroundProcessingServer类型的同时,创建了若干IBackgroundProcessDispatcherBuilder实现类BackgroundProcessDispatcherBuilder的实例,hangfire默认实现了7种dispatcher,我们任务、日志、心跳等等独立线程都是由它的Create方法完成,这个地方不算server启动主线,会在后面细说。我们继续看看BackgroundProcessingServer这个类型。这里需要注意的是里面有几个方法好像是被停用了,start、stop等方法,官方也注释了,被删除了。start方法被停用了,难道我们的server启动是在BackgroundProcessingServer类型里面?继续看BackgroundProcessingServer的定义。
1 public sealed class BackgroundProcessingServer : IBackgroundProcessingServer 2 { 3 // 其他成员 4 internal BackgroundProcessingServer( 5 [NotNull] BackgroundServerProcess process, 6 [NotNull] BackgroundProcessingServerOptions options) 7 { 8 _process = process ?? throw new ArgumentNullException(nameof(process)); 9 _options = options ?? throw new ArgumentNullException(nameof(options)); 10 _dispatcher = CreateDispatcher(); 11 #if !NETSTANDARD1_3 12 AppDomain.CurrentDomain.DomainUnload += OnCurrentDomainUnload; 13 AppDomain.CurrentDomain.ProcessExit += OnCurrentDomainUnload; 14 #endif 15 } 16 public void SendStop() 17 { 18 } 19 public bool WaitForShutdown(TimeSpan timeout) 20 { 21 } 22 public async Task WaitForShutdownAsync(CancellationToken cancellationToken) 23 { 24 } 25 public void Dispose() 26 { 27 28 } 29 private void OnCurrentDomainUnload(object sender, EventArgs args) 30 { 31 32 } 33 private IBackgroundDispatcher CreateDispatcher() 34 { 35 var execution = new BackgroundExecution( 36 _stoppingCts.Token, 37 new BackgroundExecutionOptions 38 { 39 Name = nameof(BackgroundServerProcess), 40 ErrorThreshold = TimeSpan.Zero, 41 StillErrorThreshold = TimeSpan.Zero, 42 RetryDelay = retry => _options.RestartDelay 43 }); 44 return new BackgroundDispatcher( 45 execution, 46 RunServer, 47 execution, 48 ThreadFactory); 49 } 50 private void RunServer(Guid executionId, object state) 51 { 52 _process.Execute(executionId, (BackgroundExecution)state, _stoppingCts.Token, _stoppedCts.Token, _shutdownCts.Token); 53 } 54 private static IEnumerable<Thread> ThreadFactory(ThreadStart threadStart) 55 { 56 yield return new Thread(threadStart) 57 { 58 IsBackground = true, 59 Name = $"{nameof(BackgroundServerProcess)} #{Interlocked.Increment(ref _lastThreadId)}", 60 }; 61 } 62 }
果不其然,server的启动快要揭开神秘的面纱了,RunServer?翻译过来应该是启动服务吧,我们暂且不去管他,先记一下这个有个runserver,我们继续跟踪。在构造函数里面调用了一个CreateDispatcher()的方法,我们看下它的实现
1 private IBackgroundDispatcher CreateDispatcher() 2 { 3 var execution = new BackgroundExecution( 4 _stoppingCts.Token, 5 new BackgroundExecutionOptions 6 { 7 Name = nameof(BackgroundServerProcess), 8 ErrorThreshold = TimeSpan.Zero, 9 StillErrorThreshold = TimeSpan.Zero, 10 RetryDelay = retry => _options.RestartDelay 11 }); 12 return new BackgroundDispatcher( 13 execution, 14 RunServer, 15 execution, 16 ThreadFactory); 17 }
在CreateDispatcher方法里面返回了一个BackgroundDispatcher,字面意思好像是后台分发器,并且指定了回调runserver,BackgroundDispatcher实现了IBackgroundDispatcher接口,我们先看下它们的定义。
1 // IBackgroundDispatcher 2 public interface IBackgroundDispatcher : IDisposable 3 { 4 bool Wait(TimeSpan timeout); 5 Task WaitAsync(TimeSpan timeout, CancellationToken cancellationToken); 6 } 7 8 // BackgroundDispatcher 9 internal sealed class BackgroundDispatcher : IBackgroundDispatcher 10 { 11 // 其他成员 12 public BackgroundDispatcher( 13 [NotNull] IBackgroundExecution execution, 14 [NotNull] Action<Guid, object> action, 15 [CanBeNull] object state, 16 [NotNull] Func<ThreadStart, IEnumerable<Thread>> threadFactory) 17 { 18 if (threadFactory == null) throw new ArgumentNullException(nameof(threadFactory)); 19 _execution = execution ?? throw new ArgumentNullException(nameof(execution)); 20 _action = action ?? throw new ArgumentNullException(nameof(action)); 21 _state = state; 22 #if !NETSTANDARD1_3 23 AppDomainUnloadMonitor.EnsureInitialized(); 24 #endif 25 var threads = threadFactory(DispatchLoop)?.ToArray(); 26 if (threads == null || threads.Length == 0) 27 { 28 throw new ArgumentException("At least one unstarted thread should be created.", nameof(threadFactory)); 29 } 30 if (threads.Any(thread => thread == null || (thread.ThreadState & ThreadState.Unstarted) == 0)) 31 { 32 throw new ArgumentException("All the threads should be non-null and in the ThreadState.Unstarted state.", nameof(threadFactory)); 33 } 34 _stopped = new CountdownEvent(threads.Length); 35 foreach (var thread in threads) 36 { 37 thread.Start(); 38 } 39 } 40 public bool Wait(TimeSpan timeout) 41 { 42 return _stopped.WaitHandle.WaitOne(timeout); 43 } 44 public async Task WaitAsync(TimeSpan timeout, CancellationToken cancellationToken) 45 { 46 await _stopped.WaitHandle.WaitOneAsync(timeout, cancellationToken).ConfigureAwait(false); 47 } 48 public void Dispose() 49 { 50 } 51 public override string ToString() 52 { 53 } 54 private void DispatchLoop() 55 { 56 try 57 { 58 _execution.Run(_action, _state); 59 } 60 catch (Exception ex) 61 { 62 63 } 64 finally 65 { 66 try 67 { 68 _stopped.Signal(); 69 } 70 catch (ObjectDisposedException) 71 { 72 73 } 74 } 75 } 76 }
从IBackgroundDispatcher接口的定义来看,分发器应该是负责协调资源处理,我们具体看看BackgroundDispatcher的实现。以上代码就是server的启动执行核心代码并且我以加粗,其实就是启动线程Loop执行。在DispatchLoop方法里面间接调用了我上面说的runserver方法。在runserver方法里面实现了整个server端的初始化工作。我们接着看DispatchLoop方法的实现 ,在这个方法里面调用了IBackgroundExecution接口的run方法,继续IBackgroundExecution接口的定义。
1 public interface IBackgroundExecution : IDisposable 2 { 3 void Run([NotNull] Action<Guid, object> callback, [CanBeNull] object state); 4 Task RunAsync([NotNull] Func<Guid, object, Task> callback, [CanBeNull] object state); 5 }
就两方法,run包含同步和异步,看看它的唯一实现类BackgroundExecution。
1 internal sealed class BackgroundExecution : IBackgroundExecution 2 { 3 // 其他成员 4 public void Run(Action<Guid, object> callback, object state) 5 { 6 if (callback == null) throw new ArgumentNullException(nameof(callback)); 7 var executionId = Guid.NewGuid(); 8 9 { 10 #if !NETSTANDARD1_3 11 try 12 #endif 13 { 14 HandleStarted(executionId, out var nextDelay); 15 while (true) 16 { 17 // Don't place anything here. 18 try 19 { 20 21 if (StopRequested) break; 22 if (nextDelay > TimeSpan.Zero) 23 { 24 HandleDelay(executionId, nextDelay); 25 } 26 callback(executionId, state); 27 HandleSuccess(out nextDelay); 28 } 29 #if !NETSTANDARD1_3 30 catch (ThreadAbortException) when (AppDomainUnloadMonitor.IsUnloading) 31 { 32 // Our thread is aborted due to AppDomain unload. It's better to give up to 33 // not to cause the host to be more aggressive. 34 throw; 35 } 36 #endif 37 catch (OperationCanceledException) when (StopRequested) 38 { 39 break; 40 } 41 catch (Exception ex) 42 { 43 HandleException(executionId, ex, out nextDelay); 44 } 45 } 46 HandleStop(executionId); 47 } 48 #if !NETSTANDARD1_3 49 catch (ThreadAbortException ex) 50 { 51 HandleThreadAbort(executionId, ex); 52 } 53 #endif 54 } 55 } 56 }
hangfire里面所有的独立线程都是通过run方法执行,然后回调到自己的实现类Execute方法,自此每个独立的功能线程就循环干着自己独立的工作(这个后面会单独分析RecurringJobScheduler)。继续我们的主线,server启动,我们以run的同步方法为例,第一个线程(我们就叫它主线程吧)启动了一个while循环,在循环里面并且callback调用了我们的runserver方法。
1 private void RunServer(Guid executionId, object state) 2 { 3 _process.Execute(executionId, (BackgroundExecution)state, _stoppingCts.Token, _stoppedCts.Token, _shutdownCts.Token); 4 }
在runserver方法里面的实现很简单,直接调用了_process的execute方法,我们简单看下_process类型IBackgroundServerProcess的定义。
1 internal interface IBackgroundServerProcess 2 { 3 void Execute( 4 Guid executionId, 5 BackgroundExecution execution, 6 CancellationToken stoppingToken, 7 CancellationToken stoppedToken, 8 CancellationToken shutdownToken); 9 }
IBackgroundServerProcess的定义就一个execute方法,这个接口的工作其实就是初始化server服务端,我们看看它的唯一实现类BackgroundServerProcess。
1 internal sealed class BackgroundServerProcess : IBackgroundServerProcess 2 { 3 4 // 其他成员 5 public BackgroundServerProcess( 6 [NotNull] JobStorage storage, 7 [NotNull] IEnumerable<IBackgroundProcessDispatcherBuilder> dispatcherBuilders, 8 [NotNull] BackgroundProcessingServerOptions options, 9 [NotNull] IDictionary<string, object> properties) 10 { 11 if (dispatcherBuilders == null) throw new ArgumentNullException(nameof(dispatcherBuilders)); 12 13 14 _storage = storage ?? throw new ArgumentNullException(nameof(storage)); 15 _options = options ?? throw new ArgumentNullException(nameof(options)); 16 _properties = properties ?? throw new ArgumentNullException(nameof(properties)); 17 18 19 var builders = new List<IBackgroundProcessDispatcherBuilder>(); 20 builders.AddRange(GetRequiredProcesses()); // 添加默认的工作dispatcher也就是独立线程 21 builders.AddRange(GetStorageComponents()); 22 builders.AddRange(dispatcherBuilders); 23 24 25 _dispatcherBuilders = builders.ToArray(); 26 } 27 28 29 public void Execute(Guid executionId, BackgroundExecution execution, CancellationToken stoppingToken, 30 CancellationToken stoppedToken, CancellationToken shutdownToken) // server初始化 31 { 32 var serverId = GetServerId(); 33 Stopwatch stoppedAt = null; 34 35 36 void HandleRestartSignal() 37 { 38 if (!stoppingToken.IsCancellationRequested) 39 { 40 _logger.Info($"{GetServerTemplate(serverId)} caught restart signal..."); 41 } 42 } 43 using (var restartCts = new CancellationTokenSource()) 44 using (var restartStoppingCts = CancellationTokenSource.CreateLinkedTokenSource(stoppingToken, restartCts.Token)) 45 using (var restartStoppedCts = CancellationTokenSource.CreateLinkedTokenSource(stoppedToken, restartCts.Token)) 46 using (var restartShutdownCts = CancellationTokenSource.CreateLinkedTokenSource(shutdownToken, restartCts.Token)) 47 using (restartStoppingCts.Token.Register(HandleStopRestartSignal)) 48 using (stoppingToken.Register(HandleStoppingSignal)) 49 using (stoppedToken.Register(HandleStoppedSignal)) 50 using (shutdownToken.Register(HandleShutdownSignal)) 51 using (restartCts.Token.Register(HandleRestartSignal)) 52 { 53 var context = new BackgroundServerContext( 54 serverId, 55 _storage, 56 _properties, 57 restartStoppingCts.Token, 58 restartStoppedCts.Token, 59 restartShutdownCts.Token); 60 var dispatchers = new List<IBackgroundDispatcher>(); 61 CreateServer(context); 62 try 63 { 64 // ReSharper disable once AccessToDisposedClosure 65 using (var heartbeat = CreateHeartbeatProcess(context, () => restartCts.Cancel())) // 创建守护线程 66 { 67 StartDispatchers(context, dispatchers); // 启动hangfire默认初始化的所有独立任务线程 68 execution.NotifySucceeded(); 69 WaitForDispatchers(context, dispatchers); 70 71 72 restartCts.Cancel(); 73 74 heartbeat.WaitAsync(Timeout.InfiniteTimeSpan, shutdownToken).GetAwaiter().GetResult(); 75 } 76 } 77 finally 78 { 79 DisposeDispatchers(dispatchers); 80 ServerDelete(context, stoppedAt); 81 } 82 } 83 } 84 85 86 private IBackgroundDispatcher CreateHeartbeatProcess(BackgroundServerContext context, Action requestRestart) // 创建守护线程 87 { 88 return new ServerHeartbeatProcess(_options.HeartbeatInterval, _options.ServerTimeout, requestRestart) 89 .UseBackgroundPool(threadCount: 1) 90 .Create(context, _options); 91 } 92 93 94 private IEnumerable<IBackgroundProcessDispatcherBuilder> GetRequiredProcesses() // 初始化日志和任务监控线程 95 { 96 yield return new ServerWatchdog(_options.ServerCheckInterval, _options.ServerTimeout).UseBackgroundPool(threadCount: 1); 97 yield return new ServerJobCancellationWatcher(_options.CancellationCheckInterval).UseBackgroundPool(threadCount: 1); 98 } 99 private string GetServerId() // 获取serverid 100 { 101 var serverName = _options.ServerName 102 ?? Environment.GetEnvironmentVariable("COMPUTERNAME") 103 ?? Environment.GetEnvironmentVariable("HOSTNAME"); 104 var guid = Guid.NewGuid().ToString(); 105 106 return !String.IsNullOrWhiteSpace(serverName) ? $"{serverName.ToLowerInvariant()}:{guid}" : guid; 107 } 108 109 110 private void CreateServer(BackgroundServerContext context) // 创建server,写入Server数据表 111 { 112 var stopwatch = Stopwatch.StartNew(); 113 using (var connection = _storage.GetConnection()) 114 { 115 connection.AnnounceServer(context.ServerId, GetServerContext(_properties)); 116 } 117 stopwatch.Stop(); 118 119 120 ServerJobCancellationToken.AddServer(context.ServerId); 121 _logger.Info($"{GetServerTemplate(context.ServerId)} successfully announced in {stopwatch.Elapsed.TotalMilliseconds} ms"); 122 } 123 124 125 private void StartDispatchers(BackgroundServerContext context, ICollection<IBackgroundDispatcher> dispatchers) // 启动所有独立的任务线程,包括我们的队列计划、循环计划、日志、守护等等线程 126 { 127 128 foreach (var dispatcherBuilder in _dispatcherBuilders) 129 { 130 dispatchers.Add(dispatcherBuilder.Create(context, _options)); 131 } 132 } 133 134 }
以上代码我有做精简处理,不要纠结里面的实现,代码注释也比较详细。下面我做一个简单的总结吧,第一个线程(暂时叫主线程吧)从startup里面调用usehangfireserver扩展方法-》启动一个新的worker线程用于初始化&启动server-》主程返回-》启动hangfire所有任务线程-》创建的第一个worker线程挂起(用于处理所有任务线程的资源释放)。server的初始化工作大概就是这些,下面详细看看hangfire的任务线程的执行原理,这里我们以RecurringJobScheduler循环任务为例。
RecurringJobScheduler实现机制
还记得上面提到的7个dispatcher任务线程的创建吗?这7个默认的任务线程初始化就发生在上面加粗的代码里面StartDispatchers方法,我们看代码。
1 private void StartDispatchers(BackgroundServerContext context, ICollection<IBackgroundDispatcher> dispatchers) 2 { 3 // 其他代码... 4 foreach (var dispatcherBuilder in _dispatcherBuilders) 5 { 6 dispatchers.Add(dispatcherBuilder.Create(context, _options)); // 初始化独立任务线程 7 } 8 }
遍历_dispatcherBuilders数组,7种任务类型,分别调用它们的Create方法。继续看create方法。
1 public IBackgroundDispatcher Create(BackgroundServerContext context, BackgroundProcessingServerOptions options) // 第一步 2 { 3 // 其他代码 4 var execution = new BackgroundExecution( 5 context.StoppingToken, 6 new BackgroundExecutionOptions 7 { 8 Name = _process.GetType().Name, 9 RetryDelay = options.RetryDelay 10 }); // 定义自己的execution 11 return new BackgroundDispatcher( // 创建BackgroundDispatcher 12 execution, 13 ExecuteProcess, // 指定回调 14 Tuple.Create(_process, context, execution), // 创建三元组上下文,注意一下1元组这个对象 15 _threadFactory); 16 } 17 18 public BackgroundDispatcher( // 第二步 19 [NotNull] IBackgroundExecution execution, 20 [NotNull] Action<Guid, object> action, 21 [CanBeNull] object state, 22 [NotNull] Func<ThreadStart, IEnumerable<Thread>> threadFactory) 23 { 24 25 _state = state; 26 27 var threads = threadFactory(DispatchLoop)?.ToArray(); 28 29 foreach (var thread in threads) 30 { 31 thread.Start(); // 执行线程 32 } 33 } 34 35 private void DispatchLoop() // 第三步 36 { 37 try 38 { 39 _execution.Run(_action, _state); // 在run里面回调_action 40 } 41 catch (Exception ex) 42 { 43 } 44 finally 45 { 46 try 47 { 48 _stopped.Signal(); 49 } 50 catch (ObjectDisposedException) 51 { 52 } 53 } 54 } 55 56 private static void ExecuteProcess(Guid executionId, object state) // 第四步 回调方法,对应上面的指定回调 57 { 58 var tuple = (Tuple<IBackgroundProcess, BackgroundServerContext, BackgroundExecution>)state; 59 var serverContext = tuple.Item2; 60 var context = new BackgroundProcessContext( // 创建公共上下文 61 serverContext.ServerId, 62 serverContext.Storage, 63 serverContext.Properties.ToDictionary(x => x.Key, x => x.Value), 64 executionId, 65 serverContext.StoppingToken, 66 serverContext.StoppedToken, 67 serverContext.ShutdownToken); 68 while (!context.IsStopping) 69 { 70 tuple.Item1.Execute(context); // 执行自己元组对应的实例 71 tuple.Item3.NotifySucceeded(); 72 } 73 }
上面有点乱啊,我大概简单串起来说一下。第一步在create方法里面创建了BackgroundDispatcher并指定了元组参数-》第二步绑定线程的执行函数Loop并且执行-》第三步执行Loop并且回调_action委托-》第四步_action参数对应的函数地址就是ExecuteProcess,最后在ExecuteProcess里面通过元组参数调用对应的任务类型,自此7种任务类型启动并开始工作。以上代码还有个细节需要说明一下,Tuple.Create(_process, context, execution)。元组的第一个参数,其类型为IBackgroundProcess,看下定义。
1 public interface IBackgroundProcess : IServerProcess 2 { 3 void Execute([NotNull] BackgroundProcessContext context); 4 }
接口就定义了一个方法,没什么特别的,但是它的几个实现类就是我们单独的任务类,我们下面要说的RecurringJobScheduler循环任务类也实现了这个接口。到此我们的RecurringJobScheduler循环定时任务线程就算开始执行了。
RecurringJobScheduler循环定时任务机制
照旧看下这个类型的定义
1 public class RecurringJobScheduler : IBackgroundProcess 2 { 3 // 其他代码 4 public RecurringJobScheduler( 5 [NotNull] IBackgroundJobFactory factory, 6 TimeSpan pollingDelay, 7 [NotNull] ITimeZoneResolver timeZoneResolver, 8 [NotNull] Func<DateTime> nowFactory) 9 { 10 if (factory == null) throw new ArgumentNullException(nameof(factory)); 11 if (nowFactory == null) throw new ArgumentNullException(nameof(nowFactory)); 12 if (timeZoneResolver == null) throw new ArgumentNullException(nameof(timeZoneResolver)); 13 14 15 _factory = factory; 16 _nowFactory = nowFactory; 17 _timeZoneResolver = timeZoneResolver; 18 _pollingDelay = pollingDelay; 19 _profiler = new SlowLogProfiler(_logger); 20 } 21 22 23 /// <inheritdoc /> 24 public void Execute(BackgroundProcessContext context) // 实现方法 25 { 26 if (context == null) throw new ArgumentNullException(nameof(context)); 27 28 29 var jobsEnqueued = 0; 30 31 32 while (EnqueueNextRecurringJobs(context)) // 从数据库获取定时任务 33 { 34 jobsEnqueued++; 35 36 37 if (context.IsStopping) 38 { 39 break; 40 } 41 } 42 43 44 if (jobsEnqueued != 0) 45 { 46 _logger.Debug($"{jobsEnqueued} recurring job(s) enqueued."); 47 } 48 49 50 if (_pollingDelay > TimeSpan.Zero) 51 { 52 context.Wait(_pollingDelay); 53 } 54 else 55 { 56 var now = _nowFactory(); 57 context.Wait(now.AddMilliseconds(-now.Millisecond).AddSeconds(-now.Second).AddMinutes(1) - now); 58 } 59 } 60 }
承上,调用元组的第一个参数的execute方法,RecurringJobScheduler的execute方法得以执行,该方法就干一件事,每隔15秒从数据库获取待执行的计划,每次1000条数据。通过EnqueueNextRecurringJobs方法获取任务。
1 private bool EnqueueNextRecurringJobs(BackgroundProcessContext context) 2 { 3 return UseConnectionDistributedLock(context.Storage, connection => 4 { 5 var result = false; 6 if (IsBatchingAvailable(connection)) 7 { 8 var now = _nowFactory(); 9 var timestamp = JobHelper.ToTimestamp(now); 10 var recurringJobIds = ((JobStorageConnection)connection).GetFirstByLowestScoreFromSet("recurring-jobs", 0, timestamp, BatchSize); // 从数据库里面查询 11 if (recurringJobIds == null || recurringJobIds.Count == 0) return false; 12 foreach (var recurringJobId in recurringJobIds) 13 { 14 if (context.IsStopping) return false; 15 if (TryEnqueueBackgroundJob(context, connection, recurringJobId, now))// 排队执行 16 { 17 result = true; 18 } 19 } 20 } 21 else 22 { 23 for (var i = 0; i < BatchSize; i++) 24 { 25 if (context.IsStopping) return false; 26 var now = _nowFactory(); 27 var timestamp = JobHelper.ToTimestamp(now); 28 var recurringJobId = connection.GetFirstByLowestScoreFromSet("recurring-jobs", 0, timestamp); 29 if (recurringJobId == null) return false; 30 if (!TryEnqueueBackgroundJob(context, connection, recurringJobId, now)) 31 { 32 return false; 33 } 34 } 35 } 36 return result; 37 }); 38 }
GetFirstByLowestScoreFromSet方法从数据库Set表里面查询top1000数据,条件是key为recurring-jobs字符串(表示定时任务)并且 时间范围是0到当前时间。随后遍历这些jobids,排队执行,往下看TryEnqueueBackgroundJob方法的实现。
1 private bool EnqueueBackgroundJob( 2 BackgroundProcessContext context, 3 IStorageConnection connection, 4 string recurringJobId, 5 DateTime now) 6 { 7 // 其他代码 8 using (connection.AcquireDistributedRecurringJobLock(recurringJobId, LockTimeout)) 9 { 10 try 11 { 12 var recurringJob = connection.GetRecurringJob(recurringJobId, _timeZoneResolver, now); 13 if (recurringJob == null) 14 { 15 using (var transaction = connection.CreateWriteTransaction()) 16 { 17 transaction.RemoveFromSet("recurring-jobs", recurringJobId); 18 transaction.Commit(); 19 } 20 return false; 21 } 22 23 BackgroundJob backgroundJob = null; 24 IReadOnlyDictionary<string, string> changedFields; 25 if (recurringJob.TrySchedule(out var nextExecution, out var error)) 26 { 27 if (nextExecution.HasValue && nextExecution <= now) 28 { 29 backgroundJob = _factory.TriggerRecurringJob(context.Storage, connection, _profiler, recurringJob, now); 30 if (String.IsNullOrEmpty(backgroundJob?.Id)) 31 { 32 _logger.Debug($"Recurring job '{recurringJobId}' execution at '{nextExecution}' has been canceled."); 33 } 34 } 35 recurringJob.IsChanged(out changedFields, out nextExecution); 36 } 37 else if (recurringJob.RetryAttempt < MaxRetryAttemptCount) 38 { 39 var delay = _pollingDelay > TimeSpan.Zero ? _pollingDelay : TimeSpan.FromMinutes(1); 40 41 _logger.WarnException($"Recurring job '{recurringJobId}' can't be scheduled due to an error and will be retried in {delay}.", error); 42 recurringJob.ScheduleRetry(delay, out changedFields, out nextExecution); 43 } 44 else 45 { 46 _logger.ErrorException($"Recurring job '{recurringJobId}' can't be scheduled due to an error and will be disabled.", error); 47 recurringJob.Disable(error, out changedFields, out nextExecution); 48 } 49 50 using (var transaction = connection.CreateWriteTransaction()) 51 { 52 if (backgroundJob != null) 53 { 54 _factory.StateMachine.EnqueueBackgroundJob( 55 context.Storage, 56 connection, 57 transaction, 58 recurringJob, 59 backgroundJob, 60 "Triggered by recurring job scheduler", 61 _profiler); 62 } 63 transaction.UpdateRecurringJob(recurringJob, changedFields, nextExecution, _logger); 64 transaction.Commit(); 65 return true; 66 } 67 } 68 catch (TimeZoneNotFoundException ex) 69 { 70 catch (Exception ex) 71 { 72 73 } 74 return false; 75 } 76 }
需要注意的地方我都有加粗,该方法大概流程是:1.GetRecurringJob根据jobid从Hash表里面查询一条完整的定时任务,2.TrySchedule获取该任务的下次执行时间,如果下次执行时间小于当前,执行这条任务(并非真正执行定时任务,只是往job表里面写数据,真正执行任务由worker完成),3.获取下次执行时间&所有任务字段,4.状态机修改任务状态。定时任务就这样周而复始的重复执行以上流程。这里简单说下worker的执行机制,其实际就是轮询检索job表里面的数据执行任务表达式树,worker在hangfire里面默认开启了20个线程。第三部分就到这吧。
避坑
简单说下个人在改bug期间遇到的一些问题啊。
1.时区问题,在添加定时任务时如果不指定时区信息,默认使用的是utc时间,我们中国是东8区,也就是说解析出来的执行时间会晚8个小时执行。解决办法有几种可以通过全局指定options的ITimeZoneResolver属性指定,也可以通过AddorUpdate方法指定,如果是指定时区信息,需要注意看板上面的异常信息,如果有异常会导致任务不执行,时区信息它是从系统里面检索出来的,没有就抛异常。就这样吧。