zoukankan      html  css  js  c++  java
  • 关于动态定时任务的解决方案。

    之前做定时任务都是用 Scheduled注解来实现,如果需要动态的配置,则不能满足这种需求。查询资料知道这种可以用时间轮算法来实现。大概就是模拟时间表盘来做任务。

     具体java实现:

      1 package com.education.task.provider;
      2 
      3 import org.slf4j.Logger;
      4 import org.slf4j.LoggerFactory;
      5 
      6 import java.util.HashSet;
      7 import java.util.Map;
      8 import java.util.Set;
      9 import java.util.concurrent.ConcurrentHashMap;
     10 import java.util.concurrent.ExecutorService;
     11 import java.util.concurrent.TimeUnit;
     12 import java.util.concurrent.atomic.AtomicBoolean;
     13 import java.util.concurrent.atomic.AtomicInteger;
     14 import java.util.concurrent.locks.Condition;
     15 import java.util.concurrent.locks.Lock;
     16 import java.util.concurrent.locks.ReentrantLock;
     17 
     18 /**
     19  * @author pengbenlei
     20  * @company leenleda
     21  * @date 2020/12/21 9:53
     22  * @description 时间轮算法调度任务
     23  */
     24 public class RingBufferWheel {
     25 
     26     private Logger logger = LoggerFactory.getLogger(RingBufferWheel.class);
     27     /**
     28      * default ring buffer size
     29      */
     30     private static final int STATIC_RING_SIZE = 60;
     31 
     32     private Object[] ringBuffer;
     33 
     34     private int bufferSize;
     35 
     36     /**
     37      * business thread pool
     38      */
     39     private ExecutorService executorService;
     40 
     41     private volatile int size = 0;
     42 
     43     /***
     44      * task stop sign
     45      */
     46     private volatile boolean stop = false;
     47 
     48     /**
     49      * task start sign
     50      */
     51     private volatile AtomicBoolean start = new AtomicBoolean(false);
     52 
     53     /**
     54      * total tick times
     55      */
     56     private AtomicInteger tick = new AtomicInteger();
     57 
     58     private Lock lock = new ReentrantLock();
     59     private Condition condition = lock.newCondition();
     60 
     61     private AtomicInteger taskId = new AtomicInteger();
     62     private Map<Integer, BusinessTask> taskMap = new ConcurrentHashMap<>(16);
     63 
     64     /**
     65      * Create a new delay task ring buffer by default size
     66      *
     67      * @param executorService the business thread pool
     68      */
     69     public RingBufferWheel(ExecutorService executorService) {
     70         this.executorService = executorService;
     71         this.bufferSize = STATIC_RING_SIZE;
     72         this.ringBuffer = new Object[bufferSize];
     73     }
     74 
     75 
     76     /**
     77      * Create a new delay task ring buffer by custom buffer size
     78      *
     79      * @param executorService the business thread pool
     80      * @param bufferSize      custom buffer size
     81      */
     82     public RingBufferWheel(ExecutorService executorService, int bufferSize) {
     83         this(executorService);
     84 
     85         if (!powerOf2(bufferSize)) {
     86             throw new RuntimeException("bufferSize=[" + bufferSize + "] must be a power of 2");
     87         }
     88         this.bufferSize = bufferSize;
     89         this.ringBuffer = new Object[bufferSize];
     90     }
     91 
     92     /**
     93      * Add a task into the ring buffer(thread safe)
     94      *
     95      * @param task business task extends {@link BusinessTask}
     96      */
     97     public int addTask(BusinessTask task) {
     98         int key = task.getKey();
     99         int id;
    100 
    101         try {
    102             lock.lock();
    103             int index = mod(key, bufferSize);
    104             task.setIndex(index);
    105             Set<BusinessTask> tasks = get(index);
    106 
    107             int cycleNum = cycleNum(key, bufferSize);
    108             if (tasks != null) {
    109                 task.setCycleNum(cycleNum);
    110                 tasks.add(task);
    111             } else {
    112                 task.setIndex(index);
    113                 task.setCycleNum(cycleNum);
    114                 Set<BusinessTask> sets = new HashSet<>();
    115                 sets.add(task);
    116                 put(key, sets);
    117             }
    118             id = taskId.incrementAndGet();
    119             task.setTaskId(id);
    120             taskMap.put(id, task);
    121             size++;
    122         } finally {
    123             lock.unlock();
    124         }
    125 
    126         start();
    127 
    128         return id;
    129     }
    130 
    131 
    132     /**
    133      * Cancel task by taskId
    134      *
    135      * @param id unique id through {@link #addTask(BusinessTask)}
    136      * @return
    137      */
    138     public boolean cancel(int id) {
    139 
    140         boolean flag = false;
    141         Set<BusinessTask> tempTask = new HashSet<>();
    142 
    143         try {
    144             lock.lock();
    145             BusinessTask task = taskMap.get(id);
    146             if (task == null) {
    147                 return false;
    148             }
    149 
    150             Set<BusinessTask> tasks = get(task.getIndex());
    151             for (BusinessTask tk : tasks) {
    152                 if (tk.getKey() == task.getKey() && tk.getCycleNum() == task.getCycleNum()) {
    153                     size--;
    154                     flag = true;
    155                     taskMap.remove(id);
    156                 } else {
    157                     tempTask.add(tk);
    158                 }
    159 
    160             }
    161             //update origin data
    162             ringBuffer[task.getIndex()] = tempTask;
    163         } finally {
    164             lock.unlock();
    165         }
    166 
    167         return flag;
    168     }
    169 
    170     /**
    171      * Thread safe
    172      *
    173      * @return the size of ring buffer
    174      */
    175     public int taskSize() {
    176         return size;
    177     }
    178 
    179     /**
    180      * Same with method {@link #taskSize}
    181      *
    182      * @return
    183      */
    184     public int taskMapSize() {
    185         return taskMap.size();
    186     }
    187 
    188     /**
    189      * Start background thread to consumer wheel timer, it will always run until you call method {@link #stop}
    190      */
    191     public void start() {
    192         if (!start.get()) {
    193 
    194             if (start.compareAndSet(start.get(), true)) {
    195                 logger.info("Delay task is starting");
    196                 Thread job = new Thread(new TriggerJob());
    197                 job.setName("consumer RingBuffer thread");
    198                 job.start();
    199                 start.set(true);
    200             }
    201 
    202         }
    203     }
    204 
    205     /**
    206      * Stop consumer ring buffer thread
    207      *
    208      * @param force True will force close consumer thread and discard all pending tasks
    209      *              otherwise the consumer thread waits for all tasks to completes before closing.
    210      */
    211     public void stop(boolean force) {
    212         if (force) {
    213             logger.info("Delay task is forced stop");
    214             stop = true;
    215             executorService.shutdownNow();
    216         } else {
    217             logger.info("Delay task is stopping");
    218             if (taskSize() > 0) {
    219                 try {
    220                     lock.lock();
    221                     condition.await();
    222                     stop = true;
    223                 } catch (InterruptedException e) {
    224                     logger.error("InterruptedException", e);
    225                 } finally {
    226                     lock.unlock();
    227                 }
    228             }
    229             executorService.shutdown();
    230         }
    231 
    232 
    233     }
    234 
    235 
    236     private Set<BusinessTask> get(int index) {
    237         return (Set<BusinessTask>) ringBuffer[index];
    238     }
    239 
    240     private void put(int key, Set<BusinessTask> tasks) {
    241         int index = mod(key, bufferSize);
    242         ringBuffer[index] = tasks;
    243     }
    244 
    245     /**
    246      * Remove and get task list.
    247      *
    248      * @param key
    249      * @return task list
    250      */
    251     private Set<BusinessTask> remove(int key) {
    252         Set<BusinessTask> tempTask = new HashSet<>();
    253         Set<BusinessTask> result = new HashSet<>();
    254 
    255         Set<BusinessTask> tasks = (Set<BusinessTask>) ringBuffer[key];
    256         if (tasks == null) {
    257             return result;
    258         }
    259 
    260         for (BusinessTask task : tasks) {
    261             if (task.getCycleNum() == 0) {
    262                 result.add(task);
    263 
    264                 size2Notify();
    265             } else {
    266                 // decrement 1 cycle number and update origin data
    267                 task.setCycleNum(task.getCycleNum() - 1);
    268                 tempTask.add(task);
    269             }
    270             // remove task, and free the memory.
    271             taskMap.remove(task.getTaskId());
    272         }
    273 
    274         //update origin data
    275         ringBuffer[key] = tempTask;
    276 
    277         return result;
    278     }
    279 
    280     private void size2Notify() {
    281         try {
    282             lock.lock();
    283             size--;
    284             if (size == 0) {
    285                 condition.signal();
    286             }
    287         } finally {
    288             lock.unlock();
    289         }
    290     }
    291 
    292     private boolean powerOf2(int target) {
    293         if (target < 0) {
    294             return false;
    295         }
    296         int value = target & (target - 1);
    297         if (value != 0) {
    298             return false;
    299         }
    300 
    301         return true;
    302     }
    303 
    304     private int mod(int target, int mod) {
    305         // equals target % mod
    306         target = target + tick.get();
    307         return target & (mod - 1);
    308     }
    309 
    310     private int cycleNum(int target, int mod) {
    311         //equals target/mod
    312         return target >> Integer.bitCount(mod - 1);
    313     }
    314 
    315 
    316 
    317     private class TriggerJob implements Runnable {
    318 
    319         @Override
    320         public void run() {
    321             int index = 0;
    322             while (!stop) {
    323                 try {
    324                     System.out.println(index);
    325                     Set<BusinessTask> tasks = remove(index);
    326                     for (BusinessTask task : tasks) {
    327                         executorService.submit(task);
    328                     }
    329 
    330                     if (++index > bufferSize - 1) {
    331                         index = 0;
    332                     }
    333 
    334                     //Total tick number of records
    335                     tick.incrementAndGet();
    336                     TimeUnit.SECONDS.sleep(1);
    337 
    338                 } catch (Exception e) {
    339                     logger.error("Exception", e);
    340                 }
    341 
    342             }
    343 
    344             logger.info("Delay task has stopped");
    345         }
    346     }
    347 }
    RingBufferWheel
     1 package com.education.task.provider;
     2 
     3 import lombok.Getter;
     4 import lombok.Setter;
     5 
     6 /**
     7  * @author pengbenlei
     8  * @company leenleda
     9  * @date 2020/12/21 10:49
    10  * @description
    11  */
    12 @Getter
    13 @Setter
    14 public abstract class BusinessTask extends Thread {
    15 
    16     /**
    17      * 所在位置
    18      */
    19     private int index;
    20 
    21     /**
    22      * cycleNum 轮盘上的圈数
    23      */
    24     private int cycleNum;
    25 
    26     /**
    27      * 轮盘的刻度
    28      */
    29     private int key;
    30 
    31     /**
    32      * The unique ID of the task
    33      */
    34     private int taskId;
    35 
    36     @Override
    37     public void run() {
    38     }
    39 
    40 }
    BusinessTask

    测试调用:

    1     public static class Job extends BusinessTask{
    2         @Override
    3         public void run() {
    4             System.out.println("12346");
    5         }
    6     }
    构造任务
    1 SpringApplication.run(TaskProviderApplication.class, args);
    2         RingBufferWheel ringBufferWheel = new RingBufferWheel( Executors.newFixedThreadPool(2));
    3         for (int i = 0; i < 2; i++) {
    4             BusinessTask job = new Job();
    5             job.setKey(10);
    6             job.setCycleNum(i);
    7             ringBufferWheel.addTask(job);
    8         }
    调用代码

    后续,后来听朋友说有个开源项目 xxl-job : https://github.com/xuxueli/xxl-job,真香。功能比较全。

  • 相关阅读:
    「BZOJ 1000」A+B Problem
    「HNOI 2008」越狱
    蓝桥杯 拼音字母
    蓝桥杯 抽签
    蓝桥杯 快速排序
    [蓝桥杯] 最大比例
    [蓝桥杯] 交换瓶子
    [蓝桥杯] 四平方和
    [蓝桥杯] 剪邮票
    [蓝桥杯] 方格填数
  • 原文地址:https://www.cnblogs.com/rolayblog/p/14167061.html
Copyright © 2011-2022 走看看