zoukankan      html  css  js  c++  java
  • Hangfire只允许同时运行同一个任务

    Hangfire有个机制可以确保所有任务都会被执行,如果当服务器停机了一段时间重新启动时,在此期间的周期任务会几乎同时执行。而大部分时候,我们希望同个周期任务每段时间只运行一个就行了。

    或者是如果周期任务设置得过于频繁,当之前的任务还没执行完,我们也不希望继续添加周期任务进队列去排队执行。

    Hangfire有提供一个扩展https://docs.hangfire.io/en/latest/background-processing/throttling.html 

    同个DisableConcurrentExecution我们可以限制同一个任务每次只会执行一个,但是如果有任务正在执行,这时候又有新任务过来,新任务并不会被删除而是处于排队状态,等待前面的任务执行完。

    而且,如果我们的任务用了同一个方法作为入口时(或者说我们需要根据方法的参数来确定是否为同一个任务),此时这个控制就不适用了。

    参考了https://gist.github.com/sbosell/3831f5bb893b20e82c72467baf8aefea,我们可以用过滤器来实现,将运行期间进来的任务给取消掉。

    代码的具体实现为:

     1     /// <summary>
     2     /// 禁用多个排队项目
     3     /// <remarks>同个任务取消并行执行,期间进来的任务不会等待,会被取消</remarks>
     4     /// </summary>
     5     public class DisableMultipleQueuedItemsFilter : JobFilterAttribute, IClientFilter, IServerFilter
     6     {
     7         private static readonly TimeSpan LockTimeout = TimeSpan.FromSeconds(5);
     8         private static readonly TimeSpan FingerprintTimeout = TimeSpan.FromHours(4);//任务执行超时时间
     9 
    10         public void OnCreating(CreatingContext filterContext)
    11         {
    12             var recurringJobId = filterContext.GetJobParameter<string>("RecurringJobId");
    13             if (!string.IsNullOrEmpty(recurringJobId)&&!AddFingerprintIfNotExists(filterContext.Connection, recurringJobId))
    14             {
    15                 filterContext.Canceled = true;
    16             }
    17         }
    18 
    19         public void OnPerformed(PerformedContext filterContext)
    20         {
    21             var recurringJobId = filterContext.GetJobParameter<string>("RecurringJobId");
    22             if (!string.IsNullOrEmpty(recurringJobId))
    23             {
    24                 RemoveFingerprint(filterContext.Connection, recurringJobId);
    25             }
    26         }
    27 
    28         private static bool AddFingerprintIfNotExists(IStorageConnection connection, string recurringJobId)
    29         {
    30             using (connection.AcquireDistributedLock(GetFingerprintLockKey(recurringJobId), LockTimeout))
    31             {
    32                 var fingerprint = connection.GetAllEntriesFromHash(GetFingerprintKey(recurringJobId));
    33 
    34                 if (fingerprint != null &&
    35                     fingerprint.ContainsKey("Timestamp") &&
    36                     DateTimeOffset.TryParse(fingerprint["Timestamp"], null, DateTimeStyles.RoundtripKind, out var timestamp) &&
    37                     DateTimeOffset.UtcNow <= timestamp.Add(FingerprintTimeout))
    38                 {
    39                     // 有任务还未执行完,并且没有超时
    40                     return false;
    41                 }
    42 
    43                 // 没有任务执行,或者该任务已超时
    44                 connection.SetRangeInHash(GetFingerprintKey(recurringJobId), new Dictionary<string, string>
    45             {
    46                 { "Timestamp", DateTimeOffset.UtcNow.ToString("o") }
    47             });
    48 
    49                 return true;
    50             }
    51         }
    View Code

    在OnCreating方法中,我们读取RecurringJobId的值,获取周期任务的id(同样的id代表同一个周期任务),然后以这个id为key去设置一个超时。如果在此期间,如果拿到了key的值,以及设置的时间还未超时的话,我们通过设置filterContext.Canceled = true取消掉此任务。

     使用connection.AcquireDistributedLock在设置键值时添加分布式锁,确保不会同时设置了多个相同的任务。使用connection.SetRangeInHash键RecurringJobId作为key,当前时间作为值保存。以此来确保在FingerprintTimeout的超时时间内,同个RecurringJobId的任务只能创建一个。或者等任务执行完后在OnPerformed方法中释放掉这个键值。

    在OnPerformed方法中,将我们在创建方法中设置的RecurringJobId key和对应的时间给删除,这样OnCreating可以继续创建同一个RecurringJobId 的任务。

     

    或者是普通触发的任务,这时候没有RecurringJobId 我们希望可以同个参数来控制,同样的参数不能同时执行。我们可以通过这个方法来生成相应的key

     1         private static string GetFingerprint(Job job)
     2         {
     3             var parameters = string.Empty;
     4             if (job?.Arguments != null)
     5             {
     6                 parameters = string.Join(".", job.Arguments);
     7             }
     8             if (job?.Type == null || job.Method == null)
     9             {
    10                 return string.Empty;
    11             }
    12             var payload = $"{job.Type.FullName}.{job.Method.Name}.{parameters}";
    13             var hash = SHA256.Create().ComputeHash(System.Text.Encoding.UTF8.GetBytes(payload));
    14             var fingerprint = Convert.ToBase64String(hash);
    15             return fingerprint;
    16         }    
    View Code

    这样我们就能确保我们希望的同一个任务不会同时在执行,而且周期任务也不会继续在队列中排队


    考虑到写死锁的key值不太合理,现添加特性来处理。

    添加DisableMultipleInstanceAttribute特性,添加WebApiPullDisableMultipleInstance默认实现,取参数列表的第一个参数当做key

     1     /// <summary>
     2     /// 后台任务禁用重复任务排队
     3     /// </summary>
     4     [AttributeUsage(AttributeTargets.Class)]
     5     public class DisableMultipleInstanceAttribute: BackgroundJobAttribute
     6     {
     7         private string _fingerprint;
     8         public DisableMultipleInstanceAttribute(string fingerprint = null)
     9         {
    10             _fingerprint = fingerprint;
    11         }
    12 
    13         public virtual string GetFingerprint(IReadOnlyList<object> methodArgs)
    14         {
    15             return _fingerprint;
    16         }
    17 
    18         public MultiTenancySides MultiTenancySides { get; set; } = MultiTenancySides.Tenant;
    19     }
    20 
    21     /// <summary>
    22     /// 接口禁用重复任务排队
    23     /// </summary>
    24     public class WebApiPullDisableMultipleInstanceAttribute: DisableMultipleInstanceAttribute
    25     {
    26         public override string GetFingerprint(IReadOnlyList<object> methodArgs)
    27         {
    28             return methodArgs[0].ToString();
    29         }
    30     }
    View Code

    在基类中添加特性

    添加hangfire的JobFilterAttributeFilterProvider的实现CustomJobAttributeFilterProvider,重写GetTypeAttributes方法,添加我们新增的特性

     1     protected override IEnumerable<JobFilterAttribute> GetTypeAttributes(Job job)
     2         {
     3             foreach (var attribute in ReflectedAttributeCache.GetTypeFilterAttributes(job.Type))
     4             {
     5                 if (attribute is CaptureContextAttribute)
     6                 {
     7                     yield return new CaptureContextMessageAttribute();
     8                 }
     9 
    10                 if (attribute is AutomaticRetryAttribute automaticRetry)
    11                 {
    12                     yield return new AutoRetryAttribute() { Attempts = automaticRetry.AutomaticRetry };
    13                 }
    14 
    15                 if (attribute is WebApiPullDisableMultipleInstanceAttribute apiPullDisable)
    16                 {
    17                     yield return new DisableMultipleQueuedItemsAttribute(apiPullDisable);
    18                 }
    19             }
    20         }
    View Code

    在DisableMultipleQueuedItemsAttribute的OnCreating中调用_disableMultipleInstanceAttribute.GetFingerprint获取分布式锁的key

      1     /// <summary>
      2     /// 禁用多个排队项目
      3     /// <remarks>同个任务取消并行执行,期间进来的任务不会等待,会被取消</remarks>
      4     /// </summary>
      5     public class DisableMultipleQueuedItemsAttribute : JobFilterAttribute, IClientFilter, IServerFilter
      6     {
      7         private static readonly TimeSpan LockTimeout = TimeSpan.FromSeconds(5);
      8         private static readonly TimeSpan FingerprintTimeout = TimeSpan.FromHours(4);//任务执行超时时间
      9 
     10         private readonly DisableMultipleInstanceAttribute _disableMultipleInstanceAttribute;
     11         public DisableMultipleQueuedItemsAttribute(DisableMultipleInstanceAttribute attribute)
     12         {
     13             _disableMultipleInstanceAttribute = attribute;
     14         }
     15 
     16         public void OnCreating(CreatingContext filterContext)
     17         {
     18             var fingerprintKey = _disableMultipleInstanceAttribute.GetFingerprint(filterContext.Job.Args);
     19             if (string.IsNullOrEmpty(fingerprintKey))
     20             {
     21                 throw new AppFatalExceptions("唯一键为空");
     22             }
     23             if (_disableMultipleInstanceAttribute.MultiTenancySides==MultiTenancySides.Tenant)
     24             {
     25                 var contextMessage = filterContext.GetJobParameter<ContextMessage>("_ld_contextMessage");
     26                 if (string.IsNullOrEmpty(contextMessage.TenantId))
     27                 {
     28                     throw new AppFatalExceptions("租户Id为空");
     29                 }
     30                 fingerprintKey = $"{contextMessage.TenantId}:{fingerprintKey}";
     31             }
     32             if (!AddFingerprintIfNotExists(filterContext.Connection, fingerprintKey))
     33             {
     34                 filterContext.Canceled = true;
     35             }
     36         }
     37 
     38         public void OnPerformed(PerformedContext filterContext)
     39         {
     40             var fingerprintKey = _disableMultipleInstanceAttribute.GetFingerprint(filterContext.BackgroundJob.Job.Args);
     41             if (_disableMultipleInstanceAttribute.MultiTenancySides == MultiTenancySides.Tenant)
     42             {
     43                 var contextMessage = filterContext.GetJobParameter<ContextMessage>("_ld_contextMessage");
     44                 fingerprintKey = $"{contextMessage.TenantId}:{fingerprintKey}";
     45             }
     46             RemoveFingerprint(filterContext.Connection, fingerprintKey);
     47         }
     48 
     49         private static bool AddFingerprintIfNotExists(IStorageConnection connection, string fingerprintKey)
     50         {
     51             using (connection.AcquireDistributedLock(GetFingerprintLockKey(fingerprintKey), LockTimeout))
     52             {
     53                 var fingerprint = connection.GetAllEntriesFromHash(GetFingerprintKey(fingerprintKey));
     54 
     55                 if (fingerprint != null &&
     56                     fingerprint.ContainsKey("Timestamp") &&
     57                     DateTimeOffset.TryParse(fingerprint["Timestamp"], null, DateTimeStyles.RoundtripKind, out var timestamp) &&
     58                     DateTimeOffset.UtcNow <= timestamp.Add(FingerprintTimeout))
     59                 {
     60                     // 有任务还未执行完,并且没有超时
     61                     return false;
     62                 }
     63 
     64                 // 没有任务执行,或者该任务已超时
     65                 connection.SetRangeInHash(GetFingerprintKey(fingerprintKey), new Dictionary<string, string>
     66             {
     67                 { "Timestamp", DateTimeOffset.UtcNow.ToString("o") }
     68             });
     69 
     70                 return true;
     71             }
     72         }
     73 
     74         private static void RemoveFingerprint(IStorageConnection connection, string recurringJobId)
     75         {
     76             using (connection.AcquireDistributedLock(GetFingerprintLockKey(recurringJobId), LockTimeout))
     77             using (var transaction = connection.CreateWriteTransaction())
     78             {
     79                 transaction.RemoveHash(GetFingerprintKey(recurringJobId));
     80                 transaction.Commit();
     81             }
     82         }
     83 
     84         private static string GetFingerprintLockKey(string key)
     85         {
     86             return String.Format("{0}:lock", key);
     87         }
     88 
     89         private static string GetFingerprintKey(string key)
     90         {
     91             return String.Format("fingerprint:{0}", key);
     92         }
     93 
     94 
     95         void IClientFilter.OnCreated(CreatedContext filterContext)
     96         {
     97             
     98         }
     99 
    100         void IServerFilter.OnPerforming(PerformingContext filterContext)
    101         {
    102         }
    103 
    104     }
    View Code
  • 相关阅读:
    《时间的朋友》跨年演讲金句
    2017《时间的朋友》罗振宇跨年演讲ppt
    Calculus on Computational Graphs: Backpropagation
    Machine Learning Trick of the Day (1): Replica Trick
    What Does “Neurons that Fire Together Wire Together” Mean?
    How to Tell Science Stories with Maps
    模型选择的一些基本思想和方法
    A Gentle Guide to Machine Learning
    EM算法(Expectation Maximization Algorithm)
    混合高斯模型(Mixtures of Gaussians)和EM算法
  • 原文地址:https://www.cnblogs.com/Cyril-hcj/p/13955013.html
Copyright © 2011-2022 走看看