zoukankan      html  css  js  c++  java
  • Android多线程编程之详解阻塞队列和线程池

    Android多线程编程之详解阻塞队列和线程池

    阻塞队列简介

    阻塞队列常用于生产者和消费者场景,生产者往往是往队列里添加元素的线程,消费者
    是从队列里拿元素的线程吗,阻塞队列就是生产者存放元素的容器,是消费者拿元素的容器

    1. 常见阻塞场景
    • 当前队列中没有数据的情况下,消费端的所有线程都会被自动阻塞(挂起),直到有数据放入队列
    • 当队列种数据填充满的情况下,生产者端的所有线程都会被自动阻塞(挂起),直到队列中
      有空的位置,线程被自动唤醒

    2.BlockingQueue
    放入数据:

    • offer(anobject):表示如果可以将anobject加到BlockingQueue里,即如果BlockingQueue可以容纳,则返回true,否则返回false(本方法不阻塞当前执行方法的线程)
    • offer(e o,long timeout,TimeUnit unit):可以设定等待的时间,如果在指定的时间还不能往队列中
      加入blockQuene,则返回失败
    • put(anobject):将anobject加到BlockingQueue里,如果BlockQueue没有控件,则调用此方法的线程被阻塞直到BlockingQueue里面有空间再继续

    获取数据:

    • poll(long timeout,TimeUnit unit):从BlockQueue中取出一个队首的对象,如果在指定时间内队列一旦有时间可以取,则立即返回队列中的数据,否则直到时间超过还没有数据可取,返回失败
    • take():取走BlockingQueue排在位首的对象,若BlockingQueue为空,则阻塞进入等待状态,直到
      BlockingQueue有新的数据加入
    • drainTo():一次性从BlockingQueue获取所有可用的数据对象(还可以指定获取数据的个数)
      通过该方法,可以提升获取数据的效率,无需分多次分批加锁或释放锁。

    Java中的阻塞队列
    1.ArrayBlockingQueue:由数组结构组成的有界阻塞队列
    他是用数组实现的有界阻塞队列,并按照先进先出的原则对元素进行排序,默认情况下
    不保证线程公平的访问队列,公平的访问队列就是指阻塞的所有生产者线程或消费者线程
    当队列不可用时,可以按照阻塞的先后顺序访问队列,即先阻塞的生产者先生产,先阻塞
    的消费者线程可以先从队列里获取元素,通常情况下为了保证公平性会降低吞吐量。

    2.LinkedBlockingQueue:由链表结构组成的有限阻塞队列
    他是基于链表的阻塞队列,同ArrayListBlockingQueue类似,此队列按照先进先出的原则对
    元素进行排序,其内部也会维持着一个数据缓冲队列,当生产者往队列中放入一个数据时,
    队列会从生产者手中获取数据并缓存在队列内部,而生产者立即返回,只有当队列缓冲区达到
    缓存容量的最大值时(可以指定该值),才会阻塞生产者线程,直到消费者从队列中消费
    掉一份数据,生产者线程会被唤醒,反之,对于消费者这段的处理也基于同样的原理,
    而LinkedBlockingQueue之所以能够高效的处理处理并发数据,还因为其对于生产者端和消费者端
    分别采用独立的锁来控制数据同步。

    以上两个常用的阻塞队列,还有五种不再详细介绍。

    下面分析ArrayBlockingQueue的源代码:

    private static final long serialVersionUID = -817911632652898426L;
        final Object[] items;//阻塞队列维护的一个object类型的数组
        int takeIndex;//队首元素
        int putIndex;//队尾元素
        int count;//队列中的元素
        final ReentrantLock lock;//重入锁
        private final Condition notEmpty;//条件对象判断数组不是满的
        private final Condition notFull;//条件对象判断数组不是空的
        transient Itrs itrs;
        final int dec(int i) {
            return ((i == 0) ? items.length : i) - 1;
        }
    
        /**
         * Returns item at index i.
         */
        @SuppressWarnings("unchecked")
        final E itemAt(int i) {
            return (E) items[i];
        }
    
        /**
         * Inserts element at current put position, advances, and signals.
         * Call only when holding lock.
         */
        private void enqueue(E x) {
            // assert lock.getHoldCount() == 1;
            // assert items[putIndex] == null;
            final Object[] items = this.items;
            items[putIndex] = x;
            if (++putIndex == items.length) putIndex = 0;
            count++;
            notEmpty.signal();
        }
    
        /**
         * Extracts element at current take position, advances, and signals.
         * Call only when holding lock.
         */
        private E dequeue() {
            // assert lock.getHoldCount() == 1;
            // assert items[takeIndex] != null;
            final Object[] items = this.items;
            @SuppressWarnings("unchecked")
            E x = (E) items[takeIndex];
            items[takeIndex] = null;
            if (++takeIndex == items.length) takeIndex = 0;
            count--;
            if (itrs != null)
                itrs.elementDequeued();
            notFull.signal();
            return x;
        }
    
        //取元素
            public void put(E e) throws InterruptedException {
            Objects.requireNonNull(e);
            final ReentrantLock lock = this.lock;//锁
            lock.lockInterruptibly();
            try {
                while (count == items.length)
                    notFull.await();//阻塞线程,等待notFull.signalAll()唤醒
                enqueue(e);
            } finally {
                lock.unlock();
            }
        }
    
    
        public E take() throws InterruptedException {
            final ReentrantLock lock = this.lock;
            lock.lockInterruptibly();
            try {
                while (count == 0)
                    notEmpty.await();//阻塞线程,等待notEmpty.await()唤醒
                return dequeue();
            } finally {
                lock.unlock();
            }
        }

    使用阻塞队列就无需考虑同步和线程间通信的问题。

    public class VolatikeDemo {
        private int queueSize=10;
        private ArrayBlockingQueue<Integer> queue=new ArrayBlockingQueue<>(queueSize);
    
          public static void main(String args[]){
                    VolatikeDemo demo=new VolatikeDemo();
                    Consumer consumer=demo.new Consumer();
                    Producer producer=demo.new Producer();
                    consumer.start();
                    producer.start();
         }
         class Consumer extends Thread{
             @Override
             public void run() {
                 while(true){
                     try {
                         int res=queue.take();
                         System.out.println(res);
                     } catch (InterruptedException e) {
                         e.printStackTrace();
                     }
    
                 }
             }
         }
         class Producer extends Thread{
             @Override
             public void run() {
                 while(true){
                     try {
                         this.sleep(200);
                         queue.put(1);
                     } catch (InterruptedException e) {
                         e.printStackTrace();
                     }
                 }
             }
         }
    }

    线程池

    在编程中经常会使用线程来异步处理任务,但是每个线程的创建和销毁都需要一定的 开销。如果每次执行一个任务都需要打开一个新线程去执行,则这些线程的创建和销毁 将消耗大量的资源,并且线程都是各自为政的,很难对其进行控制,更何况还有一堆的 线程在执行,这时就需要线程池来对线程进行管理,在java 1.5中提供了Executor框架用于 把任务的提交和执行解耦,任务的提交交给Runnable或者Callable,而Executor框架用来 处理任务,Executor框架中的核心成员就是ThreadPoolExecutor,他是线程池的核心类。




    ThreadPoolExecutor
    可以通过创建ThreadPoolExecutor来创建一个线程池,ThreadPoolExecutor类一共有四个构造方法
    其中拥有最多参数的构造方法:
    ThreadPoolExecutor poolExecutor=new ThreadPoolExecutor(?,?,?,?,?);

    • corePoolSize:核心线程数,默认情况下线程池是空的,只有任务提交是才会线程,如果
      当前运行的线程数少于corePoolSize,则创建新线程来处理任务,如果等于或者多余
      corepoolsize则不会创建新线程,如果调用prestartAllcoreThread()方法:线程池会提前
      创建并启动所有的核心线程来等待任务。
    • maximumPoolSize:线程池允许创建的最大线程数,如果任务队列满了,并且线程数小于
      maximumPoolSize,则线程仍会创建新线程来处理任务。
    • keepAliveTime:非核心线程闲置的超时时间,超过这个时间则回收,如果任务很多,并且每个
      任务的执行时间的时间很短,则可以调大keepAliveTime来提高线程的利用率。
    • TimeUnit:keepAliveTime参数的时间单位,可选的单位有天,小时,分钟,秒,毫秒等
    • workQueue:任务队列,如果当前线程数大与corePoolSize则将任务添加到此任务队列中,
      该任务队列是BlockingQuenu类型的,也就是阻塞队列
    • ThreadFactory:线程工厂,可以线程工厂给每个创建出来的线程池设置名字,一般情况下无需要
      设置此参数
    • RejectedExecutionHandler:饱和策略,这是当任务队列和线程池都满了时所采取的应对策略
      默认是无法处理新任务(AbordPolicy)并抛出RejectedExecutionException异常,此外还有三种策略,分别如下:
      1.CallersRunsPolicy:用调查者所在的线程来处理任务,此策略提供简单的
      反馈控制机制,能够减缓新任务的提交速度(简言之,降低提交速度)
      2.DiscardPolicy:不能执行的任务,并将该任务删除
      3.DiscardOldestPolicy:丢弃队列最近的任务,并执行当前的任务。

    线程池的处理流程和原理
    在这里插入图片描述
    1.提交任务后,线程池先判断线程数是否达到了核心线程数(corepoolSize)
    如果还没有达到核心线程数,则创建核心线程处理任务,否则执行下一步。

    2.线程池判断任务队列是否满了,如果没满,将任务加入任务队列,否则执行
    下一步。

    3.线程池判断线程数是否达到最大线程数,如果未达到,则创建非核心线程处理任务,
    否则就执行饱和策略,默认会抛出RejectedExecutionExeception异常。

    在这里插入图片描述
    通过线程池的执行示意图我们可以看出,如果我们执行ThreadPoolExecutor的execute方法,
    会遇到各种情况
    1.如果线程池中的线程数没有达到核心线程数,则创建核心线程执行任务。
    2.如果线程池中的线程数大于或等于核心线程数,则加入任务队列,线程池中的空闲线程会不断的
    从任务队列中取任务执行。
    3.如果任务队列满了,并且线程数没有达到最大线程数,则创建非核心线程去处理任务。
    4.如果线程数超过了最大线程数,则执行饱和策略。

    4种常用的线程池

    FixedThreadPool:他是可重用固定线程数的线程池,他的主要特点如下:
    只有核心线程,没有非核心线程,并且数量是固定的,keepAliveTime设置
    为0意味着多余的线程会被立即终止,因此不会产生多余的线程,采用了
    无界阻塞队列LinkedBlockingQueue。

    当执行execute方法时:如果当前运行的线程数未达到核心线程数,则创建一个新线程来处理任务,如果运行线程数等于核心线程数,则将任务添加到阻塞队列中,FixThread就是拥有固定数量核心线程的线程池,并且这些核心线程不会被回收,当线程池中有空闲线程就会去任务队列取任务
    在这里插入图片描述






    CacheThreadPool
    CacheThreadPool:他是一个根据需要创建线程的线程池,他的主要特点如下:
    CacheThreadPool的corePoolSize为0,maximumPoolSize设置为最大值,他没有
    核心线程,非核心线程是无界的,keepAliveTime设置为60s,并使用了阻塞队列
    SynchronousQueue,他是一个不存储元素的阻塞队列,每个插入操作必须要
    等待另外一个线程的移除操作,同样一个移除操作也需要等待插入操作

    在这里插入图片描述
    当执行execute方法时:首先会执行synchroniusQueue的offer方法来提交任务,并且
    查询线程池中是否有空闲的线程执行SynchronousQueue的poll方法来移除任务,如果有
    则配对成功,将任务交给这个线程去处理,如果没有则配对失败,创建新的线程去执行任务
    当线程池中的线程空闲时,他会执行SynchronusQueue的poll方法,等待synchronoudQueue提交
    的新任务,如果60s没有新任务提交到synchronousQueue,则这个线程就会终止。cacheThreadPool适合大量需要立即处理并且耗时少的任务。





    SingleThreadExecutor:他是使用单个工作线程的线程池,corePoolSize和maximumPoolSize都为
    1,意味着SingleThreadExecutor只有一个核心线程,其他核心参数都和FixThreadPool一样,SingleThreadExecutor执行execute方法时,如果当前运行的线程数未达到核心线程数,也就是当前没有运行的线程,则创建一个新线程来处理任务,如果当前有运行的线程,则将任务添加到阻塞队列中,因此SingleThreadExecutor能确保所有的任务都在一个线程中按照顺序逐一执行
    在这里插入图片描述

    关于线程池和阻塞队列的梳理到此结束,下一篇AsyncTask原理

  • 相关阅读:
    (73)C# 扩展方法
    网络
    (十九)守护进程
    (十二)函数返回局部变量
    (十八)WireShark 过滤语法
    (十七)linux网络命令 vconfig ifconfig
    (十六)getsockname()
    (十五)ioctl、ifreq、ifconf
    (十四)UDP协议的两个主要方法sendto和recvfrom详解
    (十三)Packet socket 和 sockaddr_ll
  • 原文地址:https://www.cnblogs.com/hzcya1995/p/13309537.html
Copyright © 2011-2022 走看看