zoukankan      html  css  js  c++  java
  • 线程池ThreadPool实战

     

     

    线程池概念

    线程频繁创建和关闭,比较耗费cpu性能,可以通过线程池来管理,类似数据库连接池一样的道理.
    学习Java的线程池,必须先知道创建线程池的原始类和方法ThreadPoolExecutor

     

    类继承关系
    类继承关系

     

        public ThreadPoolExecutor(int corePoolSize,
                                  int maximumPoolSize,
                                  long keepAliveTime,
                                  TimeUnit unit,
                                  BlockingQueue<Runnable> workQueue,
                                  ThreadFactory threadFactory,
                                  RejectedExecutionHandler handler) {
            if (corePoolSize < 0 ||
                maximumPoolSize <= 0 ||
                maximumPoolSize < corePoolSize ||
                keepAliveTime < 0)
                throw new IllegalArgumentException();
            if (workQueue == null || threadFactory == null || handler == null)
                throw new NullPointerException();
            this.corePoolSize = corePoolSize;
            this.maximumPoolSize = maximumPoolSize;
            this.workQueue = workQueue;
            this.keepAliveTime = unit.toNanos(keepAliveTime);
            this.threadFactory = threadFactory;
            this.handler = handler;
        }
    • corePoolSize:线程池核心线程数,空闲也不会被销毁

    • maximumPoolSize:线程池最大线程数

    • keepAliveTime:超出corePoolSize数量的线程的保留时间

    • unit:keepAliveTime单位

    • workQueue:阻塞队列,存放来不及执行的线程

      • ArrayBlockingQueue:构造函数一定要传大小
      • LinkedBlockingQueue:构造函数不传大小会默认为(Integer.MAX_VALUE ),当大量请求任务时,容易造成 内存耗尽。
      • SynchronousQueue:同步队列,一个没有存储空间的阻塞队列,将任务同步交付给工作线程。
      • PriorityBlockingQueue : 优先队列
    • threadFactory:线程工厂,一般默认即可

    • handler:饱和策略

      • AbortPolicy(默认):直接抛弃
      • CallerRunsPolicy:用调用者的线程执行任务
      • DiscardOldestPolicy:抛弃队列中最久的任务
      • DiscardPolicy:抛弃当前任务

    常用线程池和方法

    线程池:

    • newFixedThreadPool 固定线程池,可控制线程最大并发数,超出线程在队列中等待。
    • newSingleThreadExecutor 单线程池,用唯一的工作线程来执行任务。
    • newCachedThreadPool 缓存线程池,灵活回收空闲线程。
    • newScheduledThreadPool 定长线程池,支持定时及周期性任务执行。

    方法:

    • execute() 添加任务
    • submit() 提交任务
    • shutdown() 关闭线程池
    • shutdownNow() 立即关闭线程池

    1.测试线程类

    
    public class MyThread implements Runnable {
    
        private String curName;
    
        public MyThread() {
        }
    
        public MyThread(String curName) {
            this.curName = curName;
        }
    
        public String getCurName() {
            return curName;
        }
    
        public void setCurName(String curName) {
            this.curName = curName;
        }
    
        public void run() {
            System.out.println("=========>>>开始执行:"+new SimpleDateFormat("yyyy-MM-dd ahh:mm:ss").format(new Date() )+"<<<=========");
            System.out.println(Thread.currentThread().getName()+": "+this.curName);
            try {
                Thread.sleep(2000);// 模拟线程执行时间
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    2.newFixedThreadPool固定线程池

    初始化线程池大小为3,模拟10个线程并发场景

    /**
     * 测试:定长线程池
     */
    public void fixedPool(){
            ExecutorService pool = Executors.newFixedThreadPool(3);
            for (int i = 0; i < 10; i++) {
                    // 添加任务
                    pool.execute(new MyThread("线程0"+i));
            }
            pool.shutdown();
    }

     


     

     

    3.newSingleThreadExecutor单线程池

    按照顺序一个一个执行

    /**
     * 测试:单线程池
     */
    public void singlePool(){
            ExecutorService pool = Executors.newSingleThreadExecutor();
            for (int i = 0; i < 10; i++) {
                    // 添加任务
                    pool.execute(new MyThread("线程0"+i));
            }
            pool.shutdown();
    }

     


     

     

    4.newCachedThreadPool缓存线程池

    /**
     * 测试:缓存线程池
     */
    public void cachePool(){
            ExecutorService pool = Executors.newCachedThreadPool();
            for (int i = 0; i < 10; i++) {
                    // 添加任务
                    pool.execute(new MyThread("线程0"+i));
            }
            pool.shutdown();
    }

     


     

     

    5.newScheduledThreadPool定长线程池

    线程池支持延时执行和周期执行

    • schedule(Callable callable, long delay, TimeUnit unit)延时执行
    • scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit)延时固定间隔执行
    • scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit)第一次执行完,延时固定间隔执行
    /**
     * 测试:定长线程池
     */
    public void scheduledPool(){
            ScheduledExecutorService pool = Executors.newScheduledThreadPool(3);
            System.out.println("=========>>>启动线程池:"+new SimpleDateFormat("yyyy-MM-dd ahh:mm:ss").format(new Date() )+"<<<=========");
            // 延时3秒执行
            pool.schedule(new MyThread("线程01_延时"),3,TimeUnit.SECONDS);
            // 延时5秒循环执行
            pool.scheduleAtFixedRate(new MyThread("线程02_延时_循环"),5,5,TimeUnit.SECONDS);
            pool.scheduleWithFixedDelay(new MyThread("线程03_延时_循环"),5,5,TimeUnit.SECONDS);
            // 不关闭线程池
            // pool.shutdown();
    }

     


     

     

    6.完整代码

    package com.lyf.thread;
    
    import java.text.SimpleDateFormat;
    import java.util.Date;
    import java.util.concurrent.*;
    
    /**
     * @author lyf
     * @date 2019/8/11 11:03
     * 
     * newFixedThreadPool 固定线程池,可控制线程最大并发数,超出线程在队列中等待。
     * newSingleThreadExecutor 单线程池,用唯一的工作线程来执行任务。
     * newCachedThreadPool 缓存线程池,灵活回收空闲线程。
     * newScheduledThreadPool 定长线程池,支持定时及周期性任务执行。
     * 常用方法:
     * execute() 添加任务
     * submit() 提交任务
     * shutdown() 关闭线程池
     * shutdownNow() 立即关闭线程池
     */
    
    public class MyThread implements Runnable {
    
        private String curName;
    
        public MyThread() {
        }
    
        public MyThread(String curName) {
            this.curName = curName;
        }
    
        public String getCurName() {
            return curName;
        }
    
        public void setCurName(String curName) {
            this.curName = curName;
        }
    
        public void run() {
            System.out.println("=========>>>开始执行:"+new SimpleDateFormat("yyyy-MM-dd ahh:mm:ss").format(new Date() )+"<<<=========");
            System.out.println(Thread.currentThread().getName()+": "+this.curName);
            try {
                Thread.sleep(2000);// 模拟线程执行时间
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    
        /**
         * 测试:固定线程池
         */
        public void fixedPool(){
            ExecutorService pool = Executors.newFixedThreadPool(3);
            for (int i = 0; i < 10; i++) {
                // 添加任务
                pool.execute(new MyThread("线程0"+i));
            }
            pool.shutdown();
        }
    
        /**
         * 测试:单线程池
         */
        public void singlePool(){
            ExecutorService pool = Executors.newSingleThreadExecutor();
            for (int i = 0; i < 10; i++) {
                // 添加任务
                pool.execute(new MyThread("线程0"+i));
            }
            pool.shutdown();
        }
    
        /**
         * 测试:缓存线程池
         */
        public void cachePool(){
            ExecutorService pool = Executors.newCachedThreadPool();
            for (int i = 0; i < 10; i++) {
                // 添加任务
                pool.execute(new MyThread("线程0"+i));
            }
            pool.shutdown();
        }
    
        /**
         * 测试:定长线程池
         */
        public void scheduledPool(){
            ScheduledExecutorService pool = Executors.newScheduledThreadPool(3);
            System.out.println("=========>>>启动线程池:"+new SimpleDateFormat("yyyy-MM-dd ahh:mm:ss").format(new Date() )+"<<<=========");
            // 延时3秒执行
            pool.schedule(new MyThread("线程01_延时"),3,TimeUnit.SECONDS);
            // 延时5秒循环执行
            pool.scheduleAtFixedRate(new MyThread("线程02_延时_循环"),5,5,TimeUnit.SECONDS);
            pool.scheduleWithFixedDelay(new MyThread("线程03_延时_循环"),5,5,TimeUnit.SECONDS);
            // 不关闭线程池
            // pool.shutdown();
        }
    
        public static void main(String []args) {
            MyThread myThread = new MyThread();
    //        myThread.fixedPool();
    //        myThread.singlePool();
    //        myThread.cachePool();
            myThread.scheduledPool();
        }
    
    }
    

    submit和execute方法区别

    submit和execute方法都可以提交任务到线程池中,区别3点:

    1. 接收参数不一样,submit需要线程实现Callable接口
    2. submit有返回值,而execute没有
    3. submit可以处理线程内部异常
    package com.lyf.thread;
    
    import java.text.SimpleDateFormat;
    import java.util.ArrayList;
    import java.util.Date;
    import java.util.List;
    import java.util.concurrent.*;
    
    class MyThread02 implements Callable<String> {
    
        @Override
        public String call() throws Exception {
            // 模拟3s~10s之间的延时和返回结果
            long time = (long) (Math.random()*7+3);
            Thread.sleep(time*1000);
            String curName = Thread.currentThread().getName();
            System.out.println("=========>>>执行完毕:"+new SimpleDateFormat("yyyy-MM-dd ahh:mm:ss").format(new Date() )+"<<<=========");
            return curName+"_"+time;
        }
    
        public static void main(String[] args) {
            System.out.println("=========>>>启动线程池:"+new SimpleDateFormat("yyyy-MM-dd ahh:mm:ss").format(new Date() )+"<<<=========");
            ExecutorService executorService2 = Executors.newFixedThreadPool(5);// 定长线程池
            List<Future<String>> futureList = new ArrayList<>();//存储任务
            for (int i = 0; i < 5; i++) {
                Future<String> future = executorService2.submit(new MyThread02());
                futureList.add(future);
            }
            for (int i = 0; i < 5; i++) {
                Future<String> future = futureList.get(i);
                try {
                    System.out.println("Result: " + future.get());
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (ExecutionException e) {
                    e.printStackTrace();
                }
            }
            executorService2.shutdown();
        }
    }
  • 相关阅读:
    (33)ElasticSearch文档的核心元数据解析
    (32)ElasticSearch的容错机制
    (31)ElasticSearch水平扩容的过程
    (30)ElasticSearch两个节点环境中创建index解析
    (29)ElasticSearch分片和副本机制以及单节点环境中创建index解析
    UVA
    HDU
    ZOJ
    BZOJ1499: 瑰丽华尔兹(单调队列)
    UVALive
  • 原文地址:https://www.cnblogs.com/linyufeng/p/11335858.html
Copyright © 2011-2022 走看看