zoukankan      html  css  js  c++  java
  • 使用ExecutorService实现线程池

    ExecutorService是java提供的用于管理线程池的类。

    线程池的作用:

      - 控制线程数量

      - 重用线程

      当一个程序中创建了许多线程,并在任务结束后销毁,会给系统带来过度消耗资源,以及过度切换线程的危险,从而可能导致系统崩溃。为此我们应使用线程池来解决这个问题。

    线程池的概念:

      首先创建一些线程,它们的集合称为线程池,当服务器受到一个客户请求后,就从线程池中取出一个空闲的线程为之服务,服务完后不关闭该线程,而是将该线程还回到线程池中。

      在线程池编程模式下,任务是提交给整个线程池,而不是交给某个线程,线程池拿到任务就在内部找空闲的线程,再把任务交给内部的空闲线程,一个线程只能执行一个任务,但可以向线程池提交多个任务。

    线程池的主要实现方法:

    1、 Executors.newFixedThreadPool(int nThreads);

    说明:创建固定大小(nThreads, 大小不能超过int的最大值)的线程池

    // 线程数量  int nThreads = 20;

    // 创建executor 服务  : ExecutorService executor = Executors.newFixedThreadPool(nThreads) ;

    重载后的版本,需要传入实现了ThreadFactory接口的对象。

                  ExecutorService executor = Executors. newFixedThreadPool(nThreads, threadFactory);

     

    说明:创建固定大小(nThreads, 大小不能超过int的最大值) 的线程池,缓冲任务的队列为LinkedBlockingQueue,大小为整型的最大数,当使用此线程池时,在同执行的任务数量超过传入的线程池大小值后,将会放入LinkedBlockingQueue,在LinkedBlockingQueue中的任务需要等待线程空闲后再执行,如果放入LinkedBlockingQueue中的任务超过整型的最大数时,抛出RejectedExecutionException。

    2、Executors.newSingleThreadExecutor():创建大小为1的固定线程池。

    ExecutorService executor = Executors.newSingleThreadExecutor();

    重载后的版本,需要多传入实现了ThreadFactory接口的对象。

    ExecutorService executor = Executors. newSingleThreadScheduledExecutor(ThreadFactory threadFactory) 

    说明:创建大小为1的固定线程池,执行任务的线程只有一个,其它的(任务)task都放在LinkedBlockingQueue中排队等待执行。

    3、Executors.newCachedThreadPool();创建corePoolSize为0,最大线程数为整型的最大数,线程  keepAliveTime为1分钟,缓存任务的队列为SynchronousQueue的线程池。

    ExecutorService executor = Executors.newCachedThreadPool();

    当然也可以以下面的方式创建,重载后的版本,需要多传入实现了ThreadFactory接口的对象。

    ExecutorService executor = Executors.newCachedThreadPool(ThreadFactory threadFactory) ;

    说明:使用时,放入线程池的task任务会复用线程或启动新线程来执行,注意事项:启动的线程数如果超过整型最大值后会抛出RejectedExecutionException异常,启动后的线程存活时间为一分钟

    4、Executors.newScheduledThreadPool(int corePoolSize):创建corePoolSize大小的线程池。

    // 线程数量 int corePoolSize= 20;

    // 创建executor 服务 : ExecutorService executor = Executors.newScheduledThreadPool(corePoolSize) ;

      

    重载后的版本,需要多传入实现了ThreadFactory接口的对象。

    ExecutorService executor = Executors.newScheduledThreadPool(corePoolSize, threadFactory) ;

    说明:线程keepAliveTime为0,缓存任务的队列为DelayedWorkQueue,注意不要超过整型的最大值。

    这种线程池有些不同,它可以实现定时器执行任务的功能,下面对第四种线程池进行代码演示:

    import java.util.concurrent.Executors;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.ScheduledExecutorService;
    import java.util.concurrent.ScheduledFuture;
    import java.util.concurrent.TimeUnit;
    
    class Temp extends Thread {
        public void run() {
            System.out.println("run");
        }
    }
    
    public class ScheduledJob {
        
        public static void main(String args[]) throws Exception {
        
            Temp command = new Temp();
            ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
            //command代表执行的任务,5代表延迟5秒后开始执行,1代表每隔1秒执行一次,TimeUnit.SECONDS代表时间单位是秒
            ScheduledFuture<?> scheduleTask = scheduler.scheduleWithFixedDelay(command, 5, 1, TimeUnit.SECONDS);
        
        }
    }

      下面以一段代码演示一般情况线程池的使用: 

    public static void main(String[] args) {
         ExecutorService threadpoo1 = Executors.newFixedThreadPool(2);
         for(int i=0;i<5;i++){
            Runnable runn=new Runnable() {
                 public void run() {
                    Thread t=Thread.currentThread();
                        try {
                             System.out.println(t+":正在运行");
                             Thread.sleep(5000);
                             System.out.println(t+"运行结束");
                    } catch (Exception e) {
                         System.out.println("线程被中断了");
                    }
                 }
           };
        threadpoo1.execute(runn);
        System.out.println("指派了一个任务交给线程池");
        threadpoo1.shutdown();
        System.out.println("停止线程池了!");
    }
            
               

     

    自定义线程池

     

    corePoolSize:核心池的大小,这个参数跟后面讲述的线程池的实现原理有非常大的关系。在创建了线程池后,默认情况下,线程池中并没有任何线程,而是等待有任务到来才创建线程去执行任务,除非调用了prestartAllCoreThreads()或者prestartCoreThread()方法,从这2个方法的名字就可以看出,是预创建线程的意思,即在没有任务到来之前就创建corePoolSize个线程或者一个线程。默认情况下,在创建了线程池后,线程池中的线程数为0,当有任务来之后,就会创建一个线程去执行任务,当线程池中的线程数目达到corePoolSize后,就会把到达的任务放到缓存队列当中;

    maximumPoolSize:线程池最大线程数,这个参数也是一个非常重要的参数,它表示在线程池中最多能创建多少个线程;

    keepAliveTime:表示线程没有任务执行时最多保持多久时间会终止。默认情况下,只有当线程池中的线程数大于corePoolSize时,keepAliveTime才会起作用,直到线程池中的线程数不大于corePoolSize,即当线程池中的线程数大于corePoolSize时,如果一个线程空闲的时间达到keepAliveTime,则会终止,直到线程池中的线程数不超过corePoolSize。但是如果调用了allowCoreThreadTimeOut(boolean)方法,在线程池中的线程数不大于corePoolSize时,keepAliveTime参数也会起作用,直到线程池中的线程数为0;

    unit:参数keepAliveTime的时间单位

    workQueue:一个阻塞队列,用来存储等待执行的任务,这个参数的选择也很重要,会对线程池的运行过程产生重大影响,一般来说,这里的阻塞队列有以下几种选择:

    ArrayBlockingQueue;

    LinkedBlockingQueue;

    SynchronousQueue;

    threadFactory:线程工厂,主要用来创建线程;

    handler:表示当拒绝处理任务时的策略,默认有以下四种取值:

    ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出RejectedExecutionException异常。

    ThreadPoolExecutor.DiscardPolicy:也是丢弃任务,但是不抛出异常。

    ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程)

    ThreadPoolExecutor.CallerRunsPolicy:由调用线程处理该任务

     

    下面对自定义线程池进行代码演示:

    MyTask实例类

    package com.bjsxt.height.concurrent018;
    
    public class MyTask implements Runnable {
    
        private int taskId;
        private String taskName;
        
        public MyTask(int taskId, String taskName){
            this.taskId = taskId;
            this.taskName = taskName;
        }
        
        public int getTaskId() {
            return taskId;
        }
    
        public void setTaskId(int taskId) {
            this.taskId = taskId;
        }
    
        public String getTaskName() {
            return taskName;
        }
    
        public void setTaskName(String taskName) {
            this.taskName = taskName;
        }
    
        @Override
        public void run() {
            try {
                System.out.println("run taskId =" + this.taskId);
                Thread.sleep(5*1000);
                //System.out.println("end taskId =" + this.taskId);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }        
        }
        
        public String toString(){
            return Integer.toString(this.taskId);
        }
    
    }

    package
    com.bjsxt.height.concurrent018; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.ThreadPoolExecutor.DiscardOldestPolicy; import java.util.concurrent.TimeUnit; import java.util.concurrent.ThreadPoolExecutor.CallerRunsPolicy; public class UseThreadPoolExecutor1 { public static void main(String[] args) { /** * 在使用有界队列时,若有新的任务需要执行,如果线程池实际线程数小于corePoolSize,则优先创建线程, * 若大于corePoolSize,则会将任务加入队列, * 若队列已满,则在总线程数不大于maximumPoolSize的前提下,创建新的线程, * 若线程数大于maximumPoolSize,则执行拒绝策略。或其他自定义方式。 * */ ThreadPoolExecutor pool = new ThreadPoolExecutor( 1, //coreSize 2, //MaxSize 60, //60 TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(3) //指定一种队列 (有界队列) //new LinkedBlockingQueue<Runnable>() , new MyRejected() //, new DiscardOldestPolicy() ); MyTask mt1 = new MyTask(1, "任务1"); MyTask mt2 = new MyTask(2, "任务2"); MyTask mt3 = new MyTask(3, "任务3"); MyTask mt4 = new MyTask(4, "任务4"); MyTask mt5 = new MyTask(5, "任务5"); MyTask mt6 = new MyTask(6, "任务6"); pool.execute(mt1); pool.execute(mt2); pool.execute(mt3); pool.execute(mt4); pool.execute(mt5); pool.execute(mt6); pool.shutdown(); } }

    package com.bjsxt.height.concurrent018;
    
    import java.util.concurrent.ArrayBlockingQueue;
    import java.util.concurrent.BlockingQueue;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.LinkedBlockingQueue;
    import java.util.concurrent.ThreadPoolExecutor;
    import java.util.concurrent.TimeUnit;
    import java.util.concurrent.atomic.AtomicInteger;
    
    public class UseThreadPoolExecutor2 implements Runnable{
    
        private static AtomicInteger count = new AtomicInteger(0);
        
        @Override
        public void run() {
            try {
                int temp = count.incrementAndGet();
                System.out.println("任务" + temp);
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        
        public static void main(String[] args) throws Exception{
            //System.out.println(Runtime.getRuntime().availableProcessors());
            BlockingQueue<Runnable> queue = 
                    //new LinkedBlockingQueue<Runnable>();
                    new ArrayBlockingQueue<Runnable>(10);
            ExecutorService executor  = new ThreadPoolExecutor(
                        5,         //core
                        10,     //max
                        120L,     //2fenzhong
                        TimeUnit.SECONDS,
                        queue);
            
            for(int i = 0 ; i < 20; i++){
                executor.execute(new UseThreadPoolExecutor2());
            }
            Thread.sleep(1000);
            System.out.println("queue size:" + queue.size());        //10
            Thread.sleep(2000);
        }
    
    
    }

     

    自定义拒绝策略演示

    package com.bjsxt.height.concurrent018;
    
    import java.net.HttpURLConnection;
    import java.util.concurrent.RejectedExecutionHandler;
    import java.util.concurrent.ThreadPoolExecutor;
    
    public class MyRejected implements RejectedExecutionHandler{
    
        
        public MyRejected(){
        }
        
        @Override
        public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
            System.out.println("自定义处理..");
            System.out.println("当前被拒绝任务为:" + r.toString());
            
    
        }
    
    }
  • 相关阅读:
    韩式英语
    Daily dictation 听课笔记
    words with same pronunciation
    you will need to restart eclipse for the changes to take effect. would you like to restart now?
    glottal stop(britain fountain mountain)
    education 的发音
    第一次用Matlab 的lamada语句
    SVN的switch命令
    String的split
    SVN模型仓库中的资源从一个地方移动到另一个地方的办法(很久才解决)
  • 原文地址:https://www.cnblogs.com/lm970585581/p/6931640.html
Copyright © 2011-2022 走看看