zoukankan      html  css  js  c++  java
  • java使用默认线程池踩过的坑(三)

    云智慧(北京)科技有限公司  陈鑫

     

    重启线程池

     TaskManager

    public class TaskManager implements Runnable {

        …..

        public TaskManager (Set<FileTask>runners) {

            super();

            this.runners = runners;

            executeTasks(runners);

        }

     

        private voidexecuteTasks(Set<FileTask> runners) {

            for (FileTask task : runners){

               pool.execute(task);

               System.out.println(task.getClass().getSimpleName() + " has beenstarted");

            }

        }

     

        @Override

        public void run() {

            while(!Thread.currentThread().isInterrupted()) {

                try {

                   long current = System.currentTimeMillis();

                   for (FileTask wrapper : runners) {

                       if (wrapper.getLastExecTime() != 0 && current -wrapper.getLastExecTime() > wrapper.getInterval() * 5 * 1000) {   // 开始忘了乘以1000

                           wrapper.interrupt();

                           if (wrapper.getFiles() != null){

                               for (File file : wrapper.getFiles()){

                                   file.delete();

                               }

                           }

                           System.out.println("Going to shutdown thethread pool");

                           List<Runnable> shutdownNow = pool.shutdownNow();    // 不等当前pool里的任务执行完,直接关闭线程池

                           for (Runnable run : shutdownNow) {

                               System.out.println(run + " goingto be shutdown");

                           }

                          while (pool.awaitTermination(1, TimeUnit.SECONDS)) { 

                               System.out.println("The threadpool has been shutdown " + new Date());

                               executeTasks(runners);//重新执行

                               Thread.sleep(200);

                           }

                       }

                    }

                } catch(Exception e1) {

                   e1.printStackTrace();

                }

                try {

                   Thread.sleep(500);

                } catch(InterruptedException e) {

                }

            }

        }

        public static void main(String[] args) {

            Set<FileTask> tasks =new HashSet<FileTask>();

           

            FileTask task = newFileTask();

            task.setInterval(1);

            task.setName("task-1");

            tasks.add(task);

           

           

            FileTask task1 = newFileTask();

            task1.setInterval(2);

           task.setName("task-2");

            tasks.add(task1);

           

            TaskManager  codeManager = new TaskManager (tasks);

            newThread(codeManager).start();

        }

       

    }

     

    成功!把整个的ThreadPoolExector里所有的worker全部停止,之后再向其队列里重新加入要执行的两个task(注意这里并没有清空,只是停止而已)。这样做虽然能够及时处理task,但是一个很致命的缺点在于,如果不能明确的知道ThreadPoolExecutor要执行的task,就没有办法重新执行这些任务。

    定制线程池

    好吧!停止钻研别人的东西!我们完全可以自己写一个自己的ThreadPoolExecutor,只要把worker暴露出来就可以了。这里是不是回想起前面的start问题来了,没错,我们即便能够直接针对Thread进行interrupt, 但是不能再次start它了。那么clone一个同样的Thread行不行呢?

     Thread

    @Override

        protectedObject clone() throws CloneNotSupportedException{

            throw newCloneNotSupportedException();

    }

    答案显而易见,线程是不支持clone 的。我们需要重新new 一个Thread来重新运行。其实我们只需要将原来的Worker里的Runnable换成我们自己的task,然后将访问权限适当放开就可以了。还有,就是让我们的CustomThreadPoolExecutor继承Thread,因为它需要定时监控自己的所有的worker里Thread的运行状态。

      CustomThreadPoolExecutor

    public class CustomThreadPoolExecutor extendsThreadPoolExecutor implements Runnable {

             public voidexecute(Testask command) {

    ….//将执行接口改为接收我们的业务类

    }

             …

             …

             private final class Worker

            extends AbstractQueuedSynchronizer

            implements Runnable

        {

            …

    Testask firstTask; //将Runnable改为我们的业务类,方便查看状态

                       …

                       Worker(Testask firstTask) {

                …//同样将初始化参数改为我们的业务类

            }

     

    }

        public staticvoid main(String[] args) {

           CustomThreadPoolExecutor pool = new CustomThreadPoolExecutor(0, Integer.MAX_VALUE,

                   60L, TimeUnit.SECONDS,

                    newSynchronousQueue<Runnable>());

     

            Testasktask = new Testask();

           task.setInterval(1);

           pool.execute(task);

     

            Testasktask1 = new Testask();

           task1.setInterval(2);

           pool.execute(task1);

     

            newThread(pool).start();

        }

     

        @Override

        public voidrun() {

            while(!Thread.currentThread().isInterrupted()) {

                try {

                   long current = System.currentTimeMillis();

                   Set<Testask> toReExecute = new HashSet<Testask>();

                   System.out.println(" number is " + number);

                    for(Worker wrapper : workers) {

                        Testask tt = wrapper.firstTask;

                       if (tt != null) {

                           if (current - tt.getLastExecTime() > tt.getInterval() * 5 * 1000) {

    wrapper.interruptIfStarted();

                                remove(tt);

                               if (tt.getFiles() != null) {

                                    for (File file: tt.getFiles()) {

                                       file.delete();

                                    }

                                }

                               System.out.println("THread is timeout : " + tt + " "+ new Date());

                               toReExecute.add(tt);

                           }

                       }

                    }

     if(toReExecute.size() > 0) {

                        mainLock.lock();

                        try {

                            for (Testask tt :toReExecute) {

                                execute(tt);    // execute this task again

                            }

                        } finally {

                           mainLock.unlock();

                        }

                    }

                } catch(Exception e1) {

                   System.out.println("Error happens when we trying to interrupt andrestart a code task ");

                }

                try {

                    Thread.sleep(500);

                } catch(InterruptedException e) {

                }

            }

        }

    }

     Testask

    class Testask implements Runnable {

        …..

     

        @Override

        public voidrun() {

            while(!Thread.currentThread().isInterrupted()) {

               lastExecTime = System.currentTimeMillis();

               System.out.println(Thread.currentThread().getName() + " is running-> " + new Date());

                try {

                   CustomThreadPoolExecutor.number++;

                   Thread.sleep(getInterval() * 6 * 1000);

                   System.out.println(Thread.currentThread().getName() + " aftersleep");

                } catch(InterruptedException e) {

    Thread.currentThread().interrupt();

                   System.out.println("InterruptedException happens");

                }

            }

           System.out.println("Going to die");

        }

    }

    最终方案

    综上,最稳妥的就是使用JDK自带的ThreadPoolExecutor,如果需要对池里的task进行任意时间的控制,可以考虑全面更新,全方面,360度无死角的定制自己的线程池当然是最好的方案,但是一定要注意对于共享对象的处理,适当的处理好并发访问共享对象的方法。

    鉴于我们的场景,由于时间紧,而且需要了解的task并不多,暂时选用全部重新更新的策略。上线后,抽时间把自己定制的ThreadPoolExecutor搞定,然后更新上去!

  • 相关阅读:
    WCF Server Console
    Restart IIS With Powershell
    RestartService (recursively)
    Copy Files
    Stopping and Starting Dependent Services
    多线程同步控制 ManualResetEvent AutoResetEvent MSDN
    DTD 简介
    Using Powershell to Copy Files to Remote Computers
    Starting and Stopping Services (IIS 6.0)
    java中的NAN和INFINITY
  • 原文地址:https://www.cnblogs.com/amy26/p/4629624.html
Copyright © 2011-2022 走看看