zoukankan      html  css  js  c++  java
  • java 线程之executors线程池

    一、线程池的作用

      平时的业务中,如果要使用多线程,那么我们会在业务开始前创建线程,业务结束后,销毁线程。但是对于业务来说,线程的创建和销毁是与业务本身无关的,只关心线程所执行的任务。因此希望把尽可能多的cpu用在执行任务上面,而不是用在与业务无关的线程创建和销毁上面。而线程池则解决了这个问题。

      线程池的作用:线程池作用就是限制系统中执行线程的数量。根据系统的环境情况,可以自动或手动设置线程数量,达到运行的最佳效果,从而避免平凡的创建和销毁线程带来的系统开销也有效的规避了因为创建的线程过多而耗尽系统资源导致服务器宕机。使用Runtime.getRuntime().availableProcessors();设置线程数量。

    二、 java并发包提供的线程池 Executors类

      A、newFixedThreadPool  用来创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待

               ExecutorService fixedThreadPool =Executors.newFixedThreadPool(1);

     public static ExecutorService newFixedThreadPool(int nThreads) {
            //参数详解 第一个参数核心线程数,线程池在实例化的时候初始化时线程数
            //第二个:该线程池最大线程数
            //第三个:线程空闲时间(0L表示没有空闲时间即没有使用就会被回收)
            //第四个:空闲时间单位
            //第五个:LinkedBlockingQueue 无界队列 ,将没有线程处理的任务加入该队列中
            return new ThreadPoolExecutor(nThreads, nThreads,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>());
        }

      B、newSingleThreadExecutor 用来创建一个单线程化的线程池,它只用唯一的工作线程来执行任务,一次只支持一个,所有任务按照指定的顺序执行

        ExecutorService fixedThreadPool = Executors.newSingleThreadExecutor();

    public static ExecutorService newSingleThreadExecutor() {
            //参数详解 第一个参数核心线程数,线程池在实例化的时候初始化时线程数
            //第二个:该线程池最大线程数
            //第三个:线程空闲时间(0L表示没有空闲时间即没有使用就会被回收)
            //第四个:空闲时间单位
            //第五个:LinkedBlockingQueue 无解队列 ,将没有线程处理的任务加入该队列中
            return new FinalizableDelegatedExecutorService
                (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>()));
        }

      C、newCachedThreadPool  用来创建一个可缓存线程池,该线程池没有长度限制,对于新的任务,如果有空闲的线程,则使用空闲的线程执行,如果没有,则新建一个线程来执行任务。如果线程池长度超过处理需要,可灵活回收空闲线程。

    ExecutorService fixedThreadPool = Executors.newCachedThreadPool(); 

     public static ExecutorService newCachedThreadPool() {
            //参数详解 第一个参数核心线程数,线程池在实例化的时候初始化时线程数
            //第二个:Integer.MAX_VALUE 不限制该线程池的线程数
            //第三个:线程空闲时间(60L 表示线程空闲60秒之后被回收)
            //第四个:空闲时间单位 SECONDS 秒
            //第五个:SynchronousQueue 无容量队列 ,将任务直接提交给线程处理自身不存储任务
            return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>());
        }

      D、newScheduledThreadPool 用来创建一个定长线程池,并且支持定时和周期性的执行任务

        ScheduledExecutorService executorsScheduled=Executors.newScheduledThreadPool(2);

    public ScheduledThreadPoolExecutor(int corePoolSize) {
            //参数详解 第一个参数核心线程数,线程池在实例化的时候初始化时线程数
            //第二个:Integer.MAX_VALUE 不限制该线程池的线程数
            //第三个:线程空闲时间(0 表示线程空闲0秒之后被回收)
            //第四个:空闲时间单位 SECONDS 秒
            //第五个:DelayedWorkQueue 延时队列 
            super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,new DelayedWorkQueue());
        }

    使用 newScheduledThreadPool实现定时器

    package com.jalja.org.thread.executors;
    import java.util.concurrent.Executors;
    import java.util.concurrent.ScheduledExecutorService;
    import java.util.concurrent.ScheduledFuture;
    import java.util.concurrent.TimeUnit;
    public class NewScheduledThreadPoolTest {
        public static void main(String[] args) {
            Runnable runnable=new ScheduledThread();
            //实现定时器
            ScheduledExecutorService executorsScheduled=Executors.newScheduledThreadPool(2);
            //runnable 需要执行的任务  1:初始化时间(初始化延迟1秒后执行)  3:轮询时间(每隔3秒执行)
            //TimeUnit.SECONDS:时间单位
            ScheduledFuture<?> scheduledFuture= executorsScheduled.scheduleWithFixedDelay(runnable, 1,3, TimeUnit.SECONDS);
            System.out.println("scheduledFuture:"+scheduledFuture);
        }
    } 
    class  ScheduledThread implements Runnable{
        public void run() {
            System.out.println(Thread.currentThread().getName() +"=>开始");
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
            System.out.println(Thread.currentThread().getName() +"=>结束");
        }
    }

     线程池中提交任务的两种方式:

    execute()方法:该方法是 ExecutorService 接口的父类(接口)方法,该接口只有这一个方法。

    public interface Executor {
        void execute(Runnable command);
    }

    submit()方法:该方法是ExecutorService 接口的方法。

    public interface ExecutorService extends Executor {
      ...
      <T> Future<T> submit(Callable<T> task);
    
      <T> Future<T> submit(Runnable task, T result);
    
      Future<?> submit(Runnable task);
      ...
    }

     从上面的源码以及讲解可以总结execute()和submit()方法的区别:

      1. 接收的参数不一样;

      2. submit()有返回值,而execute()没有;

    三、自定义线程池

      在Java线程池中的newCachedThreadPool,newFixedThreadPool,newSingleThreadExecutor,newScheduledThreadPool这四个线程池在底层都是调用了ThreadPoolExecutor()这个构造方法。若Executors这个类无法满足我们的需求的时候,可以自己创建自定义的线程池。
    ThreadPoolExecutor类的定义如下

                    public ThreadPoolExecutor(int corePoolSize,//核心线程数--线程池初始化创建的线程数量  
                       int maximumPoolSize,//最大线程数,线程池中能创建的最大线程数  
                       long keepAliveTime,//线程存活时间  
                       TimeUnit unit,//线程存货时间单位  
                       BlockingQueue<Runnable> workQueue,//一个阻塞队列  
                       ThreadFactory threadFactory//拒绝策略  
                     ) {……}  

    自定义线程池使用有界队列(ArrayBlockingQueue 、LinkedBlockingQueue )

      若有新的任务需要执行,如果线程池实际线程数小于corePoolSize核心线程数的时候,则优先创建线程。若大于corePoolSize时,则会将多余的线程存放在队列中,若队列已满,且最请求线程小于maximumPoolSize的情况下,则自定义的线程池会创建新的线程,若队列已满,且最请求线程大于maximumPoolSize的情况下,则执行拒绝策略,或其他自定义方式。

    package com.jalja.org.thread.executors;
    
    import java.util.concurrent.LinkedBlockingQueue;
    import java.util.concurrent.ThreadPoolExecutor;
    import java.util.concurrent.TimeUnit;
    
    public class ExecutorsTest {
        public static void main(String[] args) {
            ThreadPoolExecutor test=new ThreadPoolExecutor(1, 2, 60, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(2));
            test.execute(new ExecutorsTest().new ThreadTest());
            test.execute(new ExecutorsTest().new ThreadTest());
            test.execute(new ExecutorsTest().new ThreadTest());
            test.execute(new ExecutorsTest().new ThreadTest());
            test.execute(new ExecutorsTest().new ThreadTest());
            test.shutdown();
        }
        class ThreadTest implements Runnable{
            public void run() {
                System.out.println(Thread.currentThread().getName());
                try {
                    Thread.sleep(5000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } 
            }
        }
    }

    结果:

    Exception in thread "main" java.util.concurrent.RejectedExecutionException: Task com.jalja.org.thread.executors.ExecutorsTest$ThreadTest@70dea4e rejected from java.util.concurrent.ThreadPoolExecutor@5c647e05[Running, pool size = 2, active threads = 2, queued tasks = 2, completed tasks = 0]
        at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2047)
        at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:823)
        at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1369)
        at com.jalja.org.thread.executors.ExecutorsTest.main(ExecutorsTest.java:17)
    pool-1-thread-1
    pool-1-thread-2
    pool-1-thread-1
    pool-1-thread-2

    看结果可知有一个任务是没有执行直接抛出异常的。队列已满,且最请求线程大于maximumPoolSize的情况下,则执行拒绝策略,这里使用的是——AbortPolicy:直接抛出异常,系统正常工作(默认的策略)。

    自定义线程池使用无界队列:

      对于无界队列除非系统资源耗尽,否则无界队列不存在任务入队失败的情况,若系统的线程数小于corePoolSize时,则新建线程执行corePoolSize,当达到corePoolSize后,则把多余的任务放入队列中等待执行若任务的创建和处理的速速差异很大,无界队列会保持快速增长,直到耗尽系统内存为之,对于无界队列的线程池maximumPoolSize并无真实用处。

    四、拒绝策略

    JDK提供策略:

    1.AbortPolicy:直接抛出异常,系统正常工作。(默认的策略)

    package com.jalja.org.thread.executors;
    
    import java.util.concurrent.BlockingQueue;
    import java.util.concurrent.LinkedBlockingQueue;
    import java.util.concurrent.RejectedExecutionException;
    import java.util.concurrent.ThreadPoolExecutor;
    import java.util.concurrent.TimeUnit;
    
    
    public class ExecutorsTest {
        public static void main(String[] args) {
            BlockingQueue<Runnable> f=new LinkedBlockingQueue<Runnable>(2);
            ThreadPoolExecutor test=new ThreadPoolExecutor(1, 2, 60, TimeUnit.SECONDS,f);
            try {
                test.execute(new ExecutorsTest().new ThreadTest());
                test.execute(new ExecutorsTest().new ThreadTest());
                test.execute(new ExecutorsTest().new ThreadTest());
                test.execute(new ExecutorsTest().new ThreadTest());
                test.execute(new ExecutorsTest().new ThreadTest());
            } catch (RejectedExecutionException e) {
                e.printStackTrace();
                System.out.println("超过有界队列的数据记录日志");
            }
            test.shutdown();
        }
        class ThreadTest implements Runnable{
            public void run() {
                System.out.println(Thread.currentThread().getName());
                try {
                    Thread.sleep(5000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } 
            }
        }
    }

    2.CallerRunsPolicy:只要线程池未关闭,该策略直接在调用者线程中执行,运行当前被丢弃的任务。
    3.DiscardOrderstPolicy:丢弃最老的请求,尝试再次提交当前任务。
    4.丢弃无法处理的任务,不给于任何处理。

    自定义策略:需要实现RejectedExecutionHandler接口

    package com.jalja.org.thread.executors;
    
    import java.util.concurrent.BlockingQueue;
    import java.util.concurrent.LinkedBlockingQueue;
    import java.util.concurrent.RejectedExecutionException;
    import java.util.concurrent.RejectedExecutionHandler;
    import java.util.concurrent.ThreadPoolExecutor;
    import java.util.concurrent.TimeUnit;
    
    
    public class ExecutorsTest {
        public static void main(String[] args) {
            BlockingQueue<Runnable> f=new LinkedBlockingQueue<Runnable>(2);
            ThreadPoolExecutor test=new ThreadPoolExecutor(1, 2, 60, TimeUnit.SECONDS,f, new MyRejected());
                test.execute(new ExecutorsTest().new ThreadTest());
                test.execute(new ExecutorsTest().new ThreadTest());
                test.execute(new ExecutorsTest().new ThreadTest());
                test.execute(new ExecutorsTest().new ThreadTest());
                test.execute(new ExecutorsTest().new ThreadTest());
                test.shutdown();
        }
        class ThreadTest implements Runnable{
            public void run() {
                System.out.println(Thread.currentThread().getName());
                try {
                    Thread.sleep(5000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } 
            }
        }
    }
    class MyRejected implements RejectedExecutionHandler{
        public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
            System.out.println("执行异常的任务加入日志");
        }
    }
  • 相关阅读:
    Go基础数据类型
    在foreach中使用distinct查找不重复记录
    DataTable,List去重复记录的方法(转载)
    ArcEngine的IMap接口(转载)
    根据Excel表格建立Shp文件(开发详解及源代码)(转载)
    axmapcontrol和mapcontrol有什么区别呢(转发)
    DataSet多表查询操作(转载)
    c#调用DLL(转载)
    wall 系列技术贴
    ArcEngine的IFeaturLayer接口(转载)
  • 原文地址:https://www.cnblogs.com/jalja/p/7243422.html
Copyright © 2011-2022 走看看