zoukankan      html  css  js  c++  java
  • thread_ThreadPoolExecutor

    目录

      1.基础知识

      2.简单应用

      3.异常机制

      4.丰富的扩展

    一.基础知识

      构造函数。 

    public ThreadPoolExecutor(   
    int corePoolSize,    指的是保留的线程池大小
    int maximumPoolSize,    指的是线程池的最大大小
    long keepAliveTime,    指的是空闲线程结束的超时时间
    TimeUnit unit,   
    BlockingQueue<Runnable> workQueue)  表示存放任务的队列  

      工作过程:

    1 、线程池刚创建时,里面没有一个线程。任务队列是作为参数传进来的。不过,就算队列里面有任务,线程池也不会马上执行它们。
    2 、当调用 execute() 方法添加一个任务时,线程池会做如下判断:
      a. 如果正在运行的线程数量小于 corePoolSize,那么马上创建线程运行这个任务;
      b. 如果正在运行的线程数量大于或等于 corePoolSize,那么将这个任务放入队列。
      c. 如果这时候队列满了,而且正在运行的线程数量小于 maximumPoolSize,那么还是要创建线程运行这个任务;
      d. 如果队列满了,而且正在运行的线程数量大于或等于 maximumPoolSize,那么线程池会抛出异常,告诉调用者“我不能再接受任务了”。
    3 、当一个线程完成任务时,它会从队列中取下一个任务来执行。
    4 、当一个线程无事可做,超过一定的时间(keepAliveTime)时,线程池会判断,如果当前运行的线程数大于 corePoolSize,那么这个线程就被停掉。所以线程池的所有任务完成后,它最终会收缩到 corePoolSize 的大小。

    二.简单应用  

        public void threadPool1Test() throws InterruptedException {
           BlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>();  
           ThreadPoolExecutor exe = new ThreadPoolExecutor(2, 6, 1, TimeUnit.DAYS, queue);  
            try {
                int threadNum = 0;
                for (int i = 0; i < 10; i++) {
                    threadNum++;
                    final int currentThreadNum = threadNum;
                    exe.execute(new Runnable() {
                        @Override
                        public void run() {
                            try {
                                System.out.println("子线程[" + currentThreadNum + "]开启");
                                Thread.sleep(1000 * 10);
                            } catch (InterruptedException e) {
                                e.printStackTrace();
                            } finally {
                                System.out.println("子线程[" + currentThreadNum + "]结束");
                            }
                        }
                    });
                }
                
                //不会立即终止线程池,而是要等所有任务缓存队列中的任务都执行完后才终止,但再也不会接受新的任务
                System.out.println("已经开启所有的子线程");
                exe.shutdown();
                System.out.println("shutdown():启动一次顺序关闭,执行以前提交的任务,但不接受新任务。");
                
                //判断线程池所有任务是否执行完毕
                while (true) {
                    if (exe.isTerminated()) {
                        System.out.println("所有的子线程都结束了!");
                        break;
                    }
                    Thread.sleep(1000);
                    System.out.println("线程池中线程数目:"+exe.getPoolSize()+
                                       ",队列中等待执行的任务数目:"+ exe.getQueue().size()+
                                       ",已执行玩别的任务数目:"+exe.getCompletedTaskCount());    
                    System.out.println("线程队列大小为-->"+queue.size());  
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                System.out.println("主线程结束");
            }
        }

    1、BlockingQueue 只是一个接口,常用的实现类有 LinkedBlockingQueue 和 ArrayBlockingQueue。用 LinkedBlockingQueue 的好处在于没有大小限制。这样的话,因为队列不会满,所以 execute() 不会抛出异常,而线程池中运行的线程数也永远不会超过 corePoolSize 个,keepAliveTime 参数也就没有意义了。
    2、shutdown() 方法不会阻塞。调用 shutdown() 方法之后,主线程就马上结束了,而线程池会继续运行直到所有任务执行完才会停止。如果不调用 shutdown() 方法,那么线程池会一直保持下去,以便随时添加新的任务。 

    二.异常处理

         线程超出了线程池的总容量(线程队列大小+最大线程数)

        @Test
        public void threadPool2Test() throws InterruptedException {
            ThreadPoolExecutor executor = new ThreadPoolExecutor(2, 6, 1, TimeUnit.MILLISECONDS,
                    new ArrayBlockingQueue<Runnable>(4));
            //executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
    
            executor.setRejectedExecutionHandler(
              new RejectedExecutionHandler() {
                public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
                    System.out.println(String.format("Task %d rejected.", r.hashCode()));
                    //System.out.println("DemoTask Rejected : " + ((DemoThread) r).getName());
                    System.out.println("Waiting for a second !!"); 
                    try {
                        Thread.sleep(1000);
                         executor.execute(r);
                         //executor.getQueue().put(r);
                    } catch (InterruptedException e) {
                    }
                }
              });
            ///////
            for (int i = 0; i < 11; i++) {
                final int taskNum = i;
                Runnable myTask = new Runnable() {
                    @Override
                    public void run() {
                        System.out.println("正在执行task " + taskNum);
    
                        try {
                            Thread.currentThread().sleep(100 * (new Random()).nextInt(8));
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                        System.out.println("task " + taskNum + "执行完毕");
                    }
                };
                executor.execute(myTask);
            }
            // 不会立即终止线程池,而是要等所有任务缓存队列中的任务都执行完后才终止,但再也不会接受新的任务
            executor.shutdown();
            while (true) {
                if (executor.isTerminated()) {
                    System.out.println("所有的子线程都结束了!");
                    break;
                }
                Thread.sleep(1000);
                System.out.println("线程池中线程数目:" + executor.getPoolSize() + ",队列中等待执行的任务数目:" + executor.getQueue().size()
                        + ",已执行玩别的任务数目:" + executor.getCompletedTaskCount());
            }
        }

     

       ThreadPoolExecutor 提供 4 个现有的策略,分别是:

    ThreadPoolExecutor.AbortPolicy:表示拒绝任务并抛出异常

    ThreadPoolExecutor.DiscardPolicy:表示拒绝任务但不做任何动作
    ThreadPoolExecutor.CallerRunsPolicy:表示拒绝任务,并在调用者的线程中直接执行该任务
    ThreadPoolExecutor.DiscardOldestPolicy:表示先丢弃任务队列中的第一个任务,然后把这个任务加进队列。
    使用CallerRunsPolicy,会将所有线程都执行到,也不可避免的会使用主线程来加载一个线程任务。一次同时创建maximumPoolSize个线程,加上主线程,就是一次执行maximumPoolSize+1个线程任务,等执行完后才会,再执行maximunPoolSize+1个线程任务,当然创建的maximumPoolSize个线程也会复用。
    显然这种策略保证了每一个线程任务都会执行,但牺牲了性能。而,DiscardPolicy和DiscardOldestPolicy都会抛弃容纳不了的线程任务,保证了性能。

     a. ThreadPoolExecutor可以设置一个“拒绝策略”,这是指当一个task被拒绝添加到线程池中时,采取的处理措施,
      executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy());
     b.task非常重要,“等得起”,但是“丢不起”
        System.out.println("Waiting for a second !!");
       try {
          Thread.sleep(1000);
           executor.execute(r);
       } catch (InterruptedException e) {
       }
     c.利用RejectedExecutionHandler来阻塞submit()?
        首先 submit()方法是调用了workQueue的offer()方法来塞入task,而offer()方法是非阻塞的,当workQueue已经满的时候,offer()方法会立即返回false,并不会阻塞在那里等待workQueue有空出位置,所以要让submit()阻塞,关键在于改变向workQueue添加task的行为,
      if (!executor.isShutdown()) {
          try {
              executor.getQueue().put(r);
           } catch (InterruptedException e) {
           }
       }
       调用了getQueue()方法,得到了workQueue,再调用其put()方法,将task放到workQueue中,而这个put()方法是阻塞的
       当workQueue满时,submit()一个task会导致调用我们自定义的RejectedExecutionHandler,而我们自定义的RejectedExecutionHandler会保证该task继续被尝试   用阻塞式的put()到workQueue中。

    d.重写了offer()方法的BlockingQueue

                  由于submit()是调用workQueue的offer()方法来添加task的,而offer()是非阻塞的,所以,如果我们自己实现一个BlockingQueue,其offer()方法是阻塞的

    public class LimitedQueue<E> extends LinkedBlockingQueue<E> {
      public LimitedQueue(int maxSize) {
        super(maxSize);
      }
      @Override
      public boolean offer(E e) {
        // turn offer() and add() into a blocking calls (unless interrupted)
        try {
          put(e);
          return true;
        } catch (InterruptedException ie) {
          Thread.currentThread().interrupt();
        }
        return false;
      }
    }

      四.丰富的可扩展性

       1.扩展生成器,如线程的创建前beforeExecute,创建后afterExecute,退出terminated;   
                             线程的创建策略, 通过重构ThreadFactory 重构线程名,设置守护线程等;

                             线程的拒绝策略,日志,拒绝,重试等

                             线程运行抛出异常机制    

    class ExtThreadPoolExecutor extends ThreadPoolExecutor {
    
        public ExtThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
                BlockingQueue<Runnable> workQueue) {
            super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
            super.setThreadFactory(new ExtThreadFactory());
            super.setRejectedExecutionHandler(new ExtRejectedExecutionHandler());
        }
    
        @Override
        protected void beforeExecute(Thread t, Runnable r) {
            super.beforeExecute(t, r);
            System.out.println("Perform beforeExecute() ");
        }
        @Override
        protected void terminated() {
            
            System.out.println("线程池退出 ");
        }
        @Override
        protected void afterExecute(Runnable r, Throwable t) {
            super.afterExecute(r, t);
            if (t != null) {
                System.out.println("Perform exception handler logic");
            }
            System.out.println("Perform afterExecute() ");
        }
        /////////////////////////////////
    //    @Override
    //    public void execute(Runnable task) {
    //        super.execute(wrap(task,clientTrace(),Thread.currentThread().getName())); 
    //    }
    //    @Override
    //    public Future<?> submit(Runnable task) {
    //        return super.submit(wrap(task,clientTrace(),Thread.currentThread().getName())); 
    //    }
        private Exception clientTrace(){
            return new Exception("client stack trace");
        }
        private Runnable wrap(final Runnable task ,final Exception clientStack,String clientThreadName ){
            return new Runnable(){
                @Override
                public void run() {
                    try {
                        task.run();
                    } catch (Exception e) {
                        clientStack.printStackTrace();
                        throw e;
                    }
                    task.run();
                    
                }
            };
            
        }
        ///////////////////////////////////////////
        private class ExtThreadFactory implements ThreadFactory {
            private AtomicInteger count = new AtomicInteger(0);
            @Override
            public Thread newThread(Runnable r) {
                Thread t = new Thread(r);
                String threadName = ExtThreadPoolExecutor.class.getSimpleName() + count.addAndGet(1);
                System.out.println("create "+threadName);
                //t.setDaemon(true);
                t.setName(threadName);
                return t;
            }
        }
        
        //
        private class ExtRejectedExecutionHandler implements RejectedExecutionHandler {
            @Override
            public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
                System.out.println("DemoTask Rejected : " + ((DemoThread) r).getName());
                System.out.println("Waiting for a second !!");
    
                if (!executor.isShutdown()) {
                    try {
                        Thread.sleep(1000);
                        // executor.execute(r);
                         executor.getQueue().put(r);
                        // 当task被拒绝添加到线程池中时,ThreadPoolExecutor会采用“丢弃”策略来对待这个任务,即这个task被丢弃了。
                    //    System.out.println("Lets add another time : " + ((DemoThread) r).getName());
                    } catch (InterruptedException e) {
                    }
                }
            }
        }
    }

       2.扩展生成器

    class DemoThread implements Runnable {
        private String name = null;
    
        public DemoThread(String name) {
            this.name = name;
        }
    
        public String getName() {
            return this.name;
        }
    
        @Override
        public void run() {
            try {
                Thread.sleep(500);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("Executing : taskname: " + name +", Thread id: " +Thread.currentThread().getId());
        }
    }
        public static void main(String[] args) {
    
            BlockingQueue<Runnable> blockingQueue = new ArrayBlockingQueue<Runnable>(50);
    
            ExtThreadPoolExecutor pool = new ExtThreadPoolExecutor(10, 20, 5000, TimeUnit.MILLISECONDS,
                    blockingQueue);
    
     
            for (int i = 1; i < 100; i++) {
                System.out.println("提交第" + i + "个任务!");
                pool.execute(new DemoThread(Integer.valueOf(i).toString() ) );
                pool.submit(new DemoThread(Integer.valueOf(i).toString() ));
            }
    
            // 2.销毁----此处不能销毁,因为任务没有提交执行完,如果销毁线程池,任务也就无法执行了
            // exec.destory();
    
            try {
                Thread.sleep(10000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
    
        }

         



  • 相关阅读:
    hdu 4027 Can you answer these queries?
    hdu 4041 Eliminate Witches!
    hdu 4036 Rolling Hongshu
    pku 2828 Buy Tickets
    hdu 4016 Magic Bitwise And Operation
    pku2886 Who Gets the Most Candies?(线段树+反素数打表)
    hdu 4039 The Social Network
    hdu 4023 Game
    苹果官方指南:Cocoa框架(2)(非原创)
    cocos2d 中 CCNode and CCAction
  • 原文地址:https://www.cnblogs.com/dengzy/p/5811643.html
Copyright © 2011-2022 走看看