zoukankan      html  css  js  c++  java
  • java使用默认线程池踩过的坑(三)

    云智慧(北京)科技有限公司  陈鑫

     

    重启线程池

     TaskManager

    public class TaskManager implements Runnable {

        …..

        public TaskManager (Set<FileTask>runners) {

            super();

            this.runners = runners;

            executeTasks(runners);

        }

     

        private voidexecuteTasks(Set<FileTask> runners) {

            for (FileTask task : runners){

               pool.execute(task);

               System.out.println(task.getClass().getSimpleName() + " has beenstarted");

            }

        }

     

        @Override

        public void run() {

            while(!Thread.currentThread().isInterrupted()) {

                try {

                   long current = System.currentTimeMillis();

                   for (FileTask wrapper : runners) {

                       if (wrapper.getLastExecTime() != 0 && current -wrapper.getLastExecTime() > wrapper.getInterval() * 5 * 1000) {   // 开始忘了乘以1000

                           wrapper.interrupt();

                           if (wrapper.getFiles() != null){

                               for (File file : wrapper.getFiles()){

                                   file.delete();

                               }

                           }

                           System.out.println("Going to shutdown thethread pool");

                           List<Runnable> shutdownNow = pool.shutdownNow();    // 不等当前pool里的任务执行完,直接关闭线程池

                           for (Runnable run : shutdownNow) {

                               System.out.println(run + " goingto be shutdown");

                           }

                          while (pool.awaitTermination(1, TimeUnit.SECONDS)) { 

                               System.out.println("The threadpool has been shutdown " + new Date());

                               executeTasks(runners);//重新执行

                               Thread.sleep(200);

                           }

                       }

                    }

                } catch(Exception e1) {

                   e1.printStackTrace();

                }

                try {

                   Thread.sleep(500);

                } catch(InterruptedException e) {

                }

            }

        }

        public static void main(String[] args) {

            Set<FileTask> tasks =new HashSet<FileTask>();

           

            FileTask task = newFileTask();

            task.setInterval(1);

            task.setName("task-1");

            tasks.add(task);

           

           

            FileTask task1 = newFileTask();

            task1.setInterval(2);

           task.setName("task-2");

            tasks.add(task1);

           

            TaskManager  codeManager = new TaskManager (tasks);

            newThread(codeManager).start();

        }

       

    }

     

    成功!把整个的ThreadPoolExector里所有的worker全部停止,之后再向其队列里重新加入要执行的两个task(注意这里并没有清空,只是停止而已)。这样做虽然能够及时处理task,但是一个很致命的缺点在于,如果不能明确的知道ThreadPoolExecutor要执行的task,就没有办法重新执行这些任务。

    定制线程池

    好吧!停止钻研别人的东西!我们完全可以自己写一个自己的ThreadPoolExecutor,只要把worker暴露出来就可以了。这里是不是回想起前面的start问题来了,没错,我们即便能够直接针对Thread进行interrupt, 但是不能再次start它了。那么clone一个同样的Thread行不行呢?

     Thread

    @Override

        protectedObject clone() throws CloneNotSupportedException{

            throw newCloneNotSupportedException();

    }

    答案显而易见,线程是不支持clone 的。我们需要重新new 一个Thread来重新运行。其实我们只需要将原来的Worker里的Runnable换成我们自己的task,然后将访问权限适当放开就可以了。还有,就是让我们的CustomThreadPoolExecutor继承Thread,因为它需要定时监控自己的所有的worker里Thread的运行状态。

      CustomThreadPoolExecutor

    public class CustomThreadPoolExecutor extendsThreadPoolExecutor implements Runnable {

             public voidexecute(Testask command) {

    ….//将执行接口改为接收我们的业务类

    }

             …

             …

             private final class Worker

            extends AbstractQueuedSynchronizer

            implements Runnable

        {

            …

    Testask firstTask; //将Runnable改为我们的业务类,方便查看状态

                       …

                       Worker(Testask firstTask) {

                …//同样将初始化参数改为我们的业务类

            }

     

    }

        public staticvoid main(String[] args) {

           CustomThreadPoolExecutor pool = new CustomThreadPoolExecutor(0, Integer.MAX_VALUE,

                   60L, TimeUnit.SECONDS,

                    newSynchronousQueue<Runnable>());

     

            Testasktask = new Testask();

           task.setInterval(1);

           pool.execute(task);

     

            Testasktask1 = new Testask();

           task1.setInterval(2);

           pool.execute(task1);

     

            newThread(pool).start();

        }

     

        @Override

        public voidrun() {

            while(!Thread.currentThread().isInterrupted()) {

                try {

                   long current = System.currentTimeMillis();

                   Set<Testask> toReExecute = new HashSet<Testask>();

                   System.out.println(" number is " + number);

                    for(Worker wrapper : workers) {

                        Testask tt = wrapper.firstTask;

                       if (tt != null) {

                           if (current - tt.getLastExecTime() > tt.getInterval() * 5 * 1000) {

    wrapper.interruptIfStarted();

                                remove(tt);

                               if (tt.getFiles() != null) {

                                    for (File file: tt.getFiles()) {

                                       file.delete();

                                    }

                                }

                               System.out.println("THread is timeout : " + tt + " "+ new Date());

                               toReExecute.add(tt);

                           }

                       }

                    }

     if(toReExecute.size() > 0) {

                        mainLock.lock();

                        try {

                            for (Testask tt :toReExecute) {

                                execute(tt);    // execute this task again

                            }

                        } finally {

                           mainLock.unlock();

                        }

                    }

                } catch(Exception e1) {

                   System.out.println("Error happens when we trying to interrupt andrestart a code task ");

                }

                try {

                    Thread.sleep(500);

                } catch(InterruptedException e) {

                }

            }

        }

    }

     Testask

    class Testask implements Runnable {

        …..

     

        @Override

        public voidrun() {

            while(!Thread.currentThread().isInterrupted()) {

               lastExecTime = System.currentTimeMillis();

               System.out.println(Thread.currentThread().getName() + " is running-> " + new Date());

                try {

                   CustomThreadPoolExecutor.number++;

                   Thread.sleep(getInterval() * 6 * 1000);

                   System.out.println(Thread.currentThread().getName() + " aftersleep");

                } catch(InterruptedException e) {

    Thread.currentThread().interrupt();

                   System.out.println("InterruptedException happens");

                }

            }

           System.out.println("Going to die");

        }

    }

    最终方案

    综上,最稳妥的就是使用JDK自带的ThreadPoolExecutor,如果需要对池里的task进行任意时间的控制,可以考虑全面更新,全方面,360度无死角的定制自己的线程池当然是最好的方案,但是一定要注意对于共享对象的处理,适当的处理好并发访问共享对象的方法。

    鉴于我们的场景,由于时间紧,而且需要了解的task并不多,暂时选用全部重新更新的策略。上线后,抽时间把自己定制的ThreadPoolExecutor搞定,然后更新上去!

  • 相关阅读:
    hive 之only supports newline ' ' right now. Error encountered near token ''报错
    四、第三方图标库
    三、工具模块封装(二):封装mock模块
    三、工具模块封装(一):封装axios模块
    二、前端项目案例
    一、搭建前端开发环境(Vue+element)
    注册中心(Consul)
    系统服务监控(Spring Boot Admin)
    JWT
    Spring Security(四)
  • 原文地址:https://www.cnblogs.com/amy26/p/4629624.html
Copyright © 2011-2022 走看看