zoukankan      html  css  js  c++  java
  • Java多线程系列七——ExecutorService

    java.util.concurrent.ExecutorService接口提供了许多线程管理的方法

    Method 说明
    shutdown 拒绝接收新的任务,待已提交的任务执行后关闭,且宿主线程不阻塞,若需要阻塞可借助awaitTermination实现
    shutdownNow 停止所有正在执行的任务,挂起未执行的任务并关闭,且宿主线程不阻塞,若需要阻塞可借助awaitTermination实现
    awaitTermination 当发生shutdown时,阻塞宿主线程直到约定的时间已过或者所有任务完成
    submit 提交任务Callable/Runnable,可利用Future的get()方法使宿主线程阻塞直到任务结束后返回结果

    有了以上方法,便可以基于此接口实现线程池的各种功能(例如java.util.concurrent.ThreadPoolExecutor/java.util.concurrent.ScheduledThreadPoolExecutor),以java.util.concurrent.ThreadPoolExecutor为例,其参数的详解

    Name Type 说明
    corePoolSize int 线程池中最小的线程数
    maximumPoolSize int 线程池中最大的线程数
    keepAliveTime long 线程空闲时间,若线程数大于corePoolSize,空闲时间超过该值的线程将被终止回收
    unit TimeUnit keepAliveTime的时间单位
    workQueue BlockingQueue<Runnable> 已提交但未执行的任务队列
    threadFactory ThreadFactory 创建新线程的工厂
    handler RejectedExecutionHandler 当线程池或队列达到上限拒绝新任务抛出异常时的处理类

    同时,java.util.concurrent.Executors类提供的常用方法有

    Method 说明 基类
    newFixedThreadPool 线程池中含固定数量的线程 基于java.util.concurrent.ThreadPoolExecutor类
    newSingleThreadExecutor 线程池中仅含一个工作线程
    newCachedThreadPool 按需创建线程,若线程池中无可用线程,则创建新的线程并加入,直到线程数达到上限值(Integer.MAX_VALUE)
    newWorkStealingPool 按照可用CPU数创建线程池 基于java.util.concurrent.ForkJoinPool类

    java.util.concurrent.ForkJoinPool类是Fork/Join框架的实现类,Fork/Join框架是Java7提供了的一个用于并行执行任务的框架, 是一个把大任务分割成若干个小任务,最终汇总每个小任务结果后得到大任务结果的框架,该类在有递归实现的场景有更优异的表现。

    测试代码如下

    import java.util.Date;
    import java.util.concurrent.Callable;
    import java.util.concurrent.ExecutionException;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.Future;
    import java.util.concurrent.TimeUnit;
    
    import org.junit.Assert;
    import org.junit.Test;
    
    /**
     * @Description: 测试ExecutorService
     */
    public class ThreadExecutorServiceTest {
        private static final String THIS_IS_SHUTDOWN_WITH_AWAIT_TERMINATION = "This is shutdownWithAwaitTermination";
        private static final int RESULT = 111;
    
        private static boolean submitRunnable() throws InterruptedException, ExecutionException {
            ExecutorService executorService = Executors.newSingleThreadExecutor();
            Future<?> future = executorService.submit(new Runnable() {
                @Override
                public void run() {
                    System.out.println("This is submitRunnable");
                }
            });
            return future.get() == null;
        }
    
        private static Integer submitRunnableWithResult() throws InterruptedException, ExecutionException {
            ExecutorService executorService = Executors.newSingleThreadExecutor();
            Future<Integer> future = executorService.submit(new Runnable() {
                @Override
                public void run() {
                    System.out.println("This is submitRunnableWithResult");
                }
            }, RESULT);
            return future.get();
        }
    
        private static Integer submitBlockCallable() throws InterruptedException, ExecutionException {
            ExecutorService executorService = Executors.newFixedThreadPool(1);
            Future<Integer> future = executorService.submit(new Callable<Integer>() {
                @Override
                public Integer call() throws Exception {
                    System.out.println("This is submitBlockCallable");
                    return RESULT;
                }
            });
            return future.get();// 阻塞
        }
    
        private static boolean submitNonBlockCallable() throws InterruptedException, ExecutionException {
            ExecutorService executorService = Executors.newFixedThreadPool(1);
            Future<Integer> future = executorService.submit(new Callable<Integer>() {
                @Override
                public Integer call() throws Exception {
                    System.out.println("This is submitNonBlockCallable");
                    return RESULT;
                }
            });
            while (!future.isDone()) {// 非阻塞
                System.out.println(new Date());
            }
            return future.isDone();
        }
    
        private static String shutdown() throws InterruptedException, ExecutionException {
            ExecutorService executorService = Executors.newFixedThreadPool(1);
            final StringBuilder sb = new StringBuilder();
            executorService.submit(new Callable<Integer>() {
                @Override
                public Integer call() throws Exception {
                    Thread.sleep(10000);
                    sb.append("This is shutdown");
                    return RESULT;
                }
            });
            executorService.shutdown();
            return sb.toString();
        }
    
        private static String shutdownWithAwaitTermination() throws InterruptedException, ExecutionException {
            ExecutorService executorService = Executors.newFixedThreadPool(1);
            final StringBuilder sb = new StringBuilder();
            executorService.submit(new Callable<Integer>() {
                @Override
                public Integer call() throws Exception {
                    Thread.sleep(10000);
                    sb.append(THIS_IS_SHUTDOWN_WITH_AWAIT_TERMINATION);
                    return RESULT;
                }
            });
            executorService.shutdown();
            executorService.awaitTermination(Integer.MAX_VALUE, TimeUnit.DAYS);
            return sb.toString();
        }
    
        @Test
        public void test() throws InterruptedException, ExecutionException {
            Assert.assertTrue(submitRunnable());
            Assert.assertEquals(RESULT, submitRunnableWithResult().intValue());
            Assert.assertEquals(RESULT, submitBlockCallable().intValue());
            Assert.assertTrue(submitNonBlockCallable());
            Assert.assertTrue(shutdown().isEmpty());
            Assert.assertEquals(THIS_IS_SHUTDOWN_WITH_AWAIT_TERMINATION, shutdownWithAwaitTermination());
        }
    
    }

    ---恢复内容结束---

    java.util.concurrent.ExecutorService接口提供了许多线程管理的方法

    Method 说明
    shutdown 拒绝接收新的任务,待已提交的任务执行后关闭,且宿主线程不阻塞,若需要阻塞可借助awaitTermination实现
    shutdownNow 停止所有正在执行的任务,挂起未执行的任务并关闭,且宿主线程不阻塞,若需要阻塞可借助awaitTermination实现
    awaitTermination 当发生shutdown时,阻塞宿主线程直到约定的时间已过或者所有任务完成
    submit 提交任务Callable/Runnable,可利用Future的get()方法使宿主线程阻塞直到任务结束后返回结果

    有了以上方法,便可以基于此接口实现线程池的各种功能(例如java.util.concurrent.ThreadPoolExecutor/java.util.concurrent.ScheduledThreadPoolExecutor),以java.util.concurrent.ThreadPoolExecutor为例,其参数的详解

    Name Type 说明
    corePoolSize int 线程池中最小的线程数
    maximumPoolSize int 线程池中最大的线程数
    keepAliveTime long 线程空闲时间,若线程数大于corePoolSize,空闲时间超过该值的线程将被终止回收
    unit TimeUnit keepAliveTime的时间单位
    workQueue BlockingQueue<Runnable> 已提交但未执行的任务队列
    threadFactory ThreadFactory 创建新线程的工厂
    handler RejectedExecutionHandler 当线程池或队列达到上限拒绝新任务抛出异常时的处理类

    同时,java.util.concurrent.Executors类提供了基于java.util.concurrent.ThreadPoolExecutor类的工具方法,常用方法有

    Method 说明
    newFixedThreadPool 线程池中含固定数量的线程
    newSingleThreadExecutor 线程池中仅含一个工作线程
    newCachedThreadPool 按需创建线程,若线程池中无可用线程,则创建新的线程并加入,直到线程数达到上限值(Integer.MAX_VALUE)

    测试代码如下

    import java.util.Arrays;
    import java.util.Date;
    import java.util.List;
    import java.util.concurrent.Callable;
    import java.util.concurrent.ExecutionException;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.ForkJoinPool;
    import java.util.concurrent.Future;
    import java.util.concurrent.TimeUnit;
    
    import org.junit.Assert;
    import org.junit.Test;
    import org.lxp.multiple.thread.task.SumTask;
    
    /**
     * @Description: 测试ExecutorService
     * @author Super.Li
     * @date Jul 6, 2017
     */
    public class ThreadExecutorServiceTest {
        private static final String THIS_IS_SHUTDOWN_WITH_AWAIT_TERMINATION = "This is shutdownWithAwaitTermination";
        private static final int RESULT = 111;
    
        private static boolean submitRunnable() throws InterruptedException, ExecutionException {
            ExecutorService executorService = Executors.newSingleThreadExecutor();
            Future<?> future = executorService.submit(new Runnable() {
                @Override
                public void run() {
                    System.out.println("This is submitRunnable");
                }
            });
            return future.get() == null;
        }
    
        private static Integer submitRunnableWithResult() throws InterruptedException, ExecutionException {
            ExecutorService executorService = Executors.newSingleThreadExecutor();
            Future<Integer> future = executorService.submit(new Runnable() {
                @Override
                public void run() {
                    System.out.println("This is submitRunnableWithResult");
                }
            }, RESULT);
            return future.get();
        }
    
        private static Integer submitBlockCallable() throws InterruptedException, ExecutionException {
            ExecutorService executorService = Executors.newFixedThreadPool(1);
            Future<Integer> future = executorService.submit(new Callable<Integer>() {
                @Override
                public Integer call() throws Exception {
                    System.out.println("This is submitBlockCallable");
                    return RESULT;
                }
            });
            return future.get();// 阻塞
        }
    
        private static boolean submitNonBlockCallable() throws InterruptedException, ExecutionException {
            ExecutorService executorService = Executors.newFixedThreadPool(1);
            Future<Integer> future = executorService.submit(new Callable<Integer>() {
                @Override
                public Integer call() throws Exception {
                    System.out.println("This is submitNonBlockCallable");
                    return RESULT;
                }
            });
            while (!future.isDone()) {// 非阻塞
                System.out.println(new Date());
            }
            return future.isDone();
        }
    
        private static String shutdown() throws InterruptedException, ExecutionException {
            ExecutorService executorService = Executors.newFixedThreadPool(1);
            final StringBuilder sb = new StringBuilder();
            executorService.submit(new Callable<Integer>() {
                @Override
                public Integer call() throws Exception {
                    Thread.sleep(10000);
                    sb.append("This is shutdown");
                    return RESULT;
                }
            });
            executorService.shutdown();
            return sb.toString();
        }
    
        private static String shutdownWithAwaitTermination() throws InterruptedException, ExecutionException {
            ExecutorService executorService = Executors.newFixedThreadPool(1);
            final StringBuilder sb = new StringBuilder();
            executorService.submit(new Callable<Integer>() {
                @Override
                public Integer call() throws Exception {
                    Thread.sleep(10000);
                    sb.append(THIS_IS_SHUTDOWN_WITH_AWAIT_TERMINATION);
                    return RESULT;
                }
            });
            executorService.shutdown();
            executorService.awaitTermination(Integer.MAX_VALUE, TimeUnit.DAYS);
            return sb.toString();
        }
    
        private static int testForkJoinPool(List<Integer> list) throws InterruptedException, ExecutionException {
            ForkJoinPool forkJoinPool = new ForkJoinPool(8);
            Future<Integer> future = forkJoinPool.submit(new SumTask(list));
            return future.get();
        }
    
        @Test
        public void test() throws InterruptedException, ExecutionException {
            Assert.assertTrue(submitRunnable());
            Assert.assertEquals(RESULT, submitRunnableWithResult().intValue());
            Assert.assertEquals(RESULT, submitBlockCallable().intValue());
            Assert.assertTrue(submitNonBlockCallable());
            Assert.assertTrue(shutdown().isEmpty());
            Assert.assertEquals(THIS_IS_SHUTDOWN_WITH_AWAIT_TERMINATION, shutdownWithAwaitTermination());
            Assert.assertEquals(10, testForkJoinPool(Arrays.asList(new Integer[] { 1, 2, 3, 4 })));
            Assert.assertEquals(49, testForkJoinPool(Arrays.asList(new Integer[] { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 })));
            Assert.assertEquals(60, testForkJoinPool(Arrays.asList(new Integer[] { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11 })));
        }
    
    }

    SumTask类如下:

    import java.util.List;
    import java.util.concurrent.RecursiveTask;
    
    public class SumTask extends RecursiveTask<Integer> {
        private static final long serialVersionUID = 1L;
        private List<Integer> list;
    
        public SumTask(List<Integer> list) {
            this.list = list;
        }
    
        /**
         * Ensure it is necessary to divide the job to parts and finish them separately
         * 
         * @return
         */
        @Override
        protected Integer compute() {
            int rtn, size = list.size();
            if (size < 10) {
                rtn = sum(list);
            } else {
                SumTask subTask1 = new SumTask(list.subList(0, size / 2));
                SumTask subTask2 = new SumTask(list.subList(size / 2 + 1, size));
                subTask1.fork();
                subTask2.fork();
                rtn = subTask1.join() + subTask2.join();
            }
            return rtn;
        }
    
        private int sum(List<Integer> list) {
            return list.stream().mapToInt(number -> number.intValue()).sum();
        }
    }
  • 相关阅读:
    python列表
    Apache ab压力测试
    nmon监控与 nmon analyser分析
    一些有用的cocos2d-x,box2d等网址链接
    手把手教你实现物理碰撞的网络同步
    cocos2dx-lua使用UIListView制作二级折叠菜单
    【Cocos2d-x游戏开发】Cocos2d-x中的弱联网技术
    Java游戏服务器成长之路——弱联网游戏篇(源码分析)
    在浏览器上实现自动引导(五)
    使用Cocos2d-JS制作游戏新手引导(四)应用篇
  • 原文地址:https://www.cnblogs.com/hiver/p/7126927.html
Copyright © 2011-2022 走看看