zoukankan      html  css  js  c++  java
  • 轻量级分布式延时任务处理组件easyTask-L-入门篇

      今天给大家介绍一款新武器。我自研的一个java组件easyTask-L。这个是做啥的呢?我之前研发了一款单机版本的easyTask,这次是要介绍另外一款easyTask-L。区别就是后者支持分布式环境,任务数据支持多个备份,具备了真正意义上的高可用。同时它又是轻量级的分布式应用,原因是因为它还不是一个独立的中间件,它需要一个宿主程序才能使用。做成独立的中间件是我后面要继续做的一个版本。

      组件开源地址:https://github.com/liuche51/easyTask-L

      废话不多说,先来介绍下easyTask-L组件的特性。

                     

      高可用:因为我们是分布式leader-follow集群,每个任务多有多个备份数据,如果节点宕机,集群将自动选举。所以可靠性非常高

      秒级触发:我们是采用时钟秒级分片的数据结构,支持秒级触发任务。不早也不迟

      分布式:组件支持分布式

      高并发:支持多线程同时提交任务,支持多线程同时执行任务

      数据一致性:使用TCC事务机制,保障数据在集群中的强一致性

      海量任务:节点可以存储非常多的任务,只要内存和磁盘足够。触发效率也是极高。需要配置好分派任务线程池和执行任务线程池大小即可

      开源:组件完全在GitHub上开源。任何人都可以随意使用,在不侵犯著作权情况下

      易使用:无需独立部署集群,嵌入式开发。不过多的依赖于第三方中间件,除了zookeeper。

    特别适合以下场景使用:

    • 乘坐网约车结单后30分钟若顾客未评价,则系统将默认提交一条评价信息
    • 银行充值接口,要求5分钟后才可以查询到结果成功OR失败
    • 会员登录系统30秒后自动发送一条登录短信通知
    • 每个登录用户每隔10秒统计一次其某活动中获得的积分

    easyTask-L组件的整体架构如下:

      整体采用分布式设计,leader-follow风格。集群中每一个节点都是leader,同时也可能是其他某个节点的follow。每个leader都有若干个follow。leader上提交的新任务都会强制同步到follow中,删除任务同时也会强制删除follow中的备份任务。集群中所有节点都会在zookeeper中注册并维持心跳。

      为了能更好的可用性,建议集群节点数不少于4个,这样其中一个节点宕机,就能立即得到补充。否则可能导致集群不可用。

    easyTask-L组件的核心“环形队列”的设计架构如下:

      环形队列在之前单机版的easyTask中也讲过,原理都是类似的。客户端提交任务,服务端先将任务进行持久化,再添加上环形队列这个数据结构中去,等待时间片轮询的到来。不同的是这里的持久化机制,改成了分布式存储了。不仅leader自己存储起来,还要同步存储到其follow中去。删除一个任务也是类似的过程。

      任务添加时会计算其触发所属的时间分片槽,等环形队列的始终秒针到达时会判断任务是否可以被执行了。如果可以执行了,则分派任务线程池将其丢入执行任务线程池等待执行。只要执行任务线程池线程数足够,任务将立即得到执行。

       大概的原理清晰了,接下来就是写个HelloWorld程序了!

      easyTask-L不是一个中间件,所以需要一个宿主程式。建议在微服务框架如:dubbo、spring-cloud中使用此组件,并建立一个独立的专门用于处理延时任务的服务模块。这样可以使服务尽可能少的频繁更新重启。保持集群的稳定性。下面我将以一个springboot应用为例来给大家演示如何使用easyTask-L组件。测试环境:JDK1.8、zookeeper3.4.8

      第一步:引入jar包

       如果你是Maven项目,可以使用如下方式配置引入jar包。这可以让项目自动引入easyTask-L中依赖的其他第三方jar包。最新版本请在maven中央仓库中查询。请在pom.xml中加入以下引用

     <dependency>
           <groupId>com.github.liuche51</groupId>
           <artifactId>easyTask-L</artifactId>
           <version>1.0.5</version>
     </dependency>

      第二步:配置启动环形队列

      这里以springboot应用为例,在application.yml中做如下配置

    server:
       port: 8081
    spring:
       application:
          name: easyTask-L
    easyTaskL:
       zkAddress: 127.0.0.1:2181
       taskStorePath: C:/db/node1
       serverPort: 2021
       sQLlitePoolSize: 5
       backupCount: 2
       dispatchPool:
          corePoolSize: 5
          maximumPoolSize: 50
       workPool:
          corePoolSize: 5
          maximumPoolSize: 50

      新建一个启动配置类EasyTaskLConf.java

     1 package com.github.liuche51.easyTaskL.config;
     2 
     3 import com.github.liuche51.easyTask.core.AnnularQueue;
     4 import com.github.liuche51.easyTask.core.EasyTaskConfig;
     5 import org.slf4j.Logger;
     6 import org.slf4j.LoggerFactory;
     7 import org.springframework.beans.factory.annotation.Value;
     8 import org.springframework.context.annotation.Bean;
     9 import org.springframework.context.annotation.Configuration;
    10 
    11 import java.util.concurrent.LinkedBlockingQueue;
    12 import java.util.concurrent.ThreadPoolExecutor;
    13 
    14 @Configuration
    15 public class EasyTaskLConf {
    16     private static Logger log = LoggerFactory.getLogger(EasyTaskLConf.class);
    17     @Value("${easyTaskL.zkAddress}")
    18     private String zkAddress;
    19     @Value("${easyTaskL.taskStorePath}")
    20     private String taskStorePath;
    21     @Value("${easyTaskL.serverPort}")
    22     private int serverPort;
    23     @Value("${easyTaskL.sQLlitePoolSize}")
    24     private int sQLlitePoolSize;
    25     @Value("${easyTaskL.backupCount}")
    26     private int backupCount;
    27     @Value("${easyTaskL.dispatchPool.corePoolSize}")
    28     private int dispatchCorePoolSize;
    29     @Value("${easyTaskL.dispatchPool.maximumPoolSize}")
    30     private int dispatchMaximumPoolSize;
    31     @Value("${easyTaskL.workPool.corePoolSize}")
    32     private int workPoolCorePoolSize;
    33     @Value("${easyTaskL.workPool.maximumPoolSize}")
    34     private int workPoolMaximumPoolSize;
    35     @Bean
    36     public AnnularQueue initAnnularQueue(){
    37         try {
    38             EasyTaskConfig config =new EasyTaskConfig();
    39             config.setTaskStorePath(taskStorePath);
    40             config.setServerPort(serverPort);
    41             config.setSQLlitePoolSize(sQLlitePoolSize);
    42             //config.setBackupCount(backupCount);
    43             config.setZkAddress(zkAddress);
    44             AnnularQueue annularQueue = AnnularQueue.getInstance();
    45             config.setDispatchs(new ThreadPoolExecutor(dispatchCorePoolSize, dispatchMaximumPoolSize, 1000, java.util.concurrent.TimeUnit.MILLISECONDS,
    46                     new LinkedBlockingQueue<Runnable>()));
    47             config.setWorkers(new ThreadPoolExecutor(workPoolCorePoolSize, workPoolMaximumPoolSize, 1000, java.util.concurrent.TimeUnit.MILLISECONDS,
    48                     new LinkedBlockingQueue<Runnable>()));
    49             annularQueue.start(config);
    50             return annularQueue;
    51         }catch (Exception e){
    52             log.error("",e);
    53              return null;
    54         }
    55     }
    56 
    57 }
    EasyTaskLConf.java

      第三步:建立延时任务处理类

      这个需要根据具体情况,创建你要处理的任务类。任务类都需要继承Task 这个父类以及实现Runnable 的run接口。这里可以写你的任务逻辑,getParam()可以获取到你提交任务时传入的参数。

    package com.github.liuche51.easyTaskL.task;
    import com.github.liuche51.easyTask.dto.Task;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    import java.time.ZonedDateTime;
    import java.time.format.DateTimeFormatter;
    import java.util.*;
    import java.util.concurrent.ConcurrentLinkedQueue;
    
    public class CusTask1 extends Task implements Runnable {
        private static Logger log = LoggerFactory.getLogger(CusTask1.class);
        @Override
        public void run() {
            Map<String, String> param = getParam();
            if (param != null && param.size() > 0) {
                log.info("任务1已执行!姓名:{} 生日:{} 年龄:{} 线程ID:{}", param.get("name"), param.get("birthday"), param.get("age"), param.get("threadid"));
            }
        }
    }

      第四步:向环形队列中添加任务

      新建一个Controller,增加以下Action方法。

    @RequestMapping("/once")
    @ResponseBody
    public String once(@RequestParam("name") String name, @RequestParam("time") int time) {
    	CusTask1 task1 = new CusTask1();
    	task1.setEndTimestamp(ZonedDateTime.now().plusSeconds(time).toInstant().toEpochMilli());
    	Map<String, String> param = new HashMap<String, String>() {
    		{
    			put("name", name);
    			put("birthday", "1996-1-1");
    			put("age", "28");
    			put("threadid", String.valueOf(Thread.currentThread().getId()));
    		}
    	};
    	task1.setParam(param);
    	return AnnularQueue.getInstance().submitAllowWait(task1);
    }
    

      完整的demo可以使用Git克隆我的一个开源项目:https://gitee.com/liuche/DubboServer.git  找到子项目easyTask-L-demo即可

  • 相关阅读:
    MySql中子查询,左链,右链,内链,关键字join
    MySql数据库约束,主键和外键约束的添加删除,代码实现,sql语句实现
    MySql查询,聚合函数,分组,分页,排序等复杂查询
    DQL简单语句和条件语句
    django vue
    离线部署Django工程
    数据处理与分析实战小案例系列(一)
    Python常用功能函数总结系列
    Python常用功能函数系列总结(六)
    Python常用功能函数系列总结(五)
  • 原文地址:https://www.cnblogs.com/liuche/p/13360396.html
Copyright © 2011-2022 走看看