zoukankan      html  css  js  c++  java
  • 分布式调度框架TBSchedule使用方法

    一、TBSchedule简介

    TBSchedule是来自淘宝的分布式调度开源框架,基于Zookeeper纯Java实现,其目的是让一种批量任务或者不断变化的任务,能够被动态的分配到多个主机的JVM中的不同线程组中并行执行。所有的任务能够被不重复,不遗漏的快速处理。这种框架任务的分配通过分片实现了不重复调度,又通过架构中Leader的选择,存活的自我保证,完成了可用性和伸缩性的保障。
    TBSchedule源码地址:http://code.taobao.org/p/tbschedule/src/

    二、开发环境

    1. WIN10,也可换为Linux
    2. JDK 1.7
    3. Tomcat 8.5
    4. 安装zookeeper

    三、配置步骤

    1.安装zookeeper

    (1)下载zookeeper

    http://zookeeper.apache.org/releases.html

    下载3.4.11版本:

    http://mirrors.hust.edu.cn/apache/zookeeper/zookeeper-3.4.11/zookeeper-3.4.11.tar.gz

    (2)解压至c:/prog/zookeeper/zookeeper-3.4.11

    复制conf下的zoo_sample.cfg为zoo.cfg

    修改dataDir为:

    dataDir=/prog/zookeeper/data

    tickTime单位为毫秒,为心跳间隔和最小的心跳超时间隔

    clientPort是监听客户端连接的端口,默认为2181

    (3)创建目录:c:/prog/zookeeper/data

    2.启动zookeeper

    运行bin/zkServer.cmd

    如果在Linux下,则执行:

    [root@192.168.1.5]$ ./zkServer start

    3.下载TBSchedule

    采用svn来Checkout TBSchedule

    svn地址:http://code.taobao.org/svn/tbschedule/

    4.在Eclipse中导入项目:

    右键工程区域(Package Explorer)->Import...->Maven-Existing Maven Projects

    注意:TBSchedule编码为GBK,但引用TBSchedule的工程编码为UTF-8时,此处也要将TBSchedule工程的编码设置为UTF-8。

    5.安装Tomcat

    (1)下载Tomcat

    地址:https://tomcat.apache.org/download-80.cgi#8.5.27

    (2)解压Tomcat 8.5至c:prog omcatapache-tomcat-8.5.11

    6.配置TBSchedule控制台

    (1)将TBSchedule工程中的consoleScheduleConsole.war拷贝至tomcat/webapps中

    (2)启动tomcat

    (3)浏览器中打开:

    http://localhost:8080/ScheduleConsole/schedule/index.jsp?manager=true

    点击保存会提示:

    错误信息:Zookeeper connecting ......localhost:2181

    如配置正确则可以忽略上述提示,直接进入“管理主页...”。

    7.查看zookeeper中节点

    运行zookeeper下的bin/zkClient.cmd

    输入ls /app-schedule/demo,显示:

    [strategy, baseTaskType, factory]

    说明已经创建znode成功。

    查看TBSchedule控制台中的“Zookeeper数据”,也能看到相同数据。

    8.在项目中使用TBSchedule

    Eclipse中新建一个maven工程tbsdemo

    GroupId:com.jf

    Artifact Id:tbsdemo

    9.在pom.xml中引入Spring、TBSchedule、Zookeeper

    pom.xml内容为:

     

      <modelVersion>4.0.0</modelVersion>
      
      <groupId>com.jf</groupId>
      <artifactId>tbsdemo</artifactId>
      <version>0.0.1-SNAPSHOT</version>
      <packaging>jar</packaging>
      
      <name>tbsdemo</name>
      <url>http://maven.apache.org</url>
      
      <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <!-- spring版本号 -->
        <spring.version>4.0.5.RELEASE</spring.version>
    <!-- mybatis版本号 -->
        <mybatis.version>3.3.0</mybatis.version>
    <!-- log4j日志文件管理包版本 -->
        <slf4j.version>1.7.7</slf4j.version>
        <log4j.version>1.2.17</log4j.version>
      </properties>
      
      <dependencies>
        <dependency>
          <groupId>junit</groupId>
          <artifactId>junit</artifactId>
          <version>4.11</version>
          <scope>test</scope>
        </dependency>
        <!-- spring核心包 -->
        <dependency>
           <groupId>org.springframework</groupId>
           <artifactId>spring-core</artifactId>
           <version>${spring.version}</version>
        </dependency>
        <dependency>
           <groupId>org.springframework</groupId>
           <artifactId>spring-context-support</artifactId>
           <version>${spring.version}</version>
        </dependency>
        <dependency>
           <groupId>org.springframework</groupId>
            <artifactId>spring-test</artifactId>
           <version>${spring.version}</version>
        </dependency>
        <dependency>
           <groupId>log4j</groupId>
           <artifactId>log4j</artifactId>
           <version>${log4j.version}</version>
        </dependency>
        <dependency>
           <groupId>org.apache.zookeeper</groupId>
           <artifactId>zookeeper</artifactId>
           <version>3.4.11</version>
        </dependency>
        <dependency>
           <groupId>com.taobao.pamirs.schedule</groupId>
            <artifactId>tbschedule</artifactId>
            <version>3.3.3.2</version>
        </dependency>
      </dependencies>
    </project>

     

    10.在src/main/resources下创建applicationContext.xml,输入:

    <?xml version="1.0" encoding="UTF-8"?>
        xsi:schemaLocation="http://www.springframework.org/schema/beans 
                            http://www.springframework.org/schema/beans/spring-beans-4.0.xsd 
                            http://www.springframework.org/schema/context 
                            http://www.springframework.org/schema/context/spring-context-4.0.xsd">
      
        <context:component-scan base-package="com.jf" />
        <!-- 引入配置文件 -->
        <bean id="propertyConfigurer"
           class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
        <property name="locations">
               <list>
                  <value>classpath:tbschedule.properties</value>
               </list>
           </property>
        </bean>
        <bean id="scheduleManagerFactory"    class="com.taobao.pamirs.schedule.strategy.TBScheduleManagerFactory"
           init-method="init">
           <property name="zkConfig">
               <map>
                  <entry key="zkConnectString" value="${schedule.zookeeper.address}" />
                  <entry key="rootPath" value="${schedule.root.catalog}" />
                  <entry key="zkSessionTimeout" value="${schedule.timeout}" />
                  <entry key="userName" value="${schedule.username}" />
                  <entry key="password" value="${schedule.password}" />
                  <entry key="isCheckParentPath" value="true" />
               </map>
           </property>
        </bean>
    </beans>

    11.创建TBSchedule配置文件

    在src/main/resources/中创建tbschedule.propertie

    输入:

    #注册中心地址
    schedule.zookeeper.address=localhost:2181
    #定时任务根目录,任意指定,调度控制台配置时对应
    schedule.root.catalog=/app-schedule/demo
    #账户,任意指定,调度控制台配置时对应
    schedule.username=admin
    #密码,任意指定,调度控制台配置时对应
    schedule.password=password
    #超时配置
    schedule.timeout=60000

    注意schedule.username、schedule.password要与TBSchedule控制台中设置的一致。

    12.创建任务数据类TaskModel:

     

    package com.jf.tbsdemo.pojo;
      
    public class TaskModel {
        private long id;
        private String taskInfo;
        public TaskModel(long id, String taskInfo) {
           this.id = id;
           this.taskInfo = taskInfo;
        }
        public long getId() {
           return id;
        }
        public void setId(long id) {
           this.id = id;
        }
        public String getTaskInfo() {
            return taskInfo;
        }
        public void setTaskInfo(String taskInfo) {
           this.taskInfo = taskInfo;
        }
    }

     

    13.创建任务处理类IScheduleTaskDealSingleTest:

    注意:任务处理分单任务和多任务(批处理),分别实现IScheduleTaskDealSingle<T>、IScheduleTaskDealMulti<T>接口,前者的execute()方法参数只有一个任务T,而后者的execute()方法参数为List<T>,本文使用单任务模式。

     

    package com.jf.tbsdemo;
      
    import java.util.ArrayList;
    import java.util.Comparator;
    import java.util.Date;
    import java.util.List;
      
    import org.apache.log4j.Logger;
    import org.springframework.stereotype.Component;
      
    import com.jf.tbsdemo.pojo.TaskModel;
    import com.taobao.pamirs.schedule.IScheduleTaskDealSingle;
    import com.taobao.pamirs.schedule.TaskItemDefine;
      
    public class IScheduleTaskDealSingleTest implements IScheduleTaskDealSingle<TaskModel> {
        private static final Logger logger = Logger.getLogger(IScheduleTaskDealSingleTest.class);
      
        public Comparator<TaskModel> getComparator() {
            return null;
        }
      
        public List<TaskModel> selectTasks(String taskParameter, String ownSign, int taskQueueNum,
                List<TaskItemDefine> taskItemList, int eachFetchDataNum) throws Exception {
      
            logger.info("IScheduleTaskDealSingleTest选择任务列表开始..........");
            List<TaskModel> models = new ArrayList<TaskModel>();
            models.add(new TaskModel(1"task1"));
            models.add(new TaskModel(2"task2"));
      
            return models;
        }
      
        public boolean execute(TaskModel model, String ownSign) throws Exception {
            logger.info("IScheduleTaskDealSingleTest执行开始.........." new Date());
            logger.info("任务" + model.getId() + ",内容:"+ model.getTaskInfo());
            return true;
        }
    }

     

    其中,selectTasks()方法负责取得要处理的任务信息,execute()方法为处理任务的方法。selectTasks()方法可以理解为生产者,execute()方法可以理解为消费者。

    14.创建主程序类TaskCenter:

     

    package com.jf.tbsdemo;
      
    import org.apache.log4j.Logger;
    import org.springframework.context.ApplicationContext;
    import org.springframework.context.support.FileSystemXmlApplicationContext;
      
    public class TaskCenter {
        private static final Logger logger = Logger.getLogger(TaskCenter.class);
      
        public static void main(String[] args) throws Exception {
           // 初始化Spring
           ApplicationContext ctx = new FileSystemXmlApplicationContext("classpath:applicationContext.xml");
           logger.info("---------------task start------------------");
        }
    }

     

    15.在Eclipse中运行主程序类TaskCenter

    16.在TBSchedule中创建任务:

    (1)进入TBSchedule的控制台->任务管理

    点击“创建新任务…”

    (2)配置任务属性:

    • 在任务处理的SpringBean中输入:iScheduleTaskDealSingleTest
    • 处理模式分为:SLEEP、NOTSLEEP,其中SLEEP模式是指当一个线程处理完任务后在任务池中取不到其他任务时,会检查其他线程是否活动,如果是则自己休眠,否则说明自己是最后一位,则调用业务接口取得待处理的任务放入任务池,并唤醒其他线程处理。

    NOTSLEEP模式下线程在任务池中取不到任务时,将立即调用业务接口获取待处理的任务。

    SLEEP模式较为简单,因为取任务的线程同一时间只有一个,不易发生冲突,效率也会较低。NOTSLEEP模式开销较大,也要防止发生重复获取相同任务。

    • 设置执行开始时间结束时间:与Crontab格式一致,在本时间段内任务才会执行。
    • 添加任务项:

    0,1,2,3,4,5,6,7,8,9

    17.在TBSchedule中创建调度策略:

    (1)进入TBSchedule的控制台->调度策略

    点击“创建新策略…”

    (2)填写策略属性:

    注意任务名称要与新建的任务名称一致。

    (3)点击创建,将立即启动调度任务

    另外,除了在控制台中配置调度策略、任务,还可以通过通过代码、Spring配置来设置任务调度参数,推荐采用Spring配置方式。

    18.代码方式

    创建类TaskCenter:

     

    package com.jf.tbsdemo;
      
    import java.util.Properties;
      
    import javax.annotation.Resource;
      
    import org.apache.log4j.Logger;
    import org.springframework.context.ApplicationContext;
    import org.springframework.context.support.FileSystemXmlApplicationContext;
      
    import com.taobao.pamirs.schedule.strategy.ScheduleStrategy;
    import com.taobao.pamirs.schedule.strategy.TBScheduleManagerFactory;
    import com.taobao.pamirs.schedule.taskmanager.ScheduleTaskType;
      
    public class TaskCenter {
        private static final Logger logger = Logger.getLogger(TaskCenter.class);
      
        // 初始化调度工厂
        @Resource
        TBScheduleManagerFactory scheduleManagerFactory = new TBScheduleManagerFactory();
        private void startTask() {
           // 初始化Spring
           ApplicationContext ctx = new FileSystemXmlApplicationContext("classpath:applicationContext.xml");
      
      
           Properties p = new Properties();
           p.put("zkConnectString""localhost:2181");
           p.put("rootPath""/app-schedule/demo");
           p.put("zkSessionTimeout""60000");
           p.put("userName""admin");
           p.put("password""password");
           p.put("isCheckParentPath""true");
      
           scheduleManagerFactory.setApplicationContext(ctx);
      
           try {
               scheduleManagerFactory.init(p);
        
               // 创建任务调度任务的基本信息
               String baseTaskTypeName = "DemoTask";
               ScheduleTaskType baseTaskType = new ScheduleTaskType();
               baseTaskType.setBaseTaskType(baseTaskTypeName);
               baseTaskType.setDealBeanName("demoTaskBean");
               baseTaskType.setHeartBeatRate(10000);
               baseTaskType.setJudgeDeadInterval(100000);
               baseTaskType.setTaskParameter("AREA=BJ,YEAR>30");
               baseTaskType.setTaskItems(ScheduleTaskType
                      .splitTaskItem("0:{TYPE=A,KIND=1},1:{TYPE=A,KIND=2},2:{TYPE=A,KIND=3},3:{TYPE=A,KIND=4},"
                             "4:{TYPE=A,KIND=5},5:{TYPE=A,KIND=6},6:{TYPE=A,KIND=7},7:{TYPE=A,KIND=8},"
                             "8:{TYPE=A,KIND=9},9:{TYPE=A,KIND=10}"));
               baseTaskType.setFetchDataNumber(500);
               baseTaskType.setThreadNumber(5);
               scheduleManagerFactory.getScheduleDataManager().createBaseTaskType(baseTaskType);
               logger.info("创建调度任务成功:" + baseTaskType.toString());
        
               // 创建任务的调度策略
               String taskName = baseTaskTypeName;
               String strategyName = taskName + "-Strategy";
               try {
                   scheduleManagerFactory.getScheduleStrategyManager().deleteMachineStrategy(strategyName, true);
               catch (Exception e) {
                  e.printStackTrace();
               }
               ScheduleStrategy strategy = new ScheduleStrategy();
               strategy.setStrategyName(strategyName);
               strategy.setKind(ScheduleStrategy.Kind.Schedule);
               strategy.setTaskName(taskName);
               strategy.setTaskParameter("china");
        
               strategy.setNumOfSingleServer(1);
               strategy.setAssignNum(10);
               strategy.setIPList("127.0.0.1".split(","));
               scheduleManagerFactory.getScheduleStrategyManager().createScheduleStrategy(strategy);
      
               logger.info("创建调度策略成功:" + strategy.toString());
      
               logger.info("---------------task start------------------");
           catch(Exception e) {
               logger.error("出现异常", e);
           }
        }
      
        public static void main(String[] args) throws Exception {
           TaskCenter taskCenter = new TaskCenter();
           taskCenter.startTask();
        }
    }

     

    19.Spring配置文件方式

    (1)增加类AbstractBaseScheduleTask:

     

    package com.jf.tbsdemo;
      
    import com.taobao.pamirs.schedule.IScheduleTaskDealSingle;
    import com.taobao.pamirs.schedule.strategy.ScheduleStrategy;
    import com.taobao.pamirs.schedule.taskmanager.ScheduleTaskType;
      
    public abstract class AbstractBaseScheduleTask<T> implements IScheduleTaskDealSingle<T> {
        /**
         * 调度任务的配置
         */
        private ScheduleTaskType scheduleTaskType;
        /**
         * 调度策略的配置
         */
        private ScheduleStrategy scheduleStrategy;
      
        public ScheduleTaskType getScheduleTaskType() {
            return scheduleTaskType;
        }
      
        public void setScheduleTaskType(ScheduleTaskType scheduleTaskType) {
            this.scheduleTaskType = scheduleTaskType;
        }
      
        public ScheduleStrategy getScheduleStrategy() {
            return scheduleStrategy;
        }
      
        public void setScheduleStrategy(ScheduleStrategy scheduleStrategy) {
            this.scheduleStrategy = scheduleStrategy;
        }
    }

    (2)修改IScheduleTaskDealSingleTest:

    类声明改为:

    public class IScheduleTaskDealSingleTest extends AbstractBaseScheduleTask<TaskModel> {

     

    (3)在applicationContext.xml中对声明IScheduleTaskDealSingleTest的Bean并注入参数,内容为:

     

     

    <?xml version="1.0" encoding="UTF-8"?>
        xsi:schemaLocation="http://www.springframework.org/schema/beans 
                            http://www.springframework.org/schema/beans/spring-beans-4.0.xsd 
                            http://www.springframework.org/schema/context 
                            http://www.springframework.org/schema/context/spring-context-4.0.xsd">
      
        <context:component-scan base-package="com.jf" />
        <!-- 引入配置文件 -->
        <bean id="propertyConfigurer"
           class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
           <property name="locations">
               <list>
                  <value>classpath:tbschedule.properties</value>
               </list>
           </property>
        </bean>
      
        <!--tbschedule管理器初始化(配置zookeeper,注册调度任务和调度策略)-->
    <bean id="systemTBScheduleManagerFactory" class="com.jf.tbsdemo.SystemTBScheduleManagerFactory">
       <property name="zkConfig">
                <map>
                    <entry key="zkConnectString" value="${schedule.zookeeper.address}" />
                  <entry key="rootPath" value="${schedule.root.catalog}" />
                  <entry key="zkSessionTimeout" value="${schedule.timeout}" />
                  <entry key="userName" value="${schedule.username}" />
                  <entry key="password" value="${schedule.password}" />
                  <entry key="isCheckParentPath" value="true" />
                </map>
            </property>
        </bean>
        
    <bean name="scheduleTaskType" class="com.taobao.pamirs.schedule.taskmanager.ScheduleTaskType">
    <!-- 心跳频率(毫秒) -->
            <property name="heartBeatRate" value="5000" />
            <!-- 假定服务死亡间隔(毫秒) -->
       <property name="judgeDeadInterval" value="60000" />
        <!-- 处理模式 -->
            <property name="processorType" value="SLEEP" />
            <!-- 线程数 -->
            <property name="threadNumber" value="5" />
    <!--允许执行的开始时间-->
            <property name="permitRunStartTime" value="" />
            <!--允许执行的结束时间-->
            <property name="permitRunEndTime" value=""  />
    <!--当没有数据的时候,休眠的时间-->
    <property name="sleepTimeNoData" value="3000"  />
    <!--在每次数据处理完后休眠的时间-->
    <property name="sleepTimeInterval" value="1000"  />
    <!--每次获取数据的数量-->
    <property name="fetchDataNumber" value="10"  />
    <!--任务项数组-->
            <property name="taskItems">
                <list>
                    <value>0:{TYPE=A,KIND=1}</value>
                    <value>1:{TYPE=B,KIND=2}</value>
                    <value>2:{TYPE=C,KIND=3}</value>
                </list>
    </property>
        </bean>
        <bean name="scheduleStrategy" class="com.taobao.pamirs.schedule.strategy.ScheduleStrategy">
            <!--最大线程组数量-->
            <property name="assignNum" value="9" />
            <!--单个机器(JVM)的线程组数量-->
            <property name="numOfSingleServer" value="3" />
            <!--策略运行的机器(JVM)IP-->
            <property name="IPList">
                <list>
                    <value>127.0.0.1</value>
                </list>
            </property>
    </bean>
    <!--任务simpleTask-->
        <bean id="simpleTask" class="com.jf.tbsdemo.IScheduleTaskDealSingleTest" >
            <property name="scheduleTaskType" ref="scheduleTaskType" />
            <property name="scheduleStrategy" ref="scheduleStrategy" />
        </bean>
    </beans>

     

     

    (4)打开控制台,删除所有已有的任务、调度策略,再启动TaskCenter,刷新页面则可看到当前出现了正在运行的任务和调度策略。

     

    (5)也可以在控制台中修改策略,但重启TaskCenter之后会恢复Spring中的配置信息。

     

    20.数据分片方法

    为了避免TBSchedule管理的多线程重复处理数据,需要采用分片,实现方法如下:

    (1)在selectTasks()方法中实现分片获取待处理数据

    (2) selectTasks()方法的taskItemList参数为当前线程分配到的可处理任务分片信息,全部任务分片信息由配置文件中的taskItem定义,每项任务信息为TaskItemDefine类型,其中taskItemId标明了分片ID(即0,1,2),parameter为自定义参数(即{TYPE=A,KIND=1},{TYPE=B,KIND=2},{TYPE=C,KIND=3})。

    (3)根据上面算出的分片ID来取得相应的待处理任务,例如selectTasks()方法从数据库中获取待处理的交易请求记录,可以将记录的主键或者其他字段HashCode值的余数作为分片ID,在selectTasks()方法中只获取与taskItemList中指定分片ID相同的任务,避免不同线程重复获取同一任务。

    (4)在系统运行过程中,线程数量会有所变化,因此要在每个selectTasks()方法执行开始先获取taskItemList。

    (5) 每次执行selectTasks()方法取得记录条数不要超过eachFetchDataNum

    (6)典型的分片代码实现:

     

    /**
     * 根据条件,查询当前调度服务器可处理的任务
     * @param taskParameter 任务的自定义参数
     * @param ownSign 当前环境名称
     * @param taskItemNum 当前任务类型的任务队列数量
     * @param taskItemList 当前调度服务器,分配到的可处理队列
     * @param eachFetchDataNum 每次获取数据的数量
     * @return
     * @throws Exception
     */
    public List<Date> selectTasks(String taskParameter, String ownSign, int taskItemNum, List<TaskItemDefine> taskItemList, int eachFetchDataNum) throws Exception {
        List<Date> dateList = new ArrayList<>();
      
        List<Long> taskIdList = new ArrayList<>();
        for(TaskItemDefine t : taskItemList){ //确定当前任务处理器需处理的任务项id
            taskIdList.add(Long.valueOf(t.getTaskItemId()));
        }
      
    for(int i=0;i<eachFetchDataNum;i++){ // 添加最多指定数量的待处理数据
        Date date = new Date(); //生成待处理数据
            Long remainder = date.getTime() % taskItemNum ;
            if(taskIdList.contains(remainder)){  //根据数据取模,判断当前待处理数据,是否应由当前任务处理器处理
                dateList.add(date);
            }
            TimeUnit.SECONDS.sleep(1);
        }
        return dateList;  //返回当前任务处理器需要处理的数据
    }

     

     

    21.参数说明

    (1)zookeeper参数

    zkConnectString:zookeeper注册中心地址

    rootPath:定时任务根目录,任意指定,调度控制台配置时对应

    zkSessionTimeout:超时时间

    userName:账户,任意指定,调度控制台配置时对应

    password:密码,任意指定,调度控制台配置时对应

    isCheckParentPath:设置为true会检查上级目录是否已经被用作TBSchedule调度,如果是则启动任务失败。

    (2)任务参数:

    heartBeatRate:心跳频率(毫秒)

    judgeDeadInterval:假定服务死亡间隔(毫秒)

    sleepTimeNoData: 当没有数据的时候,休眠的时间

    sleepTimeInterval:在每次数据处理完后休眠的时间

    processorType:处理模式,可为SLEEP或NOTSLEEP。

    permitRunStartTime:执行开始时间如果为空则不定时,直接执行。

    permitRunEndTime:执行结束时间,与执行开始时间之间的时间才可以执行任务。

    taskItems: 任务项数组,例如:0:{TYPE=A,KIND=1},1:{TYPE=B,KIND=2},2:{TYPE=C,KIND=3}

    在调度过程中,某线程分得了获取数据的任务,假设获取第1项任务,则在selectTasks()方法的taskItemList参数中包含第1项任务的信息,TaskItemDefine类型,包含:taskItemId、parameter成员变量,分别为:1、{TYPE=B,KIND=2}。可根据该信息取得相应的数据。

    fetchDataNumber:selectTasks()方法每次获取数据的数量

    executeNumber:每次执行数量,即execute()方法每次取得的任务数量,只在bean实现IScheduleTaskDealMulti才生效。

    threadNumber:每个线程组中的线程数

    maxTaskItemsOfOneThreadGroup:每一组线程能分配的最大任务数量,避免在随着机器的减少把正常的服务器压死,0或者空表示不限制

    taskParameter:任务的自定义参数,可作为selectTasks中的参数传入。

    (3)调度策略参数:

    strategyName:策略名称,必须填写,不能有中文和特殊字符。

    kind:任务类型,Schedule,Java,Bean 大小写敏感。

    taskName:要配置调度策略的任务名称,与这一任务配置的名称要一致。

    taskParameter:任务参数,逗号分隔的Key-Value。对任务类型为Java、Bean的有效,对任务类型为Schedule的无效,需要通过任务管理来配置。

    assignNum:最大线程组数量,是所有机器(JVM)总共运行的线程组的最大数量。

    numOfSingleServer单个机器(JVM)的线程组数量,如果是0,则表示无限制。

    IPList:策略运行的机器(JVM)IP列表,127.0.0.1或者localhost会在所有机器上运行。

     

    四、注意事项

    1. 如果分配给某线程的任务还未执行完,重启该线程所属进程后,这些任务将会丢失,因此要自行实现幂等,且不要直接kill进程,而是发消息通知各线程执行完毕后安全退出。

    当在控制台点击停止任务的按钮时。会将任务池中未处理的任务清除,而停止前的在处理的任务将继续执行。

    1. 如果要设置任务间隔一定时间运行一次,假设为10秒,可以将permitRunEndTime、permitRunStartTime设置为空,将sleepTimeNoData、sleepTimeInterval均设置为10000,这样每个线程运行完毕后不管有没有任务均休眠10秒。

    也可以只设置permitRunStartTime,将permitRunEndTime设置为空或者-1。

    1. 一般来说没有任务时线程休眠时间间隔较大,而有任务时休眠时间间隔要较小,因此sleepTimeNoData一般都大于sleepTimeInterval。
    2. 使用同一个zookeeper的不同项目如果使用同一个zookeeper实例时,所使用的zookeeper根目录不能有父子关系,即使是同一项目的不同实例(例如测试环境、开发环境、准生产环境各部署一套实例)也要使用不具有父子关系的不同根目录。
    3. 任务中配置的每次获取数据量(fetchDataNumber)要大于10倍的线程数(threadNumber),即:

    fetchDataNumber >= threadNumber * 最少循环次数10,否则TBSchedule日志会提示:参数设置不合理,系统性能不佳。

    1. 假定服务死亡间隔judgeDeadInterval至少要大于心跳频率heartBeatRate的5倍。
    2. 任务配置出错时,在控制台会对该任务加红色高亮底色标识。
    3. 当线程组运行出现故障未及时取数时,在控制台会对该线程组加红色高亮底色标识。
    4. 当运行过程中增加节点或修改配置,日志中可能会出现提示Zookeeper节点不存在的NullPointerException异常,不用理会。

    10.理论上单台机器最大线程数为:

    线程数threadNumber*单个机器的线程组数量numOfSingleServer,而numOfSingleServer并不是上限,仅有1台机器时,该机器的线程组数量能达到assignNum。

    11.TBSchedule给各机器以线程组为单位进行分配,所有机器的线程组总数不会超过最大线程组数量assignNum。

    12.一般来说在selectTasks()中获取任务,然后在execute()方法中处理,在SLEEP处理模式下,最后一个活动线程才会去获取任务,因此不会出现重复执行任务的情况。但如果在selectTasks()或execute()中再新建线程或线程池来处理任务,会出现新建线程未处理完成,但TBSchedule框架认为已处理结束从而进行下一次获取任务的操作,可能会重复取出正在处理的任务,因此应尽量避免新建线程和线程池。

    13.在selectTasks()中获取到任务后或者在execute()中处理完任务后应更改状态,防止下次再次取到,造成重复处理。

    14.在SLEEP处理模式下,配置的分片数量应合理,分片较多则同一线程组分配过多分片,对不同分片分别查询获取任务则效率会降低,而分片较少则不利于扩展机器。

    15.在SLEEP处理模式下,同一时间只会有一个线程执行selectTasks(),其他线程均处于休眠状态,因此不宜在selectTasks()中进行过多操作而让其他线程等待时间过长,处理工作应尽量在execute()中进行。或者采用NOTSLEEP模式,让多个线程可以同时运行selectTasks()获取不同分片的任务。

    NOTSLEEP模式需要实现getComparator(),防止从任务池中取出的某项任务正在被本进程中的其他线程处理。原理是在取任务前先取得正在运行的任务放入maybeRepeatTaskList中,取得任务放入任务池后,再与maybeRepeatTaskList中的每项任务对比。同时取任务时加锁保证只有一个线程在取任务。

    只有在NotSleep模式下getComparator()才有效,在Sleep模式下无效。

    执行getComparator()时会遍历正在处理的任务池。

    16.复杂任务可以拆分成多项子任务,并配置不同的策略,为操作最复杂的子任务分配较多线程,从而提高总体的处理效率。

    如果不进行拆分,则会有单个线程处理时间较长,并发的线程数较少,处理时间长短不一, 且任务分配不均匀等问题。例如任务为:从FTP中取得不同大小的文件进行解析,将每行数据写入分库中。

    如果在selectTasks中取得的每个任务对应一个文件,在execute()中处理任务时(解析文件并入库),效率会非常低。可对解析文件的任务做改造:

    改造方案1:在execute()中解析文件后入库时采用线程池处理。但这样仍不能解决任务分配不匀的问题,且引入线程池会增加线程数量。尤其是会造成框架错误判断任务已结束,导致重复处理,因此本方案不合理。

    改造方案2:将任务拆分为两个子任务,文件解析和入分库。

    子任务1:在原execute()中对文件解析后不直接入分库,而是取1000条合成1条记录存入本地单库的中间表,解析文件耗时较短且记录数较少可以较快完成,且时间不均可以忽略。

    子任务2:对中间表记录按照自增主键ID分片,selectTasks()中取得记录,然后拆分成原始单条记录返回,在execute()中对单条记录进行入库处理。

     

    改造方案2的线程数较少,且任务分配会比较均匀,同时避免了单线程处理一个大任务等待时间过长的问题。

    17.Zookeeper存储的数据:机器、策略定义、任务定义、任务分片(包含当前属于哪个机器处理)

    18.在zookeeper中每台机器均可保存不同的线程数等配置,说明不同机器可以使用不同的线程数等配置,但不建议使用不同配置。

    19.在多任务模式下,executeNumber不要设置的太大,较小一些可以减少等待最后一个活跃线程的时间,并且如果fetchDataNumber<线程数*executeNumber,则会有线程无法分得任务。任务分配在本进程中进行,并不会请求zookeeper,因此设置的较小一些效率更高。

    20.当需要重启应用时,要在控制台上先把机器全部停止,等待线程组消失,否则直接重启应用时会出现新的机器实例,旧的机器实例未能释放分片,导致新的机器获取不到任务分片无法执行,控制台上会显示新、旧线程组均为红色。

    21.使用同一zookeeper目录的多台机器中,先启动的机器一般为leader,负责分片的分配。

    22.控制台显示某线程组红色异常,长时间未取数时,可能是取任务的selectTasks()运行异常,或者每次取的任务数量过大,导致长时间未会处理完,可以适当调小eachFetchDataNum。

    也有可能是因为在SLEEP模式下任务处理时间过长。

    23.分片按线程组进行分配,同一机器中有多个线程组时,该机器分得多个分片,也会均匀分配给线程组,每个线程组各自独立取任务调度,不会同时取任务。

    24.当加入新机器时,会请求获得分片。框架10秒扫描一次,如果发现机器数量有变化,且占用分片较多的机器完成任务则会自动重新分配分片。

    25.如果每次从数据库里取待处理记录生成任务时,如果总记录数较多,即使取到的有效记录数较少,则扫描整张表花费时间较长,除了建立必要的索引,也应该减少无数据时扫描频次,即降低sleepTimeInterval,也可在selectTasks()中在取到记录后检查数量,如果较少则sleep一段时间再返回任务,也应加大sleepTimeNoData。

    26.如果任务处理结束后还要合并结果再进入下一轮处理,则最慢的机器会减慢整体速度,因此要尽量保证任务分片均匀分给不同机器,分片数量要能被机器数量整除,也能被最大线程组数量assignNum整除,这样每台机器处理的任务数量大致相同。

    27.在Zookeeper连接配置中保存时提示:

    错误信息:Zookeeper connecting ......localhost:2181

    同时无法进入其他页面,可能由于采用不同的用户名密码配置过同一目录造zookeeper数据异常,可以在zookeeper中手动删除目录数据,或者更换新目录后重启应用。

    在zookeeper中删除目录方法:

    [root@192.168.1.5]$ ./zkClient.sh

    [zk: localhost:2181(CONNECTED) 0] addauth digest admin:password

    [zk: localhost:2181(CONNECTED) 1] rmr /app-schedule/demo

     

    在控制台无法修改目录的账户密码,可在zookeeper客户端中删除目录后重建目录及账户密码。

     

    28.每位用户登录控制台后打开的配置信息均保存在bin目录下的pamirsScheduleConfig.properties,因此在同一Tomcat下操作不同的TBSchedule目录时会冲突,已修改TBSchedule的代码解决了这一问题。

    29.selectTasks()方法从数据库中取得记录时,可以在select语句中对某字段进行mod取余,这样只获取本线程组所分配的分片。一般有多个分库时,同时也会采用mycat,主键ID无法采用自增,常用雪花算法来生成不重复的ID,但对这种ID取模一般不容易均匀,因此可增加创建时间戳字段来用于取模,一般各机器取得的任务数较为均匀。

    30.如果使用zookeeper集群,则在tbschedule.properties中配置schedule.zookeeper.address时,格式如下:

    IP1:Port1,IP2:Port2,IP3:Port3

    31.TBSchedule无法实现任务均衡的转移,即当一台机器处理任务较多,其他机器较闲时,不会转到其他机器。

    32.如果使用数据库连接池,则单个机器中的线程数量不要比连接池数量大太多,或者不高于,以防出现线程获取不到数据库连接的情况出现。

    33.Sleep模式在实现逻辑上相对简单清晰,但存在一个大任务处理时间长,导致其它线程不工作的情况。
    在NotSleep模式下,减少了线程休眠的时间,避免大任务阻塞的情况,但为了避免数据被重复处理,增加了CPU在数据比较上的开销。
    同时要求业务接口实现对象的比较接口。
    如果对任务处理不允许停顿的情况下建议用NotSleep模式,其它情况建议用Sleep模式。
    34.主机编号最小的为Leader,如果是Leader第一次启动则会清除所有垃圾数据。
    35.如果任务是轮询类型,可将permitRunStartTime、permitRunEndTime均设置为空,将一直运行,可设置sleepTimeNoData、sleepTimeInterval来sleep。

    如果要设置在一定时间做内轮询,则可以同时设置permitRunStartTime、permitRunEndTime,在这一时间段内会执行selectTasks()及execute()。

    在到达结束时间时,会将任务池清空,并设置停止运行标志,此时将无法再启动新的线程运行execute(),因此如果selectTasks()运行时间略长于permitRunEndTime-permitRunStartTime,则execute()可能会永远都无法被执行到。

    例如:permitRunStartTime设置为:0/10 * * * * ?

        permitRunEndTime设置为:5/10 * * * * ?

    而selectTasks()执行时间为6秒,则在第6秒时execute()没有机会被执行。

    因此对于轮询任务,最好将permitRunStartTime、permitRunEndTime均设置为空.

    将permitRunEndTime设置为-1与为空作用一致。
    36.如果任务是定时任务,则可以只设置permitRunStartTime,而将permitRunEndTime设置为空或-1,这样在selectTasks()取得任务为空时会sleep(),直到下一个开始时间时才会执行。

    例如:permitRunStartTime设置为:0/10 * * * * ?

        permitRunEndTime设置为:-1

    则在每10秒的第0秒开始执行selectTasks()取任务,如果取到任务则会交给其他线程执行execute(),如果未取到则会sleep(),直到下一个开始时间时才会执行。

    如果只希望同一时间仅有一个线程处理任务,则可以只设置一个分片,并采用SLEEP模式,numOfSingleServer、assignNum均设置为1。

    37.每个心跳周期都会向zookeeper更新心跳信息,如果超过judgeDeadInterval(假定服务死亡间隔)未更新过,则清除zookeeper上的任务信息及Server信息。每个心跳周期也会重新分配分片。
    也会清除zookeeper中分配给已消失机器上的任务信息。
    38.如果有定时任务执行出现故障,或者因重启错过了执行时间,如果要在下一次时间点前再次执行,则可以在控制台上临时增加任务类型、策略,来临时定时执行一次,月日也加上防止忘记删除任务导致多次重复执行。执行完成后再删除该任务类型、策略。
    39.有时应用启动后日志显示正常,但不执行任务,有可能是zookeeper中数据出现错误,可删除该目录,重启应用即可。
    40.在控制台上点击机器的停止按钮时,会将zookeeper中该机器的运行状态设置为false,并清除本机器的任务池中未被处理的任务。在每台机器进程中每2秒刷新一次运行状态,当检测到false,则在任务执行完毕后不再取任务处理。
    41.SystemTBScheduleManagerFactory也可取消,改用@Bean注解,例如:
    ScheduleJobConfiguration.java:

     

    package com.jfbank.schedule.monitor.alarm.tbs;
      
    import java.util.HashMap;
    import java.util.Map;
      
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
      
    import com.taobao.pamirs.schedule.strategy.TBScheduleManagerFactory;
      
    @Configuration 
    public class ScheduleJobConfiguration{ 
      
        @Bean(initMethod = "init"
        public TBScheduleManagerFactory tbScheduleManagerFactory( 
                @Value("${schedule.zookeeper.address}")String zkConnectString,  
                @Value("${schedule.root.catalog}")String rootPath, 
                @Value("${schedule.timeout}")String zkSessionTimeout, 
                @Value("${schedule.username}")String userName, 
                @Value("${schedule.password}")String password, 
                @Value("${schedule.isCheckParentPath}")String isCheckParentPath) { 
            TBScheduleManagerFactory tbScheduleManagerFactory = new TBScheduleManagerFactory(); 
            Map<String, String> zkConfig = new HashMap<String, String>(); 
            zkConfig.put("zkConnectString", zkConnectString); 
            zkConfig.put("rootPath", rootPath); 
            zkConfig.put("zkSessionTimeout", zkSessionTimeout); 
            zkConfig.put("userName", userName); 
            zkConfig.put("password", password); 
            zkConfig.put("isCheckParentPath", isCheckParentPath); 
            tbScheduleManagerFactory.setZkConfig(zkConfig); 
            return tbScheduleManagerFactory; 
        }
    }
  • 相关阅读:
    记录 vue 中使用 SVG 渐变填充遇到过的坑
    关于map some filter every等遍历的一些临时记忆
    Blob文件处理
    电子签名 VUE加canvas实现 移动端和PC实现
    js导出excell表
    video
    移植QT5.6到嵌入式开发板(史上最详细的QT移植教程)
    Ubuntu16.04打开Qt显示/home/user/.config/QtProject/qtcreator/qtversion.xml : Permission denied
    哨兵2号影像数据获取以及处理流程
    Sentinel-2 哨兵二号数据下载及处理教程
  • 原文地址:https://www.cnblogs.com/zzpblogs/p/10823747.html
Copyright © 2011-2022 走看看