zoukankan      html  css  js  c++  java
  • asp.net core microservices 架构之 分布式自动计算(二)

    一  简介                  

           上一篇介绍了zookeeper如何进行分布式协调,这次主要讲解quartz使用zookeeper进行分布式计算,因为上一篇只是讲解原理,而这次实际使用,zookeeperService更改了一部分代码,算是集成优化吧。

    系统结构图如下:

    上图展示了,两个部分,一个是故障灾难转移集群,一个实现了分片的功能。故障灾难转移集群是quartz自带的功能,就不多说,分片功能是自己实现的。在这里要说下quartz使用故障灾难转移集群的一些配置注意事项:

    再者就是netcore不支持remote,这个很重要,所以需要自己做一个web服务接口,但是本实例没有实现,而是仅仅使用数据库去配置和跟踪quartz服务,但是这是主要的。而使用api的一些功能就是实时开启,关闭,监控quartz主机状态,监控分片主机状态。所以大家留意这些功能暂时没有,不过大家在本文学会后很容易就可以自己扩展。

    在这里要感谢 github账号为 /Weiz.TaskManager 的一个quartz管理的项目。

    当然我这次的案例都是基于linux和mysql,而这个项目是sql server,所以我把sql全部替换了,更改了一些东西。后面会把代码全部放出来。界面如下图

    好,下面看代码实现。

    二 quartz故障灾难转移和分片功能             

     首先看结构:

    然后看Program入口方法:

     var host = new HostBuilder()
                        .UseEnvironment(EnvironmentName.Development)
                        .ConfigureAppConfiguration((hostContext, configApp) =>
                        {
                            configApp.SetBasePath(Directory.GetCurrentDirectory());
                            configApp.AddJsonFile(
                                  $"appsettings.{hostContext.HostingEnvironment.EnvironmentName}.json",
                                     optional: true);
                            configApp.AddEnvironmentVariables("PREFIX_");
                            configApp.AddCommandLine(args);
                            var rootConfiguration = configApp.Build();
                            QuartzOpt = new QuartzOption();
                            rootConfiguration.GetSection("Quartz").Bind(QuartzOpt); //绑定quartz的配置类的数据
                        }).ConfigureLogging((hostContext, configBuild) =>
                        {
                            configBuild.AddConfiguration(hostContext.Configuration.GetSection("Logging"));
                            configBuild.AddConsole();
                            configBuild.AddCustomizationLogger();
                        })
                        .ConfigureServices((hostContext, service) =>
                        {
                            service.AddKafka(KafkaBuilder =>
                            {
                                KafkaBuilder.AddConfiguration(hostContext.Configuration.GetSection("KafkaService"));
                            });
                            service.AddZookeeper(zookeeperBuilder =>
                            {
                                zookeeperBuilder.AddConfiguration(hostContext.Configuration.GetSection("zookeeperService"));
                            });
                            service.AddDbContext<QuartzDbContext>(option =>
                            option.UseMySQL(hostContext.Configuration.GetConnectionString("QuartzDatabase")), ServiceLifetime.Transient, ServiceLifetime.Transient);
                  //这个是操作数据库的数据库服务,这个和 quartz的cluster数据提供程序是分开的。 }) .Build(); Host
    = host; ILoggerFactory loggerFact = host.Services.GetService<ILoggerFactory>(); LogProvider.SetCurrentLogProvider(new ConsoleLogProvider(loggerFact)); //将框架的日志提供程序,传递给quart的日志接口。          var ischema = RunProgramRunExample(loggerFact); //从数据库构造job的方法 host.WaitForShutdown(); //netcore的通用主机。 ischema.Shutdown(true);//quartz自己的主机。

    quartz框架的架构和netcore目前的架构不兼容,netcore的通道和服务部件的软件架构方式,quartz先天不支持,你无法将任何上下文,比如host上下文,configuration上下文或者service上下文,传递给quartz。所以我使用了属性的方式:

         private ILoggerFactory _loggerFact;
    
            public static IHost Host { get; set; }
    
            public static String QUARTZ_INSTANCE_ID = "PREFIX_QUARTZ_INSTANCE_ID";
    
            public static QuartzOption QuartzOpt { get; set; }

    在quartz上下文是这样使用的:

          ILoggerFactory loggerFact = Program.Host.Services.GetService<ILoggerFactory>();
                var  _logger=loggerFact.CreateLogger<ZookeeperService>();

    而我在上一节说的不支持remote的解决方案,使用netcore的解决方案就是kestrel,netcoe宣称后面将要将webHost合并到通用主机里面,netcore确实目前发展较快,内部代码优化时,代码变动较大

    ,比如我上次扩展log模块,上一版本程序设计上和当前版本几乎没有什么可重用的,当然,对于开发者,并没有什么区别,因为肯定会保持兼容。

    咱们看quart的日志模块:ConsoleLogProvider

    using System;
    using Microsoft.Extensions.Logging;
    using Quartz.Logging;
    
    namespace  Walt.Framework.Quartz.Host
     {
         public class ConsoleLogProvider : ILogProvider
        {
                private ILoggerFactory _logFactory;
    
                public ConsoleLogProvider(ILoggerFactory logFactory)
                {
                    _logFactory=logFactory;
                }
                public Logger GetLogger(string name)
                {
                    return (level, func, exception, parameters) =>
                    {
                        if (func != null)
                        {
                            string logInfo=string.Format(func(), parameters);
                            var log=_logFactory.CreateLogger<ConsoleLogProvider>(); //将提供程序,替换为自定义的分布式log提供程序
                            log.LogDebug(logInfo);
                        }
                        return true;
                    };
                }
    
                public IDisposable OpenNestedContext(string message)
                {
                    throw new NotImplementedException();
                }
    
                public IDisposable OpenMappedContext(string key, string value)
                {
                    throw new NotImplementedException();
                }
        }
     }
     
     

    再看quartz的配置类:

    namespace Walt.Framework.Quartz.Host
    {
        public class QuartzOption
        {
            public string InsatanceId{get;set;}          //很重要,cluster中必须两个实例不一样。
    
            public string InstanceName{get;set;}          //quartz的实例名称,一般情况下用于显示名称。
    
            public bool IsClear{get;set;}                  //是否启动的时候清理job,因为cluster在数据库中有历史数据。
    
            public bool IsSlave{get;set;}                  //是不是slave,预留,暂时没用
    
            public int CustomerRecordCountForTest{get;set;} //分片时候,每个机器分到的需要处理的数据的数量
        }
    }

    然后就是咱们的主要方法:RunProgramRunExample

      private static IScheduler RunProgramRunExample(ILoggerFactory loggerFact)
            {
                var log = loggerFact.CreateLogger<Program>();
                try
                {
    
                    var config = Host.Services.GetService<IConfiguration>();
                    // Grab the Scheduler instance from the Factory
                    NameValueCollection properties = new NameValueCollection
                    {
                        ["quartz.scheduler.instanceName"] = QuartzOpt.InstanceName,
                        ["quartz.scheduler.instanceId"] = QuartzOpt.InsatanceId,
                        ["quartz.threadPool.type"] = "Quartz.Simpl.SimpleThreadPool, Quartz",
                        ["quartz.threadPool.threadCount"] = "5",
                        ["quartz.jobStore.misfireThreshold"] = "60000",
                        ["quartz.jobStore.type"] = "Quartz.Impl.AdoJobStore.JobStoreTX, Quartz",
                        ["quartz.jobStore.useProperties"] = "false",
                        ["quartz.jobStore.dataSource"] = "default",
                        ["quartz.jobStore.tablePrefix"] = "QRTZ_",
                        ["quartz.jobStore.clustered"] = "true",
                        ["quartz.jobStore.driverDelegateType"] = "Quartz.Impl.AdoJobStore.MySQLDelegate, Quartz",
                        ["quartz.dataSource.default.connectionString"] = config.GetConnectionString("QuatrzClustDatabase"),
                        ["quartz.dataSource.default.provider"] = "MySql",
                        ["quartz.serializer.type"] = "json",
                        ["quartz.scheduler.exporter.type"] = "Quartz.Simpl.RemotingSchedulerExporter, Quartz",  
              //从这个往下netcore都不支持,以上为集群配置,只要两个实例id不同的quartz,配置同一个数据源
    ,就会自动的按照cluster运行,还有一点就是如果分布到不同机器,一定要配置ntp时间服务器,同步时间。
    [
    "quartz.scheduler.exporter.port"] = "555", ["quartz.scheduler.exporter.bindName"] = "QuartzScheduler", ["quartz.scheduler.exporter.channelType"] = "tcp", ["quartz.scheduler.exporter.channelName"] = "httpQuartz", ["quartz.scheduler.exporter.rejectRemoteRequests"] = "true" }; StdSchedulerFactory factory = new StdSchedulerFactory(properties); IScheduler scheduler = factory.GetScheduler().GetAwaiter().GetResult(); string machine = Environment.MachineName; //获取当前的机器名 QuartzDbContext db = Host.Services.GetService<QuartzDbContext>(); var listQuartzTask = db.QuartzTask.Where(w => w.IsDelete == 0 && w.MachineName == machine && w.InstanceId == QuartzOpt.InsatanceId) .ToListAsync().GetAwaiter().GetResult(); //从数据库中获取这台机器和实例中的job log.LogDebug("从数据库获取task记录,详细信息:{0}", Newtonsoft.Json.JsonConvert.SerializeObject(listQuartzTask)); Dictionary<string,Assembly> collAssembly=new Dictionary<string, Assembly>(); //加载程序集 foreach (var item in listQuartzTask)//首先第一次加载全部的程序集 { //加载程序集 if (!string.IsNullOrEmpty(item.AssemblyName) && !collAssembly.ContainsKey(item.AssemblyName)) { try { collAssembly[item.AssemblyName] = AssemblyHelp.GetAssemblyByteByAssemblyName( Path.Combine(Directory.GetCurrentDirectory(), "AssemblyColl"), item.AssemblyName); } catch (Exception ep) { log.Log(Microsoft.Extensions.Logging.LogLevel.Error, 0, ep, "没有找到程序集."); Task.Delay(10000); continue; } } } // and start it off scheduler.Start(); // if (!QuartzOpt.IsSlave) // { var task = Task.Run(() => { bool isClear = QuartzOpt.IsClear; log.LogInformation("job监控程序开始循环,间隔为15秒"); while (true) //主要用来循环数据库记录,在添加或者修改job的时候,自动重新添加和执行job。 { try { if (scheduler != null) { log.LogDebug("检查scheduler是否开始"); if (scheduler.IsStarted) { if (isClear) //启动清理 { scheduler.Clear().GetAwaiter().GetResult(); isClear = false; } log.LogDebug("scheduler已经开始");
                                            db = Host.Services.GetService<QuartzDbContext>();
                                            listQuartzTask = db.QuartzTask.Where(w => w.IsDelete == 0
                                                && w.MachineName == machine && w.InstanceId == QuartzOpt.InsatanceId)
                                                .ToListAsync().GetAwaiter().GetResult(); //在循环中获取数据库中本机器和实例的job记录。
                                            log.LogDebug("从数据库获取task记录,详细信息:{0}", Newtonsoft.Json.JsonConvert.SerializeObject(listQuartzTask));
    foreach (var item in listQuartzTask)
                                            {//加载程序集
                                                if (!string.IsNullOrEmpty(item.AssemblyName) && !collAssembly.ContainsKey(item.AssemblyName)) //预先加载新添加的job的程序集
                                                {
                                                    try
                                                    {
                                                        collAssembly[item.AssemblyName] =
                                                        AssemblyHelp.GetAssemblyByteByAssemblyName(
                                                            Path.Combine(Directory.GetCurrentDirectory(), "AssemblyColl"), item.AssemblyName);
                                                    }
                                                    catch (Exception ep)
                                                    {
                                                        log.Log(Microsoft.Extensions.Logging.LogLevel.Error, 0, ep, "没有找到程序集.");
                                                        Task.Delay(10000);
                                                        continue;
                                                    }
                                                }
                                                log.LogDebug("开始检查task:{0}", Newtonsoft.Json.JsonConvert.SerializeObject(item));
                                                var jobKey = new JobKey(item.TaskName, item.GroupName);
                                                var triggerKey = new TriggerKey(item.TaskName, item.GroupName);
                                                if (scheduler.CheckExists(jobKey).Result) //如果存在,则根据状态处理相应的动作。
                                                {
                                                    var jobDetai = scheduler.GetJobDetail(jobKey);
                                                    var trigger = scheduler.GetTrigger(triggerKey);
                                                    log.LogDebug("此task已经存在scheduler中,数据库状态:{0},scheduer中的状态:{1}.trigger状态:{2}"
                                                    , ((OperateStatus)item.OperateStatus).ToString(), jobDetai.Status.ToString(), trigger.Status.ToString());
    
                                                    if ((OperateStatus)item.OperateStatus == OperateStatus.Stop) //如果数据库中停止job,则删除这个job,如果有remote,可以实时,处理,这断代码就没有用了,但是可以作为远程处理失败的预防错误,所以可以保留。
                                                    {
                                                        log.LogInformation("删除schduler中的job:{0}", jobKey.ToString());
                                                        if (!scheduler.DeleteJob(jobKey).GetAwaiter().GetResult())
                                                        {
                                                            log.LogError("删除job失败。name:{0},group:{1}", jobKey.Name, jobKey.Group);
                                                        }
                                                    }
                                                    else
                                                    {
                                                        if (jobDetai.IsFaulted) //如果失败,则更改数据库中job的状体,同理,如果有remote,这个可以作为预防错误。
                                                        {
                                                            if (jobDetai.Exception != null)
                                                            {
                                                                log.LogError(10005, jobDetai.Exception, "job faulted");
                                                            }
                                                            var jobItem = db.QuartzTask.FirstOrDefault(w => w.IsDelete == 0
                                                            && w.TaskName == jobKey.Name
                                                            && w.GroupName == jobKey.Group
                                                            && w.MachineName == machine
                                                            && w.InstanceId == scheduler.SchedulerInstanceId);
                                                            item.Status = (int)TaskStatus.Faulted;
                                                            item.OperateStatus = (int)OperateStatus.Stop;
                                                            db.Update<QuartzTask>(jobItem);
                                                            db.SaveChanges();
                                                        }
                                                        else //如果非执行状态,则中断
                                                        {
                                                            if (jobDetai.Status != TaskStatus.Running
                                                                && jobDetai.Status != TaskStatus.RanToCompletion
                                                                && jobDetai.Status != TaskStatus.WaitingForActivation
                                                                && jobDetai.Status != TaskStatus.WaitingForChildrenToComplete
                                                                && jobDetai.Status != TaskStatus.WaitingToRun)
                                                            {
                                                                var interTask = scheduler.Interrupt(jobKey, new CancellationToken(true))
                                                                .GetAwaiter().GetResult();
                                                                jobDetai.Start();
                                                            }
                                                        }
                                                    }
    
                                                    var triggerListener = scheduler.ListenerManager.GetTriggerListener("triggerUpdate");
                                                    
                                                    if (triggerListener == null)
                                                    {
                                                        triggerListener = new TriggerUpdateListens("trigger"+item.TaskName);
                                                        IMatcher<TriggerKey> triggermatcher = KeyMatcher<TriggerKey>.KeyEquals(triggerKey);
                                                        scheduler.ListenerManager.AddTriggerListener(triggerListener, triggermatcher);
                                                    }
    
                                                    var jobListener = scheduler.ListenerManager.GetJobListener("jobupdateListens");
                                                    if (jobListener == null)
                                                    {
                                                        IJobListener jobUpdateListener = new JobUpdateListens("job"+item.TaskName);
                                                        IMatcher<JobKey> jobmatcher = KeyMatcher<JobKey>.KeyEquals(jobKey);
                                                        scheduler.ListenerManager.AddJobListener(jobUpdateListener, jobmatcher);
                                                    }
                                                }
                                                else //如果不存在,则新添加job,以及执行
                                                {
                                                    log.LogInformation("添加新的job,判断是否状态为停止。");
                                                    if ((OperateStatus)item.OperateStatus != OperateStatus.Stop)
                                                    {
                                                        log.LogInformation("添加新的job");
                                                        var assemblyName = item.AssemblyName;
                                                        var className = item.ClassName;
    
    
                                                        Type jobTaskType = null;
                                                        try
                                                        {
                                                            jobTaskType = AssemblyHelp.GetTypeByAssemblyNameAndClassName(collAssembly[item.AssemblyName], className);
                                                            log.LogInformation("找到类型,type:{0}",className);
                                                        }
                                                        catch (Exception ep)
                                                        {
                                                            log.Log(Microsoft.Extensions.Logging.LogLevel.Error, 0, ep, "没有找到type.");
                                                        }
                                                        if (jobTaskType == null)
                                                        {
                                                            try
                                                            {
                                                                jobTaskType = AssemblyHelp
                                                                .GetTypeByCurrentAssemblyNameAndClassName(className, Assembly.GetExecutingAssembly());
                                                                if (jobTaskType == null)
                                                                {
                                                                    log.LogInformation("没有找到类型");
                                                                    continue;
                                                                }
                                                                log.LogInformation("找到类型,type:{0}",className);
                                                            }
                                                            catch (Exception ep)
                                                            {
                                                                log.Log(Microsoft.Extensions.Logging.LogLevel.Error, 0, ep, "没有找到类型.");
                                                                continue;
                                                            }
                                                        }
                                                        IJobDetail job = JobBuilder.Create(jobTaskType)
                                                            .WithIdentity(item.TaskName, item.GroupName)
                                                            .Build();
    
                                                        ITrigger trigger = TriggerBuilder.Create()
                                                            .WithIdentity(item.TaskName, item.GroupName)
                                                            .StartNow()
                                                            .WithCronSchedule(item.CronExpressionString)
                                                            .Build();
                                                        scheduler.ScheduleJob(job, trigger).GetAwaiter().GetResult();
                                                        log.LogInformation("添加成功,type:{0}",className);
                                                        ITriggerListener triggerListener = new TriggerUpdateListens("trigger"+item.TaskName);
                                                        IMatcher<TriggerKey> triggermatcher = KeyMatcher<TriggerKey>.KeyEquals(trigger.Key);
                                                        scheduler.ListenerManager.AddTriggerListener(triggerListener, triggermatcher);
    
    
                                                        IJobListener jobUpdateListener = new JobUpdateListens("job"+item.TaskName);
                                                        IMatcher<JobKey> jobmatcher = KeyMatcher<JobKey>.KeyEquals(job.Key);
                                                        scheduler.ListenerManager.AddJobListener(jobUpdateListener, jobmatcher);
                                                    }
                                                }
                                            }
                                        }
                                        else
                                        {
                                            log.LogInformation("scheduler is not IsStarted");
                                        }
                                    }
                                    else
                                    {
                                        log.LogInformation("scheduler is null");
                                    }
                                }
                                catch (Exception ep)
                                {
                                    log.Log(Microsoft.Extensions.Logging.LogLevel.Error, 0, ep, "task监控程序执行错误.");
                                }
                                Thread.Sleep(15000);
                            }
                        });
                    // }
                    // else
                    // {
                    //     db = Host.Services.GetService<QuartzDbContext>();
                    //     listQuartzTask = db.QuartzTask.Where(w => w.IsDelete == 0
                    //                                             && w.MachineName == machine
                    //                                             && w.InstanceId == QuartzOpt.InsatanceId)
                    //                                       .ToListAsync().GetAwaiter().GetResult();
                    //     foreach (var item in listQuartzTask)
                    //     {
                    //          var jobKey = new JobKey(item.TaskName, item.GroupName);
                    //          var triggerKey = new TriggerKey(item.TaskName, item.GroupName);
    
    
                    //         // var jobItem = db.QuartzTask.FirstOrDefault(w => w.IsDelete == 0
                    //         //                          && w.TaskName == jobKey.Name
                    //         //                          && w.GroupName == jobKey.Group
                    //         //                          && w.MachineName == machine
                    //         //                          && w.InstanceId == scheduler.SchedulerInstanceId);
                    //         // item.Status = (int)TaskStatus.Faulted;
                    //         // item.OperateStatus = (int)OperateStatus.Stop;
                    //         // db.Update<QuartzTask>(jobItem);
                    //         // db.SaveChanges();
    
    
    
                    //         if (scheduler.CheckExists(jobKey).Result)
                    //         {
                    //             var triggerListener = scheduler.ListenerManager.GetTriggerListener("triggerUpdate");
                    //             if (triggerListener == null)
                    //             {
                    //                 triggerListener = new TriggerUpdateListens();
                    //                 IMatcher<TriggerKey> triggermatcher = KeyMatcher<TriggerKey>.KeyEquals(triggerKey);
                    //                 scheduler.ListenerManager.AddTriggerListener(triggerListener, triggermatcher);
                    //             }
    
                    //             var jobListener = scheduler.ListenerManager.GetJobListener("jobupdateListens");
                    //             if (jobListener == null)
                    //             {
                    //                 IJobListener jobUpdateListener = new JobUpdateListens();
                    //                 IMatcher<JobKey> jobmatcher = KeyMatcher<JobKey>.KeyEquals(jobKey);
                    //                 scheduler.ListenerManager.AddJobListener(jobUpdateListener, jobmatcher);
                    //             }
                    //         }
                    //     }
                    //}
                    return scheduler;
                    // Tell quartz to schedule the job using our trigger
                    //await scheduler.ScheduleJob(job, trigger);
                }
                catch (SchedulerException sep)
                {
                    log.Log(Microsoft.Extensions.Logging.LogLevel.Error, 0, sep, "job执行错误。");
                }
                return null;
            }

    咱们现在看trriger监控类和job的监控类:

    using System; 
    using System.Threading;
    using System.Threading.Tasks;
    using Quartz;
    using Quartz.Logging; 
    using Quartz.Impl;
    using Walt.Framework.Service.Zookeeper;
    using Microsoft.Extensions.DependencyInjection;
    using org.apache.zookeeper;
    using System.Linq;
    using System.Collections.Generic;
    using Microsoft.Extensions.Logging;
    using static org.apache.zookeeper.KeeperException;
    
    namespace Walt.Framework.Quartz.Host
    {
    
        public class TriggerUpdateListens : ITriggerListener
        {
            public string Name { get; set; }
    
            public TriggerUpdateListens(string name)
            {
                Name = name;
            }
    
            private bool VoteJob{ get; set;}
    
            public Task TriggerComplete(ITrigger trigger, IJobExecutionContext context, SchedulerInstruction triggerInstructionCode, CancellationToken cancellationToken = default(CancellationToken))
            {
                ILoggerFactory loggerFact = Program.Host.Services.GetService<ILoggerFactory>();
                var  _logger=loggerFact.CreateLogger<ZookeeperService>();
                 _logger.LogInformation(0, null, "执行成功.name:{0} group:{1}", context.JobDetail.Key.Name, context.JobDetail.Key.Group);
                return Task.FromResult(true);
            }
         //trigger激发,这是job执行的第一个执行的。
            public Task TriggerFired(ITrigger trigger, IJobExecutionContext context, CancellationToken cancellationToken = default(CancellationToken))
            {
                ILoggerFactory loggerFact = Program.Host.Services.GetService<ILoggerFactory>();
                var  _logger=loggerFact.CreateLogger<ZookeeperService>();
                _logger.LogInformation(0, null, "开始执行job.name:{0} group:{1}", context.JobDetail.Key.Name, context.JobDetail.Key.Group);
                string machine = Environment.MachineName;  //获取当前机器名
                try
                {
                    var customerAttri = context.JobDetail.JobType.GetCustomAttributes(false);
                    foreach (var customer in customerAttri) 
                    {
                        if (customer is DistributingAttributes) //如果打这个标签,则说明是分片job。
                        {
                            var distri = customer as DistributingAttributes;
                            var zookeeper = Program.Host.Services.GetService<IZookeeperService>();
                            string currentTempNodeName = string.Empty;
                            string fullPath = "/lock/"+ context.JobDetail.Key.Name + context.JobDetail.Key.Group;
                            int flag = 0;
                        Repeat: //这里因为某些原因失败,可以给重复几次。
                            string jsonData = zookeeper.GetDataByLockNode(fullPath, "getlock"
                            , ZooDefs.Ids.OPEN_ACL_UNSAFE, out currentTempNodeName);
                            if(jsonData==null)
                            {
                                _logger.LogError("获取锁失败。节点:{0},锁前缀:{1},重试:{2}",fullPath,"getlock",flag);
                                if(flag<=2)
                                {
                                    flag = flag + 1;
                                    goto Repeat;
                                }
                                VoteJob = true; //如果获取失败,则否决执行job,这个变量在下面的trriger方法中使用。
                                //context.Scheduler.Interrupt(context.JobDetail.Key);
                      return Task.FromResult(false); //返回false,则会执行VetoJobExecution方法。
                           }

                  //获取锁成功,处理分片数据,构造分片上下文。 QuartzDbContext db
    = Program.Host.Services.GetService<QuartzDbContext>(); var item = db.QuartzTask.Where(w => w.IsDelete == 0 && w.TaskName == context.JobDetail.Key.Name && w.GroupName == context.JobDetail.Key.Group && w.MachineName == machine && w.InstanceId == context.Scheduler.SchedulerInstanceId).FirstOrDefault(); if (item != null) { //TODO 这里可以找出机器名,拼接remote的api,可以查看分片主机是否存活,从而将一些挂起的任务重新分配。 } string distributeFlag = item.MachineName + item.InstanceId; List<DistributingData> distriData = new List<DistributingData>(); DistributingData currentDistriEntity = null; if (string.IsNullOrEmpty(jsonData)) { currentDistriEntity= new DistributingData //分片元数据 { DistributeFlag =distributeFlag, //分片标记,以机器名和实例名构造 PageIndex = 1, PageSize = Program.QuartzOpt.CustomerRecordCountForTest //配置的需要处理的数据数量 }; distriData.Add(currentDistriEntity); } else { distriData = Newtonsoft.Json.JsonConvert.DeserializeObject<List<DistributingData>>(jsonData); if (distriData == null || distriData.Count() < 1) { currentDistriEntity= new DistributingData { DistributeFlag =distributeFlag, PageIndex = 1, PageSize = Program.QuartzOpt.CustomerRecordCountForTest //配置 }; distriData.Add(currentDistriEntity); } else { currentDistriEntity= distriData.Where(w => w.DistributeFlag == distributeFlag).SingleOrDefault(); if (currentDistriEntity == null) //当前主机还没有分片过,将当前主机加入分片集群 { var maxPageIndex = distriData.Max(w => w.PageIndex); maxPageIndex = maxPageIndex + 1; var entity = new DistributingData { DistributeFlag = distributeFlag, PageIndex = maxPageIndex, PageSize = Program.QuartzOpt.CustomerRecordCountForTest //配置 }; distriData.Add(entity); } else { var maxPageIndex = distriData.Max(w => w.PageIndex); maxPageIndex = maxPageIndex + 1; currentDistriEntity.PageIndex = maxPageIndex; } } } item.Remark = Newtonsoft.Json.JsonConvert.SerializeObject(currentDistriEntity); db.Update(item); db.SaveChanges(); string resultData = Newtonsoft.Json.JsonConvert.SerializeObject(distriData); context.JobDetail.JobDataMap.Put("distriData", currentDistriEntity); //将分片数据放入数据上下文,job中可以访问。 zookeeper.SetDataAsync(fullPath , resultData, false).GetAwaiter().GetResult(); zookeeper.DeleteNode(currentTempNodeName); //处理完成,需要删除当前节点,释放锁。 _logger.LogInformation("分片执行:{0}",resultData); } } } catch(ConnectionLossException cle) { VoteJob = true; _logger.LogError(cle, "获取同步锁出现错误。连接丢失"); } catch(SessionExpiredException sep) { VoteJob = true; _logger.LogError(sep, "获取同步锁出现错误。连接过期"); } catch(KeeperException kep) { VoteJob = true; _logger.LogError(kep, "获取同步锁出现错误。操作zookeeper出错"); } catch(Exception ep) { try { _logger.LogError(0,ep,"分片失败。"); //context.Scheduler.DeleteJob(context.JobDetail.Key).GetAwaiter().GetResult(); VoteJob = true; QuartzDbContext db = Program.Host.Services.GetService<QuartzDbContext>(); var item = db.QuartzTask.Where(w => w.IsDelete == 0 && w.TaskName == context.JobDetail.Key.Name && w.GroupName == context.JobDetail.Key.Group && w.MachineName == machine && w.InstanceId == context.Scheduler.SchedulerInstanceId).FirstOrDefault(); if (item == null) { _logger.LogError(0, ep, "分片失败,获取数据库记录失败。"); } else { item.Status = (int)TaskStatus.Canceled; item.OperateStatus = (int)OperateStatus.Stop; item.Remark = ep.ToString(); db.Update(item); db.SaveChanges(); } } catch (Exception eep) { _logger.LogError(0, eep, "分片失败,更新数据库失败。"); } } return Task.FromResult(true); } public Task TriggerMisfired(ITrigger trigger, CancellationToken cancellationToken = default(CancellationToken)) { return Task.FromResult(true); }      //
         //当TriggerComplete返回false,执行这个方法。
    public Task<bool> VetoJobExecution(ITrigger trigger, IJobExecutionContext context, CancellationToken cancellationToken = default(CancellationToken))
            {
                ILoggerFactory loggerFact = Program.Host.Services.GetService<ILoggerFactory>();
                var  _logger=loggerFact.CreateLogger<ZookeeperService>();
                if (VoteJob)
                {
                    _logger.LogInformation(0, null, "取消执行job.name:{0} group:{1}", context.JobDetail.Key.Name, context.JobDetail.Key.Group);
                }
                return Task.FromResult(VoteJob);//VoteJob在TriggerComplete中指定,默认为false
                              //,如果获取锁失败,则设置为true,这个方法返回true,则只执行JobUpdateListens的JobExecutionVetoed方法,然后job这一次将不执行。
    } } }

    接下来看job执行前都执行那些方法:

    using System; 
    using System.Threading;
    using System.Threading.Tasks;
    using Quartz;
    using Quartz.Logging; 
    using Quartz.Impl;
    using Microsoft.Extensions.DependencyInjection;
    using System.Linq;
    using Microsoft.EntityFrameworkCore;
    using Microsoft.Extensions.Logging;
    
    namespace Walt.Framework.Quartz.Host
    {
    
        public class JobUpdateListens : IJobListener
        { 
            public string Name { get; set; }
    
    
            public JobUpdateListens(string name)
            {
                Name = name;
            }
    
            public Task JobExecutionVetoed(IJobExecutionContext context, CancellationToken cancellationToken = default(CancellationToken))
            {
                return Task.FromResult(true);
            }
    //job执行前执行,将状态放入数据库。
            public Task JobToBeExecuted(IJobExecutionContext context, CancellationToken cancellationToken = default(CancellationToken)) 
            {
                try
                {
                    string machine = Environment.MachineName;
                    QuartzDbContext db = Program.Host.Services.GetService<QuartzDbContext>();
                    var item = db.QuartzTask.FirstOrDefault(w => w.IsDelete == 0
                    && w.TaskName == context.JobDetail.Key.Name
                    && w.GroupName == context.JobDetail.Key.Group
                    && w.MachineName == machine
                    && w.InstanceId == context.Scheduler.SchedulerInstanceId);
                    item.Status = (int)TaskStatus.WaitingToRun;
                    db.Update<QuartzTask>(item);
                    db.SaveChanges();
                }
                catch (Exception ep)
                {
                    //context.Scheduler.Interrupt(context.JobDetail.Key);
                    var logFaoctory = Program.Host.Services.GetService<ILoggerFactory>();
                    var log = logFaoctory.CreateLogger<JobUpdateListens>();
                    log.LogError(0, ep, "JobToBeExecuted:Job执行错误,name:{0},Group:{1}", context.JobDetail.Key.Name, context.JobDetail.Key.Group);
                }
                return Task.FromResult(true);
            }
    //job执行后执行,这个方法将执行结果放入数据库,处理异常。
            public Task JobWasExecuted(IJobExecutionContext context, JobExecutionException jobException, CancellationToken cancellationToken = default(CancellationToken))
            {
                try
                {
                    QuartzDbContext db = Program.Host.Services.GetService<QuartzDbContext>();
                    var logFaoctory = Program.Host.Services.GetService<ILoggerFactory>();
                    var log = logFaoctory.CreateLogger<JobUpdateListens>();
                    string machine = Environment.MachineName;
                    var item = db.QuartzTask.FirstOrDefault(w => w.IsDelete == 0
                                                            && w.TaskName == context.JobDetail.Key.Name
                                                            && w.GroupName == context.JobDetail.Key.Group
                                                            && w.MachineName == machine
                                                            && w.InstanceId == context.Scheduler.SchedulerInstanceId);
                    if (jobException != null)
                    {
                        item.Status = (int)TaskStatus.Faulted;
                        item.Remark = Newtonsoft.Json.JsonConvert.SerializeObject(jobException);
                        log.LogError("Job执行错误,name:{0},Group:{1}", context.JobDetail.Key.Name, context.JobDetail.Key.Group);
                    }
                    else
                    {
                        item.Status = (int)TaskStatus.RanToCompletion;
                        item.RecentRunTime = context.FireTimeUtc.DateTime;
                        if (context.NextFireTimeUtc.HasValue)
                        {
                            item.NextFireTime = context.NextFireTimeUtc.Value.DateTime;
                        }
                    }
                    db.Update<QuartzTask>(item);
                    db.SaveChanges();
                }
                catch (Exception ep)
                {
                    //context.Scheduler.Interrupt(context.JobDetail.Key);
                    var logFaoctory = Program.Host.Services.GetService<ILoggerFactory>();
                    var log = logFaoctory.CreateLogger<JobUpdateListens>();
                    log.LogError(0, ep, "JobWasExecuted:Job执行错误,name:{0},Group:{1}", context.JobDetail.Key.Name, context.JobDetail.Key.Group);
                }
                return Task.FromResult(true);
            }
        }
    
    }

    在这次使用 zookeeperservice中,优化了一些代码:

    using System;
    using System.Collections.Generic;
    using System.Threading.Tasks;
    using Microsoft.Extensions.Logging;
    using Microsoft.Extensions.Options;
    using org.apache.zookeeper;
    using org.apache.zookeeper.data;
    using static org.apache.zookeeper.ZooKeeper;
    using System.Linq;
    using System.Linq.Expressions;
    using System.Threading;
    using static org.apache.zookeeper.Watcher.Event;
    using Newtonsoft.Json;
    using System.Collections.Concurrent;
    using static org.apache.zookeeper.ZooDefs;
    using static org.apache.zookeeper.KeeperException;
    
    namespace  Walt.Framework.Service.Zookeeper
    {
    
        internal class WaitLockWatch : Watcher
        {
            private AutoResetEvent _autoResetEvent;
    
            private ManualResetEvent _mutex;
            private ILogger _logger;
    
            private string _path;
    
            public WaitLockWatch(AutoResetEvent autoResetEvent
            , ILogger logger, string path
            , ManualResetEvent mutex)
            {
                _autoResetEvent = autoResetEvent;
                _logger = logger;
                _path = path;
                _mutex = mutex;
            }
    
            public override Task process(WatchedEvent @event)
            {
                 _mutex.Set();
                return Task.FromResult(true);
            }
        }
    
    
        internal class WaitConnWatch : Watcher
        {
            private AutoResetEvent _autoResetEvent;
            private ILogger _logger;
    
            private ManualResetEvent _mutex;
    
            public WaitConnWatch(AutoResetEvent autoResetEvent
            ,ILogger logger
            ,ManualResetEvent mutex)
            {
                _autoResetEvent=autoResetEvent;
                _logger=logger;
                _mutex = mutex;
            }
    
           public override Task process(WatchedEvent @event)
           {
               _logger.LogInformation("watch激发,回掉状态:{0}",@event.getState().ToString());
                if(@event.getState()== KeeperState.SyncConnected
                ||@event.getState()== KeeperState.ConnectedReadOnly)
                {
                    _logger.LogInformation("释放连接阻塞");
                    _autoResetEvent.Set();
                }
                else
                {
                    _logger.LogInformation("连接断开,释放分布式锁阻塞");
                    _mutex.Set();
                }
                return Task.FromResult(0);
           }
        }
    
        public class ZookeeperService : IZookeeperService
        {
            private ZookeeperOptions _zookeeperOptions;
            private ZooKeeper _zookeeper;
    
             private static readonly byte[] NO_PASSWORD = new byte[0];
    
             public Watcher Wathcer {get;set;}
    
             public ILoggerFactory LoggerFac { get; set; }
    
             private ILogger _logger;
    
            internal Thread CurrThread{ get; }
    
    
    
            AutoResetEvent[] autoResetEvent=new AutoResetEvent[2]
             {new AutoResetEvent(false),new AutoResetEvent(false)};
            ManualResetEvent _manualReset = new ManualResetEvent(false);
            public ZookeeperService(IOptionsMonitor<ZookeeperOptions>  zookeeperOptions
            ,ILoggerFactory loggerFac)
            {
                LoggerFac=loggerFac;
                _logger=LoggerFac.CreateLogger<ZookeeperService>();
                _zookeeperOptions=zookeeperOptions.CurrentValue; 
                _logger.LogInformation("配置参数:{0}",JsonConvert.SerializeObject(_zookeeperOptions));
                 zookeeperOptions.OnChange((zookopt,s)=>{
                    _zookeeperOptions=zookopt; 
                });
                _logger.LogInformation("开始连接");
                Conn(_zookeeperOptions); 
                CurrThread = System.Threading.Thread.CurrentThread;
            }
    
           
    
            private void Conn(ZookeeperOptions zookeeperOptions)
            {
                bool isReadOnly=default(Boolean);
                Wathcer=new WaitConnWatch(autoResetEvent[0],_logger,_manualReset);
                if(isReadOnly!=zookeeperOptions.IsReadOnly)
                {
                    isReadOnly=zookeeperOptions.IsReadOnly;
                }
    
                
                byte[] pwd=new byte[0];
                //如果没有密码和sessionId
                if(string.IsNullOrEmpty(zookeeperOptions.SessionPwd)
                &&_zookeeperOptions.SessionId==default(int))
                {
                 _zookeeper=new ZooKeeper(zookeeperOptions.Connectstring,zookeeperOptions.SessionTimeout,Wathcer,isReadOnly);
                }
                else if (!string.IsNullOrEmpty(zookeeperOptions.SessionPwd))
                {
                    pwd=System.Text.Encoding.Default.GetBytes(zookeeperOptions.SessionPwd);
                     _zookeeper=new ZooKeeper(zookeeperOptions.Connectstring,zookeeperOptions.SessionTimeout,Wathcer,0,pwd,isReadOnly);
                }
                else
                {
                     _zookeeper=new ZooKeeper(zookeeperOptions.Connectstring
                     ,zookeeperOptions.SessionTimeout,Wathcer,zookeeperOptions.SessionId,pwd,isReadOnly);
                }
                 if(_zookeeper.getState()==States.CONNECTING)
                {
                    _logger.LogInformation("当前状态:CONNECTING。阻塞等待");
                    autoResetEvent[0].WaitOne();
                }
            }
    
            public Task<string> CreateZNode(string path,string data,CreateMode createMode,List<ACL> aclList)
            {
                ReConn();
                if(string.IsNullOrEmpty(path)||!path.StartsWith('/'))
                {
                    _logger.LogInformation("path路径非法,参数:path:{0}",path);
                    return null;
                }
                byte[] dat=new byte[0];
                if(string.IsNullOrEmpty(data))
                { 
                    dat=System.Text.Encoding.Default.GetBytes(data);
                }
                if(createMode==null)
                {
                     _logger.LogInformation("createMode为null,默认使用CreateMode.PERSISTENT");
                    createMode=CreateMode.PERSISTENT;
                }
                return _zookeeper.createAsync(path,dat,aclList,createMode);
            }
    
            public async void Sync(string path)
            {
                try
                {
                    _logger.LogInformation("同步成功");
                     await _zookeeper.sync(path);
                }
                catch (Exception ep)
                {
                    _logger.LogError("同步失败。", ep);
                }
            }
    
            public async Task<DataResult> GetDataAsync(string path,Watcher watcher,bool isSync)
            {
                ReConn();
                if(await _zookeeper.existsAsync(path)==null )
                {
                    _logger.LogInformation("path不存在");
                    return null;
                }
                if (isSync)
                {
                    _logger.LogInformation("即将进行同步。");
                    try
                    {
                       await  _zookeeper.sync(path);
                        _logger.LogInformation("同步成功");
                    }
                    catch (Exception ep)
                    {
                        _logger.LogError("同步失败。", ep);
                    }
                }
                return await _zookeeper.getDataAsync(path,watcher);
            }
    
             public async Task<Stat> SetDataAsync(string path,string data,bool isSync)
            {
                ReConn();
                if(await _zookeeper.existsAsync(path)==null )
                {
                     _logger.LogInformation("path不存在");
                    return null;
                }
                byte[] dat=new byte[0];
                if(!string.IsNullOrEmpty(data))
                { 
                    dat=System.Text.Encoding.Default.GetBytes(data);
                }
                return await _zookeeper.setDataAsync(path,dat);
            }
    
            public async Task<ChildrenResult> GetChildrenAsync(string path, Watcher watcher, bool isSync)
            {
                ReConn();
                if (await _zookeeper.existsAsync(path) == null)
                {
                    _logger.LogInformation("path不存在");
                    return null;
                }
                if (isSync)
                {
                    _logger.LogInformation("即将进行同步。");
                    try
                    {
                        _logger.LogInformation("开始同步");
                        await _zookeeper.sync(path);
                        _logger.LogInformation("同步成功");
                    }
                    catch (Exception ep)
                    {
                        _logger.LogError("同步失败。", ep);
                    }
                }
                return await _zookeeper.getChildrenAsync(path, watcher);
            }
    
            public async Task DeleteNode(string path)
             {
                 ReConn();
                  if(await _zookeeper.existsAsync(path)==null )
                {
                     _logger.LogDebug("删除path:path不存在");
                    return;
                }
                try
                {
                    _logger.LogDebug("删除node:{0}", path);
                    await _zookeeper.deleteAsync(path);
                }
                catch (Exception ep)
                {
                    _logger.LogError("删除失败", ep);
                    return;
                }
            }
    
            public async Task<bool> SetWatcher(string path,Watcher watcher)
            {
                ReConn();
                var stat = await _zookeeper.existsAsync(path);
                if(stat==null )
                {
                     _logger.LogDebug("判断path是否存在:path不存在");
                    return false;
                }
                 try
                {
                    _logger.LogDebug("设置监控:{0}", path);
                    await _zookeeper.getDataAsync(path,watcher);
                    return true;
                }
                catch (Exception ep)
                {
                    _logger.LogError("设置监控错误", ep);
                    return false;
                }
            }
    
             public  string GetDataByLockNode(string path,string sequenceName,List<ACL> aclList,out string tempNodeOut)
             {
                 _logger.LogInformation("获取分布式锁开始。");
                 string tempNode=string.Empty;
                 tempNodeOut=string.Empty;
                try
                {
    
                    ReConn();
                    if (_zookeeper.existsAsync(path).Result == null)
                    {
                        _logger.LogDebug("path不存在,创建");
                        CreateZNode(path, "", CreateMode.PERSISTENT, aclList).GetAwaiter().GetResult();
                    }
    
    
                    tempNode = CreateZNode(path + "/" + sequenceName, "", CreateMode.EPHEMERAL_SEQUENTIAL, aclList).Result;
                    _logger.LogDebug("创建节点:{0}", tempNode);
                    if (tempNode == null)
                    {
                        _logger.LogDebug("创建临时序列节点失败。详细参数:path:{0},data:{1},CreateMode:{2}"
                        , path + "/squence", "", CreateMode.EPHEMERAL_SEQUENTIAL);
                        return null;
                    }
                    _logger.LogInformation("创建成功。");
    
    
                    // var taskGetData=Task.Run(async () =>{
                    //     int circleCount = 0;
                    //     while (true)
                    //     {
                    //         Thread.Sleep(200);
                    //         circleCount++;
                    //         _logger.LogInformation("循环获取锁。当前循环次数:{0}", circleCount);
                    //         try
                    //         {
                    //             var childList =await GetChildrenAsync(path, null, true);
                    //             if (childList == null || childList.Children == null || childList.Children.Count < 1)
                    //             {
                    //                 _logger.LogWarning("获取子序列失败,计数为零.path:{0}", path);
                    //                 return null;
                    //             }
                    //             _logger.LogInformation("获取path:{0}的子节点:{1}", path, Newtonsoft.Json.JsonConvert.SerializeObject(childList.Children));
    
                    //             var top = childList.Children.OrderBy(or => or).First();
                    //             if (path + "/" + top == tempNode)
                    //             {
                    //                 return tempNode;
                    //             }
                    //         }
                    //         catch (Exception ep)
                    //         {
                    //             _logger.LogError(ep,"循环获取锁出错。");
                    //             return null;
                    //         }
                    //     }
                    // });
                    // tempNode = taskGetData.GetAwaiter().GetResult();
                    // if (!string.IsNullOrEmpty(tempNode))
                    // {
                    //     byte[] da = null;
                    //     tempNodeOut = tempNode;
                    //     da = GetDataAsync(path, null, true).Result.Data;
                    //     if (da == null || da.Length < 1)
                    //     {
                    //         return string.Empty;
                    //     }
                    //     return System.Text.Encoding.Default.GetString(da);
                    // }
                    int clycleCount = 0;
                GetChild: //这里防止并发出现错误。
                    clycleCount++;
                    var childList = GetChildrenAsync(path, null, true).GetAwaiter().GetResult();
                    if (childList == null || childList.Children == null || childList.Children.Count < 1)
                    {
                        _logger.LogWarning("获取子序列失败,计数为零.path:{0}", path);
                        return null;
                    }
                    _logger.LogInformation("获取path:{0}的子节点:{1}", path, Newtonsoft.Json.JsonConvert.SerializeObject(childList.Children));
    
                    var top = childList.Children.OrderBy(or => or).First();
                    if (path + "/" + top == tempNode)
                    {
                        tempNodeOut = tempNode;
                        var da = GetDataAsync(path, null, true).Result.Data;
                        if (da == null || da.Length < 1)
                        {
                            return string.Empty;
                        }
                        return System.Text.Encoding.Default.GetString(da);
                    }
                    // bool isSet=
                    //     SetWatcher(path + "/" + top,).Result;
                    // if(!isSet)
                    // {
                    //     goto GetChild;
                    // }
                    bool isSet= SetWatcher(path + "/" + top,new WaitLockWatch(autoResetEvent[1], _logger, path,_manualReset)).Result;
                    if(!isSet)
                    {
                        _logger.LogWarning("没有设置上watcher,需要重新运行一遍。");
                        goto GetChild;
                    }
                    _manualReset.WaitOne(15000);
                     childList = GetChildrenAsync(path, null, true).GetAwaiter().GetResult();
                    if (childList == null || childList.Children == null || childList.Children.Count < 1)
                    {
                        _logger.LogWarning("再次获取子序列失败,计数为零.path:{0}", path);
                        return null;
                    }
                    _logger.LogInformation("再次获取path:{0}的子节点:{1}", path, Newtonsoft.Json.JsonConvert.SerializeObject(childList.Children));
                    top = childList.Children.OrderBy(or => or).First();
                    if (path + "/" + top == tempNode)
                    {
                        _logger.LogDebug("节点获取到锁权限。");
                        tempNodeOut = tempNode;
                        var da = GetDataAsync(path, null, true).Result.Data;
                        if (da == null || da.Length < 1)
                        {
                            return string.Empty;
                        }
                        return System.Text.Encoding.Default.GetString(da);
                    }
                    else
                    {
                        _logger.LogDebug("没有获取到锁权限,进行循环。循环第:{0}次",clycleCount);
                        Thread.Sleep(1000);
                        goto GetChild;
                        // Sync(path);
                        
                        //DeleteNode(tempNode).GetAwaiter().GetResult();
                        // DeleteNode(tempNode).GetAwaiter().GetResult();
                        //  _logger.LogError("没有获取到锁,Watcher出现问题,请查看日志。");
                        // if (_zookeeper.existsAsync(tempNode).Result== null)
                        // {
                        //     _logger.LogWarning("tempNode:{0}存在,但是没有获取到锁,在等待的时候,被线程检查程序释放了阻塞,属于误伤"
                        //     ,tempNode);
    
                        // }
                        // else
                        // {
                        //     _logger.LogError("没有获取到锁,Watcher出现问题,请查看日志。");
                        // }
                    }
    
                }
                catch(ConnectionLossException cle)
                {
                    _logger.LogError(cle, "获取同步锁出现错误。连接丢失");
                }
                catch(SessionExpiredException sep)
                {
                    _logger.LogError(sep, "获取同步锁出现错误。连接过期");
                }
                catch(KeeperException kep)
                {
                    _logger.LogError(kep, "获取同步锁出现错误。操作zookeeper出错");
                }
                catch (Exception ep)
                {
                    _logger.LogError(ep, "获取同步锁出现错误。");
                    if (!string.IsNullOrEmpty(tempNode))
                    {
                        try{
                        DeleteNode(tempNode).GetAwaiter().GetResult();
                        }catch(Exception)
                        {
                            
                        }
                    }
                }
    
                return null;
             }
    
             private void ReConn()
             {
                 _logger.LogInformation("检查连接状态,status:{0}",_zookeeper.getState());
                 if(_zookeeper.getState()==States.CLOSED
                 ||_zookeeper.getState()== States.NOT_CONNECTED)
                 {
                     _logger.LogInformation("连接为关闭,开始重新连接。");
                    Conn(_zookeeperOptions);
                 }
             }
    
            public async void Close(string tempNode)
            {
                try
                {
                    await _zookeeper.closeAsync();
                }
                catch (Exception ep)
                {
                    _logger.LogError("zookeeper关闭失败。", ep);
                }
            }
    
        }
    }

    下面看结果

    咱们关闭master2,然后看看slave1:

    大家看时间上,master2已经2分钟没运行了,如果要有好的监控,还是最好实现remote,也不是什么难事,将通用主机改为webhost,然后写几个api就行了,就会实时的监控quartz主机的状态

    ,而且分片也会自动去除失败的主机,自动分派任务。咱们看看分片的情况:

    master2已经失败,但是这里没有去掉,不过不影响接下来的任务,再者master2的任务如果在失败的时候没执行完成,那么会有一部分数据是没有处理的。

    三 总结                  

          quartz扩展主要注意两点,一是job listeners和trigger listeners,做好异常处理,尤其是trigger listeners,如果出错,job会失去控制,不激发,而且job状态也会失效,必须重新关闭和重新运行一次。在写业务代码的时候,分片的需要处理的数据源必须是有规律自增的或者是静态的,这样分页才满足业务分片的要求。如果是非自增的或者随机增加,那么quartz就必须把需要处理的主键存进去,但是这样的需求毕竟是少数。

    微服务系列的github:https://github.com/ck0074451665/Walt.Framework.git

    测试例子:https://github.com/ck0074451665/Walt.MicroServicesTest.git

    管理界面:https://pan.baidu.com/s/1gYNDX1j3-XctuPiejV2XPQ

  • 相关阅读:
    Live2d Test Env
    Live2d Test Env
    Live2d Test Env
    图神经网络入门
    CommandLineRunner 可能会导致你的应用宕机停止,我劝你耗子尾汁
    不使用 MQ 如何实现 pub/sub 场景?
    为什么 @Value 可以获取配置中心的值?
    vite + ts 快速搭建 vue3 项目 以及介绍相关特性
    给 Mac 添加右键菜单「使用 VSCode 打开」
    【Python】连接常用数据库
  • 原文地址:https://www.cnblogs.com/ck0074451665/p/10321096.html
Copyright © 2011-2022 走看看