zoukankan      html  css  js  c++  java
  • 【Distributed】分布式任务调度平台

    一、概述

    什么是定时任务

    • 定时任务调度(开发) 相当于在某个时间段,进行一些任务操作。定时(在某个时间段进行操作假设2点)任务(需求) 调度(触发)
    • 案例:定时任务调度案例 ,我每天早上需要知道昨天一天的用户活跃量,写一个定时Job,每天早上9点钟,查询昨天UI天用户活月量,在以邮件方式发送到我的邮箱。
    • 定时任务应用场景:数据同步、交易信息、清楚用户信息、定期发送报表数据、活动推送
    • 引出问题:分布式任务调度平台分布式领域中服务器集群的话,如何保证定时Job幂等性

    二、Java实现定时任务方式

    2.1 Thread

    public class Demo01 {
        static long count = 0;
        public static void main(String[] args) {
            Runnable runnable = new Runnable() {
                @Override
                public void run() {
                    while (true) {
                        try {
                            Thread.sleep(1000);
                            count++;
                            System.out.println(count);
                        } catch (Exception e) {
                            // TODO: handle exception
                        }
                    }
                }
            };
            Thread thread = new Thread(runnable);
            thread.start();
        }
    }
    

    2.2 TimerTask

    /**
     * 使用TimerTask类实现定时任务
    */
    public class Demo02 {
        static long count = 0;
        public static void main(String[] args) {
            TimerTask timerTask = new TimerTask() {
                @Override
                public void run() {
                count++;
                System.out.println(count);
            }
        };
        Timer timer = new Timer();
        // 天数
        long delay = 0;
        // 秒数
        long period = 1000;
        timer.scheduleAtFixedRate(timerTask, delay, period);
    }
    
    }
    
    

    2.3 ScheduledExecutorService

    使用ScheduledExecutorService是从Java
    JavaSE5的java.util.concurrent里,做为并发工具类被引进的,这是最理想的定时任务实现方式。

    public class Demo003 {
        public static void main(String[] args) {
            Runnable runnable = new Runnable() {
                public void run() {
                    // task to run goes here
                    System.out.println("Hello !!");
                }
            };
            ScheduledExecutorService service = Executors.newSingleThreadScheduledExecutor();
            // 第二个参数为首次执行的延时时间,第三个参数为定时执行的间隔时间
            service.scheduleAtFixedRate(runnable, 1, 1, TimeUnit.SECONDS);
        }
    }
    

    2.4 Quartz

    引入maven依赖

    <dependencies>
        <!-- quartz -->
        <dependency>
            <groupId>org.quartz-scheduler</groupId>
            <artifactId>quartz</artifactId>
            <version>2.2.1</version>
        </dependency>
        <dependency>
            <groupId>org.quartz-scheduler</groupId>
            <artifactId>quartz-jobs</artifactId>
            <version>2.2.1</version>
        </dependency>
    </dependencies>
    

    任务调度类

    public class MyJob implements Job {
        public void execute(JobExecutionContext context) throws JobExecutionException {
            System.out.println("quartz MyJob date:" + new Date().getTime());
        }
    }
    

    启动类

     //1.创建Scheduler的工厂
      SchedulerFactory sf = new StdSchedulerFactory();
      //2.从工厂中获取调度器实例
      Scheduler scheduler = sf.getScheduler();
    
      //3.创建JobDetail
      JobDetail jb = JobBuilder.newJob(MyJob.class)
              .withDescription("this is a ram job") //job的描述
              .withIdentity("ramJob", "ramGroup") //job 的name和group
              .build();
    
      //任务运行的时间,SimpleSchedle类型触发器有效
      long time=  System.currentTimeMillis() + 3*1000L; //3秒后启动任务
      Date statTime = new Date(time);
    
      //4.创建Trigger
          //使用SimpleScheduleBuilder或者CronScheduleBuilder
      Trigger t = TriggerBuilder.newTrigger()
                  .withDescription("")
                  .withIdentity("ramTrigger", "ramTriggerGroup")
                  //.withSchedule(SimpleScheduleBuilder.simpleSchedule())
                  .startAt(statTime)  //默认当前时间启动
                  .withSchedule(CronScheduleBuilder.cronSchedule("0/2 * * * * ?")) //两秒执行一次
                  .build();
    
      //5.注册任务和定时器
      scheduler.scheduleJob(jb, t);
    
      //6.启动 调度器
      scheduler.start();
    

    Quartz表达式

    http://cron.qqe2.com/
    

    三、分布式情况下定时任务会出现哪些问题?

    分布式集群的情况下,怎么保证定时任务不被重复执行

    1. 定时任务和业务代码存放在同一个jvm (小项目)
    2. 大型互联网公司定时任务代码执行与业务执行代码服务器都是分开,都是独立的jvm。
    3. 定时任务服务器是否需要考虑高并发?同一时间点执行多个任务间隔场景是不需要,同时场景可能会发生高并发
    4. 如果在高并发的情况下,定时Job宕机之后应该如何处理? (单台节点) 使用心跳检测监控自动重启、补偿机制(每个任务打一个小标记) 定时任务在执行代码的时候中间突然报错了,使用日志记录错误,跳过继续执行,在使用定时JOb扫描日志错误记录,进行补偿信息。定时]ob在执行的时候,导致整个Job异常结束掉,发送邮件通知给运维人员

    传统定时任务与分布式定时任务区别
    传统定时任务特征:单一(没有集群)

    四、分布式定时任务解决方案

    1. 使用zookeeper实现分布式锁 缺点(需要创建临时节点、和事件通知不易于扩展)
    2. 使用配置文件做一个开关 缺点发布后,需要重启,设置一个开启任务的开关,将其中一台服务器中设置为true,代表开启任务,其他的服务器设置为false,此种方式如果设置为true的服务器宕机了,任务就失效了。(不推荐)
    3. 数据库唯一约束,缺点效率低 (不推荐)
    4. 使用分布式任务调度平台 XXLJOB、Elastric-Job、TBSchedule

    五、XXLJOB介绍

    5.1 分布式任务调度平台能够帮我们实现那些事情

    1. 支持Job集群(前提保证幂等性问题) Job负载均衡轮训机制
    2. 支持Job补偿如果Job执行失败的话,会自动实现重试机制,如果重启多次还是失败的话,发送邮件通知给运维人员。
    3. 支持Job日志记录
    4. 动态配置定时规则传统定时JOb触发规则都是写死,采用动态配置Job规则

    5.2 XXLJOB GitHub

    XXLJOB项目及文档

    XX-JOB
    XX-JOB

    5.3 原理

    • XXL-Job原理:执行器、任务管理
      • 执行器表达意思:定时Job实际执行的服务地址
      • 任务管表达意思:配置定时任务规则、路由策略、允许模式、等。

    步骤

    • ① 部署: xxl-job-admin 作为注册中心
    • ② 创建执行器(具体调度地址) 可以支持集群
    • ③ 配置文件需要填写xxl-job注册中心地址
    • ④ 每个具体执行job服务器需要创建一个netty连接端口号
    • ⑤ 需要执行job的任务类,集成IJobHandler抽象类注册到job容器中
    • ⑥ Execute方法中编写具体job任务

    5.4 SpringBoot整合XXLJob

    配置文件信息

    application.properties

    # web port
    server.port=8081
        
    # log config
    logging.config=classpath:logback.xml
        
    ### xxl-job admin address list, such as "http://address" or "http://address01,http://address02"
    xxl.job.admin.addresses=http://127.0.0.1:8080/xxl-job-admin
        
    ### xxl-job executor address
    xxl.job.executor.appname=text-job
    xxl.job.executor.ip=
    xxl.job.executor.port=9999
        
    ### xxl-job, access token
    xxl.job.accessToken=
        
    ### xxl-job log path
    xxl.job.executor.logpath=/data/applogs/xxl-job/jobhandler
    ### xxl-job log retention days
    xxl.job.executor.logretentiondays=-1
    

    配置XxlJobConfig

    @Configuration
    @ComponentScan(basePackages = "com.xxl.job.executor.service.jobhandler")
    public class XxlJobConfig {
        private Logger logger = LoggerFactory.getLogger(XxlJobConfig.class);
    
    @Value("${xxl.job.admin.addresses}")
    private String adminAddresses;
    
    @Value("${xxl.job.executor.appname}")
    private String appName;
    
    @Value("${xxl.job.executor.ip}")
    private String ip;
    
    @Value("${xxl.job.executor.port}")
    private int port;
    
    @Value("${xxl.job.accessToken}")
    private String accessToken;
    
    @Value("${xxl.job.executor.logpath}")
    private String logPath;
    
    @Value("${xxl.job.executor.logretentiondays}")
    private int logRetentionDays;
    
    @Bean(initMethod = "start", destroyMethod = "destroy")
    public XxlJobExecutor xxlJobExecutor() {
        logger.info(">>>>>>>>>>> xxl-job config init.");
        XxlJobExecutor xxlJobExecutor = new XxlJobExecutor();
        xxlJobExecutor.setAdminAddresses(adminAddresses);
        xxlJobExecutor.setAppName(appName);
        xxlJobExecutor.setIp(ip);
        xxlJobExecutor.setPort(port);
        xxlJobExecutor.setAccessToken(accessToken);
        xxlJobExecutor.setLogPath(logPath);
        xxlJobExecutor.setLogRetentionDays(logRetentionDays);
        return xxlJobExecutor;
    }
    }
    

    创建handler接口

    @JobHandler("demoJobHandler")
    @Component
    public class DemoHandler extends IJobHandler {
        @Value("${server.port}")
        private String serverPort;
        
        @Override
        public ReturnT<String> execute(String param) throws Exception {
            System.out.println("######端口号:serverPort" + serverPort + "###定时Job开始执行啦!!!!######");
            return SUCCESS;
        }
    }
    

    5.5 调度中心集群

      如果xx-job admin 平台挂掉的话,会导致任务无法执行,所以调度中心也需要部署集群,调度中心支持集群部署,提升调度系统容灾和可用性。调度中心集群部署时,几点要求和建议:

    1. DB配置保持一致;
    2. 登陆账号配置保持一致;
    3. 群机器时钟保持一致(单机集群忽视);
    4. 建议:推荐通过nginx为调度中心集群做负载均衡,分配域名。调度中心访问、执行器回调配置、调用API服务等操作均通过该域名进行。

    xxl-job集群原理
    xxl-job集群原理

    Nginx配置信息

    upstream  backServer{
    server 127.0.0.1:8080 weight=1;
    server 127.0.0.1:8081 weight=1;
    }
    server {
        listen       80;
        server_name  127.0.0.1 ;
        #charset koi8-r;
        #access_log  logs/host.access.log  main;
        location / {
            proxy_pass   http://backServer;
            index  index.html index.htm;
        }
        error_page   500 502 503 504  /50x.html;
        location = /50x.html {
            root   html;
        }
    }
    
  • 相关阅读:
    poj 3159 Candies
    强连通分量——Tarjan算法
    nyoj 次方求模
    nyoj 快速查找素数
    nyoj 光棍节的快乐
    拓扑排序
    快速幂取模
    nyoj 最大素因子
    素数打表
    nyoj 数的长度
  • 原文地址:https://www.cnblogs.com/haoworld/p/distributed-fen-bu-shi-ren-wu-diao-du-ping-tai.html
Copyright © 2011-2022 走看看