zoukankan      html  css  js  c++  java
  • JDK多任务执行框架(Executor框架)

    Executor的常用方法

    为了更好的控制多线程,JDK提供了一套线程框架Executor,帮助开发人员有效地进行线程控制。它们都在java.util.concurrent包中,是JDK开发包的核心。其中有一个重要的类:Executors,他扮演这线程工厂的角色,我们通过Executors可以创建特定功能的线程池。

    1. newFixedThreadPool()方法,该方法返回一个固定数量的线程池,该方法的线程数始终不变,当有一个任务提交时,如线程池中有空闲,则立即执行,如没有,则会被暂缓在一个任务队列中等待有空闲的线程去执行。
    2. newSingleThreadExecutor()方法,创建一个线程的线程池,若空闲则执行,若没有空闲线程则暂缓在任务队列中。
    3. newCachedThreadPool()方法,返回一个可以根据实际情况调整线程个数的线程池,不限制最大线程数量,若有空闲的线程则执行任务,若无任务则不创建线程。并且每一个空闲线程会在60秒后自动收回。
    4.  newScheduleThreadPool()方法,返回一个ScheduledExecutorService对象,但该线程池可以指定线程的数量。

    查看它们的源码:

        public static ExecutorService newFixedThreadPool(int nThreads) {
            return new ThreadPoolExecutor(nThreads, nThreads,
                                          0L, TimeUnit.MILLISECONDS,
                                          new LinkedBlockingQueue<Runnable>());
        }
    
        public static ExecutorService newSingleThreadExecutor() {
            return new FinalizableDelegatedExecutorService
                (new ThreadPoolExecutor(1, 1,
                                        0L, TimeUnit.MILLISECONDS,
                                        new LinkedBlockingQueue<Runnable>()));
        }
    
         public static ExecutorService newCachedThreadPool() {
            return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                          60L, TimeUnit.SECONDS,
                                          new SynchronousQueue<Runnable>());
        }
    
        public static ScheduledExecutorService newScheduledThreadPool(
                int corePoolSize, ThreadFactory threadFactory) {
            return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory);
        }
        public ScheduledThreadPoolExecutor(int corePoolSize,
                                           ThreadFactory threadFactory) {
            //这里super是ThreadPoolExecutor类
            super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
                  new DelayedWorkQueue(), threadFactory);
        }
        
    View Code

    从源码看以看出它们都是由ThreadPoolExecutor构造出来的。

    ThreadPoolExecutor构造方法概述

    ThreadPoolExecutor(

      int corePoolSize,//核心线程数,线程池刚初始化的时候实例化线程个数

      int maximumPoolSize,//最大线程数

      long keepLongTime,//空闲时间,过时回收

      TimeUnit unit,//时间单位

      BlockingQueue<Runable> worker,//线程暂缓处

      ThreadFactory threadFactory,

      RejectExecuteHandle handle//拒绝执行的方法

    )

    所以上面几个方法主要是根据不同的参数来执行不同的行为。其中newScheduleThreadPool方法稍复杂一点,它的方法值是ScheduledExecutorService。

    ScheduledExecutorService主要有两个方法scheduleWithFixedDelay,scheduleAtFixedRate

    1、scheduleAtFixedRate 方法,以固定的频率来执行某项计划(任务),即固定的频率来执行某项计划,它不受计划执行时间的影响。到时间就执行。 它不受计划执行时间的影响。
    2、scheduleWithFixedDealy,相对固定的延迟后,执行某项计划(任务), 即无论某个任务执行多长时间,等执行完了,我再延迟指定的时间去执行。它受计划执行时间的影响。

    class Temp extends Thread {
        public void run() {
            try {
                System.out.println( new Date().toString() + "..run start");
                Thread.sleep(3000);
                System.out.println( new Date().toString() + "...run end");
            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }
    }
    
    public class ScheduledJob {
        
        public static void main(String args[]) throws Exception {
        
            Temp command = new Temp();
            ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
            System.out.println(new Date().toString() + "...start");
            //scheduler.scheduleWithFixedDelay(command, 5, 2, TimeUnit.SECONDS);
            scheduler.scheduleAtFixedRate(command, 5, 2, TimeUnit.SECONDS);
        }
    }
    View Code

    scheduleAtFixedRate 的执行结果:run方法延迟了5秒执行,run方法执行了3秒,已经超过了我们指定的间隔2秒,所以它在第一个run方法执行完后,立即执行了第二个run方法

    Tue Feb 06 15:48:01 CST 2018...start
    Tue Feb 06 15:48:06 CST 2018..run start
    Tue Feb 06 15:48:09 CST 2018...run end
    Tue Feb 06 15:48:09 CST 2018..run start
    Tue Feb 06 15:48:12 CST 2018...run end
    Tue Feb 06 15:48:12 CST 2018..run start

    scheduleWithFixedDealy的执行结果:run方法延迟了5秒执行,run方法执行了3秒,已经超过了我们指定的间隔2秒,但是它还是等了2秒后,再执行了第二个run方法

    Tue Feb 06 15:53:04 CST 2018...start
    Tue Feb 06 15:53:10 CST 2018..run start
    Tue Feb 06 15:53:13 CST 2018...run end
    Tue Feb 06 15:53:15 CST 2018..run start
    Tue Feb 06 15:53:18 CST 2018...run end
    Tue Feb 06 15:53:20 CST 2018..run start
    Tue Feb 06 15:53:23 CST 2018...run end

    自定义线程池

    如果Executors工厂类无法满足我们的需求,可以自己去创建线程池。在自己创建线程池时,这个构造方法对于队列是什么类型比较关键:

    使用有届队列(ArrayBlockingQueue):若有新的任务需要执行时,如果线程池实际线程数小于corePoolSize,则优先创建线程,若大于corePoolSize,则会将任务加入队列中等待执行,若队列已满,则在总线程数不大于maximumPoolSize的前提下,创建行的线程,诺线程数大于maximumPoolSize,则执行拒绝策略。
    使用无界队列(LinkedBlockingQueue):除非系统资源耗尽,否则无界的任务队列不存在入队失败的情况。当有新任务来时,系统的线程数小于corePoolSize时,则新建线程执行任务,当达到corePoolSize后,就不会继续增加,若后续还有新的任务加入,而没有空闲的线程资源,则任务直接进入队列等待。若任务创建和处理的速度差异很大时,无界队列会保持快速增长,直到耗尽系统内存。注意:maximumPoolSize在无界队列时没有作用
    JDK拒绝策略:
    AbortPolicy:直接抛出异常组织系统正常工作 CallerRunsPolicy:只要线程池未关闭,该策略直接在调用者线程中,运行当前被丢弃的任务
    DiscardOldestPolicy:丢弃最老的一个请求,尝试再次提交当前任务
    DiscardPolicy:丢弃无法处理的任务,不做任何处理。
    如果需要自定义拒绝策略可以实现RejectedExecutionHandler接口

    有届队列的例子

    public class UseThreadPoolExecutor1 {
        public static void main(String[] args) {
            /**
             * 在使用有界队列时,若有新的任务需要执行,如果线程池实际线程数小于corePoolSize,则优先创建线程,
             * 若大于corePoolSize,则会将任务加入队列,
             * 若队列已满,则在总线程数不大于maximumPoolSize的前提下,创建新的线程,
             * 若线程数大于maximumPoolSize,则执行拒绝策略。或其他自定义方式。
             * 
             */    
            ThreadPoolExecutor pool = new ThreadPoolExecutor(
                    1,                 //coreSize
                    2,                 //MaxSize
                    60,             //60
                    TimeUnit.SECONDS, 
                    new ArrayBlockingQueue<Runnable>(3)            //指定一种队列 (有界队列)
                    //new LinkedBlockingQueue<Runnable>()
    //                , new MyRejected()
                    //, new DiscardOldestPolicy()
                    );
            
            MyTask mt1 = new MyTask(1, "任务1");
            MyTask mt2 = new MyTask(2, "任务2");
            MyTask mt3 = new MyTask(3, "任务3");
            MyTask mt4 = new MyTask(4, "任务4");
            MyTask mt5 = new MyTask(5, "任务5");
            MyTask mt6 = new MyTask(6, "任务6");
            
            pool.execute(mt1);
            pool.execute(mt2);
            pool.execute(mt3);
            pool.execute(mt4);
            pool.execute(mt5);
            pool.execute(mt6);
            
            pool.shutdown();
            
        }
    }
    View Code

    只执行前四个任务(把任务5,6先不执行)

    run taskId =1
    run taskId =2
    run taskId =3
    run taskId =4

    它是一个一个执行的,原因是第一个线程来的时候,立即执行了(coreSize=1),当后面三个线程来的时候会被加入到queue中(new ArrayBlockingQueue<Runnable>(3))

    只执行前五个任务

    run taskId =1
    run taskId =5
    run taskId =2
    run taskId =3
    run taskId =4

    它是两个两个执行的,原因是第五个任务来的时候已经超出了queue的size(3),但是总线程数有小于MaxSize(2),所以任务5和任务1一起执行了,之后会从queue中拿两个再执行

    执行留个任务

    run taskId =1
    Exception in thread "main" run taskId =5
    java.util.concurrent.RejectedExecutionException: Task 6 rejected from java.util.concurrent.ThreadPoolExecutor@4e25154f[Running, pool size = 2, active threads = 2, queued tasks = 3, completed tasks = 0]
        at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2047)
        at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:823)
        at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1369)
        at com.bjsxt.height.concurrent018.UseThreadPoolExecutor1.main(UseThreadPoolExecutor1.java:46)
    run taskId =2
    run taskId =3
    run taskId =4

    任务6来的时候,queue已经满了,总线程数也满了,这是会执行拒绝策略,JDK默认的是AbortPolicy,直接抛出异常,我们也可以改,就想代码里注释掉的一样

    无界队列的例子

    public class UseThreadPoolExecutor2 implements Runnable{
    
        private static AtomicInteger count = new AtomicInteger(0);
        
        @Override
        public void run() {
            try {
                int temp = count.incrementAndGet();
                System.out.println("任务" + temp);
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        
        public static void main(String[] args) throws Exception{
            //System.out.println(Runtime.getRuntime().availableProcessors());
            BlockingQueue<Runnable> queue = 
                    new LinkedBlockingQueue<Runnable>();
            ExecutorService executor  = new ThreadPoolExecutor(
                        5,         //core
                        10,     //max
                        120L,     //2fenzhong
                        TimeUnit.SECONDS,
                        queue);
            
            for(int i = 0 ; i < 20; i++){
                executor.execute(new UseThreadPoolExecutor2());
            }
            Thread.sleep(1000);
            System.out.println("queue size:" + queue.size());        //10
            Thread.sleep(2000);
        }
    }
    View Code

    执行结果:

    任务1
    任务2
    任务3
    任务4
    任务5
    queue size:15
    任务6
    任务7
    任务9
    任务8
    任务10
    任务11
    任务12
    任务13
    任务14
    任务15
    任务16
    任务17
    任务19
    任务20
    任务18

    任务是5个5个执行的,而且把剩余的任务全部加到queue中。第二个参数没有作用,不会创建10个线程,如果是有届队列,就会起作用。

    Task的代码:

    public class MyTask implements Runnable {
    
        private int taskId;
        private String taskName;
        
        public MyTask(int taskId, String taskName){
            this.taskId = taskId;
            this.taskName = taskName;
        }
        
        public int getTaskId() {
            return taskId;
        }
    
        public void setTaskId(int taskId) {
            this.taskId = taskId;
        }
    
        public String getTaskName() {
            return taskName;
        }
    
        public void setTaskName(String taskName) {
            this.taskName = taskName;
        }
    
        @Override
        public void run() {
            try {
                System.out.println("run taskId =" + this.taskId);
                Thread.sleep(5*1000);
                //System.out.println("end taskId =" + this.taskId);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }        
        }
        
        public String toString(){
            return Integer.toString(this.taskId);
        }
    
    }
    View Code

     MyRejected的代码:

    public class MyRejected implements RejectedExecutionHandler{
    
        
        public MyRejected(){
        }
        
        @Override
        public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
            System.out.println("自定义处理..");
            System.out.println("当前被拒绝任务为:" + r.toString());
        }
    
    }
    View Code
  • 相关阅读:
    django中使用celery
    django中使用Redis
    Nginx编译和安装(超简单版)
    cookie和session
    forms组件
    反向解析(reverse())
    QuerySet对象
    models.py里的字段以及参数详解
    Q查询和F查询
    JgrId 无数据返回设置
  • 原文地址:https://www.cnblogs.com/lostyears/p/8422994.html
Copyright © 2011-2022 走看看