zoukankan      html  css  js  c++  java
  • elastic-job动态添加定时任务

    在elastic-job的使用过程中,我们会遇到动态添加定时任务的时候,但是官网上面并没有对这块内容进行说明。按照我的理解以及官网上面elastic-job的框架图,ej的定时任务其实是存储在zookeeper的一个个节点上面,所以通过给zookeeper添加对应的节点即可完成定时任务的添加动作。

    下面上代码:

    import java.text.SimpleDateFormat;
    import java.util.Date;
    
    import com.dangdang.ddframe.job.api.ShardingContext;
    import com.dangdang.ddframe.job.api.simple.SimpleJob;
    import com.dangdang.ddframe.job.config.JobCoreConfiguration;
    import com.dangdang.ddframe.job.config.simple.SimpleJobConfiguration;
    import com.dangdang.ddframe.job.exception.JobSystemException;
    import com.dangdang.ddframe.job.lite.api.JobScheduler;
    import com.dangdang.ddframe.job.lite.config.LiteJobConfiguration;
    import com.dangdang.ddframe.job.reg.zookeeper.ZookeeperRegistryCenter;
    
    import org.springframework.context.ApplicationContext;
    import org.springframework.context.support.ClassPathXmlApplicationContext;
    
    public class DynamicAddJob implements SimpleJob{
    	private static final String CRON_DATE_FORMAT = "ss mm HH dd MM ? yyyy";
    
    	/***
    	 * @param date 时间
    	 * @return cron类型的日期
    	 */
    	public static String getCron(final Date date) {
    		SimpleDateFormat sdf = new SimpleDateFormat(CRON_DATE_FORMAT);
    		String formatTimeStr = "";
    		if (date != null) {
    			formatTimeStr = sdf.format(date);
    		}
    		return formatTimeStr;
    	}
    
    
    	public static void main(String[] args){
    		ApplicationContext context = new ClassPathXmlApplicationContext("classpath:spring/applicationContext-job.xml");
    		ZookeeperRegistryCenter zookeeperRegistryCenter = context.getBean(ZookeeperRegistryCenter.class);
    		long now = System.currentTimeMillis();
    		for (int i = 0; i < 100; i++) {
    			String cron = getCron(new Date(now + (i + 1) * 50000));
    			JobCoreConfiguration coreConfig = JobCoreConfiguration.newBuilder("dynamicDemoJob-" + i, cron, 2).build();
    			SimpleJobConfiguration simpleJobConfig = new SimpleJobConfiguration(coreConfig, DynamicAddJob.class.getCanonicalName());
    			JobScheduler jobScheduler = new JobScheduler(zookeeperRegistryCenter, LiteJobConfiguration.newBuilder(simpleJobConfig).build());
    			try {
    				jobScheduler.init();
    			}catch (JobSystemException e){
    				e.printStackTrace();
    			}
    		}
    	}
    
    	@Override
    	public void execute(ShardingContext shardingContext) {
    		switch (shardingContext.getShardingItem()){
    			case 0:
    				System.out.println("doing sharding 0...job name is "+shardingContext.getJobName());
    				// do something by sharding item 0
    				break;
    			case 1:
    				System.out.println("doing sharding 1...job name is "+shardingContext.getJobName());
    				// do something by sharding item 1
    				break;
    		}
    	}
    }
    

    这里用到比较重要的一个类是JobScheduler,这是lite-core里面一个比较核心的类,这个类其实就是我们的job,他的构造方法包含以下参数:

    • CoordinatorRegistryCenter regCenter:注册中心,这里是zookeeper
    • LiteJobConfiguration liteJobConfig:定时任务的配置信息

    这里可以看一下LiteJobConfiguration这个类,采用了设计模式中的建造者模式进行构建。可能看着会比较摸不着头脑,里面的Builder跟平时的不太一样,这里我们需要知道的是ej的源码采用了lombok这个代码简化的工具,只需要通过注解的形式就能将我们平时所需要的get/set和构造器的内容在编译时创建出来,不需要在代码中体现,能够大大简化我们的代码。

    另外还遇到一个坑。这段代码不能重复使用,第一次跑的时候没问题,过段时间再次跑这个代码时,会在init()处报错,原因是我们新建的job根本不能被fire,我跟了进去。发现,job的cron表达式表示的时间还是以前的时间,这就奇怪了,明明我这边配置了一个新的时间。通过debug,进入init方法中,发现他会更新job信息,而更新时,会去zk上面load配置信息,而zk的znode节点是老的节点,上面存储的配置信息也是老的,所以这块的cron表达式也是旧的时间,根本不会被执行,下面贴出源码,供大家参考。

    init()源码:

        /**
         * 初始化作业.
         */
        public void init() {
            LiteJobConfiguration liteJobConfigFromRegCenter = schedulerFacade.updateJobConfiguration(liteJobConfig);
            JobRegistry.getInstance().setCurrentShardingTotalCount(liteJobConfigFromRegCenter.getJobName(), liteJobConfigFromRegCenter.getTypeConfig().getCoreConfig().getShardingTotalCount());
            JobScheduleController jobScheduleController = new JobScheduleController(createScheduler(), createJobDetail(liteJobConfigFromRegCenter.getTypeConfig().getJobClass()), liteJobConfigFromRegCenter.getJobName());
            JobRegistry.getInstance().registerJob(liteJobConfigFromRegCenter.getJobName(), jobScheduleController, regCenter);
            schedulerFacade.registerStartUpInfo(!liteJobConfigFromRegCenter.isDisabled());
            jobScheduleController.scheduleJob(liteJobConfigFromRegCenter.getTypeConfig().getCoreConfig().getCron());
        }
    

    updateJobConfiguration()的源码如下:

        /**
         * 更新作业配置.
         *
         * @param liteJobConfig 作业配置
         * @return 更新后的作业配置
         */
        public LiteJobConfiguration updateJobConfiguration(final LiteJobConfiguration liteJobConfig) {
            configService.persist(liteJobConfig);
            return configService.load(false);
        }
    

    load()源码如下:

        /**
         * 读取作业配置.
         * 
         * @param fromCache 是否从缓存中读取
         * @return 作业配置
         */
        public LiteJobConfiguration load(final boolean fromCache) {
            String result;
            if (fromCache) {
                result = jobNodeStorage.getJobNodeData(ConfigurationNode.ROOT);
                if (null == result) {
                    result = jobNodeStorage.getJobNodeDataDirectly(ConfigurationNode.ROOT);
                }
            } else {
                result = jobNodeStorage.getJobNodeDataDirectly(ConfigurationNode.ROOT);
            }
            return LiteJobConfigurationGsonFactory.fromJson(result);
        }
    

    可以发现这块load有两种,一种是从缓存(这里的缓存使用Map来实现的TreeCache)中获取getJobNodeData,一种是从注册中心也就是zookeeper中获取getJobNodeDataDirectly。load的时候,根据的是zk的路径,其实也就是任务的jobName,所以我们要尽量避免任务名称的重复。

  • 相关阅读:
    pycharm 使用pip3更新插件已经更新时报错
    剑指Offer系列之题11~题15
    剑指Offer系列之题1~题5
    个人hexo博客(静态,无后台)搭建
    设计模式之单例模式
    Hibernate实现limit语句效果
    Springboot项目中 前端展示本地图片
    eclipse报错:problems during content assist
    python中open与with open的区别
    修改Jenkins目录
  • 原文地址:https://www.cnblogs.com/f-zhao/p/6768842.html
Copyright © 2011-2022 走看看