zoukankan      html  css  js  c++  java
  • 20.custom自定义线程池

    自定义线程池

        1.若Executors工厂类无法满足需求,可以自己使用工厂类创建线程池,底层都是使用了ThreadPoolExecutor这个类可以自定义。

    1. public ThreadPoolExecutor(int corePoolSize(当前核心线程数),
    2. int maximunPoolSize(最大线程数),
    3. long keepAliveTime(保持活跃时间),
    4. TimeUnit utin(时间单位),
    5. BlockingQueue<Runnable> workQueue(线程队列容器),
    6. ThreadFactory threadFactory(线程工厂),
    7. RejectedExecutionHandler rejectedExecutionHandler(拒绝执行的方法,线程队列阻塞到容器等待、例:限制最大执行、));
        2.有界队列【ArrayBlockingQueue】,若有新的任务则立即去执行:若有新的线程需要执行,目标线程池实际数小于"coreRoolSize",则优先参加线程,若大于,

    则会将任务加入队 列中,若队列已经满,则在总线程数不大于maximumPoolSize(最大coreRoolSize数)的前提下。创建一个新的线程,若大于               maximumPoolSize则实行拒绝策略、或其他处理方式。

        3.无界队列【LinkedBlockingQueue】与有界队列对比:除非资源耗尽,否则无界队列在加入任务不存在失败的情况下,当有新的任务到来,系统的线程数小于
           corePoolSize的时候,则新建线程去执行任务队列,,达到maxCorePoolsize后就不会增加了,若后续还有任务到来,而且没有空闲的资源,则把任务进行
            队列中去等待。若创建任务与处理任务的速度差异很大,无界队列保持快速增加等待的任务空间,一直到资源耗尽。
        4.拒绝策略
                        Abortlicy:直接抛出异常、核心系统则不会受到影响。
                        CallerRunspPolicy:该策略直接在调用者现场,运行当前被拒绝的任务、丢弃的任务(我刚刚进行队列就满了,那么就让我插个队执行)。
                        DiscardOldestPolicy:丢弃最老的一个请求(被阻塞在队列中很长时间),尝试直接运行当前被拒绝的任务。
                        DiscardPolicy:丢弃无法处理的任务,被给与任何的后续处理。
        5.自定义策略
                        
    1. implements RejectedExecutionHandle
    案例1:有界队列
    1. package demo8.threadPool;
    2. import java.util.concurrent.*;
    3. public class CustomThreadPool1 {
    4. /* 有界队列ArrayBlockingQueue:
    5. 若有新的线程需要执行,日光线程池实际数小于"coreRoolSize",则优先参加线程,若大于,则会将任务加入队列中,若队列已经满,则在总线程
    6. 数不大于maximumPoolSize(最大coreRoolSize数)的前提下。创建一个新的线程,若大于maximumPoolSize则实行拒绝策略、或其他处理方式。*/
    7. public static void main(String[] args) {
    8. BlockingQueue<Runnable> blockingQueue = new ArrayBlockingQueue<Runnable>(3);
    9. ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1,
    10. 2,
    11. 60,
    12. TimeUnit.SECONDS,
    13. blockingQueue);
    14. UserTask userTask1 = new UserTask(1, "任务-1");
    15. UserTask userTask2 = new UserTask(2, "任务-2");
    16. UserTask userTask3 = new UserTask(3, "任务-3");
    17. UserTask userTask4 = new UserTask(4, "任务-4");
    18. UserTask userTask5 = new UserTask(5, "任务-5");
    19. threadPoolExecutor.execute(userTask1);
    20. threadPoolExecutor.execute(userTask2);
    21. threadPoolExecutor.execute(userTask3);
    22. threadPoolExecutor.execute(userTask4);
    23. threadPoolExecutor.execute(userTask5);
    24. try {
    25. Thread.sleep(1000);
    26. } catch (InterruptedException e) {
    27. e.printStackTrace();
    28. }
    29. threadPoolExecutor.shutdown();
    30. }
    31. }
    32. 输出:
    33. pool-1-thread-1 run taskId:1 pool-1-thread-2 run taskId:5 pool-1-thread-2 run taskId:3 pool-1-thread-1 run taskId:2 pool-1-thread-2 run taskId:4    这种情况处理完所有的任务
    1. package demo8.threadPool;
    2. /**
    3. * Created by liudan on 2017/7/23.
    4. */
    5. public class UserTask implements Runnable {
    6. private int id;
    7. private String taskName;
    8. @Override
    9. public void run() {
    10. try {
    11. Thread.sleep(1000);
    12. System.err.println(Thread.currentThread().getName()+" run taskId:"+this.getId());
    13. } catch (InterruptedException e) {
    14. e.printStackTrace();
    15. }
    16. }
    17. public UserTask() {
    18. }
    19. public UserTask(int id, String taskName) {
    20. this.id = id;
    21. this.taskName = taskName;
    22. }
    23. public int getId() {
    24. return id;
    25. }
    26. public void setId(int id) {
    27. this.id = id;
    28. }
    29. public String getTaskName() {
    30. return taskName;
    31. }
    32. public void setTaskName(String taskName) {
    33. this.taskName = taskName;
    34. }
    35. @Override
    36. public String toString() {
    37. return "UserTask{" +
    38. "id=" + id +
    39. '}';
    40. }
    41. }
    案例2:拒绝任务,超出暂缓任务对了大小
    1. public static void main(String[] args) {
    2. BlockingQueue<Runnable> blockingQueue = new ArrayBlockingQueue<Runnable>(3);
    3. ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1,
    4. 2,
    5. 60,
    6. TimeUnit.SECONDS,
    7. blockingQueue);
    8. UserTask userTask1 = new UserTask(1, "任务-1");
    9. UserTask userTask2 = new UserTask(2, "任务-2");
    10. UserTask userTask3 = new UserTask(3, "任务-3");
    11. UserTask userTask4 = new UserTask(4, "任务-4");
    12. UserTask userTask5 = new UserTask(5, "任务-5");
    13. UserTask userTask6 = new UserTask(6, "任务-6");多加一个
    14. threadPoolExecutor.execute(userTask1);
    15. threadPoolExecutor.execute(userTask2);
    16. threadPoolExecutor.execute(userTask3);
    17. threadPoolExecutor.execute(userTask4);
    18. threadPoolExecutor.execute(userTask5);
    19. threadPoolExecutor.execute(userTask6);
    20. try {
    21. Thread.sleep(1000);
    22. } catch (InterruptedException e) {
    23. e.printStackTrace();
    24. }
    25. threadPoolExecutor.shutdown();
    26. }
    27. 输出:
    28. Exception in thread "main" java.util.concurrent.RejectedExecutionException:
    29. Task UserTask{id=6} rejected from java.util.concurrent.ThreadPoolExecutor@6f94fa3e
    30. [Running, pool size = 2, active threads = 2, queued tasks = 3, completed tasks = 0] at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2047) at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:823) at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1369) at demo8.threadPool.CustomThreadPool1.main(CustomThreadPool1.java:35) pool-1-thread-1 run taskId:1 pool-1-thread-2 run taskId:5 pool-1-thread-1 run taskId:2 pool-1-thread-2 run taskId:3 pool-1-thread-1 run taskId:4
    案例3:自定义拒绝策略
    1. package demo8.threadPool;
    2. import java.util.concurrent.RejectedExecutionHandler;
    3. import java.util.concurrent.ThreadPoolExecutor;
    4. public class CustomDiscardPolicy implements RejectedExecutionHandler {
    5. public CustomDiscardPolicy() {
    6. }
    7. @Override
    8. public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
    9. System.err.println("拒绝任务");
    10. /**
    11. * 用日志记录保存下,可做后续的扩展操作
    12. */
    13. System.err.println("加入任务log... "+r.toString());
    14. }
    15. }
    1. public static void main(String[] args) {
    2. BlockingQueue<Runnable> blockingQueue = new ArrayBlockingQueue<Runnable>(3);
    3. ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(1,
    4. 2,
    5. 60,
    6. TimeUnit.SECONDS,
    7. blockingQueue,
    8. new CustomDiscardPolicy());//使用自定义拒绝策略
    9. UserTask userTask1 = new UserTask(1, "任务-1");
    10. UserTask userTask2 = new UserTask(2, "任务-2");
    11. UserTask userTask3 = new UserTask(3, "任务-3");
    12. UserTask userTask4 = new UserTask(4, "任务-4");
    13. UserTask userTask5 = new UserTask(5, "任务-5");
    14. UserTask userTask6 = new UserTask(6, "任务-6");
    15. threadPoolExecutor.execute(userTask1);
    16. threadPoolExecutor.execute(userTask2);
    17. threadPoolExecutor.execute(userTask3);
    18. threadPoolExecutor.execute(userTask4);
    19. threadPoolExecutor.execute(userTask5);
    20. threadPoolExecutor.execute(userTask6);
    21. try {
    22. Thread.sleep(1000);
    23. } catch (InterruptedException e) {
    24. e.printStackTrace();
    25. }
    26. threadPoolExecutor.shutdown();
    27. }
    28. 输出:
    29. 拒绝任务 加入任务log... UserTask{id=6} pool-1-thread-1 run taskId:1 pool-1-thread-2 run taskId:5 pool-1-thread-1 run taskId:2 pool-1-thread-2 run taskId:3 pool-1-thread-1 run taskId:4
    案例4:无界队列LinkedBlockingQueue
    1. package demo8.threadPool;
    2. import java.util.concurrent.*;
    3. public class CustomThreadPool2 {
    4. /* 无界队列LinkedBlockingQueue:
    5. 与有界队列相比,除非系统资源耗尽,否则无界的任务队列不存在加入任务队列失败的情况下,当有新的任务到来,系统的线程数小于coreRoolSize
    6. 时候,则新建线程执行任务,当到达coreRoolSize后就不会增加了,若后续还有新的任务加入,而且没有空闲的线程资源,则任务直接进入队列等待
    7. 。若任务创建和处理速度差异很大,无界队列保持快速增长,知道耗尽系统内存。*/
    8. public static void main(String[] args) throws InterruptedException {
    9. BlockingQueue<Runnable> blockingQueue = new LinkedBlockingQueue<Runnable>();
    10. ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(5,10,120L, TimeUnit.SECONDS,blockingQueue);
    11. //ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) Executors.newFixedThreadPool(4);
    12. for (int i=1;i<=20;i++){
    13. threadPoolExecutor.execute(new UserTask(i,"任务-"+i));
    14. }
    15. Thread.sleep(1000);
    16. System.err.println("blockingQueue size:"+blockingQueue.size());
    17. Thread.sleep(2000);
    18. }
    19. }
    20. 输出:
    21. pool-1-thread-1 run taskId:1 pool-1-thread-2 run taskId:2 pool-1-thread-3 run taskId:3 pool-1-thread-4 run taskId:4 pool-1-thread-5 run taskId:5 blockingQueue size:10 pool-1-thread-2 run taskId:7 pool-1-thread-4 run taskId:9 pool-1-thread-3 run taskId:8 pool-1-thread-1 run taskId:6 pool-1-thread-5 run taskId:10 pool-1-thread-2 run taskId:11 pool-1-thread-1 run taskId:14 pool-1-thread-3 run taskId:13 pool-1-thread-4 run taskId:12 pool-1-thread-5 run taskId:15 pool-1-thread-1 run taskId:17 pool-1-thread-4 run taskId:19 pool-1-thread-5 run taskId:20 pool-1-thread-2 run taskId:16 pool-1-thread-3 run taskId:18



  • 相关阅读:
    Java学习二十九天
    Java学习二十八天
    47. Permutations II 全排列可重复版本
    46. Permutations 全排列,无重复
    subset ii 子集 有重复元素
    339. Nested List Weight Sum 339.嵌套列表权重总和
    251. Flatten 2D Vector 平铺二维矩阵
    217. Contains Duplicate数组重复元素
    209. Minimum Size Subarray Sum 结果大于等于目标的最小长度数组
    438. Find All Anagrams in a String 查找字符串中的所有Anagrams
  • 原文地址:https://www.cnblogs.com/xxt19970908/p/7337126.html
Copyright © 2011-2022 走看看