zoukankan      html  css  js  c++  java
  • 线程池中的饱和策略

    ThreadPoolExecutor允许提供一个BlockingQueue来保存等待执行的任务。

    查看结构图

     

       我们需要关注的方法是offer(E),put(E),take()

    newFixedThreadPool和newSingleThreadExecutor在默认情况下将使用一个无界的队列(LinkedBlockingQueue),如果所有线程都在执行任务,那么任务将在队列中等待,如果任务到达的速度大于线程执行的速度,造成的后果将是队列无限期增加。

    更稳妥的管理策略是使用有界队列,如:ArrayBlockingQueue,有界的LinkedBlockingQueue,PriorityBlockingQueue.有界队列避免了资源耗尽的情况,但出现一个问题,队列填满后,新的任务该怎么办?使用拒绝策略。

       JDK提供了几种不同的RejectedExecutionHandler实现,每种都是不同的饱和策略:AbortPolicy,CallerRunsPolicy,DiscardPolicy和DiscardOldestPolicy.

    AbortPolicy 当任务添加到线程池中被拒绝时,它将抛出 RejectedExecutionException 异常。
    如下代码:
     1 package cn.concurrent.executor;
     2 
     3 import java.util.concurrent.*;
     4 
     5 /**
     6  * Created by spark on 17-9-24.
     7  */
     8 public class AbortPolicyDemo {
     9 
    10     public static void main(String[] args) {
    11         //初始化一个初始化容量大小为1,阻塞队列容量为1,maxmumPoolSize大小为1的线程池
    12         ThreadPoolExecutor pool = new ThreadPoolExecutor(1, 1,
    13                 0, TimeUnit.SECONDS, new LinkedBlockingQueue<>(1));
    14         //设置饱和策略为AbortPolicy---拒绝策略/**/,用户可以捕获这个异常
    15         pool.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());
    16         //创建线程执行
    17         for (int i = 0; i < 5; i++) {
    18             MyRunnable myRunnable = new MyRunnable();
    19             pool.execute(myRunnable);
    20         }
    21         pool.shutdown();
    22     }
    23 
    24 
    25     static class MyRunnable implements Runnable {
    26         @Override
    27         public void run() {
    28             System.err.println(Thread.currentThread().getId() + ":正在执行");
    29             try {
    30                 Thread.sleep(300);
    31             } catch (InterruptedException e) {
    32                 e.printStackTrace();
    33             }
    34         }
    35     }
    36 }
    
    

    结果如下:

    Exception in thread "main" java.util.concurrent.RejectedExecutionException: Task cn.concurrent.executor.AbortPolicyDemo$MyRunnable@1d44bcfa rejected from java.util.concurrent.ThreadPoolExecutor@266474c2[Running, pool size = 1, active threads = 1, queued tasks = 1, completed tasks = 0]
        at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2066)
        at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830)
        at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1379)
        at cn.concurrent.executor.AbortPolicyDemo.main(AbortPolicyDemo.java:19)
    10:正在执行
    10:正在执行

    抛出了RejectedExecutionException,由于饱和策略引起的。

    如果修改代码如下:
     1 package cn.concurrent.executor;
     2 
     3 import java.util.concurrent.*;
     4 
     5 /**
     6  * Created by spark on 17-9-24.
     7  */
     8 public class AbortPolicyDemo {
     9 
    10     public static void main(String[] args) {
    11         //初始化一个初始化容量大小为1,阻塞队列容量为1,maxmumPoolSize大小为1的线程池
    12         ThreadPoolExecutor pool = new ThreadPoolExecutor(1, 1,
    13                 0, TimeUnit.SECONDS, new LinkedBlockingQueue<>(1));
    14         //设置饱和策略为AbortPolicy---拒绝策略/**/,用户可以捕获这个异常
    15         pool.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());
    16         //创建线程执行
    17         for (int i = 0; i < 5; i++) {
    18             try {
    19                 Thread.sleep(500);
    20             } catch (InterruptedException e) {
    21                 e.printStackTrace();
    22             }
    23             MyRunnable myRunnable = new MyRunnable();
    24             pool.execute(myRunnable);
    25         }
    26         pool.shutdown();
    27     }
    28 
    29 
    30     static class MyRunnable implements Runnable {
    31         @Override
    32         public void run() {
    33             System.err.println(Thread.currentThread().getId() + ":正在执行");
    34             try {
    35                 Thread.sleep(300);
    36             } catch (InterruptedException e) {
    37                 e.printStackTrace();
    38             }
    39         }
    40     }
    41 }
    
    

    执行结果为:

    1 /usr/lib/jvm/java-1.8.0-openjdk-amd64/bin/java -javaagent:/usr/local/idea/lib/idea_rt.jar=43205:/usr/local/idea/bin -Dfile.encoding=UTF-8 -classpath /usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/charsets.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/ext/cldrdata.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/ext/dnsns.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/ext/icedtea-sound.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/ext/jaccess.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/ext/localedata.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/ext/nashorn.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/ext/sunec.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/ext/sunjce_provider.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/ext/sunpkcs11.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/ext/zipfs.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/jce.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/jsse.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/management-agent.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/resources.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/rt.jar:/home/spark/IdeaProjects/thread/thread/target/classes:/home/spark/.m2/repository/com/lmax/disruptor/3.3.2/disruptor-3.3.2.jar cn.concurrent.executor.AbortPolicyDemo
    2 10:正在执行
    3 10:正在执行
    4 10:正在执行
    5 10:正在执行
    6 10:正在执行
    7 
    8 Process finished with exit code 0

    原因是:正好每一个线程都有足够的时间执行,因此有界阻塞队列不会填满,程序能够正常运行。

    
    
    DiscardOldestPolicy  当任务添加到线程池中被拒绝时,线程池会放弃等待队列中最旧的未处理任务(抛弃下一个将被执行的任务),然后将被拒绝的任务添加到等待队列中,如果队列是一个优先队列,那么抛弃
    最旧的策略就会抛弃优先级最高的任务,因此不要将两者在一起使用。
    如下代码:
     1 package cn.concurrent.executor;
     2 
     3 import java.util.concurrent.ArrayBlockingQueue;
     4 import java.util.concurrent.ThreadPoolExecutor;
     5 import java.util.concurrent.TimeUnit;
     6 
     7 /**
     8  * Created by spark on 17-9-24.
     9  */
    10 public class DiscardOledesrPolicy {
    11 
    12     public static void main(String[] args) {
    13 
    14         ThreadPoolExecutor pool = new ThreadPoolExecutor(1, 1, 0, TimeUnit.SECONDS,
    15                 new ArrayBlockingQueue<Runnable>(1));
    16         //设置饱和策略为DiscardOledestPolicy
    17         pool.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardOldestPolicy());
    18 
    19 
    20         for (int i = 0; i < 6; i++) {
    21             MyRunnable myRunnable = new MyRunnable("this is " + i + " task");
    22             pool.submit(myRunnable);
    23         }
    24         pool.shutdown();
    25     }
    26 
    27     static class MyRunnable implements Runnable {
    28 
    29         private String name;
    30 
    31         public MyRunnable(String name) {
    32             this.name = name;
    33         }
    34 
    35         @Override
    36         public void run() {
    37             System.err.println(this.name + ": is running.");
    38             try {
    39                 Thread.sleep(300);
    40             } catch (InterruptedException e) {
    41                 e.printStackTrace();
    42             }
    43         }
    44     }
    45 }
    
    

    结果如下:

    /usr/lib/jvm/java-1.8.0-openjdk-amd64/bin/java -javaagent:/usr/local/idea/lib/idea_rt.jar=33258:/usr/local/idea/bin -Dfile.encoding=UTF-8 -classpath /usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/charsets.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/ext/cldrdata.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/ext/dnsns.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/ext/icedtea-sound.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/ext/jaccess.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/ext/localedata.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/ext/nashorn.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/ext/sunec.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/ext/sunjce_provider.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/ext/sunpkcs11.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/ext/zipfs.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/jce.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/jsse.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/management-agent.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/resources.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/rt.jar:/home/spark/IdeaProjects/thread/thread/target/classes:/home/spark/.m2/repository/com/lmax/disruptor/3.3.2/disruptor-3.3.2.jar cn.concurrent.executor.DiscardOledesrPolicy
    this is 0 task: is running.
    this is 5 task: is running.
    
    Process finished with exit code 0

    从结果中可以看到,1,2,3,4都被丢弃了。

    
    
    DiscardPolicy       该策略默默地丢弃无法处理的任务,不予任何处理
    代码如下:
     1 package cn.concurrent.executor;
     2 
     3 import java.util.concurrent.ArrayBlockingQueue;
     4 import java.util.concurrent.PriorityBlockingQueue;
     5 import java.util.concurrent.ThreadPoolExecutor;
     6 import java.util.concurrent.TimeUnit;
     7 
     8 /**
     9  * Created by spark on 17-9-24.
    10  */
    11 public class DiscardPolicy {
    12 
    13     public static void main(String[] args) {
    14         ThreadPoolExecutor pool=new ThreadPoolExecutor(1,1,0, TimeUnit.SECONDS,
    15                 new ArrayBlockingQueue<Runnable>(1));
    16         //添加饱和策略为丢弃策略
    17         pool.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy());
    18         for (int i = 0; i < 6; i++) {
    19             MyRunnable myRunnable = new MyRunnable("this is " + i + " task");
    20             pool.submit(myRunnable);
    21         }
    22         pool.shutdown();
    23     }
    24     static class MyRunnable implements Runnable {
    25 
    26         private String name;
    27 
    28         public MyRunnable(String name) {
    29             this.name = name;
    30         }
    31 
    32         @Override
    33         public void run() {
    34             System.err.println(this.name + ": is running.");
    35             try {
    36                 Thread.sleep(300);
    37             } catch (InterruptedException e) {
    38                 e.printStackTrace();
    39             }
    40         }
    41     }
    42 }
    
    

     结果如下:

    1 /usr/lib/jvm/java-1.8.0-openjdk-amd64/bin/java -javaagent:/usr/local/idea/lib/idea_rt.jar=41981:/usr/local/idea/bin -Dfile.encoding=UTF-8 -classpath /usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/charsets.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/ext/cldrdata.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/ext/dnsns.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/ext/icedtea-sound.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/ext/jaccess.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/ext/localedata.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/ext/nashorn.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/ext/sunec.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/ext/sunjce_provider.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/ext/sunpkcs11.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/ext/zipfs.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/jce.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/jsse.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/management-agent.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/resources.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/rt.jar:/home/spark/IdeaProjects/thread/thread/target/classes:/home/spark/.m2/repository/com/lmax/disruptor/3.3.2/disruptor-3.3.2.jar cn.concurrent.executor.DiscardPolicy
    2 this is 0 task: is running.
    3 this is 1 task: is running.
    4 
    5 Process finished with exit code 0

     从结果可以看出,2,3,4,5任务都被丢弃了。

    线程池pool的"最大池大小"和"核心池大小"都为1(THREADS_SIZE),这意味着"线程池能同时运行的任务数量最大只能是1"。
    线程池pool的阻塞队列是ArrayBlockingQueue,ArrayBlockingQueue是一个有界的阻塞队列,ArrayBlockingQueue的容量为1。这也意味着线程池的阻塞队列只能有一个线程池阻塞等待。根据""中分析的execute()代码可知:线程池中共运行了2个任务。第1个任务直接放到Worker中,通过线程去执行;第2个任务放到阻塞队列中等待。其他的任务都被丢弃了!

    修改线程池中的队列为PriorityBlockingQueue看看结果

    
    
     1 package cn.concurrent.executor;
     2 
     3 import java.util.concurrent.ArrayBlockingQueue;
     4 import java.util.concurrent.PriorityBlockingQueue;
     5 import java.util.concurrent.ThreadPoolExecutor;
     6 import java.util.concurrent.TimeUnit;
     7 
     8 /**
     9  * Created by spark on 17-9-24.
    10  * 13  */
    14 public class DiscardPolicy2 {
    15 
    16     public static void main(String[] args) {
    17         ThreadPoolExecutor pool=new ThreadPoolExecutor(1,1,0, TimeUnit.SECONDS,
    18                 new PriorityBlockingQueue<>(1));
    19         //添加饱和策略为丢弃策略
    20         pool.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy());
    21         for (int i = 0; i < 6; i++) {
    22             MyRunnable myRunnable = new MyRunnable("this is " + i + " task");
    23             pool.submit(myRunnable);
    24         }
    25         pool.shutdown();
    26     }
    27     static class MyRunnable implements Runnable {
    28 
    29         private String name;
    30 
    31         public MyRunnable(String name) {
    32             this.name = name;
    33         }
    34 
    35         @Override
    36         public void run() {
    37             System.err.println(this.name + ": is running.");
    38             try {
    39                 Thread.sleep(300);
    40             } catch (InterruptedException e) {
    41                 e.printStackTrace();
    42             }
    43         }
    44     }
    45 }

    结果如下:
    /usr/lib/jvm/java-1.8.0-openjdk-amd64/bin/java -javaagent:/usr/local/idea/lib/idea_rt.jar=42797:/usr/local/idea/bin -Dfile.encoding=UTF-8 -classpath /usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/charsets.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/ext/cldrdata.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/ext/dnsns.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/ext/icedtea-sound.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/ext/jaccess.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/ext/localedata.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/ext/nashorn.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/ext/sunec.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/ext/sunjce_provider.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/ext/sunpkcs11.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/ext/zipfs.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/jce.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/jsse.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/management-agent.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/resources.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/rt.jar:/home/spark/IdeaProjects/thread/thread/target/classes:/home/spark/.m2/repository/com/lmax/disruptor/3.3.2/disruptor-3.3.2.jar cn.concurrent.executor.DiscardPolicy2
    Exception in thread "main" java.lang.ClassCastException: java.util.concurrent.FutureTask cannot be cast to java.lang.Comparable
        at java.util.concurrent.PriorityBlockingQueue.siftUpComparable(PriorityBlockingQueue.java:357)
        at java.util.concurrent.PriorityBlockingQueue.offer(PriorityBlockingQueue.java:489)
        at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1371)
        at java.util.concurrent.AbstractExecutorService.submit(AbstractExecutorService.java:112)
        at cn.concurrent.executor.DiscardPolicy2.main(DiscardPolicy2.java:23)
    this is 0 task: is running.
    
    

    报错的原因是因为我们的任务没有优先级,因此应该实现Comparaable接口再看看

     1 package cn.concurrent.executor;
     2 
     3 import java.util.concurrent.ArrayBlockingQueue;
     4 import java.util.concurrent.PriorityBlockingQueue;
     5 import java.util.concurrent.ThreadPoolExecutor;
     6 import java.util.concurrent.TimeUnit;
     7 
     8 /**
     9  * Created by spark on 17-9-24.
    10  * 线程池pool的"最大池大小"和"核心池大小"都为1(THREADS_SIZE),这意味着"线程池能同时运行的任务数量最大只能是1"。
    11  * 线程池pool的阻塞队列是ArrayBlockingQueue,ArrayBlockingQueue是一个有界的阻塞队列,ArrayBlockingQueue的容量为1。这也意味着线程池的阻塞队列只能有一个线程池阻塞等待。
    12  * 根据""中分析的execute()代码可知:线程池中共运行了2个任务。第1个任务直接放到Worker中,通过线程去执行;第2个任务放到阻塞队列中等待。其他的任务都被丢弃了!
    13  */
    14 public class DiscardPolicy2 {
    15 
    16     public static void main(String[] args) {
    17         ThreadPoolExecutor pool = new ThreadPoolExecutor(1, 1, 0, TimeUnit.SECONDS,
    18                 new PriorityBlockingQueue<>(1));
    19         //添加饱和策略为丢弃策略
    20         pool.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy());
    21         for (int i = 0; i < 6; i++) {
    22             MyRunnable myRunnable = new MyRunnable("this is " + i + " task", i);
    23             pool.execute(myRunnable);
    24         }
    25         pool.shutdown();
    26     }
    27 
    28     static class MyRunnable implements Runnable, Comparable {
    29 
    30         private String name;
    31         private int num;
    32 
    33         public MyRunnable(String name, int num) {
    34             this.name = name;
    35             this.num = num;
    36         }
    37 
    38         public MyRunnable(String name) {
    39             this.name = name;
    40         }
    41 
    42         @Override
    43         public void run() {
    44             System.err.println(this.name + ": is running.");
    45             try {
    46                 Thread.sleep(300);
    47             } catch (InterruptedException e) {
    48                 e.printStackTrace();
    49             }
    50         }
    51 
    52         @Override
    53         public int compareTo(Object o) {
    54             return 0;
    55         }
    56     }
    57 }

    结果:

    /usr/lib/jvm/java-1.8.0-openjdk-amd64/bin/java -agentlib:jdwp=transport=dt_socket,address=127.0.0.1:41534,suspend=y,server=n -Dfile.encoding=UTF-8 -classpath /usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/charsets.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/ext/cldrdata.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/ext/dnsns.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/ext/icedtea-sound.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/ext/jaccess.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/ext/localedata.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/ext/nashorn.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/ext/sunec.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/ext/sunjce_provider.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/ext/sunpkcs11.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/ext/zipfs.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/jce.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/jsse.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/management-agent.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/resources.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/rt.jar:/home/spark/IdeaProjects/thread/thread/target/classes:/home/spark/.m2/repository/com/lmax/disruptor/3.3.2/disruptor-3.3.2.jar:/usr/local/idea/lib/idea_rt.jar cn.concurrent.executor.DiscardPolicy2
    Connected to the target VM, address: '127.0.0.1:41534', transport: 'socket'
    this is 0 task: is running.
    this is 1 task: is running.
    this is 5 task: is running.
    this is 4 task: is running.
    this is 3 task: is running.
    this is 2 task: is running.
    Disconnected from the target VM, address: '127.0.0.1:41534', transport: 'socket'
    
    Process finished with exit code 0

     这里使用execute没有使用submit是因为submit返回的结果为FutureTask,这个类没有实现Comparable

    
    
    CallerRunsPolicy       该策略只要线程池未关闭,该策略直接在调用者线程中,运行当前被丢弃的任务(白话就是不会抛弃线程,也不抛出异常,而是将任务回退到调用者,
    从而降低新任务的流量),这样会影响QPS。
    代码如下:
     1 package cn.concurrent.executor;
     2 
     3 import java.util.concurrent.ArrayBlockingQueue;
     4 import java.util.concurrent.ThreadPoolExecutor;
     5 import java.util.concurrent.TimeUnit;
     6 
     7 /**
     8  * Created by spark on 17-9-24.
     9  */
    10 public class CallerRunsPolicyDemo {
    11     public static void main(String[] args) {
    12         ThreadPoolExecutor pool=new ThreadPoolExecutor(1,1,0, TimeUnit.SECONDS,
    13                 new ArrayBlockingQueue<Runnable>(1));
    14         //添加饱和策略为丢弃策略
    15         pool.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
    16         for (int i = 0; i < 6; i++) {
    17             MyRunnable myRunnable = new MyRunnable("this is " + i + " task");
    18             pool.submit(myRunnable);
    19         }
    20         pool.shutdown();
    21     }
    22     static class MyRunnable implements Runnable {
    23 
    24         private String name;
    25 
    26         public MyRunnable(String name) {
    27             this.name = name;
    28         }
    29 
    30         @Override
    31         public void run() {
    32             System.err.println(this.name + ": is running.");
    33             try {
    34                 Thread.sleep(300);
    35             } catch (InterruptedException e) {
    36                 e.printStackTrace();
    37             }
    38         }
    39     }
    40 }
    
    

    结果如下:

    /usr/lib/jvm/java-1.8.0-openjdk-amd64/bin/java -javaagent:/usr/local/idea/lib/idea_rt.jar=40487:/usr/local/idea/bin -Dfile.encoding=UTF-8 -classpath /usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/charsets.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/ext/cldrdata.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/ext/dnsns.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/ext/icedtea-sound.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/ext/jaccess.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/ext/localedata.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/ext/nashorn.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/ext/sunec.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/ext/sunjce_provider.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/ext/sunpkcs11.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/ext/zipfs.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/jce.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/jsse.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/management-agent.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/resources.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/rt.jar:/home/spark/IdeaProjects/thread/thread/target/classes:/home/spark/.m2/repository/com/lmax/disruptor/3.3.2/disruptor-3.3.2.jar cn.concurrent.executor.CallerRunsPolicyDemo
    this is 2 task: is running.
    this is 0 task: is running.
    this is 3 task: is running.
    this is 1 task: is running.
    this is 5 task: is running.
    this is 4 task: is running.
    
    Process finished with exit code 0
    我们还可以自定义饱和策略:如下:
     1 package cn.concurrent;
     2 
     3 import java.util.concurrent.*;
     4 
     5 /**
     6  * Created by spark on 17-9-3.
     7  * 主要演示线程池的拒绝策略实现的接口RejectedExecutionHandler
     8  */
     9 public class RejectThreadPoolDemo {
    10 
    11     public static class MyTask implements Runnable {
    12         @Override
    13         public void run() {
    14             System.out.println(System.currentTimeMillis() + ":Thread ID:" + Thread.currentThread().getId());
    15             try {
    16                 Thread.sleep(100);
    17             } catch (InterruptedException e) {
    18                 e.printStackTrace();
    19             }
    20         }
    21     }
    22 
    23     //实现RejectExecutionHandler
    24     public static void main(String[] args) throws InterruptedException {
    25         MyTask myTask = new MyTask();
    26         //创建一个线程池
    27         ExecutorService es = new ThreadPoolExecutor(5, 5, 0L, TimeUnit.MILLISECONDS,
    28                 new LinkedBlockingDeque<Runnable>(10),
    29                 Executors.defaultThreadFactory(),
    30                 new RejectedExecutionHandler() {
    31                     //自定义拒绝策略的处理
    32                     @Override
    33                     public void rejectedExecution(Runnable runnable, ThreadPoolExecutor threadPoolExecutor) {
    34                         System.out.println(runnable.toString() + " is discard");
    35                     }
    36                 });
    37         for(int i=0;i<Integer.MAX_VALUE;i++){
    38             es.submit(myTask);
    39             Thread.sleep(10);
    40         }
    41     }
    42 }
    
    

    结果如下:

      

    /usr/lib/jvm/java-1.8.0-openjdk-amd64/bin/java -javaagent:/usr/local/idea/lib/idea_rt.jar=35478:/usr/local/idea/bin -Dfile.encoding=UTF-8 -classpath /usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/charsets.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/ext/cldrdata.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/ext/dnsns.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/ext/icedtea-sound.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/ext/jaccess.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/ext/localedata.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/ext/nashorn.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/ext/sunec.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/ext/sunjce_provider.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/ext/sunpkcs11.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/ext/zipfs.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/jce.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/jsse.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/management-agent.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/resources.jar:/usr/lib/jvm/java-1.8.0-openjdk-amd64/jre/lib/rt.jar:/home/spark/IdeaProjects/thread/thread/target/classes:/home/spark/.m2/repository/com/lmax/disruptor/3.3.2/disruptor-3.3.2.jar cn.concurrent.RejectThreadPoolDemo
    1506244149620:Thread ID:10
    1506244149630:Thread ID:11
    1506244149640:Thread ID:12
    1506244149651:Thread ID:13
    1506244149661:Thread ID:14
    1506244149720:Thread ID:10
    1506244149730:Thread ID:11
    1506244149740:Thread ID:12
    1506244149751:Thread ID:13
    1506244149761:Thread ID:14
    1506244149821:Thread ID:10
    1506244149830:Thread ID:11
    1506244149841:Thread ID:12
    1506244149851:Thread ID:13
    1506244149862:Thread ID:14
    java.util.concurrent.FutureTask@63947c6b is discard
    java.util.concurrent.FutureTask@2b193f2d is discard
    java.util.concurrent.FutureTask@355da254 is discard
    java.util.concurrent.FutureTask@4dc63996 is discard
    java.util.concurrent.FutureTask@d716361 is discard
    1506244149921:Thread ID:10
    1506244149930:Thread ID:11
    1506244149941:Thread ID:12
    1506244149951:Thread ID:13
    1506244149962:Thread ID:14
    java.util.concurrent.FutureTask@6ff3c5b5 is discard
    java.util.concurrent.FutureTask@3764951d is discard
    java.util.concurrent.FutureTask@4b1210ee is discard
    java.util.concurrent.FutureTask@4d7e1886 is discard
    java.util.concurrent.FutureTask@3cd1a2f1 is discard
    1506244150023:Thread ID:10
    1506244150031:Thread ID:11

     可以看到,饱和策略生效了,在实际应用中,我们可以记录日志,分析系统的负载和任务丢失的情况。

    记录点点滴滴,有很多要学习,望大家指点。
     
  • 相关阅读:
    数据表格
    数据表格
    数据表格
    布局
    表单
    表单
    Java知识点梳理——继承
    Java知识点梳理——抽象类和接口
    面试心得与总结—BAT、网易、蘑菇街
    Java知识点梳理——多态
  • 原文地址:https://www.cnblogs.com/lwy19998273333/p/7587686.html
Copyright © 2011-2022 走看看