zoukankan      html  css  js  c++  java
  • 使用ThreadPoolTaskScheduler动态修改调度时间

    SchedulingConfigurer 接口只能统一修改,要分开控制的话有多少个job就要有多少个实现。比较麻烦

    配置线程池ThreadPoolTaskScheduler

    @Configuration
    public class JobConfig {
        @Bean("taskExecutor")
        public ThreadPoolTaskScheduler taskExecutor() {
            ThreadPoolTaskScheduler executor = new ThreadPoolTaskScheduler();
            executor.setPoolSize(20);
            executor.setThreadNamePrefix("taskExecutor-");
            executor.setWaitForTasksToCompleteOnShutdown(true);
            executor.setAwaitTerminationSeconds(300);
            return executor;
        }
    }
    

    封装实现

    @Component
    @Slf4j
    public class Jobs {
        /**
         * key josId
         * value ScheduledFuture object
         */
        private final ConcurrentHashMap<Integer, ScheduledFuture> jobsMap = new ConcurrentHashMap<>(8);
    
        private final ApplicationContext applicationContext;
        private final ThreadPoolTaskScheduler threadPoolTaskScheduler;
        private final EcoScheduleJobMapper scheduleJobMapper;
    
        public Jobs(ApplicationContext applicationContext, EcoScheduleJobMapper scheduleJobMapper, ThreadPoolTaskScheduler threadPoolTaskScheduler) {
            this.applicationContext = applicationContext;
            this.scheduleJobMapper = scheduleJobMapper;
            this.threadPoolTaskScheduler = threadPoolTaskScheduler;
            init();
        }
    
        public Map<Integer, ScheduledFuture> getJobsMap() {
            return jobsMap;
        }
    
        public void start(Integer jobId) {
            if (jobsMap.containsKey(jobId)) {
                log.debug("job:{} is running", jobId);
                // running...
                return;
            }
    
            executeJob(selectJobsDetail(jobId));
        }
    
        public void stop(Integer jobId) {
            ScheduledFuture scheduledFuture = jobsMap.get(jobId);
            if (scheduledFuture != null) {
                scheduledFuture.cancel(true);
                log.debug("job cancel:{}", scheduledFuture);
            } else
                log.debug("jobId:{} is not running ", jobId);
        }
    
        private EcoScheduleJob selectJobsDetail(Integer jobId) {
            return scheduleJobMapper.selectByPrimaryKey(jobId);
        }
    
        // 修改corn配置,先取消再重新启动
        public void reset(Integer jobId) {
            ScheduledFuture scheduledFuture = jobsMap.get(jobId);
            EcoScheduleJob scheduleJob = selectJobsDetail(jobId);
            if (Objects.nonNull(scheduledFuture)) {
                scheduledFuture.cancel(true);
                log.debug("job:{} cancel success", scheduleJob);
            }
            if (Objects.nonNull(scheduleJob)) {
                executeJob(scheduleJob);
                log.debug("job:{} start success", scheduleJob);
            }
        }
    
        // 启动初始化启动所有job
        private void init() {
            // 从数据库获取任务信息
            List<EcoScheduleJob> jobs = scheduleJobMapper.selectJobs();
            if (CollectionUtils.isNotEmpty(jobs)) {
                jobs.forEach(this::executeJob);
            }
        }
    
        private void executeJob(EcoScheduleJob job) {
            Runnable runnable = null;
            try {
                Class<?> clazz = Class.forName(job.getClassName());
                Constructor<?>[] declaredConstructors = clazz.getDeclaredConstructors();
                if (!Arrays.isNullOrEmpty(declaredConstructors)) {
                    // 只取一个构造器
                    if (declaredConstructors.length > 1) {
                        log.warn("{} 存在多个构造器,只能有一个", clazz);
                        return;
                    }
                    Constructor<?> constructor = declaredConstructors[0];
                    Class<?>[] parameterTypes = constructor.getParameterTypes();
                    if (parameterTypes.length == 0) {
                        log.warn("{} 无参构造器", clazz);
                        return;
                    }
                    Class<?> parameterType = parameterTypes[0];
                    Object bean = applicationContext.getBean(parameterType);
                    runnable = (Runnable) constructor.newInstance(bean);
                }
            } catch (ClassNotFoundException | InstantiationException | IllegalAccessException | InvocationTargetException e) {
                log.error("任务调度失败:{}", e);
            }
            if (Objects.nonNull(runnable)) {
                ScheduledFuture<?> schedule = threadPoolTaskScheduler.schedule(runnable, new CronTrigger(job.getCron()));
                jobsMap.put(job.getJobId(), schedule);
                log.debug("job:{} is running...", job);
            } else {
                log.error("任务调度失败 job Class is null");
            }
        }
    
    }
    

    pojo

    @Table(name = "`eco_schedule_job`")
    @Data
    public class EcoScheduleJob implements Serializable {
        /**
         * pk
         */
        @Id
        @Column(name = "`job_id`")
        private Integer jobId;
    
        /**
         * 类名
         */
        @Column(name = "`class_name`")
        private String className;
    
        /**
         * cron表达式
         */
        @Column(name = "`cron`")
        private String cron;
    
        /**
         * 状态 1 停用 0 启动
         */
        @Column(name = "`status`")
        private String status;
    }
    

    controller

    @RequestMapping("/job")
    @RestController
    public class ScheduleJobController {
    
        @Reference
        private ScheduleJobService scheduleJobService;
    
        @GetMapping("/status.do")
        public Object status() {
            return scheduleJobService.status();
        }
    
        @GetMapping("/start.do")
        public Object start(@RequestParam Integer jobId) {
            scheduleJobService.start(jobId);
            return Response.OK();
        }
    
        @GetMapping("/stop.do")
        public Object stop(@RequestParam Integer jobId) {
            scheduleJobService.stop(jobId);
            return Response.OK();
        }
    
        @GetMapping("/update.do")
        public Object update(@RequestParam Integer jobId, @RequestParam String cron) {
            scheduleJobService.update(jobId, cron);
            return Response.OK();
        }
    }
    

    service

    @Service
    public class ScheduleJobServiceImpl implements ScheduleJobService {
    
        private final EcoScheduleJobMapper ecoScheduleJobMapper;
    
        private final Jobs jobs;
    
        public ScheduleJobServiceImpl(EcoScheduleJobMapper ecoScheduleJobMapper , Jobs jobs) {
            this.ecoScheduleJobMapper = ecoScheduleJobMapper;
            this.jobs = jobs;
        }
    
        @Override
        public void update(Integer jobId, String cron) {
            EcoScheduleJob scheduleJob = new EcoScheduleJob();
            scheduleJob.setJobId(jobId);
            scheduleJob.setCron(cron);
            ecoScheduleJobMapper.updateByPrimaryKeySelective(scheduleJob);
    
            jobs.reset(jobId);
        }
    
        @Override
        public void stop(Integer jobId) {
            jobs.stop(jobId);
        }
    
        @Override
        public void start(Integer jobId) {
            jobs.start(jobId);
        }
    
        @Override
        public List<EcoScheduleJob> status() {
            Set<Integer> integers = jobs.getJobsMap().keySet();
            String join = StringUtils.join(integers, ",");
            return ecoScheduleJobMapper.selectByIds(join);
        }
    }
    
  • 相关阅读:
    some requirement checks failed
    FTP下载文件时拒绝登陆申请怎么办?
    Linux查看与设定别名
    如何编写shell脚本
    Linux shell是什么
    Linux命令大全之查看登陆用户信息
    Linux命令大全之挂载命令
    论第二次作业之输入输出格式怎么合格(才疏学浅说的不对轻点喷我)
    文件词数统计
    软件工程作业--第一周
  • 原文地址:https://www.cnblogs.com/wilwei/p/10242618.html
Copyright © 2011-2022 走看看