zoukankan      html  css  js  c++  java
  • 3. 线程池

    一、JDK对线程池的支持

    常用方法

    newFixedThreadPool(int Threads);
    newSingleThreadExecutor();
    newCachedThreadPool();
    newSingleThreadScheduledExecutor();
    newScheduledThreadPool(int corePoolSize);
    

    1.newFixedThreadPool

    返回一个固定线程数量的线程池,该线程池内线程数量始终不变。任务提交,若空闲则执行,反之,放入任务队列等待
    

    2.newSingleThreadExecutor

    返回一个只有一个线程的线程池。任务提交,若空闲则执行,反之,放入任务队列等待
    

    3.newCachedThreadPool

    返回一个可根据实际情况调整线程数量的线程池,任务提交,若有空闲线程则执行,反之,创建新进程执行
    

    4.newSingleThreadScheduledExecutor

    返回一个ScheduledExecutorService对象,线程池大小为1,具有在某个固定的延时之后执行,或者周期性执行某个任务的功能
    

    5.newScheduledThreadPool

    功能同上,但是可以指定线程数量
    

    5.1常用方法

    ScheduledExecutorService ses = Executors.newScheduledThreadPool(10);
    ses.schedule(Runnable command,long delay,TimeUnit unit);
    ses.scheduleAtFixedRate(Runnable command,long initialDelay,long period,TimeUnit unit);
    ses.scheduleWithFixedDelay(Runnable command,long initialDelay,long delay,TimeUnit unit);
    

    5.2 schedule、scheduleAtFixedRate和scheduleWithFixedDelay

    - schedule  会在给定时间对任务进行一次调度
    
    - scheduleAtFixedRate  以initialDelay为首次开始运行时间。每次以上一次任务开始时间为起点,在经过period时间后调度下一次任务。如果period短于任务执行时间,则任务不会堆叠执行,而是在上一个任务执行完成后再立即开始执行
    	也就是第一次任务在initialDelay执行,第二次任务在initialDelay+period时执行,第三次在initialDelay+2*period时执行,
    
    - scheduleWithFixedDelay:以initialDelay为首次开始运行时间,在任务运行完成后经过delay时间,执行下次任务
    	即第一次任务在initialDelay执行,假设任务运行t时长,第二次任务在initialDelay+t+delay时候执行,第三次在initialDelay+t+delay+t+delay时执行
    

    二、内部实现

    对于上述线程池,其内部均使用的是ThreadPoolExecutor类
    
    //三种线程实现方式
    public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads, 0l, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
    }
    public static ExecutorService newSingleThreadExecutor() {
        return new ThreadPoolExecutor(1, 1, 0l, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
    }
    public static ExecutorService newCachedThreadPool(){
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60l, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>());
    }
    
    //ThreadPoolExecutor构造函数
    public ThreadPoolExecutor(int corePoolSize,//指定线程池中的线程数量
            int maximumPoolSize,//指定线程池中的最大线程数量
            long keepAliveTime,//当线程池线程数量超过corePoolSize,多余线程存活时间
            TimeUnit unit,//keepAliveTime的单位
            BlockingQueue<Runnable> workQueue,//任务队列
            ThreadFactory threadFactory,//线程工厂,用于创建线程
            RejectedExecutionHandler handler)//拒绝策略,当任务太多来不及处理时,如何拒绝任务
    

    1. 任务队列 BlockingQueue

    在ThreadPoolExecutor类的构造函数中可使用以下几种BlockingQueue接口

    直接提交的队列SynchronousQueue

    SynchronousQueue没有容量,对新任务不会保存,而是直接提交运行,若空闲则执行。若无空闲,则尝试创建新线程,若进程数量已到达maximumPoolSize,则执行拒绝策略
    

    有界任务队列ArrayBlockingQueue

    ArrayBlockingQueue构造函数必须指定容器容量
    public ArrayBlockingQueue(int capacity);
    
    新任务加入
    	若线程池中线程数小于corePoolSize,则优先创建新线程执行任务,
    	若大于corePoolSize,则将新任务加入等待队列
    	若等待队列已满,则在总线程数小于maximumPoolSize的前提下创建新线程执行任务
    	若大于maximumPoolSize,则执行拒绝策略
    

    无界任务队列LinkedBlockingQueue

    新任务加入
    	若线程池中线程数小于corePoolSize,则优先创建新线程执行任务,
    	若等于corePoolSize,则将新任务加入等待队列
    

    优先任务队列PriorityBlockingQueue

    是一个特殊的无界队列,根据任务优先级进行任务调度
    

    2.拒绝策略 RejectedExecutionHandler

    //JDK内置拒绝策略
    AbortPolicy;//直接抛出异常,阻止系统正常工作
    CallerRunsPolicy;//只要线程池未关闭,该策略直接在调用者线程中执行该任务
    DiscardOldestPolicy;//丢弃最老的一个请求,也就是即将被执行的任务,并尝试再次提交当前任务
    DiscardPolicy;//默默丢弃无法处理的任务,无任何提示
    
    //自定义拒绝策略
    new ThreadPoolExecutor(5, 5, 0l,TimeUnit.MILLISECONDS, 
             new LinkedBlockingQueue<>(),
             Executors.defaultThreadFactory(),
             new RejectedExecutionHandler(){
             @Override
              public void rejectedExecution(Runnable runnable, ThreadPoolExecutor threadPoolExecutor) {
                 System.out.println(runnable.tostring+"is discard");
              }
    });
    

    3. 自定义线程创建 ThreadFactory

    我们使用ThreadFactory创建线程,他只有一个创建线程的方法
    Thread newThread(Runnable r);
    自定义线程池可以帮助我们很多
    	1.跟踪线程池究竟在何时创建了多少线程
    	2.自定义线程名称,组以及优先级等信息
    	3.甚至可以将所有线程设置为守护线程
    
    //自定义线程创建
    Task task = new Task();
    ExecutorService es = new ThreadPoolExecutor(5,5,0l,
                    TimeUnit.MILLISECONDS,
                    new SynchronousQueue<>(),
                    new ThreadFactory() {
                        @Override
                        public Thread newThread(Runnable runnable) {
                            Thread t = new Thread(runnable);
                            t.setDaemon(true);
                            System.out.println("create " + t);
                            return t;
                        }
    });
    for (int x = 0; x < 10; x++)
         es.submit(task);
    

    4. 扩展线程池

    我们可以使用
    
    //demo
    public class test {
        public static class MyTask implements Runnable {
            public String name;
            public MyTask(String name) {
                this.name = name;
            }
            @Override
            public void run() {
                System.out.println("正在执行:Thead ID:" + Thread.currentThread().getId() + " Task Name= " + name);
                try {
                    Thread.sleep(100);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
        public static void main(String[] args) throws InterruptedException {
            ExecutorService es = new ThreadPoolExecutor(5, 5, 0l, TimeUnit.MILLISECONDS,
                    new LinkedBlockingQueue<>()) {
                @Override
                protected void beforeExecute(Thread t, Runnable r) {
                    System.out.println("准备执行:" + ((MyTask) r).name);
                }
                @Override
                protected void afterExecute(Runnable r, Throwable t) {
                    System.out.println("执行完成:" + ((MyTask) r).name);
                }
                @Override
                protected void terminated() {
                    System.out.println("线程池退出");
                }
            };
            for (int x = 0; x < 10; x++) {
                MyTask task = new MyTask("Task-" + x);
                es.execute(task);
                Thread.sleep(10);
            }
            es.shutdown();
        }
    }
    //在提交完成后,调用shutdown()方法关闭线程池,这是一个比较安全的方法,若当前有线程在执行,shutdown()方法不会立即暴力关闭线程,而是停止接受新任务,等待已有任务完成再关闭,因此可以理解为shutdown()方法只是发送一个关闭信号而已。
    

    三、合理选择线程池线程数量

    Ncpu = cpu数量
    Ucpu = 目标cpu的使用率  0 ≤ Ucpu ≤ 1
    W/C = 等待时间于计算时间的比率
    最优线程池大小Nthreads = Ncpu*Ucpu*(1+W/C)
    

    四、在线程池中寻找堆栈

    下面看一个简单案例

    public class test {
        public static class DivTask implements Runnable {
            int a, b;
            public DivTask(int a, int b) {
                this.a = a;
                this.b = b;
            }
            @Override
            public void run() {
                double re = a / b;
                System.out.println(re);
            }
        }
        public static void main(String[] args) {
            ThreadPoolExecutor pool = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 0l, TimeUnit.SECONDS, new SynchronousQueue<>());
            for (int x = 0; x < 5; x++)
                pool.submit(new DivTask(100, x));
        }
    }
    

    运行结果如下

    100.0
    25.0
    33.0
    50.0
    

    输出缺少100/0的结果且无任何异常报出,这时我们可以将submit改为execute

    pool.execute(new DivTask(100, x));
    

    结果如下

    Exception in thread "pool-1-thread-1" java.lang.ArithmeticException: / by zero
    	at com.company.test$DivTask.run(test.java:20)
    	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    	at java.base/java.lang.Thread.run(Thread.java:834)
    100.0
    25.0
    33.0
    50.0
    

    但此时虽然知道了100/0异常在哪里,但是仍不知道这个任务是谁提交的,为了方便我们的调试,我们可以对其进行加强

    public static class TraceThreadPoolExecutor extends ThreadPoolExecutor {
            public TraceThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
                super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
            }
        
            @Override
            public void execute(Runnable task) {
                super.execute(wrap(task, clientTrace(), Thread.currentThread().getName()));
            }
        
            @Override
            public Future<?> submit(Runnable task) {
                return super.submit(wrap(task, clientTrace(), Thread.currentThread().getName()));
            }
        
            private Exception clientTrace() {
                return new Exception("Client stack trace");
            }
        
            private Runnable wrap(final Runnable task, final Exception clientStack, String clientThreadName) {
                return new Runnable() {
                    @Override
                    public void run() {
                        try {
                            task.run();
                        } catch (Exception e) {
                            e.printStackTrace();
                            throw e;
                        }
                    }
                };
            }
        }
    

    然后再执行

        public static void main(String[] args) {
            ThreadPoolExecutor pools = new TraceThreadPoolExecutor(0, Integer.MAX_VALUE, 0l, TimeUnit.SECONDS, new SynchronousQueue<>());
            for (int x = 0; x < 5; x++)
                pools.execute(new DivTask(100, x));
        }
    

    运行结果如下

    java.lang.ArithmeticException: / by zero
    	at com.company.test$DivTask.run(test.java:21)
    	at com.company.test$TraceThreadPoolExecutor$1.run(test.java:50)
    	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    	at java.base/java.lang.Thread.run(Thread.java:834)
    Exception in thread "pool-1-thread-1" java.lang.ArithmeticException: / by zero
    	at com.company.test$DivTask.run(test.java:21)
    	at com.company.test$TraceThreadPoolExecutor$1.run(test.java:50)
    	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    	at java.base/java.lang.Thread.run(Thread.java:834)
    100.0
    25.0
    33.0
    50.0
    

    五、Fork/Join框架

    ForkJoinPool线程池

    ForkJoinTask任务,即支持fork()方法分解和join()方法等待的任务,ForkJoinTask任务有两个重要的子类

    RecursiveTask //有返回值的任务
    RecursiveAction //没有返回值的任务
    

    demo-数列求和

    public class test extends RecursiveTask<Long> {
    
        private static final int THRESHOLD = 10000;
        private long start;
        private long end;
    
        public test(long start, long end) {
            this.start = start;
            this.end = end;
        }
    
    
        @Override
        protected Long compute() {
            long sum = 0;
            boolean canCompute = (end - start) < THRESHOLD;
            if (canCompute)
                for (long i = start; i <= end; i++)
                    sum += i;
            else {
                long step = (start + end) / 100;
                ArrayList<test> subTasks = new ArrayList<>();
                long pos = start;
                for (int i = 0; i < 100; i++) {
                    long lastOne = pos + step;
                    if (lastOne > end) lastOne = end;
                    test subTask = new test(pos, lastOne);
                    pos += step + 1;
                    subTasks.add(subTask);
                    subTask.fork();
                }
                for (test t : subTasks)
                    sum += t.join();
            }
            return sum;
        }
    
        public static void main(String[] args) {
            ForkJoinPool forkJoinPool = new ForkJoinPool();
            test t = new test(0, 200000l);
            ForkJoinTask<Long> result = forkJoinPool.submit(t);
            try {
                long res = result.get();
                System.out.println("sum= " + res);
            } catch (InterruptedException | ExecutionException e) {
                e.printStackTrace();
            }
        }
    }
    
  • 相关阅读:
    机器分配
    搭建免费私有音乐云
    ngnix相关
    idea常用插件
    notepad++ 实用插件
    liunx 新建自启服务
    scala 语法特性小计
    spring boot 静态资源 访问 配置
    SVN-Unable to create pristine install stream
    idea 编译 错误 Error:java: Compilation failed: internal java compiler error 解决方案
  • 原文地址:https://www.cnblogs.com/INnoVationv2/p/13027451.html
Copyright © 2011-2022 走看看