zoukankan      html  css  js  c++  java
  • 大鹏任务调度

    大鹏任务调度

    1.原理

        利用反射查找被标注的任务类,调起quartz提交任务处理并处理.
    

    2.使用

        注意:实际使用时需要定义thrift文件生成接口或特质
        若想只在一个实例中执行可以用
        MasterHelper.isMasterisMaster(String servieName, String versionName)
        判断当前实例是master.
    
        
        /**
         * 特质声明
         **/
        trait SyncGoodsTaskService {
            def handle():Unit;
        }
        
        /**
         * 在类上使用注解@ScheduledTask 标注此服务类定时任务.   
         **/
        @ScheduledTask
        class SimpleSyncGoodsTaskService extends SyncGoodsTaskService{
    
            /**
             * 定义定时任务执行时间.
             */
            @ScheduledTaskCron(cron = "0 0 0 * * ?")
            override def handle(): Unit = {
                
            }
        }
    
    
    
        interface SomeTaskService{
            void dealBill();
        }
        
        /**
         * 标注此服务为任务处理服务.
         **/
        @ScheduledTask
        public class SimpleSomeTaskService extends SomeTaskService{
            
            /**
             * 定义 CRON 表达式.
             **/
            @ScheduledTaskCron(cron="0 30 0 0 0 0 ?")
            public void dealBill(){
                //...下载帐单
            }
        }
    

    3.分析

        public class TaskSchedulePlugin implements AppListener, Plugin {
        
            
            private static final Logger LOGGER = LoggerFactory.getLogger("container.scheduled.task");
        
            private final Container container;
        
            private Scheduler scheduler = null;
        
            public TaskSchedulePlugin(Container container) {
                this.container = container;
                container.registerAppListener(this);
            }
        
            @Override
            public void appRegistered(AppEvent event) {
                LOGGER.warn(getClass().getSimpleName() + "::appRegistered, event[" + event.getSource() + "], do nothing here");
            }
        
            /**
             * 应用停止时,停止任务调度
             **/
            @Override
            public void appUnRegistered(AppEvent event) {
                LOGGER.warn(getClass().getSimpleName() + "::appUnRegistered, event[" + event.getSource() + "]");
                stop();
            }
        
            /**
             * 注册所有任务调度
             **/
            @Override
            public void start() {
                LOGGER.warn("Plugin::" + getClass().getSimpleName() + "::start");
                container.getApplications().forEach(application -> {
                    List<ServiceInfo> serviceInfos = application.getServiceInfos().stream()
                            .filter(serviceInfo ->
                                    serviceInfo.ifaceClass.isAnnotationPresent(ScheduledTask.class))
                            .collect(Collectors.toList());
                    serviceInfos.forEach(serviceInfo -> runTask(serviceInfo));
                });
            }
        
            @Override
            public void stop() {
                LOGGER.warn("Plugin::TaskSchedulePlugin stop");
                try {
                    if (scheduler != null) {
                        if (scheduler.isInStandbyMode() || !scheduler.isStarted()) {
                            LOGGER.info(" start to shutdown scheduler: " + scheduler.getSchedulerName());
                            scheduler.shutdown();
                        }
                    }
                } catch (SchedulerException e) {
                    LOGGER.error(" Failed to shutdown scheduler: " + e.getMessage(), e);
                }
            }
        
            public void runTask(ServiceInfo serviceInfo) {
                Class<?> ifaceClass = serviceInfo.ifaceClass;
        
                Map<ProcessorKey, SoaServiceDefinition<?>> processorMap = ContainerFactory.getContainer().getServiceProcessors();
        
                List<Method> taskMethods = Arrays.stream(ifaceClass.getMethods()).filter(method -> method.isAnnotationPresent(ScheduledTaskCron.class))
                        .collect(Collectors.toList());
        
                SoaServiceDefinition soaServiceDefinition = processorMap.get(new ProcessorKey(serviceInfo.serviceName,
                        serviceInfo.version));
        
                if (soaServiceDefinition == null) {
                    LOGGER.error(" SoaServiceDefinition Not found....serviceName: {}, version: {} ", serviceInfo.serviceName, serviceInfo.version);
                    return;
                }
        
                taskMethods.forEach(method -> {
                    String methodName = method.getName();
        
                    ScheduledTaskCron cron = method.getAnnotation(ScheduledTaskCron.class);
                    String cronStr = cron.cron();
        
                    //new quartz job
                    JobDataMap jobDataMap = new JobDataMap();
                    jobDataMap.put("function", soaServiceDefinition.functions.get(methodName));
                    jobDataMap.put("iface", soaServiceDefinition.iface);
                    jobDataMap.put("serviceName", serviceInfo.serviceName);
                    jobDataMap.put("versionName", serviceInfo.version);
                    JobDetail job = JobBuilder.newJob(ScheduledJob.class)
                            .withIdentity(ifaceClass.getName() + ":" + methodName)
                            .setJobData(jobDataMap)
                            .build();
        
                    CronTriggerImpl trigger = new CronTriggerImpl();
                    trigger.setName(job.getKey().getName());
                    trigger.setJobKey(job.getKey());
                    try {
                        trigger.setCronExpression(cronStr);
                    } catch (ParseException e) {
                        LOGGER.error("定时任务({}:{})Cron解析出错", ifaceClass.getName(), methodName);
                        LOGGER.error(e.getMessage(), e);
                        return;
                    }
        
                    if (scheduler == null) {
                        try {
                            scheduler = StdSchedulerFactory.getDefaultScheduler();
                            scheduler.start();
                        } catch (SchedulerException e) {
                            LOGGER.error("ScheduledTaskContainer启动失败");
                            LOGGER.error(e.getMessage(), e);
                            return;
                        }
                    }
                    try {
                        scheduler.scheduleJob(job, trigger);
                    } catch (SchedulerException e) {
                        LOGGER.error(" Failed to scheduleJob....job: " + job.getKey().getName() + ", reason:" + e.getMessage(),
                                e);
                        return;
                    }
                    LOGGER.info("添加定时任务({}:{})成功", ifaceClass.getName(), methodName);
                });
            }
        }
    
  • 相关阅读:
    mysql配置完半同步复制之后报错[ERROR] The server quit without updating PID file
    mysql配置为半同步复制
    mysql主从复制(半同步方式)
    ssh连接时提示THE AUTHENTICITY OF HOST XX CAN’T BE ESTABLISHED
    Linux 记录所有用户登录和操作的详细日志
    如何禁止某个linux用户访问某些文件夹及执行某些命令
    怎样把linux客户端用户禁止用 su命令来切换用户
    Django---进阶10
    Django---入门
    Django---进阶1
  • 原文地址:https://www.cnblogs.com/hhbk/p/9546647.html
Copyright © 2011-2022 走看看