zoukankan      html  css  js  c++  java
  • java使用默认线程池踩过的坑(三)

                                  云智慧(北京)科技有限公司  陈鑫
    

    重新启动线程池
    TaskManager
    public class TaskManager implements Runnable {
    …..
    public TaskManager (Setrunners) {
    super();
    this.runners = runners;
    executeTasks(runners);
    }

    private voidexecuteTasks(Set<FileTask> runners) {
        for (FileTask task : runners){
           pool.execute(task);
           System.out.println(task.getClass().getSimpleName() + " has beenstarted");
        }
    }
    
    @Override
    public void run() {
        while(!Thread.currentThread().isInterrupted()) {
            try {
               long current = System.currentTimeMillis();
               for (FileTask wrapper : runners) {
                   if (wrapper.getLastExecTime() != 0 && current -wrapper.getLastExecTime() > wrapper.getInterval() * 5 * 1000) {   // 開始忘了乘以1000
                       wrapper.interrupt();
                       if (wrapper.getFiles() != null){
                           for (File file : wrapper.getFiles()){
                               file.delete();
                           }
                       }
                       System.out.println("Going to shutdown thethread pool");
                       List<Runnable> shutdownNow = pool.shutdownNow();    // 不等当前pool里的任务运行完。直接关闭线程池
                       for (Runnable run : shutdownNow) {
                           System.out.println(run + " goingto be shutdown");
                       }
                      while (pool.awaitTermination(1, TimeUnit.SECONDS)) { 
                           System.out.println("The threadpool has been shutdown " + new Date());
                           executeTasks(runners);//又一次运行
                           Thread.sleep(200);
                       }
                   }
                }
            } catch(Exception e1) {
               e1.printStackTrace();
            }
            try {
               Thread.sleep(500);
            } catch(InterruptedException e) {
            }
        }
    }
    public static void main(String[] args) {
        Set<FileTask> tasks =new HashSet<FileTask>();
    
        FileTask task = newFileTask();
        task.setInterval(1);
        task.setName("task-1");
        tasks.add(task);
    
    
        FileTask task1 = newFileTask();
        task1.setInterval(2);
       task.setName("task-2");
        tasks.add(task1);
    
        TaskManager  codeManager = new TaskManager (tasks);
        newThread(codeManager).start();
    }
    

    }

    成功。把整个的ThreadPoolExector里所有的worker所有停止,之后再向其队列里又一次增加要运行的两个task(注意这里并没有清空,仅仅是停止而已)。这样做尽管能够及时处理task,可是一个非常致命的缺点在于。假设不能明白的知道ThreadPoolExecutor要运行的task。就没有办法又一次运行这些任务。


    定制线程池
    好吧!停止钻研别人的东西!

    我们全然能够自己写一个自己的ThreadPoolExecutor。仅仅要把worker暴露出来就能够了。这里是不是回忆起前面的start问题来了,没错。我们即便能够直接针对Thread进行interrupt, 可是不能再次start它了。那么clone一个相同的Thread行不行呢?
    Thread
    @Override
    protectedObject clone() throws CloneNotSupportedException{
    throw newCloneNotSupportedException();
    }
    答案显而易见。线程是不支持clone 的。

    我们须要又一次new 一个Thread来又一次运行。事实上我们仅仅须要将原来的Worker里的Runnable换成我们自己的task,然后将訪问权限适当放开就能够了。

    还有,就是让我们的CustomThreadPoolExecutor继承Thread,由于它须要定时监控自己的所有的worker里Thread的运行状态。


    CustomThreadPoolExecutor
    public class CustomThreadPoolExecutor extendsThreadPoolExecutor implements Runnable {
    public voidexecute(Testask command) {
    ….//将运行接口改为接收我们的业务类
    }


    private final class Worker
    extends AbstractQueuedSynchronizer
    implements Runnable
    {

    Testask firstTask; //将Runnable改为我们的业务类,方便查看状态

    Worker(Testask firstTask) {
    …//相同将初始化參数改为我们的业务类
    }

    }
    public staticvoid main(String[] args) {
    CustomThreadPoolExecutor pool = new CustomThreadPoolExecutor(0, Integer.MAX_VALUE,
    60L, TimeUnit.SECONDS,
    newSynchronousQueue());

        Testasktask = new Testask();
       task.setInterval(1);
       pool.execute(task);
    
        Testasktask1 = new Testask();
       task1.setInterval(2);
       pool.execute(task1);
    
        newThread(pool).start();
    }
    
    @Override
    public voidrun() {
        while(!Thread.currentThread().isInterrupted()) {
            try {
               long current = System.currentTimeMillis();
               Set<Testask> toReExecute = new HashSet<Testask>();
               System.out.println("	 number is " + number);
                for(Worker wrapper : workers) {
                    Testask tt = wrapper.firstTask;
                   if (tt != null) {
                       if (current - tt.getLastExecTime() > tt.getInterval() * 5 * 1000) {
    

    wrapper.interruptIfStarted();
    remove(tt);
    if (tt.getFiles() != null) {
    for (File file: tt.getFiles()) {
    file.delete();
    }
    }
    System.out.println(“THread is timeout : ” + tt + ” “+ new Date());
    toReExecute.add(tt);
    }
    }
    }
    if(toReExecute.size() > 0) {
    mainLock.lock();
    try {
    for (Testask tt :toReExecute) {
    execute(tt); // execute this task again
    }
    } finally {
    mainLock.unlock();
    }
    }
    } catch(Exception e1) {
    System.out.println(“Error happens when we trying to interrupt andrestart a code task “);
    }
    try {
    Thread.sleep(500);
    } catch(InterruptedException e) {
    }
    }
    }
    }
    Testask
    class Testask implements Runnable {
    …..

    @Override
    public voidrun() {
        while(!Thread.currentThread().isInterrupted()) {
           lastExecTime = System.currentTimeMillis();
           System.out.println(Thread.currentThread().getName() + " is running-> " + new Date());
            try {
               CustomThreadPoolExecutor.number++;
               Thread.sleep(getInterval() * 6 * 1000);
               System.out.println(Thread.currentThread().getName() + " aftersleep");
            } catch(InterruptedException e) {
    

    Thread.currentThread().interrupt();
    System.out.println(“InterruptedException happens”);
    }
    }
    System.out.println(“Going to die”);
    }
    }
    终于方案
    综上,最稳妥的就是使用JDK自带的ThreadPoolExecutor,假设须要对池里的task进行随意时间的控制,能够考虑全面更新,全方面,360度无死角的定制自己的线程池当然是最好的方案。可是一定要注意对于共享对象的处理,适当的处理好并发訪问共享对象的方法。
    鉴于我们的场景。由于时间紧,并且须要了解的task并不多。临时选用所有又一次更新的策略。

    上线后。抽时间把自己定制的ThreadPoolExecutor搞定,然后更新上去!

  • 相关阅读:
    (转)创建Windows服务(Windows Services)N种方式总结
    无法加载协定为“ServiceReference1.xxxxxx”的终结点配置部分,因为找到了该协定的多个终结点配置。请按名称指示首选的终结点配置部分。
    《App架构实践指南》
    Awesome Projects (汇聚全球所有🐮项目,你值得拥有)
    【公告】个人站点及系列文章
    Android+TensorFlow+CNN+MNIST 手写数字识别实现
    TensorFlow基础
    UiAutomator2.0升级填坑记
    那些年,从博客到出书的博主
    Appuim源码剖析(Bootstrap)
  • 原文地址:https://www.cnblogs.com/liguangsunls/p/7253372.html
Copyright © 2011-2022 走看看