zoukankan      html  css  js  c++  java
  • java实现任务调度

    最近的一个小项目是做一个简单的数据仓库,需要将其他数据库的数据抽取出来,并通过而出抽取成页面需要的数据,以空间换时间的方式,让后端报表查询更快。
    因为在抽取的过程中,有一定的先后顺序,需要做一个任务调度器,某一优先级的会先执行,然后会进入下一个优先级的队列任务中。
    先定义了一个Map的集合,key是优先级,value是任务的集合,某一个优先级内的任务是并发执行的,而不同优先级是串行执行的,前一个优先级执行完之后,后面的才会执行。

    ConcurrentHashMap<Integer/* 优先级. */, List<BaseTask>/* 任务集合. */> tasks = new ConcurrentHashMap<>();
    

    这个调度管理有一个演进的过程,我先说第一个,这个是比较好理解的。
    第一个版本:
    首先对tasks集合中的key进行一个排序,我定义的是数字越小就有限执行,则进行遍历key值,并取出某个优先级的任务队列,执行任务队列的任务。任务的执行交给线程池去执行,在遍历内部,需要不断的检查这个队列中的任务是否都执行了,没有则一直等待否则进入到下个队列,任务执行的时候可能会抛出异常,但是不管任务是否异常,都将任务状态设置已执行。
    下面是其核心代码:

    public void run() {
        //对key值进行排序
        Enumeration<Integer> keys = tasks.keys();
        List<Integer> prioritys = new ArrayList<>();
        while (keys.hasMoreElements()) {
          prioritys.add(keys.nextElement());
        }
        Collections.sort(prioritys);//升序
        //对key进行遍历,执行某个某个优先级的任务队列
        for (Integer priority : prioritys) {
          List<BaseTask> taskList = tasks.get(priority);
          if (taskList.isEmpty()) {
            continue;
          }
          logger.info("execute priority {} task ", taskList.get(0).priority);
          for (BaseTask task : taskList) {
            executor.execute(() -> {
              try {
                task.doTask();
              } catch (Exception e) {
                e.printStackTrace();
              }
            });//线程中执行任务
          }
          while (true) {//等待所有线程都执行完成之后执行下一个任务队列
            boolean finish = true;
            for (BaseTask t : taskList) {
              if (!t.finish) {
                finish = false;
              }
            }
            if (finish) {//当前任务都执行完毕
              break;
            }
            Misc.sleep(1000);//Thread.sleep(1000)
          }
          Misc.sleep(1000);
        }
      }
    

    关键代码很好理解,在任务执行之前,需要对所有任务都初始化,初始化的时候给出每个任务的优先级和任务名称,任务抽象类如下:

    public abstract class BaseTask {
      public String taskName;//任务名称
      public Integer priority; //优先级
      public boolean finish; //任务完成?
      /**
       * 执行的任务
       */
      public abstract void doTask(Date date) throws Exception;
    

    第一个版本的思路很简单。
    第二个版本稍微有一点点复杂。这里主要介绍该版本的内容,后续将代码的链接附上。
    程序是由SpringBoot搭建起来的,定时器是Spring内置的轻量级的Quartz,使用Aop方式拦截异常,使用注解的方式在任务初始化时设置任务的初始变量。使用EventBus解耦程序,其中程序简单实现邮件发送功能(该功能还需要自己配置参数),以上这些至少需要简单的了解一下。
    程序的思路:在整个队列执行过程中会有多个管道,某个队列上的管道任务执行完成,可以直接进行到下一个队列中执行,也设置了等待某一个队列上的所有任务都执行完成才执行当前任务。在某个队列任务中会标识某些任务是一队的,其他的为另一队,当这一队任务执行完成,就可以到下一个队列中去,不需要等待另一队。
    这里会先初始化每个队列的每个队的条件,这个条件就是每个队的任务数,执行完成减1,当为0时,就进入下一个队列中。
    分四个步骤进行完成:
    1.bean的初始化
    2.条件的设置
    3.任务的执行
    4.任务异常和任务执行完成之后通知检查是否执行下一个队列的任务

    1.bean的初始化

    1.创建注解类

    @Retention(RetentionPolicy.RUNTIME)
    @Target(ElementType.TYPE)
    @Documented
    public @interface TaskAnnotation {
    
      int priority() default 0;//优先级
      String taskName() default "";//任务名称
      TaskQueueEnum[] queueName() default {};//队列名称
    }
    

    2.实现BeanPostProcessor,该接口是中有两个方法postProcessBeforeInitialization和postProcessAfterInitialization,分别是bean初始化之前和bean初始化之后做的事情。

    	Annotation[] annotations = bean.getClass().getAnnotations();//获取类上的注解
    	if (ArrayUtils.isEmpty(annotations)) {//注解为空时直接返回(不能返回空,否则bean不会被加载)
            return bean;
     	}
     	for (Annotation annotation : annotations) {
            if (annotation.annotationType().equals(TaskAnnotation.class)) {
              TaskAnnotation taskAnnotation = (TaskAnnotation) annotation;//强转
               try {
                Field[] fields = target.getClass().getFields();//需要通过反射将值进行修改,下面的操作仅仅是对象的引用
                if (!ArrayUtils.isEmpty(fields)) {
                  for (Field f : fields) {
                    f.setAccessible(true);
                    if (f.getName().equals("priority")) {
                      f.set(target, taskAnnotation.priority());
                    }
                }
              }
         }
      }
    

    上面需要注意的一点是需要通过反射的机制给bean设置值,不能直接调用bean的方式set值,否则bean的值是空的。
    上面的代码通过实现BeanPostProcessor后置处理器,处理任务上的注解,完成对任务的初始化的。

    2.条件的初始化

    创建条件类,提供初始化的方法。

    public abstract class BaseTask {
    
      public int nextPriority;//子级节点的优先级
    
      public String taskName;//任务名称
    
      public Integer priority; //优先级
    
      public String queueName;//队列名称
    
      public boolean finish; //任务完成?
    
      public boolean allExecute;
    
      /**
       * 执行的任务
       */
      public abstract void doTask(Date date) throws Exception;
    
    	//任务完成之后,通过eventBus发送通知,是否需要执行下一个队列
      public void notifyExecuteTaskMsg(EventBus eventBus, Date date) {
        EventNotifyExecuteTaskMsg msg = new EventNotifyExecuteTaskMsg();
        msg.setDate(date);
        msg.setNextPriority(nextPriority);
        msg.setQueueName(queueName);
        msg.setPriority(priority);
        msg.setTaskName(taskName);
        eventBus.post(msg);
      }
    }
    
    public class TaskExecuteCondition {
    
      private ConcurrentHashMap<String, AtomicInteger> executeMap = new ConcurrentHashMap<>();
    
      /**
       * 初始化,每个队列进行分组,每个组的任务数量放入map集合中.
       */
      public void init(ConcurrentHashMap<Integer, List<BaseTask>> tasks) {
        Enumeration<Integer> keys = tasks.keys();
        List<Integer> prioritys = new ArrayList<>();
        while (keys.hasMoreElements()) {
          prioritys.add(keys.nextElement());
        }
        Collections.sort(prioritys);//升序
        for (Integer priority : prioritys) {
          List<BaseTask> list = tasks.get(priority);
          if (list.isEmpty()) {
            continue;
          }
          //对每个队列进行分组
          Map<String, List<BaseTask>> collect = list.stream()
              .collect(Collectors.groupingBy(x -> x.queueName, Collectors.toList()));
          for (Entry<String, List<BaseTask>> entry : collect.entrySet()) {
            for (BaseTask task : entry.getValue()) {
              addCondition(task.priority, task.queueName);
            }
          }
        }
      }
    
      /**
       * 执行任务完成,条件减1
       */
      public boolean executeTask(Integer priority, String queueName) {
        String name = this.getQueue(priority, queueName);
        AtomicInteger count = executeMap.get(name);
        int sum = count.decrementAndGet();
        if (sum == 0) {
          return true;
        }
        return false;
      }
    
      /**
       * 对个某个队列的条件
       */
      public int getCondition(Integer priority, String queueName) {
        String name = this.getQueue(priority, queueName);
        return executeMap.get(name).get();
      }
    
      private void addCondition(Integer priority, String queueName) {
        String name = this.getQueue(priority, queueName);
        AtomicInteger count = executeMap.get(name);
        if (count == null) {
          count = new AtomicInteger(0);
          executeMap.put(name, count);
        }
        count.incrementAndGet();
      }
    
      private void addCondition(Integer priority, String queueName, int sum) {
        String name = this.getQueue(priority, queueName);
        AtomicInteger count = executeMap.get(name);
        if (count == null) {
          count = new AtomicInteger(sum);
          executeMap.put(name, count);
        } else {
          count.set(sum);
        }
      }
    
    
      private String getQueue(Integer priority, String queueName) {
        return priority + queueName;
      }
    
      /**
       * 清除队列
       */
      public void clear() {
        this.executeMap.clear();
      }
    }
    

    3.任务的执行

    任务执行类提供run方法,执行第一个队列,并提供获取下一个队列优先级方法,执行某个队列某个组的方法。

    public class ScheduleTask {
      private static final Logger logger = LoggerFactory.getLogger(ScheduleTask.class);
      
      public ConcurrentHashMap<Integer/* 优先级. */, List<BaseTask>/* 任务集合. */> tasks = new ConcurrentHashMap<>();
    
      @Autowired
      private ThreadPoolTaskExecutor executor;//线程池
    	//任务会先执行第一队列的任务.
      public void run(Date date) {
        Enumeration<Integer> keys = tasks.keys();
        List<Integer> prioritys = new ArrayList<>();
        while (keys.hasMoreElements()) {
          prioritys.add(keys.nextElement());
        }
        Collections.sort(prioritys);//升序
        Integer priority = prioritys.get(0);
        executeTask(priority, date);//执行第一行的任务.
      }
    	//获取下一个队列的优先级
      public Integer nextPriority(Integer priority) {
        Enumeration<Integer> keys = tasks.keys();
        List<Integer> prioritys = new ArrayList<>();
        while (keys.hasMoreElements()) {
          prioritys.add(keys.nextElement());
        }
        Collections.sort(prioritys);//升序
        for (Integer pri : prioritys) {
          if (priority < pri) {
            return pri;
          }
        }
        return null;//没有下一个队列
      }
    
      public void executeTask(Integer priority) {
        List<BaseTask> list = tasks.get(priority);
        if (list.isEmpty()) {
          return;
        }
        for (BaseTask task : list) {
          execute(task);
        }
      }
     //执行某个队列的某个组
      public void executeTask(Integer priority, String queueName) {
        List<BaseTask> list = this.tasks.get(priority);
        list = list.stream().filter(task -> queueName.equals(task.queueName))
            .collect(Collectors.toList());
        if (list.isEmpty()) {
          return;
        }
        for (BaseTask task : list) {
          execute(task);
        }
      }
    
      public void execute(BaseTask task) {
        executor.execute(() -> {
          try {
            task.doTask(date);//
          } catch (Exception e) {//异常处理已经Aop拦截处理
          }
        });//线程中执行任务
      }
    
      /**
       * 增加任务
       */
      public void addTask(BaseTask task) {
        List<BaseTask> baseTasks = tasks.get(task.priority);
        if (baseTasks == null) {
          baseTasks = new ArrayList<>();
          List<BaseTask> putIfAbsent = tasks.putIfAbsent(task.priority, baseTasks);
          if (putIfAbsent != null) {
            baseTasks = putIfAbsent;
          }
        }
        baseTasks.add(task);
      }
    
      /**
       * 将任务结束标识重新设置
       */
      public void finishTask() {
        tasks.forEach((key, value) -> {
          for (BaseTask task : value) {
            task.finish = false;
          }
        });
      }
    }
    

    4.任务异常和任务执行完成之后通知检查是否执行下一个队列的任务

    public class EventNotifyExecuteTaskListener {
      private static final Logger logger = LoggerFactory .getLogger(EventNotifyExecuteTaskListener.class);
      @Autowired
      private ScheduleTask scheduleTask;
    
      @Autowired
      private TaskExecuteCondition condition;
    
      @Subscribe
      public void executeTask(EventNotifyExecuteTaskMsg msg) {
      //当前队列的某组内容是否都执行完成
        boolean success = condition.executeTask(msg.getPriority(), msg.getQueueName());
        if (success) {
          Integer nextPriority = scheduleTask.nextPriority(msg.getPriority());
          if (nextPriority != null) {
            scheduleTask.executeTask(nextPriority, msg.getQueueName(), msg.getDate());//执行下一个队列
          } else {//执行完成,重置任务标识
            scheduleTask.finishTask();
            logger.info("CoreTask end!");
          }
        }
      }
    }
    

    整个思路介绍到这里,那么接下来是整个项目中出现的一些问题
    1.BeanPostProcessor与Aop一起使用时,postProcessAfterInitialization调用之后获取的bean分为不同的了,一个是jdk原生实体对象,一种是Aop注解下的类会被cglib代理,生成带有后缀的对象,如果通过这个对象时反射获取类的注解,字段和方法,就获取不到,在代码中,需要将其转化一下,将cgLib代理之后的类转化为不带后缀的对象。
    2.postProcessAfterInitialization的参数bean不能直接设置值,就是如下:

     TaskAnnotation taskAnnotation = (TaskAnnotation) annotation;//强转
     BaseTask baseTask = (BaseTask) bean;//强转
     baseTask.priority = taskAnnotation.priority();
    

    在使用对象时,其中对象的字段时为空的,并需要通过反射的方式去设置字段的值。
    上面仅仅只是个人的想法,如果有更好的方式,或者有某些地方可以进行改进的,我们可以共同探讨一下。

    链接地址:https://github.com/wangice/task-scheduler
    程序中使用了一个公共包:https://github.com/wangice/misc

  • 相关阅读:
    1.BMap(百度地图)第二次加载显示不全
    SpringMVC的拦截器
    装饰者模式
    java产生随机数
    VS 常用快捷键
    给包含compid列且值为null ,表的行数据赋值--
    遍历数据库,删除包含指定列的表的行数据-
    DataTable select根据条件取值
    临时表汇总金额
    Redirect url 路径简单介绍
  • 原文地址:https://www.cnblogs.com/skyice/p/9691736.html
Copyright © 2011-2022 走看看