zoukankan      html  css  js  c++  java
  • JAVA 多线程(11):阻塞队列与线程池

    说线程池必须说队列,因为线程池跟队列有着莫大的关系

    一、阻塞队列(7个):
    数组阻塞队列、链表阻塞队列、优先级排序队列,还有对应的无界阻塞队列,另外还有双向阻塞队列,排序规则分为先进先出FIFO 与先进后出LIFO两种。

    对于阻塞队列,针对插入与移除有有4种操作方式。如下:

    方法 抛出异常 返回特殊值 一直阻塞 超时退出
    插入 add(e) offer put offer(e,time,unit)
    移除 remove poll take poll(time,unit)
    检查 element peek 不可用 不可用

    测试(有界队列):

    1.抛出异常:

     输出:

    第一次正常:

     输出:

     第二次队列设置长度为1,add方法调用了2次,结果抛出异常。

    2.返回特殊值

     输出:

    由结果看出,使用offer方法不会抛出异常,在添加元素时如果队列已满,返回失败标识,当使用移除方法poll时,如果队列已空,则会返回null

    3.一直阻塞:put/take

    输出:

     由结果看出,因为有界队列为长度为1,主线程执行了2次put方法,第二次因为队列已满,所以会一直阻塞当前线程知道队列不满时才会继续执行,修改一下,如下:

     

    输出:

     执行完毕,由结果看出,上面代码执行开始执行后,主线程与子线程乱序输出,但是可以看出当主线程执行第二次put方法后会等待子线程take后,主线程再执行take,也就是说当线程满后put方法会导致当前线程阻塞,当线程空了也会当作当前线程阻塞,我们可以再原先的代码上再多加一句。如下:

     输出:

    首先主线程等待子线程先执行,子线程首先执行take方法,因为队列中没有元素,所以子线程等待,2秒后主线程开始执行,第一次put后,子线程发现队列中有元素了,所以不再阻塞,进入runable状态,接着线程再执行take 方法。结束。

    总之,关于上面3类方法,各有特色。(当然也有可能主线程会先执行take方法,但是这种可能性为0,想一想,因为put方法是连续执行的,再执行第二个put时,因为队列已满,所以必须要等待队列非满时,主线程才不会阻塞)

    1.ArrayBlockingQueue   数组有界阻塞队列FIFO

    按照阻塞的先后顺序访问队列,默认情况下不保证线程公平的访问队列~如果要保证公平性,会降低一定的吞吐量,源码如下:

     2.LinkedBlockingQueue

    链表有界阻塞队列,默认最大长度为Integer.MAX_VALUE。此队列按照先进先出的原则对元素进行排序。

    3.PriorityBlockingQueue

    优先级队列,可以自定义排序方法,但是对于同级元素不能保证顺序

    4.DelayQueue

    延迟获取元素队列,指定时间后获取,为无界阻塞队列。

    5.SynchronousQueue

    不存储元素的阻塞队列。每一个put操作必须订单tabke 操作,否则不能继续添加元素。

    6.LinkedTransfetQueue

    无界阻塞队列,多了tryTransfer 和transfet方法

    7.LinkedBlockingQueue

    链表结构组成的双向阻塞队列。可以从队列的两端插入和移除元素。

    另外:非阻塞算法的安全队列- ConcurrentLinkedQueue(CAS 实现 compare and swap)

    首先它是一个基础链接节点的无界的线程安全队列,有head和tail组成~没了,个人对线程理解比较浅,有兴趣的小伙伴可以研究一下~

    二、线程池

    使用线程池的好处:

    1.降低资源消耗:重复利用已创建的线程降低线程创建和销毁造成的消耗

    2.提高响应速度:任务到达时,任务不需要等待线程创建

    3.提高线程的可管理性:可以对线程统一分配、调优和监控。

    关于线程池的实现原理:

    1.判断核心线程线程是否都在执行任务,如不是,则创建一个工作线程来执行任务,否则进入2

    2.判断队列是不是满了,如果没有则提交任务到工作队列,否则进入3

    3.判断线程池是否都在工作,如果没有则创建一个新的工作线程执行任务。否则,交给饱和策略来处理这个任务。

    线程池中创建新线程执行任务的时候需要获取全局锁,所以java在执行任务的时候,如果在第二个步骤尽量进入到队列中,因为其不需要获取全局锁,在执行execute方法时,工作线程在执行完任务后会从队列中获取工作任务执行。

    构造方法:

    参数:

    1. corePoolSize:线程池基本大小,当提交一个新任务到工作线程时,线程池会创建一个线程来执行任务,即使空闲的基本线程能够执行新任务也会创建新线程,等到任务数大于线程基本大小时就不再创建,如果

    调用prestartAllCoreThreads方法,线程池会提前创建并启动所有基本线程

    2.maximumPoolSize:线程池中允许的最大线程数。如果核心线程池满了,会丢到队列中,如果队列也满了(这里说的是有界队列),会创建一个新的线程执行这个工作任务。打个比方:过节回家的时候,高速和国道,假设高速允许100辆车同时上高速,

    允许有100辆车可以在入口等待,国道允许200辆车通行,那么也就是说允许同时出现100+100+200 辆车,也就是400辆,这就是最大线程数(不是很贴切,理解就好)

    3.keeyAliveTime:线程活动保持时间,线程空闲后,保持的时间。如果超过时间则销毁线程。

    4.TimeUnit:时间单位

    5.阻塞队列:用于保存等待执行任务的阻塞队列,数组有界队列、链表队列、同步队列、优先级无界队列。newFixedThreadPool 使用的是链表队列。

    6.ThreadFactory:创建线程的工厂。

    7.RejectedRxecutionHandler:饱和策略。也就是说当超过了最大线程池数量,那么会执行饱和策略。以2中的例子接着说,如果车辆超过了400辆,那么从第401辆车开始需要给出另外的解决方法,可以是在国道入口排队,或则直接让你坐火车去~

    提交任务的方法:

    1.execute:不返回值

    2.submit:返回值(future类型对象)

    如下:

    public static void main(String[] args) {
            // 创建线程池
            ExecutorService executor = Executors.newFixedThreadPool(10);
            Runnable runnable = new Runnable() {
                @Override
                public void run() {
                    System.out.println(Thread.currentThread().getName());
    
                }
            };
            Callable callable = new Callable() {
                @Override
                public Object call() throws Exception {
                    return "hi 我是返回值";
                }
            };
            executor.execute(runnable);
    
            Future future = executor.submit(callable);
            try {
                Object o = future.get();
                System.out.println(o);
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (ExecutionException e) {
                e.printStackTrace();
            }
            executor.shutdown();
        }

    输出结果:

    future 是一个接口类,其实现类是FutureTask该类源码如下:

     

     

    Callable 类似Runnable 但是它会有返回值。

    关于关闭线程:

    推荐使用shutdown 而不是shutdownNiw,shutdown 会遍历线程池中的线程,并逐个关闭其状态,对于正在执行线程会等待其执行结束(interrupt),shutdownNow类似立即停止,所以~推荐使用shutdown就好,当然如果有特殊情况除外。

    监控:

    通过继承线程池可以自定义线程池,重写beforeExecute、afterExecute、terminated方法,在开始前、执行后、中止的时候调用。如:

    static class TestCustomMyPool extends ThreadPoolExecutor {
            public TestCustomMyPool(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
                super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
            }
    
            @Override
            protected void beforeExecute(Thread t, Runnable r) {
                super.beforeExecute(t, r);
                System.out.println("开始执行");
            }
    
            @Override
            protected void afterExecute(Runnable r, Throwable t) {
                super.afterExecute(r, t);
                System.out.println("执行结束");
            }
    
            @Override
            protected void terminated() {
                super.terminated();
                System.out.println("中止");
            }
        }
    
        public static void main(String[] args) {
            TestCustomMyPool testCustomMyPool = new TestCustomMyPool(10, 10, 0L, TimeUnit.MICROSECONDS, new LinkedBlockingQueue<>());
            Runnable runnable = new Runnable() {
                @Override
                public void run() {
                    System.out.println("正在执行线程");
                }
            };
            testCustomMyPool.execute(runnable);
    
            testCustomMyPool.shutdown();
        }

    输出:

    由结果看出,线程在执行前后执行了重写的方法。

    另外,除了这些,线程池还有其他一些方法可以来监控线程池的情况,如:

    public static void main(String[] args) {
            TestCustomMyPool testCustomMyPool = new TestCustomMyPool(10, 10, 0L, TimeUnit.MICROSECONDS, new LinkedBlockingQueue<>());
            Runnable runnable = new Runnable() {
                @Override
                public void run() {
                    System.out.println("正在执行线程");
                }
            };
            testCustomMyPool.execute(runnable);
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            // 需要执行的任务数量
            System.out.println("需要执行的任务数量:"+testCustomMyPool.getTaskCount());
            // 已完成的任务数量
            System.out.println("已完成的任务数量:"+testCustomMyPool.getCompletedTaskCount());
            // 创建最大线程数量
            System.out.println("创建最大线程数量:"+testCustomMyPool.getLargestPoolSize());
            // 线程池的线程数量
            System.out.println("线程池的线程数量:"+testCustomMyPool.getPoolSize());
            // 线程池活动的线程数
            System.out.println("线程池活动的线程数:"+testCustomMyPool.getActiveCount());
    
            testCustomMyPool.shutdown();
        }

    输出:

    关于future,这里有个参考链接,有兴趣的小伙伴也可以参考这位博主的文章 https://www.cnblogs.com/dolphin0520/p/3949310.html

    成灰之前,抓紧时间做点事!!
  • 相关阅读:
    Linux工具-curl
    常用工具-Postman
    HTTP头部信息
    HTTP状态码
    HTTP/HTTP2协议
    HTTP协议
    常用的服务端口
    三次握手,四次挥手和抓包工具
    路由表
    TCP/IP协议详解
  • 原文地址:https://www.cnblogs.com/jony-it/p/10862667.html
Copyright © 2011-2022 走看看