zoukankan      html  css  js  c++  java
  • 使用ExecutorService实现线程池

    ExecutorService是java提供的用于管理线程池的类。

    线程池的作用:

      - 控制线程数量

      - 重用线程

      当一个程序中创建了许多线程,并在任务结束后销毁,会给系统带来过度消耗资源,以及过度切换线程的危险,从而可能导致系统崩溃。为此我们应使用线程池来解决这个问题。

    线程池的概念:

      首先创建一些线程,它们的集合称为线程池,当服务器受到一个客户请求后,就从线程池中取出一个空闲的线程为之服务,服务完后不关闭该线程,而是将该线程还回到线程池中。

      在线程池编程模式下,任务是提交给整个线程池,而不是交给某个线程,线程池拿到任务就在内部找空闲的线程,再把任务交给内部的空闲线程,一个线程只能执行一个任务,但可以向线程池提交多个任务。

    线程池的主要实现方法:

    1、 Executors.newFixedThreadPool(int nThreads);

    说明:创建固定大小(nThreads, 大小不能超过int的最大值)的线程池

    // 线程数量  int nThreads = 20;

    // 创建executor 服务  : ExecutorService executor = Executors.newFixedThreadPool(nThreads) ;

    重载后的版本,需要传入实现了ThreadFactory接口的对象。

                  ExecutorService executor = Executors. newFixedThreadPool(nThreads, threadFactory);

     

    说明:创建固定大小(nThreads, 大小不能超过int的最大值) 的线程池,缓冲任务的队列为LinkedBlockingQueue,大小为整型的最大数,当使用此线程池时,在同执行的任务数量超过传入的线程池大小值后,将会放入LinkedBlockingQueue,在LinkedBlockingQueue中的任务需要等待线程空闲后再执行,如果放入LinkedBlockingQueue中的任务超过整型的最大数时,抛出RejectedExecutionException。

    2、Executors.newSingleThreadExecutor():创建大小为1的固定线程池。

    ExecutorService executor = Executors.newSingleThreadExecutor();

    重载后的版本,需要多传入实现了ThreadFactory接口的对象。

    ExecutorService executor = Executors. newSingleThreadScheduledExecutor(ThreadFactory threadFactory) 

    说明:创建大小为1的固定线程池,执行任务的线程只有一个,其它的(任务)task都放在LinkedBlockingQueue中排队等待执行。

    3、Executors.newCachedThreadPool();创建corePoolSize为0,最大线程数为整型的最大数,线程  keepAliveTime为1分钟,缓存任务的队列为SynchronousQueue的线程池。

    ExecutorService executor = Executors.newCachedThreadPool();

    当然也可以以下面的方式创建,重载后的版本,需要多传入实现了ThreadFactory接口的对象。

    ExecutorService executor = Executors.newCachedThreadPool(ThreadFactory threadFactory) ;

    说明:使用时,放入线程池的task任务会复用线程或启动新线程来执行,注意事项:启动的线程数如果超过整型最大值后会抛出RejectedExecutionException异常,启动后的线程存活时间为一分钟

    4、Executors.newScheduledThreadPool(int corePoolSize):创建corePoolSize大小的线程池。

    // 线程数量 int corePoolSize= 20;

    // 创建executor 服务 : ExecutorService executor = Executors.newScheduledThreadPool(corePoolSize) ;

      

    重载后的版本,需要多传入实现了ThreadFactory接口的对象。

    ExecutorService executor = Executors.newScheduledThreadPool(corePoolSize, threadFactory) ;

    说明:线程keepAliveTime为0,缓存任务的队列为DelayedWorkQueue,注意不要超过整型的最大值。

    这种线程池有些不同,它可以实现定时器执行任务的功能,下面对第四种线程池进行代码演示:

    import java.util.concurrent.Executors;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.ScheduledExecutorService;
    import java.util.concurrent.ScheduledFuture;
    import java.util.concurrent.TimeUnit;
    
    class Temp extends Thread {
        public void run() {
            System.out.println("run");
        }
    }
    
    public class ScheduledJob {
        
        public static void main(String args[]) throws Exception {
        
            Temp command = new Temp();
            ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
            //command代表执行的任务,5代表延迟5秒后开始执行,1代表每隔1秒执行一次,TimeUnit.SECONDS代表时间单位是秒
            ScheduledFuture<?> scheduleTask = scheduler.scheduleWithFixedDelay(command, 5, 1, TimeUnit.SECONDS);
        
        }
    }

      下面以一段代码演示一般情况线程池的使用: 

    public static void main(String[] args) {
         ExecutorService threadpoo1 = Executors.newFixedThreadPool(2);
         for(int i=0;i<5;i++){
            Runnable runn=new Runnable() {
                 public void run() {
                    Thread t=Thread.currentThread();
                        try {
                             System.out.println(t+":正在运行");
                             Thread.sleep(5000);
                             System.out.println(t+"运行结束");
                    } catch (Exception e) {
                         System.out.println("线程被中断了");
                    }
                 }
           };
        threadpoo1.execute(runn);
        System.out.println("指派了一个任务交给线程池");
        threadpoo1.shutdown();
        System.out.println("停止线程池了!");
    }
            
               

     

    自定义线程池

     

    corePoolSize:核心池的大小,这个参数跟后面讲述的线程池的实现原理有非常大的关系。在创建了线程池后,默认情况下,线程池中并没有任何线程,而是等待有任务到来才创建线程去执行任务,除非调用了prestartAllCoreThreads()或者prestartCoreThread()方法,从这2个方法的名字就可以看出,是预创建线程的意思,即在没有任务到来之前就创建corePoolSize个线程或者一个线程。默认情况下,在创建了线程池后,线程池中的线程数为0,当有任务来之后,就会创建一个线程去执行任务,当线程池中的线程数目达到corePoolSize后,就会把到达的任务放到缓存队列当中;

    maximumPoolSize:线程池最大线程数,这个参数也是一个非常重要的参数,它表示在线程池中最多能创建多少个线程;

    keepAliveTime:表示线程没有任务执行时最多保持多久时间会终止。默认情况下,只有当线程池中的线程数大于corePoolSize时,keepAliveTime才会起作用,直到线程池中的线程数不大于corePoolSize,即当线程池中的线程数大于corePoolSize时,如果一个线程空闲的时间达到keepAliveTime,则会终止,直到线程池中的线程数不超过corePoolSize。但是如果调用了allowCoreThreadTimeOut(boolean)方法,在线程池中的线程数不大于corePoolSize时,keepAliveTime参数也会起作用,直到线程池中的线程数为0;

    unit:参数keepAliveTime的时间单位

    workQueue:一个阻塞队列,用来存储等待执行的任务,这个参数的选择也很重要,会对线程池的运行过程产生重大影响,一般来说,这里的阻塞队列有以下几种选择:

    ArrayBlockingQueue;

    LinkedBlockingQueue;

    SynchronousQueue;

    threadFactory:线程工厂,主要用来创建线程;

    handler:表示当拒绝处理任务时的策略,默认有以下四种取值:

    ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出RejectedExecutionException异常。

    ThreadPoolExecutor.DiscardPolicy:也是丢弃任务,但是不抛出异常。

    ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程)

    ThreadPoolExecutor.CallerRunsPolicy:由调用线程处理该任务

     

    下面对自定义线程池进行代码演示:

    MyTask实例类

    package com.bjsxt.height.concurrent018;
    
    public class MyTask implements Runnable {
    
        private int taskId;
        private String taskName;
        
        public MyTask(int taskId, String taskName){
            this.taskId = taskId;
            this.taskName = taskName;
        }
        
        public int getTaskId() {
            return taskId;
        }
    
        public void setTaskId(int taskId) {
            this.taskId = taskId;
        }
    
        public String getTaskName() {
            return taskName;
        }
    
        public void setTaskName(String taskName) {
            this.taskName = taskName;
        }
    
        @Override
        public void run() {
            try {
                System.out.println("run taskId =" + this.taskId);
                Thread.sleep(5*1000);
                //System.out.println("end taskId =" + this.taskId);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }        
        }
        
        public String toString(){
            return Integer.toString(this.taskId);
        }
    
    }

    package
    com.bjsxt.height.concurrent018; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.ThreadPoolExecutor.DiscardOldestPolicy; import java.util.concurrent.TimeUnit; import java.util.concurrent.ThreadPoolExecutor.CallerRunsPolicy; public class UseThreadPoolExecutor1 { public static void main(String[] args) { /** * 在使用有界队列时,若有新的任务需要执行,如果线程池实际线程数小于corePoolSize,则优先创建线程, * 若大于corePoolSize,则会将任务加入队列, * 若队列已满,则在总线程数不大于maximumPoolSize的前提下,创建新的线程, * 若线程数大于maximumPoolSize,则执行拒绝策略。或其他自定义方式。 * */ ThreadPoolExecutor pool = new ThreadPoolExecutor( 1, //coreSize 2, //MaxSize 60, //60 TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(3) //指定一种队列 (有界队列) //new LinkedBlockingQueue<Runnable>() , new MyRejected() //, new DiscardOldestPolicy() ); MyTask mt1 = new MyTask(1, "任务1"); MyTask mt2 = new MyTask(2, "任务2"); MyTask mt3 = new MyTask(3, "任务3"); MyTask mt4 = new MyTask(4, "任务4"); MyTask mt5 = new MyTask(5, "任务5"); MyTask mt6 = new MyTask(6, "任务6"); pool.execute(mt1); pool.execute(mt2); pool.execute(mt3); pool.execute(mt4); pool.execute(mt5); pool.execute(mt6); pool.shutdown(); } }

    package com.bjsxt.height.concurrent018;
    
    import java.util.concurrent.ArrayBlockingQueue;
    import java.util.concurrent.BlockingQueue;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.LinkedBlockingQueue;
    import java.util.concurrent.ThreadPoolExecutor;
    import java.util.concurrent.TimeUnit;
    import java.util.concurrent.atomic.AtomicInteger;
    
    public class UseThreadPoolExecutor2 implements Runnable{
    
        private static AtomicInteger count = new AtomicInteger(0);
        
        @Override
        public void run() {
            try {
                int temp = count.incrementAndGet();
                System.out.println("任务" + temp);
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        
        public static void main(String[] args) throws Exception{
            //System.out.println(Runtime.getRuntime().availableProcessors());
            BlockingQueue<Runnable> queue = 
                    //new LinkedBlockingQueue<Runnable>();
                    new ArrayBlockingQueue<Runnable>(10);
            ExecutorService executor  = new ThreadPoolExecutor(
                        5,         //core
                        10,     //max
                        120L,     //2fenzhong
                        TimeUnit.SECONDS,
                        queue);
            
            for(int i = 0 ; i < 20; i++){
                executor.execute(new UseThreadPoolExecutor2());
            }
            Thread.sleep(1000);
            System.out.println("queue size:" + queue.size());        //10
            Thread.sleep(2000);
        }
    
    
    }

     

    自定义拒绝策略演示

    package com.bjsxt.height.concurrent018;
    
    import java.net.HttpURLConnection;
    import java.util.concurrent.RejectedExecutionHandler;
    import java.util.concurrent.ThreadPoolExecutor;
    
    public class MyRejected implements RejectedExecutionHandler{
    
        
        public MyRejected(){
        }
        
        @Override
        public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
            System.out.println("自定义处理..");
            System.out.println("当前被拒绝任务为:" + r.toString());
            
    
        }
    
    }
  • 相关阅读:
    安卓 日常问题 工作日志20
    安卓 日常问题 工作日志19
    安卓 日常问题 工作日志18
    安卓 日常问题 工作日志17
    安卓 日常问题 工作日志16
    对json进行排序处理
    Eclipse错误: 找不到或无法加载主类或项目无法编译10种解决大法!----------------------转载
    hibernate hql语句 投影查询的三种方式
    新的开始2018/6/8
    SSM搭建
  • 原文地址:https://www.cnblogs.com/lm970585581/p/6931640.html
Copyright © 2011-2022 走看看