zoukankan      html  css  js  c++  java
  • TaskUtil多线程与定时任务

    package com.taoban.util;
    /**
     * 执行单次任务或定时任务工具类(用于减少new Thread()和new Timer()的使用)
     */
    public class TaskUtil {
        private static Log log = LogFactory.getLog(TaskUtil.class);
        private static ExecutorService cachedExecutor = null;
        private static ScheduledExecutorService scheduledExecutor = null;
        private static Map<Runnable, Future<?>> keepRunningTasks = null;
        private static Map<Future<?>, Callback> callbackdTasks = null;
        static {
            cachedExecutor = Executors.newCachedThreadPool(new TaskUtilThreadFactory("cached"));
            scheduledExecutor = Executors.newScheduledThreadPool(5, new TaskUtilThreadFactory("scheduled"));
            Runtime.getRuntime().addShutdownHook(new Thread() {//线程池自动退出
                @Override
                public void run() {
                    cachedExecutor.shutdown();
                    scheduledExecutor.shutdown();
                    log.info("TaskUtil executors shutdown.");
                }
            });
        }
        /**
         * 立即执行任务
         */
        public static Future<?> submit(Runnable task) {
            return cachedExecutor.submit(task);
        }
        /**
         * 自动保持任务持续运行,每分钟监视一次
         */
        public static Future<?> submitKeepRunning(Runnable task){
            Future<?> future = submit(task);
            checkInitCachedTasks();
            synchronized (keepRunningTasks) {
                keepRunningTasks.put(task, future);
            }
            return future;
        }
        /**
         * 延迟执行任务,例如延迟5秒:schedule(task,5,TimeUnit.SECONDS)
         */
        public static void schedule(Runnable task, long delay, TimeUnit unit) {
            scheduledExecutor.schedule(task, delay, unit);
        }
        /**
         * 定时执行任务一次,比如下午两点:scheduleAt(task, DateUtils.setHours(new Date(), 13))
         */
        public static void scheduleAt(Runnable task, Date time) {
            long mills = time.getTime() - System.currentTimeMillis();
            scheduledExecutor.schedule(task, mills>0 ? mills : 3, TimeUnit.MILLISECONDS);
        }
        /**
         * 定时重复执行任务,比如延迟5秒,每10分钟执行一次:scheduleAtFixRate(task, 5, TimeUnit.MINUTES.toSeconds(10), TimeUnit.SECONDS)
         */
        public static void scheduleAtFixtRate(Runnable task, long initialDelay, long delay, TimeUnit unit) {
            scheduledExecutor.scheduleWithFixedDelay(task, initialDelay, delay, unit);
        }
        /**
         * 定时重复执行任务,比如下午两点开始,每小时执行一次:scheduleAtFixRate(task, DateUtils.setHours(new Date(), 13), 1, TimeUnit.HOURS)
         */
        public static void scheduleAtFixtRate(Runnable task, Date time, long delay, TimeUnit unit) {
            long mills = time.getTime() - System.currentTimeMillis();
            scheduledExecutor.scheduleWithFixedDelay(task, mills>0 ? mills : 3, unit.toMillis(delay), TimeUnit.MILLISECONDS);
        }
        /**
         * 提交带返回值的任务,支持后续处理(调用者手动处理)
         */
        public static <T> Future<T> submit(Callable<T> task) {
            return cachedExecutor.submit(task);
        }
        /**
         * 提交带返回值的任务,支持后续处理(自动调用Callback接口)
         */
        public static <T> Future<T> submit(Callable<T> task, Callback callback) {
            Future<T> future = submit(task);
            checkInitCachedTasks();
            if(callback != null) {
                synchronized (callbackdTasks) {
                    callbackdTasks.put(future, callback);
                }
            }
            return future;
        }
        /**
         * 提交任务,等待返回值(阻塞调用者)
         */
        public static <T> T wait(Callable<T> task) {
            Future<T> future = cachedExecutor.submit(task);
            try {
                return future.get();
            } catch (Exception e) {
                log.warn(e);
                return null;
            }
        }
        private static void checkInitCachedTasks() {
            if(keepRunningTasks != nullreturn;
            
            keepRunningTasks = new HashMap<Runnable, Future<?>>();
            callbackdTasks = new HashMap<Future<?>, Callback>();
            scheduleAtFixtRate(new CachedTasksMonitor(), 1, 1, TimeUnit.MINUTES);
        }
        /**
         * 监视需要保持运行的任务
         */
        static class CachedTasksMonitor implements Runnable {
            @Override
            public void run() {
                if(keepRunningTasks.size() > 0) {
                    synchronized (keepRunningTasks) {
                        Map<Runnable, Future<?>> tempTasks = null;
                        for(Runnable task : keepRunningTasks.keySet()) {
                            Future<?> future = keepRunningTasks.get(task);
                            if(future.isDone()) {
                                future = submit(task);//恢复运行异常结束任务
                                if(tempTasks == null) tempTasks = new HashMap<Runnable, Future<?>>();
                                tempTasks.put(task, future);
                            }
                        }
                        if(tempTasks != null && tempTasks.size() > 0) keepRunningTasks.putAll(tempTasks);
                    }
                }
                
                if(callbackdTasks.size() > 0) {
                    synchronized (callbackdTasks) {
                        List<Future<?>> callbackedFutures = null;
                        for(Future<?> future : callbackdTasks.keySet()) {
                            final Callback callback = callbackdTasks.get(future);
                            if(future.isDone()) {
                                try{
                                    final Object result = future.get(5, TimeUnit.SECONDS);
                                    submit(new Runnable() {
                                        @Override
                                        public void run() {//callback可能耗时所以作为独立运行任务,而本监视器需尽快完成工作
                                            callback.handle(result);
                                        }
                                    });
                                    if(callbackedFutures == null) callbackedFutures = new LinkedList<Future<?>>();
                                    callbackedFutures.add(future);
                                }catch (Exception e) {
                                    log.warn("TaskUtil callbackedTasks warn: ", e);
                                }
                            }
                        }
                        
                        if(callbackedFutures != null && callbackedFutures.size() > 0) {
                            for(Future<?> future : callbackedFutures) {
                                callbackdTasks.remove(future);
                            }
                        }
                    }
                }
            }
        }
        
        /**
         * 自定义线程名称Task-idx-name-idx2
         */
        static class TaskUtilThreadFactory implements ThreadFactory {
            private final static AtomicInteger taskutilThreadNumber = new AtomicInteger(1);
            private final AtomicInteger threadNumber = new AtomicInteger(1);
            private final String threadNamePrefix;
            TaskUtilThreadFactory(String threadNamePrefix){
                this.threadNamePrefix = threadNamePrefix;
            }
            @Override
            public Thread newThread(Runnable r) {
                Thread t = new Thread(r, String.format("TaskUtil-%d-%s-%d"taskutilThreadNumber.getAndIncrement(), this.threadNamePrefixthreadNumber.getAndIncrement()));
                t.setDaemon(false);
                t.setPriority(Thread.NORM_PRIORITY);
                return t;
            }
        }
        
        /**
         * 等待结果回调接口
         */
        public static interface Callback {
            void handle(Object result);
        }
    }




  • 相关阅读:
    C++为什么不可以把一个数组直接赋值给另一个数组
    Eigen 矩阵库学习笔记
    HTTP请求报文和HTTP响应报文
    剔除三个(包括三个以上)的子串
    c语言实现:4和7幸运数字的题
    oracle顺序控制语句goto、null和分页过程中输入输出存储、java程序的调用过程
    oracle的控制语句if和循环语句loop while for
    oracle函数、包、变量的定义和使用、重点”结构体和数组”
    oracle pl/sql简介、块、过程
    oracle角色
  • 原文地址:https://www.cnblogs.com/xingqi/p/723763586ab36aeee9b982351c00579d.html
Copyright © 2011-2022 走看看