zoukankan      html  css  js  c++  java
  • 3. 线程池

    一、JDK对线程池的支持

    常用方法

    newFixedThreadPool(int Threads);
    newSingleThreadExecutor();
    newCachedThreadPool();
    newSingleThreadScheduledExecutor();
    newScheduledThreadPool(int corePoolSize);
    

    1.newFixedThreadPool

    返回一个固定线程数量的线程池,该线程池内线程数量始终不变。任务提交,若空闲则执行,反之,放入任务队列等待
    

    2.newSingleThreadExecutor

    返回一个只有一个线程的线程池。任务提交,若空闲则执行,反之,放入任务队列等待
    

    3.newCachedThreadPool

    返回一个可根据实际情况调整线程数量的线程池,任务提交,若有空闲线程则执行,反之,创建新进程执行
    

    4.newSingleThreadScheduledExecutor

    返回一个ScheduledExecutorService对象,线程池大小为1,具有在某个固定的延时之后执行,或者周期性执行某个任务的功能
    

    5.newScheduledThreadPool

    功能同上,但是可以指定线程数量
    

    5.1常用方法

    ScheduledExecutorService ses = Executors.newScheduledThreadPool(10);
    ses.schedule(Runnable command,long delay,TimeUnit unit);
    ses.scheduleAtFixedRate(Runnable command,long initialDelay,long period,TimeUnit unit);
    ses.scheduleWithFixedDelay(Runnable command,long initialDelay,long delay,TimeUnit unit);
    

    5.2 schedule、scheduleAtFixedRate和scheduleWithFixedDelay

    - schedule  会在给定时间对任务进行一次调度
    
    - scheduleAtFixedRate  以initialDelay为首次开始运行时间。每次以上一次任务开始时间为起点,在经过period时间后调度下一次任务。如果period短于任务执行时间,则任务不会堆叠执行,而是在上一个任务执行完成后再立即开始执行
    	也就是第一次任务在initialDelay执行,第二次任务在initialDelay+period时执行,第三次在initialDelay+2*period时执行,
    
    - scheduleWithFixedDelay:以initialDelay为首次开始运行时间,在任务运行完成后经过delay时间,执行下次任务
    	即第一次任务在initialDelay执行,假设任务运行t时长,第二次任务在initialDelay+t+delay时候执行,第三次在initialDelay+t+delay+t+delay时执行
    

    二、内部实现

    对于上述线程池,其内部均使用的是ThreadPoolExecutor类
    
    //三种线程实现方式
    public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads, 0l, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
    }
    public static ExecutorService newSingleThreadExecutor() {
        return new ThreadPoolExecutor(1, 1, 0l, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
    }
    public static ExecutorService newCachedThreadPool(){
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60l, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>());
    }
    
    //ThreadPoolExecutor构造函数
    public ThreadPoolExecutor(int corePoolSize,//指定线程池中的线程数量
            int maximumPoolSize,//指定线程池中的最大线程数量
            long keepAliveTime,//当线程池线程数量超过corePoolSize,多余线程存活时间
            TimeUnit unit,//keepAliveTime的单位
            BlockingQueue<Runnable> workQueue,//任务队列
            ThreadFactory threadFactory,//线程工厂,用于创建线程
            RejectedExecutionHandler handler)//拒绝策略,当任务太多来不及处理时,如何拒绝任务
    

    1. 任务队列 BlockingQueue

    在ThreadPoolExecutor类的构造函数中可使用以下几种BlockingQueue接口

    直接提交的队列SynchronousQueue

    SynchronousQueue没有容量,对新任务不会保存,而是直接提交运行,若空闲则执行。若无空闲,则尝试创建新线程,若进程数量已到达maximumPoolSize,则执行拒绝策略
    

    有界任务队列ArrayBlockingQueue

    ArrayBlockingQueue构造函数必须指定容器容量
    public ArrayBlockingQueue(int capacity);
    
    新任务加入
    	若线程池中线程数小于corePoolSize,则优先创建新线程执行任务,
    	若大于corePoolSize,则将新任务加入等待队列
    	若等待队列已满,则在总线程数小于maximumPoolSize的前提下创建新线程执行任务
    	若大于maximumPoolSize,则执行拒绝策略
    

    无界任务队列LinkedBlockingQueue

    新任务加入
    	若线程池中线程数小于corePoolSize,则优先创建新线程执行任务,
    	若等于corePoolSize,则将新任务加入等待队列
    

    优先任务队列PriorityBlockingQueue

    是一个特殊的无界队列,根据任务优先级进行任务调度
    

    2.拒绝策略 RejectedExecutionHandler

    //JDK内置拒绝策略
    AbortPolicy;//直接抛出异常,阻止系统正常工作
    CallerRunsPolicy;//只要线程池未关闭,该策略直接在调用者线程中执行该任务
    DiscardOldestPolicy;//丢弃最老的一个请求,也就是即将被执行的任务,并尝试再次提交当前任务
    DiscardPolicy;//默默丢弃无法处理的任务,无任何提示
    
    //自定义拒绝策略
    new ThreadPoolExecutor(5, 5, 0l,TimeUnit.MILLISECONDS, 
             new LinkedBlockingQueue<>(),
             Executors.defaultThreadFactory(),
             new RejectedExecutionHandler(){
             @Override
              public void rejectedExecution(Runnable runnable, ThreadPoolExecutor threadPoolExecutor) {
                 System.out.println(runnable.tostring+"is discard");
              }
    });
    

    3. 自定义线程创建 ThreadFactory

    我们使用ThreadFactory创建线程,他只有一个创建线程的方法
    Thread newThread(Runnable r);
    自定义线程池可以帮助我们很多
    	1.跟踪线程池究竟在何时创建了多少线程
    	2.自定义线程名称,组以及优先级等信息
    	3.甚至可以将所有线程设置为守护线程
    
    //自定义线程创建
    Task task = new Task();
    ExecutorService es = new ThreadPoolExecutor(5,5,0l,
                    TimeUnit.MILLISECONDS,
                    new SynchronousQueue<>(),
                    new ThreadFactory() {
                        @Override
                        public Thread newThread(Runnable runnable) {
                            Thread t = new Thread(runnable);
                            t.setDaemon(true);
                            System.out.println("create " + t);
                            return t;
                        }
    });
    for (int x = 0; x < 10; x++)
         es.submit(task);
    

    4. 扩展线程池

    我们可以使用
    
    //demo
    public class test {
        public static class MyTask implements Runnable {
            public String name;
            public MyTask(String name) {
                this.name = name;
            }
            @Override
            public void run() {
                System.out.println("正在执行:Thead ID:" + Thread.currentThread().getId() + " Task Name= " + name);
                try {
                    Thread.sleep(100);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
        public static void main(String[] args) throws InterruptedException {
            ExecutorService es = new ThreadPoolExecutor(5, 5, 0l, TimeUnit.MILLISECONDS,
                    new LinkedBlockingQueue<>()) {
                @Override
                protected void beforeExecute(Thread t, Runnable r) {
                    System.out.println("准备执行:" + ((MyTask) r).name);
                }
                @Override
                protected void afterExecute(Runnable r, Throwable t) {
                    System.out.println("执行完成:" + ((MyTask) r).name);
                }
                @Override
                protected void terminated() {
                    System.out.println("线程池退出");
                }
            };
            for (int x = 0; x < 10; x++) {
                MyTask task = new MyTask("Task-" + x);
                es.execute(task);
                Thread.sleep(10);
            }
            es.shutdown();
        }
    }
    //在提交完成后,调用shutdown()方法关闭线程池,这是一个比较安全的方法,若当前有线程在执行,shutdown()方法不会立即暴力关闭线程,而是停止接受新任务,等待已有任务完成再关闭,因此可以理解为shutdown()方法只是发送一个关闭信号而已。
    

    三、合理选择线程池线程数量

    Ncpu = cpu数量
    Ucpu = 目标cpu的使用率  0 ≤ Ucpu ≤ 1
    W/C = 等待时间于计算时间的比率
    最优线程池大小Nthreads = Ncpu*Ucpu*(1+W/C)
    

    四、在线程池中寻找堆栈

    下面看一个简单案例

    public class test {
        public static class DivTask implements Runnable {
            int a, b;
            public DivTask(int a, int b) {
                this.a = a;
                this.b = b;
            }
            @Override
            public void run() {
                double re = a / b;
                System.out.println(re);
            }
        }
        public static void main(String[] args) {
            ThreadPoolExecutor pool = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 0l, TimeUnit.SECONDS, new SynchronousQueue<>());
            for (int x = 0; x < 5; x++)
                pool.submit(new DivTask(100, x));
        }
    }
    

    运行结果如下

    100.0
    25.0
    33.0
    50.0
    

    输出缺少100/0的结果且无任何异常报出,这时我们可以将submit改为execute

    pool.execute(new DivTask(100, x));
    

    结果如下

    Exception in thread "pool-1-thread-1" java.lang.ArithmeticException: / by zero
    	at com.company.test$DivTask.run(test.java:20)
    	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    	at java.base/java.lang.Thread.run(Thread.java:834)
    100.0
    25.0
    33.0
    50.0
    

    但此时虽然知道了100/0异常在哪里,但是仍不知道这个任务是谁提交的,为了方便我们的调试,我们可以对其进行加强

    public static class TraceThreadPoolExecutor extends ThreadPoolExecutor {
            public TraceThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
                super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
            }
        
            @Override
            public void execute(Runnable task) {
                super.execute(wrap(task, clientTrace(), Thread.currentThread().getName()));
            }
        
            @Override
            public Future<?> submit(Runnable task) {
                return super.submit(wrap(task, clientTrace(), Thread.currentThread().getName()));
            }
        
            private Exception clientTrace() {
                return new Exception("Client stack trace");
            }
        
            private Runnable wrap(final Runnable task, final Exception clientStack, String clientThreadName) {
                return new Runnable() {
                    @Override
                    public void run() {
                        try {
                            task.run();
                        } catch (Exception e) {
                            e.printStackTrace();
                            throw e;
                        }
                    }
                };
            }
        }
    

    然后再执行

        public static void main(String[] args) {
            ThreadPoolExecutor pools = new TraceThreadPoolExecutor(0, Integer.MAX_VALUE, 0l, TimeUnit.SECONDS, new SynchronousQueue<>());
            for (int x = 0; x < 5; x++)
                pools.execute(new DivTask(100, x));
        }
    

    运行结果如下

    java.lang.ArithmeticException: / by zero
    	at com.company.test$DivTask.run(test.java:21)
    	at com.company.test$TraceThreadPoolExecutor$1.run(test.java:50)
    	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    	at java.base/java.lang.Thread.run(Thread.java:834)
    Exception in thread "pool-1-thread-1" java.lang.ArithmeticException: / by zero
    	at com.company.test$DivTask.run(test.java:21)
    	at com.company.test$TraceThreadPoolExecutor$1.run(test.java:50)
    	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    	at java.base/java.lang.Thread.run(Thread.java:834)
    100.0
    25.0
    33.0
    50.0
    

    五、Fork/Join框架

    ForkJoinPool线程池

    ForkJoinTask任务,即支持fork()方法分解和join()方法等待的任务,ForkJoinTask任务有两个重要的子类

    RecursiveTask //有返回值的任务
    RecursiveAction //没有返回值的任务
    

    demo-数列求和

    public class test extends RecursiveTask<Long> {
    
        private static final int THRESHOLD = 10000;
        private long start;
        private long end;
    
        public test(long start, long end) {
            this.start = start;
            this.end = end;
        }
    
    
        @Override
        protected Long compute() {
            long sum = 0;
            boolean canCompute = (end - start) < THRESHOLD;
            if (canCompute)
                for (long i = start; i <= end; i++)
                    sum += i;
            else {
                long step = (start + end) / 100;
                ArrayList<test> subTasks = new ArrayList<>();
                long pos = start;
                for (int i = 0; i < 100; i++) {
                    long lastOne = pos + step;
                    if (lastOne > end) lastOne = end;
                    test subTask = new test(pos, lastOne);
                    pos += step + 1;
                    subTasks.add(subTask);
                    subTask.fork();
                }
                for (test t : subTasks)
                    sum += t.join();
            }
            return sum;
        }
    
        public static void main(String[] args) {
            ForkJoinPool forkJoinPool = new ForkJoinPool();
            test t = new test(0, 200000l);
            ForkJoinTask<Long> result = forkJoinPool.submit(t);
            try {
                long res = result.get();
                System.out.println("sum= " + res);
            } catch (InterruptedException | ExecutionException e) {
                e.printStackTrace();
            }
        }
    }
    
  • 相关阅读:
    changing a pointer rather than erasing memory cells
    验证码识别 edge enhancement 轮廓增强 region finding 区域查找
    Manipulating Data Structures
    passing parameters by value is inefficient when the parameters represent large blocks of data
    Aliasing 走样
    Artificial Intelligence Research Methodologies 人工智能研究方法
    Thread safety
    include pointers as a primitive data type
    flat file
    functional cohesion
  • 原文地址:https://www.cnblogs.com/INnoVationv2/p/13027451.html
Copyright © 2011-2022 走看看