zoukankan      html  css  js  c++  java
  • 说下hangfire吧

    最近因工作需要开发计划任务模块(严格来说应该是修改bug吧,其他同事负责的)接触到了Hangfire。早前听同事说hangfire有点坑,怀着好奇,趁这两天bug改的差不多了,在github上面down了hangfire源码,下面分享一下,自己读hangfire源码的一些理解,和工作中需要注意的地方。介绍大概分为以下几个部分吧。1.准备工作,2.简单使用,3.源码分析,4.避坑。需要说明一下接触hangfire源码的时间不长,也就几天时间理解不到位,或者说错了的,希望在评论指正。
    准备工作:hangfire源代码的代码量不多,github地址: https://github.com/HangfireIO/Hangfire,有兴趣的朋友可以自己下载瞅瞅源码。功能上大概可以分为客户端模式和服务端模式。用到的技术大概有Multi Thread、Expression、Dapper、Cron等。可以这么说,它的定时任务完全就是基于多线程协作实现的。因为是多线程环境,所以个人觉得看起来有点费力。
    简单使用:.Net&.Net Core环境都可以使用,下面就以.Net Core的使用为例。
    1.客户端和服务端独立部署
    client端
     1 public IServiceProvider ConfigureServices(IServiceCollection services)
     2         {
     3             // 其他代码
     4              services.AddHangfire(config =>
     5             {
     6                  config.UseSqlServerStorage(...);
     7             });
     8         }
     9  
    10 public void Configure(IApplicationBuilder app, IWebHostEnvironment env)
    11         {
    12             // 其他代码...
    13             // 启用Dashboard看板
    14             app.UseHangfireDashboard();
    15         }
    server端
     1 public void Configuration(IAppBuilder app)
     2         {
     3             GlobalConfiguration.Configuration
     4                  .UseSqlServerStorage("连接字符串", new SqlServerStorageOptions
     5                 {
     6                     // options
     7                 });
     8             app.UseHangfireServer(new BackgroundJobServerOptions
     9             {
    10             });
    11         }
    12  
    13  
    或者
    1 services.AddHangfireServer(options =>
    2                     {
    3                         // 基于IHostedService接口实现
    4                     });
    PS:server端还有一种实现方式,实现IHostedService接口 其实跟上面的使用方法一样的,注入到服务就ok,在程序启动阶段会自动执行IHostedService接口的两个方法,可以简单看下IHostedService接口的定义。
    1   public interface IHostedService
    2   {
    3     Task StartAsync(CancellationToken cancellationToken);
    4     Task StopAsync(CancellationToken cancellationToken);
    5   }
    接口就定义了两个方法,start是在程序启动的时候执行,当然stop就是在程序停止的时候执行。我们用一张图简单描绘一下它的执行时机,图是盗的。
    以上就是hangfire的client端和server端分开部署的一个简单应用,下面我们看下第二种,client&server部署在同一台机器上。
    2.客户端和服务端统一部署
    1 public void Configuration(IAppBuilder app)
    2 {
    3     GlobalConfiguration.Configuration.UseSqlServerStorage(); // 配置数据库连接
    4     
    5     app.UseHangfireServer(); // 启用server
    6     app.UseHangfireDashboard(); // 启用看板
    7 }
    简单的几行代码,当然我也只会简单的用法。以上服务注入并执行,接下来就是往hangfire里面添加任务。
    1 BackgroundJob.Enqueue(() => Console.WriteLine("Simple!")); // 立即执行
    2 BackgroundJob.Schedule(() => Console.WriteLine("Reliable!"), TimeSpan.FromDays(7)); // 延后执行
    3 RecurringJob.AddOrUpdate(() => Console.WriteLine("Transparent!"), Cron.Daily); // 循环执行,支持cron表达式
    简单使用就到这吧,我们继续大纲的第三部分,源码分析。
     
    源码分析
    客户端模式就不用说了,说白了就是往hangfire数据库里面写任务,我们主要是看看服务端的执行原理。我们先找到入口,也可以看做是NetCore里面的一个中间件吧。看代码
    1 app.UseHangfireServer(); // 启用server
    UseHangfireServer实现
     1 public static IAppBuilder UseHangfireServer(
     2             [NotNull] this IAppBuilder builder,
     3             [NotNull] JobStorage storage,
     4             [NotNull] BackgroundJobServerOptions options,
     5             [NotNull] params IBackgroundProcess[] additionalProcesses)
     6         {
     7             // 其他代码...
     8             var server = new BackgroundJobServer(options, storage,  additionalProcesses); 
     9             
    10             return builder;
    11         }
    UseHangfireServer扩展方法实现里面,比较重要的一行代码就是创建BackgroundJobServer,BackgroundJobServer实现了IBackgroundProcessingServer接口,server的启动也就是间接在它的构造器里面完成的。我们不妨先瞅瞅IBackgroundProcessingServer接口和BackgroundJobServer类的定义。
     1 // IBackgroundProcessingServer
     2 public interface IBackgroundProcessingServer : IDisposable
     3     {
     4         void SendStop();
     5         bool WaitForShutdown(TimeSpan timeout);
     6         Task WaitForShutdownAsync(CancellationToken cancellationToken);
     7     }
     8  
     9 // BackgroundJobServer
    10 public class BackgroundJobServer : IBackgroundProcessingServer
    11     {
    12         // 其他成员...
    13         public BackgroundJobServer(
    14             [NotNull] BackgroundJobServerOptions options,
    15             [NotNull] JobStorage storage,
    16             [NotNull] IEnumerable<IBackgroundProcess> additionalProcesses,
    17             [CanBeNull] IJobFilterProvider filterProvider,
    18             [CanBeNull] JobActivator activator,
    19             [CanBeNull] IBackgroundJobFactory factory,
    20             [CanBeNull] IBackgroundJobPerformer performer,
    21             [CanBeNull] IBackgroundJobStateChanger stateChanger)
    22         {
    23             // 其他代码
    24             var processes = new List<IBackgroundProcessDispatcherBuilder>();
    25             processes.AddRange(GetRequiredProcesses(filterProvider, activator,  factory, performer, stateChanger));
    26             processes.AddRange(additionalProcesses.Select(x =>  x.UseBackgroundPool(1)));
    27             var properties = new Dictionary<string, object>
    28             {
    29                 { "Queues", options.Queues },
    30                 { "WorkerCount", options.WorkerCount }
    31             };
    32             
    33             _processingServer = new BackgroundProcessingServer(
    34                 storage,
    35                 processes,
    36                 properties,
    37                 GetProcessingServerOptions());
    38         }
    39         public void SendStop()
    40         {
    41         }
    42         public void Dispose()
    43         {
    44         }
    45         [Obsolete("This method is a stub. There is no need to call the `Start`  method. Will be removed in version 2.0.0.")]
    46         public void Start()
    47         {
    48         }
    49         [Obsolete("Please call the `Shutdown` method instead. Will be removed in  version 2.0.0.")]
    50         public void Stop()
    51         {
    52         }
    53         [Obsolete("Please call the `Shutdown` method instead. Will be removed in  version 2.0.0.")]
    54         public void Stop(bool force)
    55         {
    56         }
    57         public bool WaitForShutdown(TimeSpan timeout)
    58         {
    59         }
    60         public Task WaitForShutdownAsync(CancellationToken cancellationToken)
    61         {
    62         }
    IBackgroundProcessingServer接口里面的这几个方法都是跟停用server,取消任务清理资源相关的。BackgroundJobServer类里面真正完成接口的实现是由BackgroundProcessingServer类型的同名函数实现,这个对象是在构造函数里面初始化的,在初始化BackgroundProcessingServer类型的同时,创建了若干IBackgroundProcessDispatcherBuilder实现类BackgroundProcessDispatcherBuilder的实例,hangfire默认实现了7种dispatcher,我们任务、日志、心跳等等独立线程都是由它的Create方法完成,这个地方不算server启动主线,会在后面细说。我们继续看看BackgroundProcessingServer这个类型。这里需要注意的是里面有几个方法好像是被停用了,start、stop等方法,官方也注释了,被删除了。start方法被停用了,难道我们的server启动是在BackgroundProcessingServer类型里面?继续看BackgroundProcessingServer的定义。
     1 public sealed class BackgroundProcessingServer : IBackgroundProcessingServer
     2     {
     3         // 其他成员
     4         internal BackgroundProcessingServer(
     5             [NotNull] BackgroundServerProcess process,
     6             [NotNull] BackgroundProcessingServerOptions options)
     7         {
     8             _process = process ?? throw new ArgumentNullException(nameof(process));
     9             _options = options ?? throw new ArgumentNullException(nameof(options));
    10             _dispatcher = CreateDispatcher();
    11 #if !NETSTANDARD1_3
    12             AppDomain.CurrentDomain.DomainUnload += OnCurrentDomainUnload;
    13             AppDomain.CurrentDomain.ProcessExit += OnCurrentDomainUnload;
    14 #endif
    15         }
    16         public void SendStop()
    17         {
    18         }
    19         public bool WaitForShutdown(TimeSpan timeout)
    20         {
    21         }
    22         public async Task WaitForShutdownAsync(CancellationToken cancellationToken)
    23         {
    24         }
    25         public void Dispose()
    26         {
    27             
    28         }
    29         private void OnCurrentDomainUnload(object sender, EventArgs args)
    30         {
    31             
    32         }
    33         private IBackgroundDispatcher CreateDispatcher()
    34         {
    35             var execution = new BackgroundExecution(
    36                 _stoppingCts.Token,
    37                 new BackgroundExecutionOptions
    38                 {
    39                     Name = nameof(BackgroundServerProcess),
    40                     ErrorThreshold = TimeSpan.Zero,
    41                     StillErrorThreshold = TimeSpan.Zero,
    42                     RetryDelay = retry => _options.RestartDelay
    43                 });
    44             return new BackgroundDispatcher(
    45                 execution,
    46                 RunServer,
    47                 execution,
    48                 ThreadFactory);
    49         }
    50         private void RunServer(Guid executionId, object state)
    51         {
    52             _process.Execute(executionId, (BackgroundExecution)state,  _stoppingCts.Token, _stoppedCts.Token, _shutdownCts.Token);
    53         }
    54         private static IEnumerable<Thread> ThreadFactory(ThreadStart threadStart)
    55         {
    56             yield return new Thread(threadStart)
    57             {
    58                 IsBackground = true,
    59                 Name = $"{nameof(BackgroundServerProcess)}  #{Interlocked.Increment(ref _lastThreadId)}",
    60             };
    61         }
    62     }
    果不其然,server的启动快要揭开神秘的面纱了,RunServer?翻译过来应该是启动服务吧,我们暂且不去管他,先记一下这个有个runserver,我们继续跟踪。在构造函数里面调用了一个CreateDispatcher()的方法,我们看下它的实现
     1 private IBackgroundDispatcher CreateDispatcher()
     2         {
     3             var execution = new BackgroundExecution(
     4                 _stoppingCts.Token,
     5                 new BackgroundExecutionOptions
     6                 {
     7                     Name = nameof(BackgroundServerProcess),
     8                     ErrorThreshold = TimeSpan.Zero,
     9                     StillErrorThreshold = TimeSpan.Zero,
    10                     RetryDelay = retry => _options.RestartDelay
    11                 });
    12             return new BackgroundDispatcher(
    13                 execution,
    14                 RunServer,
    15                 execution,
    16                 ThreadFactory);
    17         }
    在CreateDispatcher方法里面返回了一个BackgroundDispatcher,字面意思好像是后台分发器,并且指定了回调runserver,BackgroundDispatcher实现了IBackgroundDispatcher接口,我们先看下它们的定义。
     1 // IBackgroundDispatcher
     2 public interface IBackgroundDispatcher : IDisposable
     3     {
     4         bool Wait(TimeSpan timeout);
     5         Task WaitAsync(TimeSpan timeout, CancellationToken cancellationToken);
     6     }
     7  
     8 // BackgroundDispatcher
     9 internal sealed class BackgroundDispatcher : IBackgroundDispatcher
    10     {
    11         // 其他成员
    12         public BackgroundDispatcher(
    13             [NotNull] IBackgroundExecution execution,
    14             [NotNull] Action<Guid, object> action,
    15             [CanBeNull] object state,
    16             [NotNull] Func<ThreadStart, IEnumerable<Thread>> threadFactory)
    17         {
    18             if (threadFactory == null) throw new  ArgumentNullException(nameof(threadFactory));
    19             _execution = execution ?? throw new  ArgumentNullException(nameof(execution));
    20             _action = action ?? throw new ArgumentNullException(nameof(action));
    21             _state = state;
    22 #if !NETSTANDARD1_3
    23             AppDomainUnloadMonitor.EnsureInitialized();
    24 #endif
    25             var threads = threadFactory(DispatchLoop)?.ToArray();
    26             if (threads == null || threads.Length == 0)
    27             {
    28                 throw new ArgumentException("At least one unstarted thread should be  created.", nameof(threadFactory));
    29             }
    30             if (threads.Any(thread => thread == null || (thread.ThreadState &  ThreadState.Unstarted) == 0))
    31             {
    32                 throw new ArgumentException("All the threads should be non-null and  in the ThreadState.Unstarted state.", nameof(threadFactory));
    33             }
    34             _stopped = new CountdownEvent(threads.Length);
    35             foreach (var thread in threads)
    36             {
    37                 thread.Start();
    38             }
    39         }
    40         public bool Wait(TimeSpan timeout)
    41         {
    42             return _stopped.WaitHandle.WaitOne(timeout);
    43         }
    44         public async Task WaitAsync(TimeSpan timeout, CancellationToken  cancellationToken)
    45         {
    46             await _stopped.WaitHandle.WaitOneAsync(timeout,  cancellationToken).ConfigureAwait(false);
    47         }
    48         public void Dispose()
    49         {
    50         }
    51         public override string ToString()
    52         {
    53         }
    54         private void DispatchLoop()
    55         {
    56             try
    57             {
    58                 _execution.Run(_action, _state);
    59             }
    60             catch (Exception ex)
    61             {
    62  
    63             }
    64             finally
    65             {
    66                 try
    67                 {
    68                     _stopped.Signal();
    69                 }
    70                 catch (ObjectDisposedException)
    71                 {
    72  
    73                 }
    74             }
    75         }
    76     }
    从IBackgroundDispatcher接口的定义来看,分发器应该是负责协调资源处理,我们具体看看BackgroundDispatcher的实现。以上代码就是server的启动执行核心代码并且我以加粗,其实就是启动线程Loop执行。在DispatchLoop方法里面间接调用了我上面说的runserver方法。在runserver方法里面实现了整个server端的初始化工作。我们接着看DispatchLoop方法的实现 ,在这个方法里面调用了IBackgroundExecution接口的run方法,继续IBackgroundExecution接口的定义。
    1 public interface IBackgroundExecution : IDisposable
    2     {
    3         void Run([NotNull] Action<Guid, object> callback, [CanBeNull] object  state);
    4         Task RunAsync([NotNull] Func<Guid, object, Task> callback, [CanBeNull]  object state);
    5     }
    就两方法,run包含同步和异步,看看它的唯一实现类BackgroundExecution。
     1   internal sealed class BackgroundExecution : IBackgroundExecution
     2     {
     3                 // 其他成员
     4         public void Run(Action<Guid, object> callback, object state)
     5         {
     6             if (callback == null) throw new ArgumentNullException(nameof(callback));
     7             var executionId = Guid.NewGuid();
     8            
     9             {
    10 #if !NETSTANDARD1_3
    11                 try
    12 #endif
    13                 {
    14                     HandleStarted(executionId, out var nextDelay);
    15                     while (true)
    16                     {
    17                         // Don't place anything here.
    18                         try
    19                         {
    20                            
    21                             if (StopRequested) break;
    22                             if (nextDelay > TimeSpan.Zero)
    23                             {
    24                                 HandleDelay(executionId, nextDelay);
    25                             }
    26                             callback(executionId, state);
    27                             HandleSuccess(out nextDelay);
    28                         }
    29 #if !NETSTANDARD1_3
    30                         catch (ThreadAbortException) when  (AppDomainUnloadMonitor.IsUnloading)
    31                         {
    32                             // Our thread is aborted due to AppDomain unload. It's  better to give up to
    33                             // not to cause the host to be more aggressive.
    34                             throw;
    35                         }
    36 #endif
    37                         catch (OperationCanceledException) when (StopRequested)
    38                         {
    39                             break;
    40                         }
    41                         catch (Exception ex)
    42                         {
    43                             HandleException(executionId, ex, out nextDelay);
    44                         }
    45                     }
    46                     HandleStop(executionId);
    47                 }
    48 #if !NETSTANDARD1_3
    49                 catch (ThreadAbortException ex)
    50                 {
    51                     HandleThreadAbort(executionId, ex);
    52                 }
    53 #endif
    54             }
    55         }
    56 }
    hangfire里面所有的独立线程都是通过run方法执行,然后回调到自己的实现类Execute方法,自此每个独立的功能线程就循环干着自己独立的工作(这个后面会单独分析RecurringJobScheduler)。继续我们的主线,server启动,我们以run的同步方法为例,第一个线程(我们就叫它主线程吧)启动了一个while循环,在循环里面并且callback调用了我们的runserver方法。
        
    1 private void RunServer(Guid executionId, object state)
    2         {
    3             _process.Execute(executionId, (BackgroundExecution)state,  _stoppingCts.Token, _stoppedCts.Token, _shutdownCts.Token);
    4         }
    在runserver方法里面的实现很简单,直接调用了_process的execute方法,我们简单看下_process类型IBackgroundServerProcess的定义。
    1 internal interface IBackgroundServerProcess
    2     {
    3         void Execute(
    4             Guid executionId,
    5             BackgroundExecution execution,
    6             CancellationToken stoppingToken,
    7             CancellationToken stoppedToken,
    8             CancellationToken shutdownToken);
    9     }
    IBackgroundServerProcess的定义就一个execute方法,这个接口的工作其实就是初始化server服务端,我们看看它的唯一实现类BackgroundServerProcess。
      1 internal sealed class BackgroundServerProcess : IBackgroundServerProcess
      2     {
      3         
      4         // 其他成员
      5         public BackgroundServerProcess(
      6             [NotNull] JobStorage storage,
      7             [NotNull] IEnumerable<IBackgroundProcessDispatcherBuilder> dispatcherBuilders,
      8             [NotNull] BackgroundProcessingServerOptions options,
      9             [NotNull] IDictionary<string, object> properties)
     10         {
     11             if (dispatcherBuilders == null) throw new ArgumentNullException(nameof(dispatcherBuilders));
     12  
     13  
     14             _storage = storage ?? throw new ArgumentNullException(nameof(storage));
     15             _options = options ?? throw new ArgumentNullException(nameof(options));
     16             _properties = properties ?? throw new ArgumentNullException(nameof(properties));
     17  
     18  
     19             var builders = new List<IBackgroundProcessDispatcherBuilder>();
     20             builders.AddRange(GetRequiredProcesses()); // 添加默认的工作dispatcher也就是独立线程
     21             builders.AddRange(GetStorageComponents());
     22             builders.AddRange(dispatcherBuilders);
     23  
     24  
     25             _dispatcherBuilders = builders.ToArray();
     26         }
     27  
     28  
     29         public void Execute(Guid executionId, BackgroundExecution execution, CancellationToken stoppingToken,
     30             CancellationToken stoppedToken, CancellationToken shutdownToken)  // server初始化
     31         {
     32             var serverId = GetServerId();
     33             Stopwatch stoppedAt = null;
     34  
     35  
     36             void HandleRestartSignal()
     37             {
     38                 if (!stoppingToken.IsCancellationRequested)
     39                 {
     40                     _logger.Info($"{GetServerTemplate(serverId)} caught restart signal...");
     41                 }
     42             }
     43             using (var restartCts = new CancellationTokenSource())
     44             using (var restartStoppingCts = CancellationTokenSource.CreateLinkedTokenSource(stoppingToken, restartCts.Token))
     45             using (var restartStoppedCts = CancellationTokenSource.CreateLinkedTokenSource(stoppedToken, restartCts.Token))
     46             using (var restartShutdownCts = CancellationTokenSource.CreateLinkedTokenSource(shutdownToken, restartCts.Token))
     47             using (restartStoppingCts.Token.Register(HandleStopRestartSignal))
     48             using (stoppingToken.Register(HandleStoppingSignal))
     49             using (stoppedToken.Register(HandleStoppedSignal))
     50             using (shutdownToken.Register(HandleShutdownSignal))
     51             using (restartCts.Token.Register(HandleRestartSignal))
     52             {
     53                 var context = new BackgroundServerContext(
     54                     serverId,
     55                     _storage,
     56                     _properties,
     57                     restartStoppingCts.Token,
     58                     restartStoppedCts.Token,
     59                     restartShutdownCts.Token);
     60                 var dispatchers = new List<IBackgroundDispatcher>();
     61                 CreateServer(context);
     62                 try
     63                 {
     64                     // ReSharper disable once AccessToDisposedClosure
     65                     using (var heartbeat = CreateHeartbeatProcess(context, () => restartCts.Cancel())) // 创建守护线程
     66                     {
     67                         StartDispatchers(context, dispatchers); // 启动hangfire默认初始化的所有独立任务线程
     68                         execution.NotifySucceeded();
     69                         WaitForDispatchers(context, dispatchers);
     70  
     71  
     72                         restartCts.Cancel();
     73  
     74                         heartbeat.WaitAsync(Timeout.InfiniteTimeSpan, shutdownToken).GetAwaiter().GetResult();
     75                     }
     76                 }
     77                 finally
     78                 {
     79                     DisposeDispatchers(dispatchers);
     80                     ServerDelete(context, stoppedAt);
     81                 }
     82             }
     83         }
     84  
     85  
     86         private IBackgroundDispatcher CreateHeartbeatProcess(BackgroundServerContext context, Action requestRestart) // 创建守护线程
     87         {
     88             return new ServerHeartbeatProcess(_options.HeartbeatInterval, _options.ServerTimeout, requestRestart)
     89                 .UseBackgroundPool(threadCount: 1)
     90                 .Create(context, _options);
     91         }
     92  
     93  
     94         private IEnumerable<IBackgroundProcessDispatcherBuilder> GetRequiredProcesses() // 初始化日志和任务监控线程
     95         {
     96             yield return new ServerWatchdog(_options.ServerCheckInterval, _options.ServerTimeout).UseBackgroundPool(threadCount: 1);
     97             yield return new ServerJobCancellationWatcher(_options.CancellationCheckInterval).UseBackgroundPool(threadCount: 1);
     98         }
     99         private string GetServerId() // 获取serverid
    100         {
    101             var serverName = _options.ServerName
    102                  ?? Environment.GetEnvironmentVariable("COMPUTERNAME")
    103                  ?? Environment.GetEnvironmentVariable("HOSTNAME");
    104             var guid = Guid.NewGuid().ToString();
    105  
    106             return !String.IsNullOrWhiteSpace(serverName) ? $"{serverName.ToLowerInvariant()}:{guid}" : guid;
    107         }
    108  
    109         
    110         private void CreateServer(BackgroundServerContext context) // 创建server,写入Server数据表
    111         {
    112             var stopwatch = Stopwatch.StartNew();
    113             using (var connection = _storage.GetConnection())
    114             {
    115                 connection.AnnounceServer(context.ServerId, GetServerContext(_properties));
    116             }
    117             stopwatch.Stop();
    118  
    119  
    120             ServerJobCancellationToken.AddServer(context.ServerId);
    121             _logger.Info($"{GetServerTemplate(context.ServerId)} successfully announced in {stopwatch.Elapsed.TotalMilliseconds} ms");
    122         }
    123  
    124  
    125         private void StartDispatchers(BackgroundServerContext context, ICollection<IBackgroundDispatcher> dispatchers) // 启动所有独立的任务线程,包括我们的队列计划、循环计划、日志、守护等等线程
    126         {
    127  
    128             foreach (var dispatcherBuilder in _dispatcherBuilders)
    129             {
    130                 dispatchers.Add(dispatcherBuilder.Create(context, _options));
    131             }
    132         }
    133  
    134     }
    以上代码我有做精简处理,不要纠结里面的实现,代码注释也比较详细。下面我做一个简单的总结吧,第一个线程(暂时叫主线程吧)从startup里面调用usehangfireserver扩展方法-》启动一个新的worker线程用于初始化&启动server-》主程返回-》启动hangfire所有任务线程-》创建的第一个worker线程挂起(用于处理所有任务线程的资源释放)。server的初始化工作大概就是这些,下面详细看看hangfire的任务线程的执行原理,这里我们以RecurringJobScheduler循环任务为例。
     
    RecurringJobScheduler实现机制
    还记得上面提到的7个dispatcher任务线程的创建吗?这7个默认的任务线程初始化就发生在上面加粗的代码里面StartDispatchers方法,我们看代码。
    1 private void StartDispatchers(BackgroundServerContext context,  ICollection<IBackgroundDispatcher> dispatchers)
    2         {
    3                // 其他代码...
    4             foreach (var dispatcherBuilder in _dispatcherBuilders)
    5             {
    6                 dispatchers.Add(dispatcherBuilder.Create(context, _options)); // 初始化独立任务线程
    7             }
    8         }
    遍历_dispatcherBuilders数组,7种任务类型,分别调用它们的Create方法。继续看create方法。
       
     1  public IBackgroundDispatcher Create(BackgroundServerContext context,  BackgroundProcessingServerOptions options) // 第一步
     2         {
     3             // 其他代码
     4             var execution = new BackgroundExecution(
     5                 context.StoppingToken,
     6                 new BackgroundExecutionOptions
     7                 {
     8                     Name = _process.GetType().Name,
     9                     RetryDelay = options.RetryDelay
    10                 }); // 定义自己的execution
    11             return new BackgroundDispatcher( // 创建BackgroundDispatcher 
    12                 execution,
    13                 ExecuteProcess, // 指定回调
    14                 Tuple.Create(_process, context, execution), // 创建三元组上下文,注意一下1元组这个对象
    15                 _threadFactory);
    16         }
    17  
    18 public BackgroundDispatcher(  // 第二步
    19             [NotNull] IBackgroundExecution execution,
    20             [NotNull] Action<Guid, object> action,
    21             [CanBeNull] object state,
    22             [NotNull] Func<ThreadStart, IEnumerable<Thread>> threadFactory)
    23         {
    24    
    25             _state = state;
    26  
    27             var threads = threadFactory(DispatchLoop)?.ToArray();
    28            
    29             foreach (var thread in threads)
    30             {
    31                 thread.Start(); // 执行线程
    32             }
    33         }
    34  
    35 private void DispatchLoop() // 第三步
    36         {
    37             try
    38             {
    39                 _execution.Run(_action, _state);  // 在run里面回调_action
    40             }
    41             catch (Exception ex)
    42             {
    43             }
    44             finally
    45             {
    46                 try
    47                 {
    48                     _stopped.Signal();
    49                 }
    50                 catch (ObjectDisposedException)
    51                 {
    52                 }
    53             }
    54         }
    55  
    56 private static void ExecuteProcess(Guid executionId, object state) // 第四步 回调方法,对应上面的指定回调
    57         {
    58             var tuple = (Tuple<IBackgroundProcess, BackgroundServerContext,  BackgroundExecution>)state;
    59             var serverContext = tuple.Item2;
    60             var context = new BackgroundProcessContext( // 创建公共上下文
    61                 serverContext.ServerId,
    62                 serverContext.Storage,
    63                 serverContext.Properties.ToDictionary(x => x.Key, x => x.Value),
    64                 executionId,
    65                 serverContext.StoppingToken,
    66                 serverContext.StoppedToken,
    67                 serverContext.ShutdownToken);
    68             while (!context.IsStopping)
    69             {
    70                 tuple.Item1.Execute(context); // 执行自己元组对应的实例
    71                 tuple.Item3.NotifySucceeded();
    72             }
    73         }
    上面有点乱啊,我大概简单串起来说一下。第一步在create方法里面创建了BackgroundDispatcher并指定了元组参数-》第二步绑定线程的执行函数Loop并且执行-》第三步执行Loop并且回调_action委托-》第四步_action参数对应的函数地址就是ExecuteProcess,最后在ExecuteProcess里面通过元组参数调用对应的任务类型,自此7种任务类型启动并开始工作。以上代码还有个细节需要说明一下,Tuple.Create(_process, context, execution)。元组的第一个参数,其类型为IBackgroundProcess,看下定义。
    1 public interface IBackgroundProcess : IServerProcess
    2     {
    3         void Execute([NotNull] BackgroundProcessContext context);
    4     }
    接口就定义了一个方法,没什么特别的,但是它的几个实现类就是我们单独的任务类,我们下面要说的RecurringJobScheduler循环任务类也实现了这个接口。到此我们的RecurringJobScheduler循环定时任务线程就算开始执行了。
    RecurringJobScheduler循环定时任务机制
    照旧看下这个类型的定义
     1 public class RecurringJobScheduler : IBackgroundProcess
     2     {
     3         // 其他代码
     4         public RecurringJobScheduler(
     5             [NotNull] IBackgroundJobFactory factory,
     6             TimeSpan pollingDelay,
     7             [NotNull] ITimeZoneResolver timeZoneResolver,
     8             [NotNull] Func<DateTime> nowFactory)
     9         {
    10             if (factory == null) throw new ArgumentNullException(nameof(factory));
    11             if (nowFactory == null) throw new ArgumentNullException(nameof(nowFactory));
    12             if (timeZoneResolver == null) throw new ArgumentNullException(nameof(timeZoneResolver));
    13  
    14  
    15             _factory = factory;
    16             _nowFactory = nowFactory;
    17             _timeZoneResolver = timeZoneResolver;
    18             _pollingDelay = pollingDelay;
    19             _profiler = new SlowLogProfiler(_logger);
    20         }
    21  
    22  
    23         /// <inheritdoc />
    24         public void Execute(BackgroundProcessContext context) // 实现方法
    25         {
    26             if (context == null) throw new ArgumentNullException(nameof(context));
    27  
    28  
    29             var jobsEnqueued = 0;
    30  
    31  
    32             while (EnqueueNextRecurringJobs(context)) // 从数据库获取定时任务
    33             {
    34                 jobsEnqueued++;
    35  
    36  
    37                 if (context.IsStopping)
    38                 {
    39                     break;
    40                 }
    41             }
    42  
    43  
    44             if (jobsEnqueued != 0)
    45             {
    46                 _logger.Debug($"{jobsEnqueued} recurring job(s) enqueued.");
    47             }
    48  
    49  
    50             if (_pollingDelay > TimeSpan.Zero)
    51             {
    52                 context.Wait(_pollingDelay);
    53             }
    54             else
    55             {
    56                 var now = _nowFactory();
    57                 context.Wait(now.AddMilliseconds(-now.Millisecond).AddSeconds(-now.Second).AddMinutes(1) - now);
    58             }
    59         }
    60     }
    承上,调用元组的第一个参数的execute方法,RecurringJobScheduler的execute方法得以执行,该方法就干一件事,每隔15秒从数据库获取待执行的计划,每次1000条数据。通过EnqueueNextRecurringJobs方法获取任务。
     1 private bool EnqueueNextRecurringJobs(BackgroundProcessContext context)
     2         {
     3             return UseConnectionDistributedLock(context.Storage, connection => 
     4             {
     5                 var result = false;
     6                 if (IsBatchingAvailable(connection))
     7                 {
     8                     var now = _nowFactory();
     9                     var timestamp = JobHelper.ToTimestamp(now);
    10                     var recurringJobIds =  ((JobStorageConnection)connection).GetFirstByLowestScoreFromSet("recurring-jobs", 0,  timestamp, BatchSize); // 从数据库里面查询
    11                     if (recurringJobIds == null || recurringJobIds.Count == 0) return  false;
    12                     foreach (var recurringJobId in recurringJobIds)
    13                     {
    14                         if (context.IsStopping) return false;
    15                         if (TryEnqueueBackgroundJob(context, connection, recurringJobId,  now))// 排队执行
    16                         {
    17                             result = true;
    18                         }
    19                     }
    20                 }
    21                 else
    22                 {
    23                     for (var i = 0; i < BatchSize; i++)
    24                     {
    25                         if (context.IsStopping) return false;
    26                         var now = _nowFactory();
    27                         var timestamp = JobHelper.ToTimestamp(now);
    28                         var recurringJobId =  connection.GetFirstByLowestScoreFromSet("recurring-jobs", 0, timestamp);
    29                         if (recurringJobId == null) return false;
    30                         if (!TryEnqueueBackgroundJob(context, connection, recurringJobId,  now))
    31                         {
    32                             return false;
    33                         }
    34                     }
    35                 }
    36                 return result;
    37             });
    38         }
    GetFirstByLowestScoreFromSet方法从数据库Set表里面查询top1000数据,条件是key为recurring-jobs字符串(表示定时任务)并且 时间范围是0到当前时间。随后遍历这些jobids,排队执行,往下看TryEnqueueBackgroundJob方法的实现。
     1 private bool EnqueueBackgroundJob(
     2             BackgroundProcessContext context,
     3             IStorageConnection connection,
     4             string recurringJobId,
     5             DateTime now)
     6         {
     7             // 其他代码
     8             using (connection.AcquireDistributedRecurringJobLock(recurringJobId,  LockTimeout))
     9             {
    10                 try
    11                 {
    12                     var recurringJob = connection.GetRecurringJob(recurringJobId,  _timeZoneResolver, now);
    13                     if (recurringJob == null)
    14                     {
    15                         using (var transaction = connection.CreateWriteTransaction())
    16                         {
    17                             transaction.RemoveFromSet("recurring-jobs", recurringJobId);
    18                             transaction.Commit();
    19                         }
    20                         return false;
    21                     }
    22           
    23                     BackgroundJob backgroundJob = null;
    24                     IReadOnlyDictionary<string, string> changedFields;
    25                     if (recurringJob.TrySchedule(out var nextExecution, out var error))
    26                     {
    27                         if (nextExecution.HasValue && nextExecution <= now)
    28                         {
    29                             backgroundJob = _factory.TriggerRecurringJob(context.Storage,  connection, _profiler, recurringJob, now);
    30                             if (String.IsNullOrEmpty(backgroundJob?.Id))
    31                             {
    32                                 _logger.Debug($"Recurring job '{recurringJobId}' execution  at '{nextExecution}' has been canceled.");
    33                             }
    34                         }
    35                         recurringJob.IsChanged(out changedFields, out nextExecution);
    36                     }
    37                     else if (recurringJob.RetryAttempt < MaxRetryAttemptCount)
    38                     {
    39                         var delay = _pollingDelay > TimeSpan.Zero ? _pollingDelay :  TimeSpan.FromMinutes(1);
    40                         
    41                         _logger.WarnException($"Recurring job '{recurringJobId}' can't be  scheduled due to an error and will be retried in {delay}.", error);
    42                         recurringJob.ScheduleRetry(delay, out changedFields, out  nextExecution);
    43                     }
    44                     else
    45                     {
    46                         _logger.ErrorException($"Recurring job '{recurringJobId}' can't be  scheduled due to an error and will be disabled.", error);
    47                         recurringJob.Disable(error, out changedFields, out nextExecution);
    48                     }
    49               
    50                     using (var transaction = connection.CreateWriteTransaction())
    51                     {
    52                         if (backgroundJob != null)
    53                         {
    54                             _factory.StateMachine.EnqueueBackgroundJob(
    55                                 context.Storage,
    56                                 connection,
    57                                 transaction,
    58                                 recurringJob,
    59                                 backgroundJob,
    60                                 "Triggered by recurring job scheduler",
    61                                 _profiler);
    62                         }
    63                         transaction.UpdateRecurringJob(recurringJob, changedFields,  nextExecution, _logger);
    64                         transaction.Commit();
    65                         return true;
    66                     }
    67                 }
    68                 catch (TimeZoneNotFoundException ex)
    69                 {
    70                 catch (Exception ex)
    71                 {
    72    
    73                 }
    74                 return false;
    75             }
    76         }
    需要注意的地方我都有加粗,该方法大概流程是:1.GetRecurringJob根据jobid从Hash表里面查询一条完整的定时任务,2.TrySchedule获取该任务的下次执行时间,如果下次执行时间小于当前,执行这条任务(并非真正执行定时任务,只是往job表里面写数据,真正执行任务由worker完成),3.获取下次执行时间&所有任务字段,4.状态机修改任务状态。定时任务就这样周而复始的重复执行以上流程。这里简单说下worker的执行机制,其实际就是轮询检索job表里面的数据执行任务表达式树,worker在hangfire里面默认开启了20个线程。第三部分就到这吧。
     
    避坑
    简单说下个人在改bug期间遇到的一些问题啊。
    1.时区问题,在添加定时任务时如果不指定时区信息,默认使用的是utc时间,我们中国是东8区,也就是说解析出来的执行时间会晚8个小时执行。解决办法有几种可以通过全局指定options的ITimeZoneResolver属性指定,也可以通过AddorUpdate方法指定,如果是指定时区信息,需要注意看板上面的异常信息,如果有异常会导致任务不执行,时区信息它是从系统里面检索出来的,没有就抛异常。就这样吧。
     
     
     
     
     
     
  • 相关阅读:
    javaweb开发之解决全站乱码
    redis加入开机启动服务
    linux下安装memcache
    关于本地连接虚拟机(centos)里的mongodb失败问题
    oracle存储过程中返回一个程序集
    面向对象进阶(二)----------类的内置方法
    面向对象进阶(一)
    面向对象的三大特性之----------封装
    面向对象的三大特性之----------多态
    面向对象的三大特性之----------继承
  • 原文地址:https://www.cnblogs.com/adair-blog/p/12490042.html
Copyright © 2011-2022 走看看