编写了一个任务分配系统,基本流程是任务发起者发起一个周期任务,指定好周期,接收人定期收到任务。刚开始使用的是Hangfire,很容易跑起来,还自带一个管理控制台。跑起来后发现Hangfire有一些不足之处,最主要的是它的Cron表达式非常不标准,例如L、W等都不能使用,这个问题大了,果断放弃转入Quartz.net。Quartz上手有点困难,主要是有一些基本的概念要先理解,看了看官方文档了解个大概,立马着手。
领域模型
using System;
using System.Collections.Generic;
using System.ComponentModel;
using System.ComponentModel.DataAnnotations;
using System.ComponentModel.DataAnnotations.Schema;
using Abp.Domain.Entities;
using Kde6.Core.Users;
namespace Kde6.Core.Schedules
{
[Table("KdeScheduleMission")]
public class ScheduleMission: Entity, ISoftDelete
{
public virtual User Initiator { get; set; }
public long InitiatorId { get; set; }
[MaxLength(100)]
public string Title { get; set; }
[Required]
public string Content { get; set; }
[Range(1,10)]
public int Priority { get; set; }
public DateTime CreateTime { get; set; }
public bool IsPublic { get; set; }
public int LimitDays { get; set; }
[MaxLength(100)]
public string CronExpression { get; set; }
[MaxLength(100)]
public string TriggerId { get; set; }
public bool IsDeleted { get; set; }
[EditorBrowsable(EditorBrowsableState.Never)]
public string InternalData { get; set; }
[NotMapped]
public long[] RespondentIds
{
get
{
return Array.ConvertAll(InternalData.Split(';'), Convert.ToInt64);
}
set
{
InternalData = String.Join(";", value);
}
}
public virtual ICollection<MissionHistory> Histories { get; set; }
}
[Table("KdeMissionHistory")]
public class MissionHistory : Entity
{
public virtual ScheduleMission Mission { get; set; }
public int MissionId { get; set; }
public virtual User Respondant { get; set; }
public long RespondantId { get; set; }
public DateTime ReceiveTime { get; set; }
public DateTime? ConfirmTime { get; set; }
public bool IsComplete { get; set; }
public DateTime? CompleteTime { get; set; }
[MaxLength(1000)]
public string Note { get; set; }
}
}
ScheduleMission类本来应该跟我的用户类User是多对多的关系,但是当我使用ICollection<User> Respondents
属性时,在Job执行的时候,只要一加载Respondents,就会报connection has been disposed错误,查询了很多资料,包括修改ScheduleFactory等方法,都无法解决。无奈只好用些非常规方法,在ScheduleMission类中存储接收人的id,使用分号分割,好在接收人一般不是很多,在任务没有分发时,查询任务接收人的需求不是很频繁。
应用层代码
namespace Kde6
{
[DependsOn(typeof(Kde6CoreModule), typeof(AbpAutoMapperModule), typeof(AbpQuartzModule))]
public class Kde6ApplicationModule : AbpModule
{
public override void PreInitialize()
{
Configuration.Modules.AbpAutoMapper().Configurators.Add(mapper =>
{
mapper.AddProfile<ScheduleProfile>();
});
}
public override void Initialize()
{
IocManager.RegisterAssemblyByConvention(Assembly.GetExecutingAssembly());
// 创建Job
IScheduler scheduler = Configuration.Modules.AbpQuartz().Scheduler;
var missionJob = scheduler.GetJobDetail(new JobKey("missionJob", "OfficeGroup"));
if (missionJob == null)
{
missionJob = JobBuilder.Create<MissionJob>()
.WithIdentity("missionJob", "OfficeGroup")
.WithDescription("执行定时任务")
.StoreDurably(true)
.Build();
scheduler.AddJob(missionJob, true);
}
}
}
}
首先,应用层模块要依赖AbpQuartzModule,在官方的示例代码中,每次添加job都会生成一个新的Job和Trigger,但是我的missionJob的工作流程很简单,查询所有接收人,根据任务内容,生成一个MissionHistory ,如果每次都生成一个新的Job,感觉有点浪费资源,所有我在模块加载时,首先判断如果missionJob没有生成,则生成一个,一个missionJob对应多个触发器。
namespace Kde6.Application.Schedules
{
public class MissionJob : JobBase, ISingletonDependency
{
private readonly IMessageAppService _messageAppService;
private readonly IRepository<ScheduleMission> _missionRepository;
private readonly IRepository<MissionHistory> _historyRepository;
public MissionJob(IMessageAppService messageAppService
,IRepository<ScheduleMission> missionRepository,
IRepository<MissionHistory> historyRepository)
{
_messageAppService = messageAppService;
_missionRepository = missionRepository;
_historyRepository = historyRepository;
}
public override void Execute(IJobExecutionContext context)
{
JobDataMap dataMap = context.MergedJobDataMap;
int missionId = dataMap.GetIntValue("ScheduleMissionId");
Logger.Debug($"Read ScheduleMissionId = {missionId}");
var mission = _missionRepository.Get(missionId);
Logger.Debug($"Read ScheduleMission Title: {mission.Title}");
Logger.Debug($"Three Are {mission.RespondentIds.Length} Respondants");
DateTime now = DateTime.Now;
Array.ForEach(mission.RespondentIds, id =>
{
var history = new MissionHistory()
{
RespondantId = id,
MissionId = missionId,
ReceiveTime = now
};
_historyRepository.Insert(history);
Logger.Debug($"Assign mission to user {id}");
});
}
}
}
MissionJob 的工作很简单:根据通过IJobExecutionContext传递过来的任务id,查询任务对应的所有用户,在MissionHistory中添加纪录。这里就是在前面提到的如果用ICollection
private readonly IRepository<ScheduleMission> _missionRepository;
private readonly IRepository<MissionHistory> _historyRepository;
private readonly IScheduler _scheduler;
public MissionAppService(IRepository<ScheduleMission> missionRepository,
IRepository<MissionHistory> historyRepository,
IAbpQuartzConfiguration quartzConfiguration)
{
_missionRepository = missionRepository;
_historyRepository = historyRepository;
_scheduler = quartzConfiguration.Scheduler;
}
Abp官方代码中直接一句_jobManager.ScheduleAsync()
就把任务分配完了,很强大,但是如我我想对Job进行一个配置,就不好操作了,因为我要给Trigger生成一个唯一的id,方便用户查询自己的trigger的运行情况(是否过期,下次运行时间等),另外还要把给Trigger指定missionId,否则它不知道去执行哪个任务。这里通过quartzConfiguration.Scheduler参数注入,得到IScheduler。
public void AddMission(AddMissionInput input)
{
var mission = input.MapTo<ScheduleMission>();
mission.CreateTime = DateTime.Now;
mission.InitiatorId = AbpSession.UserId.Value;
mission.TriggerId = Guid.NewGuid().ToString();
int missionId = _missionRepository.InsertAndGetId(mission);
var job = _scheduler.GetJobDetail(new JobKey("missionJob", "OfficeGroup"));
var trigger = TriggerBuilder.Create()
.WithIdentity(mission.TriggerId, "OfficeGroup")
.StartNow()
.UsingJobData("ScheduleMissionId", missionId)
.WithCronSchedule(input.CronExpression)
.ForJob(job)
.Build();
try
{
_scheduler.ScheduleJob(trigger);
}
catch (Exception e)
{
_missionRepository.Delete(missionId);
throw new UserFriendlyException("创建计划任务失败");
}
}
Quartz的trigger的触发流程没有看到,Hangfire中写的比较详细,无奈英语比较渣,大概意思是不要在trigger中存放太多内容,因为他要将参数序列化到数据库,在Trigger触发的时候,会反序列化出所有的参数。所以在Quartz中,我只在trigger中存放了一个missionId,其它的参数由trigger查询数据库得到。
public IEnumerable<InitMissionDto> GetInitMissions(long userId)
{
var list = _missionRepository.GetAll()
.Where(m => m.InitiatorId == userId)
.OrderByDescending(m=>m.Id)
.ProjectTo<InitMissionDto>()
.ToList();
foreach (var item in list)
{
var trigger = _scheduler.GetTrigger(new TriggerKey(item.TriggerId, "OfficeGroup"));
if (trigger == null)
{
item.State = "已失效";
}
else
{
if (trigger.GetMayFireAgain())
{
item.State = "正在进行";
var nextFireTimeUtc = trigger.GetNextFireTimeUtc();
if (nextFireTimeUtc != null)
item.NextFireDate = nextFireTimeUtc.Value.DateTime.ToLocalTime();
}
else
{
item.State = "已完成";
}
}
}
return list;
}
上面的代码就是查询发起人发起的所有任务的代码,通过_scheduler.GetTrigger
可以查询到任务的详细信息
public void DeleteInitMissions(DeleteInitMissionInput input)
{
var missions = _missionRepository.GetAll().Where(m => input.Ids.Contains(m.Id)).ToList();
missions.ForEach(mission =>
{
var ids = mission.Histories.Select(x => x.Id).ToArray();
_historyRepository.Delete(h => ids.Contains(h.Id));
_scheduler.UnscheduleJob(new TriggerKey(mission.TriggerId, "OfficeGroup"));
_missionRepository.Delete(mission);
});
}```
最后是删除任务,删除任务时,首先将接收人收到的任务全部删除,然后使用_scheduler.UnscheduleJob()方法将trigger删除,最后删除任务
最后,放几张图
![](http://images2015.cnblogs.com/blog/418928/201704/418928-20170430210319240-20884007.png)
![](http://images2015.cnblogs.com/blog/418928/201704/418928-20170430210503850-1023101359.png)
![](http://images2015.cnblogs.com/blog/418928/201704/418928-20170430210553147-1258522197.png)