zoukankan      html  css  js  c++  java
  • 从源码角度分析ScheduleMaster的节点管理流程

    ScheduleMaster是一个开源的分布式任务调度系统,它基于.NET Core 3.1平台构建,支持跨平台多节点部署运行。

    项目主页:https://github.com/hey-hoho/ScheduleMasterCore

    本篇从源码角度分析一下节点控制的核心流程。


    生命周期事件

    生命周期事件增强了整个应用进程的控制能力,由于节点状态与之关系密切,所以必须要首先了解下生命周期事件具体干了什么活。

    借助于ASP.NET Core框架的HostedService模型,我们把生命周期管理器封装在一个后台托管服务AppLifetimeHostedService中,在它的StartAsync方法中注册了我们需要的事件:

    public Task StartAsync(CancellationToken cancellationToken)
    {
        _appLifetime.ApplicationStarted.Register(OnStarted);
        _appLifetime.ApplicationStopping.Register(OnStopping);
        _appLifetime.ApplicationStopped.Register(OnStopped);
    
        return Task.CompletedTask;
    }
    

    这里主要涉及的事件就是应用启动和停止时所需要处理的逻辑,分别对应节点状态的变更,下面重点说一下启动事件。

    ScheduleMaster采用了典型的中心化结构搭建,基于1个master节点和N和worker节点提供服务,其中master扮演了整个系统资源调度的角色,worker则是实际执行任务的角色。这样的话,master就必须要感知到它所能调度的资源清单,所以系统引入了节点注册概念。

    根据注册发起者的不同,可以分为如下两种模式:

    • 自动注册模式

    • 手动注册模式


    自动注册模式

    接触过微服务架构的朋友应该会对服务注册发现这一过程比较熟悉,借鉴了相似的设计,节点自动注册就类似服务注册的样子,在节点启动时自动把自身的配置信息注册到控制中心,默认的方式就是从配置文件读取节点信息,同时也支持使用命令行参数覆盖配置文件中的字段:

    private void OnStarted()
    {
    	// ....
    
        //判断是否要自动根据配置文件注册节点信息
        if (AppCommandResolver.IsAutoRegister())
        {
            _logger.LogInformation("enabled auto register...");
            // 设置节点信息
            ConfigurationCache.SetNode(_configuration);
    
            // ....
        }
    }
    
    public static void SetNode(IConfiguration configuration)
    {
        NodeSetting = configuration.GetSection("NodeSetting").Get<NodeSetting>();
        string identity = AppCommandResolver.GetCommandLineArgsValue("identity");
        if (!string.IsNullOrEmpty(identity))
        {
            NodeSetting.IdentityName = identity;
        }
        string protocol = AppCommandResolver.GetCommandLineArgsValue("protocol");
        if (!string.IsNullOrEmpty(protocol))
        {
            NodeSetting.Protocol = protocol;
        }
        string ip = AppCommandResolver.GetCommandLineArgsValue("ip");
        if (!string.IsNullOrEmpty(ip))
        {
            NodeSetting.IP = ip;
        }
        string port = AppCommandResolver.GetCommandLineArgsValue("port");
        if (!string.IsNullOrEmpty(port))
        {
            NodeSetting.Port = Convert.ToInt32(port);
        }
        string priority = AppCommandResolver.GetCommandLineArgsValue("priority");
        if (!string.IsNullOrEmpty(priority))
        {
            NodeSetting.Priority = Convert.ToInt32(priority);
        }
        NodeSetting.MachineName = Environment.MachineName;
    }
    

    再看一下如何判断节点是否开启了自动注册模式:

    public static bool IsAutoRegister()
    {
        //优先读取环境参数
        string option = Environment.GetEnvironmentVariable("SMCORE_AUTOR");
        //再看命令行参数中是否也有设置
        string cmdArg = GetCommandLineArgsValue("autor");
        if (!string.IsNullOrEmpty(cmdArg))
        {
            option = cmdArg;
        }
        return option != "false";
    }
    

    很明显,在节点启动时如果指定了特定的环境变量SMCORE_AUTOR或命令行参数autor并且值为false即表示关闭自动注册模式,否则默认开启。
    自动注册流程

    要注意的是,master节点只提供了自动注册模式。


    手动注册模式

    自动注册模式虽然流程简单,但是需要提前配置好节点信息,这对于节点弹性部署并不友好,因此为了增加系统灵活性,系统也提供了手动注册节点的模式,这时候对worker注册的主动权转移到master手里,需要先在master控制台中创建好要注册的节点,然后执行连接操作,最后启动服务即可。
    手动注册流程

    这个过程中比较核心的是连接验证过程,设计这个流程的原因是为了保障创建连接的双方是可信状态,实现数据匹配,其核心过程为:

    • worker节点在启动时通过环境变量SMCORE_WORKEROF或者命令行参数workerof指定归属的master名称

    • 在控制台中对节点执行[连接]操作,master携带验证信息对worker发起连接请求

    • 如果验证通过,则使用指定的节点名称去数据库查询完整的节点配置信息,并为worker节点缓存配置数据,worker生成一个新的访问秘钥返回

    • 标记节点状态为空闲中,此时worker并不运行任何调度服务,处于空跑状态

    • 对节点执行[启用]操作,开启调度功能

    验证连接过程的核心代码为:

     public async Task<(bool success, string content)> Connect()
     {
        HttpClient client = CreateClient();
        client.DefaultRequestHeaders.Add("sm_connection", SecurityHelper.MD5(ConfigurationCache.NodeSetting.IdentityName));
        client.DefaultRequestHeaders.Add("sm_nameto", _server.NodeName);
    
        var response = await client.PostAsync("/api/server/connect", null);
        return (response.IsSuccessStatusCode, await response.Content.ReadAsStringAsync());
     }
    
     [HttpPost, AllowAnonymous]
     public IActionResult Connect()
     {
         string workerof = AppCommandResolver.GetTargetMasterName();
         string encodeKey = Request.Headers["sm_connection"].FirstOrDefault();
         if (string.IsNullOrEmpty(workerof) || string.IsNullOrEmpty(encodeKey))
         {
             _logger.LogWarning("connect failed! workerof or encodekey is null...");
             return BadRequest("Unauthorized Connection.");
         }
         if (!Core.Common.SecurityHelper.MD5(workerof).Equals(encodeKey))
         {
             _logger.LogWarning("connect failed! encodekey is unvalid, wokerof:{0}, encodekey:{1}", workerof, encodeKey);
             return BadRequest("Unauthorized Connection.");
         }
         string workerName = Request.Headers["sm_nameto"].FirstOrDefault();
         var node = _db.ServerNodes.FirstOrDefault(x => x.NodeName == workerName);
         if (node == null)
         {
             _logger.LogWarning("connect failed! unkown worker name:{0}...", workerName);
             return BadRequest("Unkown Worker Name.");
         }
         Core.ConfigurationCache.SetNode(node);
         string secret = Guid.NewGuid().ToString("n");
         QuartzManager.AccessSecret = secret;
         _logger.LogInformation("successfully connected to {0}!", workerof);
         LogHelper.Info($"与{workerof}连接成功~");
         return Ok(secret);
     }
    

    健康检查

    健康检查是为了保障不可用的worker节点及时被发现并剔除调度,其验证方式使用了ASP.NET Core框架自带的健康检查机制中间件,通过访问一个指定的路由地址获取节点的健康情况,如果连续N次检查失败就把该节点强制剔除下线,多次检查目的是为了避免因短暂的网络抖动导致出现误判情况,这个次数N可以根据实际情况进行配置,默认是3次。

    首先master启动的时候会注册一个每分钟执行一次的后台定时任务,这个任务会拉取所有状态是非[下线]的worker节点,然后对其发起健康检查请求:

        public class SystemSchedulerRegistry : Registry
        {
            public SystemSchedulerRegistry()
            {
                NonReentrantAsDefault();
    
                //对运行节点每分钟一次心跳监测
                Schedule<WorkerCheckJob>().ToRunEvery(1).Minutes();
            }
        }
    
        internal class WorkerCheckJob : IJob
        {
            /// <summary>
            /// 执行计划
            /// </summary>
            public void Execute()
            {
                using (var scope = ConfigurationCache.RootServiceProvider.CreateScope())
                {
                    Core.Interface.INodeService service = scope.ServiceProvider.GetService<Core.Interface.INodeService>();
                    AutowiredServiceProvider provider = new AutowiredServiceProvider();
                    provider.PropertyActivate(service, scope.ServiceProvider);
                    service.WorkerHealthCheck();
                }
            }
        }
    

    具体判断节点无效的流程为:

    • 读取系统配置的最大允许无响应次数

    • 给节点维护一个失败计数器,本质是一个字典,key是节点名称,value是连续失败的次数

    • 对节点发起健康检查请求,如果请求成功就更新节点的最后刷新时间,并把计数器归0

    • 如果请求失败但没有达到最大失败次数,把计数器加1,等待下次检查

    • 如果已经达到最大失败次数,则把节点标记下线,释放该节点占据的锁,同时把计数器归0

    worker的中间件注册过程为:

    public void ConfigureServices(IServiceCollection services)
    {
    	// ....
    
    	services.AddHealthChecks();
    
    	// ....
    }
    
    public void Configure(IApplicationBuilder app, IWebHostEnvironment env)
    {
    	// ....
    
        app.UseEndpoints(endpoints =>
        {
            endpoints.MapControllers();
            endpoints.MapHealthChecks("/health");
        });
    
    	// ....
    }
    

    访问控制

    为了保证worker接口访问的安全性,系统加入了动态秘钥验证机制,每次节点启动或者被连接的时候都会生成一个新的秘钥,持有合法秘钥的请求才会被节点正常处理,否则直接返回401 Unauthorized。

    public void OnActionExecuting(ActionExecutingContext context)
    {
        var anonymous = (context.ActionDescriptor as ControllerActionDescriptor).MethodInfo.GetCustomAttributes(typeof(AllowAnonymousAttribute), false);
        if (anonymous.Any())
        {
            return;
        }
        var secret = context.HttpContext.Request.Headers["sm_secret"].FirstOrDefault();
        if (string.Compare(Common.QuartzManager.AccessSecret, secret, StringComparison.CurrentCultureIgnoreCase) != 0)
        {
            context.Result = new UnauthorizedObjectResult($"w:{Common.QuartzManager.AccessSecret} m:{secret}");
        }
    }
    

    节点访问

    在master控制台中对任务的操作最终都被分发到关联的worker节点上,通过worker提供的webapi接口实现远程调用。以启动任务为例,我们看一下具体分发和远程调用过程:

    private async Task<bool> DispatcherHandler(Guid sid, RequestDelegate func)
    {
        var nodeList = _nodeService.GetAvaliableWorkerForSchedule(sid);
        if (nodeList.Any())
        {
            foreach (var item in nodeList)
            {
                if (!await func(item))
                {
                    return false;
                }
            }
            return true;
        }
        throw new InvalidOperationException("running worker not found.");
    }
    
    public async Task<bool> ScheduleStart(Guid sid)
    {
        return await DispatcherHandler(sid, async (ServerNodeEntity node) =>
         {
             _scheduleClient.Server = node;
             return await _scheduleClient.Start(sid);
         });
    }
    

    可以看到,启动操作会首先查询任务的执行节点,然后依次遍历执行远程调用,只要其中一个节点执行命令失败那么整个操作就会失败。

    最终的httpclient请求被封装在Hos.ScheduleMaster.Core.Services.RemoteCaller.ServerClient类中,它的CreateClient方法从IHttpClientFactory获取了一个客户端实例,并把节点的访问秘钥放入请求头中,以此完成安全性验证:

    protected HttpClient CreateClient()
    {
        if (_server == null)
        {
            throw new ArgumentException("no target worker that can send the request.");
        }
        HttpClient client = _httpClientFactory.CreateClient("workercaller");
        client.DefaultRequestHeaders.Add("sm_secret", _server.AccessSecret);
        client.BaseAddress = new Uri($"{_server.AccessProtocol}://{_server.Host}");
        return client;
    }
    

    写在最后

    到这里基本把节点的核心操作都分析完毕了,希望能对关注这个项目的朋友带来帮助~

  • 相关阅读:
    网络编程
    GUI编程
    Java数组
    Day24
    Day23
    Day22
    Day21
    Day20
    Day19
    Day18
  • 原文地址:https://www.cnblogs.com/hohoa/p/13153207.html
Copyright © 2011-2022 走看看