zoukankan      html  css  js  c++  java
  • 集群任务消费组件TaskScheduler设计

    业务背景

    假设我们这样一个需求:每天晚上定时将数据库数据刷新到缓存中。数据库为 10 库 10 表。每张表 100 万数据。

    需求分析

    10库10表总共1亿条数据,各条数据只要序列化为字符串,存入缓存即可,相互无关联。但数据量较大,单机多线程执行的情况下,执行性能也是极慢的。所以我们考虑集群的方式处理。

    可以将10库10表拆分为100个task,集群节点不停的拉取task,处理相关逻辑直至所有task处理完成。

    为什么要集群,而不能在单机上开多个线程处理?

    答:单机多线程处理也有性能上线,当线程启动过多,并发性能会因为线程切换的消耗反而会急剧下降。

    方案

    定时JOB服务:QuartzJob+Topshelf 组件 来实现windows服务方式部署的定时任务。

    集群任务消费:自定义组件 gt.TaskScheduler ,项目源码:https://gitee.com/gt1987/gt.TaskScheduler

    TaskScheduler概要介绍

    角色定义

    • 普通节点。负责消费队列中的Task,并进行相关的逻辑处理。并支持多线程并行消费。
    • Leader节点。除了普通节点的功能外,还负责维护整个集群的运行状态维护,以及初始话队列数据等工作。
    • 消费队列。维护集群的所有Task,支持生产者消费者模式。当前很多方案可以选型。例如 Redis、Zookeeper。
    • 队列数据。由{dbname}.{tablename}构成,这样每个task即使代表一张具体的表。

    集群流程:

    1. 集群节点同时启动(这里会依赖QuartzJob的时间启动机制),竞争选举一个节点为Leader节点。一个集群只能有一个节点。
    2. Leader节点进行队列数据及集群状态等初始化工作。普通节点等待集群状态变更到提示可以开始工作。
    3. 集群节点(包括Leader节点)作为消费者,从队列拉取task,获取待处理的{dbname}.{tablename}。批量拉取数据处理存入缓存。如果队列空,则更新本节点的执行状态。
    4. Leader节点完成数据逻辑工作,开始监控集群其他节点的状态,如果所有节点状态都已经是 已完成停止 状态,则开始集群停止相关状态更新工作。
    5. 整个任务完成。等待下一次集群工作。
    6. 过程中,如果有新节点加入,会自动以普通节点加入集群工作。

    集群状态,以KEY-VALUE方式存储在分布式组件中。以Redis为例

    1. 集群标识:ts:{taskSchedulerName}。taskSchedulerName为每个集群任务的名称。
    2. 集群节点:ts:{taskSchedulerName}:members。hash结构存储,field为 节点名称,value标识 节点状态。
    3. 集群Leader:ts:{taskSchedulerName}:leader。hash结构存储,field为 Leader节点名称,value标识 集群状态。
    4. 集群队列:ts:{taskSchedulerName}:q。List结构存储。value为待处理的taskname,此例中为 表的全名。
    5. 分布式锁:ts:{taskSchedulerName}:lock。用于分布式锁的Key值。

    具体状态:

        /// <summary>
        /// 节点状态
        /// </summary>
        public enum MemberStatus
        {
            /// <summary>
            /// 已注册
            /// </summary>
            Register = 1,
            /// <summary>
            /// 执行任务中
            /// </summary>
            Run = 2,
            /// <summary>
            /// 已停止
            /// </summary>
            Stop = 4
        }
    
        /// <summary>
        /// 集群
        /// </summary>
        public enum TaskSchedulerStatus
        {
            /// <summary>
            /// 集群启动
            /// </summary>
            Start=1,
            /// <summary>
            /// 运行中
            /// </summary>
            Run=2,
            /// <summary>
            /// 集群停止
            /// </summary>
            Stop=4
        }

    启动方式

            public void DefaultBuildTest()
            {
                var schedulerName = "build_test";
    
                using (var distribute = new RedisDistributeFeature(_redisConn, 3, 10))
                {
                    TaskSchedulerManagerBuilder builder = new TaskSchedulerManagerBuilder(schedulerName, distribute);
                    builder.AddTaskHandler("sample1", new SampleHandler(distribute));
                    builder.AddTaskHandler("sample2", new SampleHandler(distribute));
                    var taskScheduler = builder.Build();
                    taskScheduler.Run();
    
                    Assert.Equal("0", distribute.GetData(_sampleKey, "sample1"));
                    Assert.Equal("0", distribute.GetData(_sampleKey, "sample2"));
                }
            }

     SampleHandler: 

        public class SampleHandler : ITaskHandler
        {
            private IDistributeFeature _distribute;
            private readonly string _sampleKey = "ts:test:sample:result";
            public SampleHandler(IDistributeFeature distribute)
            {
                _distribute = distribute;
            }
    
            public Task Execute(string tag)
            {
                _distribute.SetData(_sampleKey, tag, "0");
                return Task.Delay(500);
            }
        }

    不列详细代码了,具体可参考源码

    既存问题:

    1. 如果Leader节点发生异常,并没有成功更新集群停止状态,那么下一次集群启动时,会发现集群正常运行中,所有节点不会正常工作。

              答1:考虑状态类数据存储增加时间过期,这里的过期时间应该大于整个集群任务完成的预估时间

  • 相关阅读:
    RabbitMQ安装与配置
    在Web项目中使用shiro
    solr整合spring
    mycat
    SpringSession管理
    Nginx安装与配置(Nginx服务器和Tomcat服务器是不同的服务器)
    dubbo负载均衡与服务降级以及Zookeeper认证
    小笔记
    SpringMVC路径转发与重定向
    java-同步控制及不可变设置(只读访问)
  • 原文地址:https://www.cnblogs.com/gt1987/p/12696864.html
Copyright © 2011-2022 走看看