zoukankan      html  css  js  c++  java
  • Abp集成Quartz.net记录

    编写了一个任务分配系统,基本流程是任务发起者发起一个周期任务,指定好周期,接收人定期收到任务。刚开始使用的是Hangfire,很容易跑起来,还自带一个管理控制台。跑起来后发现Hangfire有一些不足之处,最主要的是它的Cron表达式非常不标准,例如L、W等都不能使用,这个问题大了,果断放弃转入Quartz.net。Quartz上手有点困难,主要是有一些基本的概念要先理解,看了看官方文档了解个大概,立马着手。

    领域模型

    using System;
    using System.Collections.Generic;
    using System.ComponentModel;
    using System.ComponentModel.DataAnnotations;
    using System.ComponentModel.DataAnnotations.Schema;
    using Abp.Domain.Entities;
    using Kde6.Core.Users;
    
    namespace Kde6.Core.Schedules
    {
        [Table("KdeScheduleMission")]
        public class ScheduleMission: Entity, ISoftDelete
        {
            public virtual User Initiator { get; set; }
    
            public long InitiatorId { get; set; }
    
            [MaxLength(100)]
            public string Title { get; set; }
    
            [Required]
            public string Content { get; set; }
    
            [Range(1,10)]
            public int Priority { get; set; }
    
            public DateTime CreateTime { get; set; }
    
            public bool IsPublic { get; set; }
    
            public int LimitDays { get; set; }
    
            [MaxLength(100)]
            public string CronExpression { get; set; }
    
            [MaxLength(100)]
            public string TriggerId { get; set; }
    
            public bool IsDeleted { get; set; }
    
            [EditorBrowsable(EditorBrowsableState.Never)]
            public string InternalData { get; set; }
    
            [NotMapped]
            public long[] RespondentIds
            {
                get
                {
                    return Array.ConvertAll(InternalData.Split(';'), Convert.ToInt64);
                }
                set
                {
                    InternalData = String.Join(";", value);
                }
            }
    
            public virtual ICollection<MissionHistory> Histories { get; set; }
        }
    
        [Table("KdeMissionHistory")]
        public class MissionHistory : Entity
        {
            public virtual ScheduleMission Mission { get; set; }
    
            public int MissionId { get; set; }
    
            public virtual User Respondant { get; set; }
    
            public long RespondantId { get; set; }
    
            public DateTime ReceiveTime { get; set; }
    
            public DateTime? ConfirmTime { get; set; }
    
            public bool IsComplete { get; set; }
    
            public DateTime? CompleteTime { get; set; }
    
            [MaxLength(1000)]
            public string Note { get; set; }
        }
    }
    

    ScheduleMission类本来应该跟我的用户类User是多对多的关系,但是当我使用ICollection<User> Respondents属性时,在Job执行的时候,只要一加载Respondents,就会报connection has been disposed错误,查询了很多资料,包括修改ScheduleFactory等方法,都无法解决。无奈只好用些非常规方法,在ScheduleMission类中存储接收人的id,使用分号分割,好在接收人一般不是很多,在任务没有分发时,查询任务接收人的需求不是很频繁。

    应用层代码

    namespace Kde6
    {
        [DependsOn(typeof(Kde6CoreModule), typeof(AbpAutoMapperModule), typeof(AbpQuartzModule))]
        public class Kde6ApplicationModule : AbpModule
        {
            public override void PreInitialize()
            {
                Configuration.Modules.AbpAutoMapper().Configurators.Add(mapper =>
                {
                    mapper.AddProfile<ScheduleProfile>();
                });
            }
    
            public override void Initialize()
            {
                IocManager.RegisterAssemblyByConvention(Assembly.GetExecutingAssembly());
    
                // 创建Job
                IScheduler scheduler = Configuration.Modules.AbpQuartz().Scheduler;
    
                var missionJob = scheduler.GetJobDetail(new JobKey("missionJob", "OfficeGroup"));
                if (missionJob == null)
                {
                    missionJob = JobBuilder.Create<MissionJob>()
                        .WithIdentity("missionJob", "OfficeGroup")
                        .WithDescription("执行定时任务")
                        .StoreDurably(true)
                        .Build();
    
                    scheduler.AddJob(missionJob, true);
                }
            }
        }
    }
    

    首先,应用层模块要依赖AbpQuartzModule,在官方的示例代码中,每次添加job都会生成一个新的Job和Trigger,但是我的missionJob的工作流程很简单,查询所有接收人,根据任务内容,生成一个MissionHistory ,如果每次都生成一个新的Job,感觉有点浪费资源,所有我在模块加载时,首先判断如果missionJob没有生成,则生成一个,一个missionJob对应多个触发器。

    namespace Kde6.Application.Schedules
    {
        public class MissionJob : JobBase, ISingletonDependency
        {
            private readonly IMessageAppService _messageAppService;
            private readonly IRepository<ScheduleMission> _missionRepository;
            private readonly IRepository<MissionHistory> _historyRepository;
    
    
            public MissionJob(IMessageAppService messageAppService
                ,IRepository<ScheduleMission> missionRepository,
                IRepository<MissionHistory> historyRepository)
            {
                _messageAppService = messageAppService;
                _missionRepository = missionRepository;
                _historyRepository = historyRepository;
            }
    
            public override void Execute(IJobExecutionContext context)
            {
                JobDataMap dataMap = context.MergedJobDataMap;
                int missionId = dataMap.GetIntValue("ScheduleMissionId");
                Logger.Debug($"Read ScheduleMissionId = {missionId}");
    
                var mission = _missionRepository.Get(missionId);
                Logger.Debug($"Read ScheduleMission Title: {mission.Title}");
                Logger.Debug($"Three Are {mission.RespondentIds.Length} Respondants");
    
                DateTime now = DateTime.Now;
                Array.ForEach(mission.RespondentIds, id =>
                {
                    var history = new MissionHistory()
                    {
                        RespondantId = id,
                        MissionId = missionId,
                        ReceiveTime = now
                    };
                    _historyRepository.Insert(history);
                    Logger.Debug($"Assign mission to user {id}");
                });
            }
        }
    }
    

    MissionJob 的工作很简单:根据通过IJobExecutionContext传递过来的任务id,查询任务对应的所有用户,在MissionHistory中添加纪录。这里就是在前面提到的如果用ICollection Respondents属性来加载接收人时,会报错的地方,一直无法解决...

    private readonly IRepository<ScheduleMission> _missionRepository;
            private readonly IRepository<MissionHistory> _historyRepository;
            private readonly IScheduler _scheduler;
    
            public MissionAppService(IRepository<ScheduleMission> missionRepository,
                IRepository<MissionHistory> historyRepository,
                IAbpQuartzConfiguration quartzConfiguration)
            {
                _missionRepository = missionRepository;
                _historyRepository = historyRepository;
                _scheduler = quartzConfiguration.Scheduler;
            }
    

    Abp官方代码中直接一句_jobManager.ScheduleAsync()就把任务分配完了,很强大,但是如我我想对Job进行一个配置,就不好操作了,因为我要给Trigger生成一个唯一的id,方便用户查询自己的trigger的运行情况(是否过期,下次运行时间等),另外还要把给Trigger指定missionId,否则它不知道去执行哪个任务。这里通过quartzConfiguration.Scheduler参数注入,得到IScheduler。

    public void AddMission(AddMissionInput input)
    {
        var mission = input.MapTo<ScheduleMission>();
        mission.CreateTime = DateTime.Now;
        mission.InitiatorId = AbpSession.UserId.Value;
        mission.TriggerId = Guid.NewGuid().ToString();
    
        int missionId = _missionRepository.InsertAndGetId(mission);
        var job = _scheduler.GetJobDetail(new JobKey("missionJob", "OfficeGroup"));
        var trigger = TriggerBuilder.Create()
            .WithIdentity(mission.TriggerId, "OfficeGroup")
            .StartNow()
            .UsingJobData("ScheduleMissionId", missionId)
            .WithCronSchedule(input.CronExpression)
            .ForJob(job)
            .Build();
        try
        {
            _scheduler.ScheduleJob(trigger);
        }
        catch (Exception e)
        {
            _missionRepository.Delete(missionId);
            throw new UserFriendlyException("创建计划任务失败");
        }
    }
    

    Quartz的trigger的触发流程没有看到,Hangfire中写的比较详细,无奈英语比较渣,大概意思是不要在trigger中存放太多内容,因为他要将参数序列化到数据库,在Trigger触发的时候,会反序列化出所有的参数。所以在Quartz中,我只在trigger中存放了一个missionId,其它的参数由trigger查询数据库得到。

    public IEnumerable<InitMissionDto> GetInitMissions(long userId)
    {
        var list = _missionRepository.GetAll()
            .Where(m => m.InitiatorId == userId)
            .OrderByDescending(m=>m.Id)
            .ProjectTo<InitMissionDto>()
            .ToList();
    
        foreach (var item in list)
        {
            var trigger = _scheduler.GetTrigger(new TriggerKey(item.TriggerId, "OfficeGroup"));
            if (trigger == null)
            {
                item.State = "已失效";
            }
            else
            {
                if (trigger.GetMayFireAgain())
                {
                    item.State = "正在进行";
                    var nextFireTimeUtc = trigger.GetNextFireTimeUtc();
                    if (nextFireTimeUtc != null)
                        item.NextFireDate = nextFireTimeUtc.Value.DateTime.ToLocalTime();
                }
                else
                {
                    item.State = "已完成";
                }
            }
        }
    
        return list;
    }
    

    上面的代码就是查询发起人发起的所有任务的代码,通过_scheduler.GetTrigger可以查询到任务的详细信息

    public void DeleteInitMissions(DeleteInitMissionInput input)
    {
        var missions = _missionRepository.GetAll().Where(m => input.Ids.Contains(m.Id)).ToList();
        missions.ForEach(mission =>
        {
            var ids = mission.Histories.Select(x => x.Id).ToArray();
            _historyRepository.Delete(h => ids.Contains(h.Id));
    
            _scheduler.UnscheduleJob(new TriggerKey(mission.TriggerId, "OfficeGroup"));
            _missionRepository.Delete(mission);
        });
    
    }```
    最后是删除任务,删除任务时,首先将接收人收到的任务全部删除,然后使用_scheduler.UnscheduleJob()方法将trigger删除,最后删除任务
    
    最后,放几张图
    ![](http://images2015.cnblogs.com/blog/418928/201704/418928-20170430210319240-20884007.png)
    
    ![](http://images2015.cnblogs.com/blog/418928/201704/418928-20170430210503850-1023101359.png)
    
    ![](http://images2015.cnblogs.com/blog/418928/201704/418928-20170430210553147-1258522197.png)
  • 相关阅读:
    Rabbit简单测试实例
    RabbitMQ-2 工作队列
    RabbitMQ-1 Helloword
    utmp
    导入wordpress数据库到mysql报错
    Tengine 反向代理状态检测
    阿里云服务器挖矿wipefs处理
    JbossMiner 挖矿蠕虫分析 (转载)
    centos6+nginx+php+mysql+memcached+wordpress
    php安装ZendGuardLoader扩展问题
  • 原文地址:https://www.cnblogs.com/Leman/p/6790419.html
Copyright © 2011-2022 走看看