zoukankan      html  css  js  c++  java
  • Java--concurrent并发包下阻塞队列介绍

    JDK提供了7中阻塞队列,这里介绍其中3中,剩余的以此类推原理相同。

    1.ArrayBlockingQueue

    package com.seeyon.queue;
    
    import java.util.concurrent.ArrayBlockingQueue;
    
    /**
     * Created by yangyu on 16/11/27.
     */
    
    /**
     * ArrayBlockingQueue是数组结构组成的有界阻塞队列
     * 当队列已经满了的时候,put操作会阻塞当前线程,直到队列发生出队操作然后会唤醒put线程在入队
     * 当队列为空的时候,take操作会阻塞当前线程,直到队列发生入队操作后会唤醒take线程进行出队
     */
    public class TestArrayBlockingQueue {
        public static void main(String[] args) {
            ArrayBlockingQueue<String> queue = new ArrayBlockingQueue(1);
    
            try {
                queue.put("1111");
                /**
                 * 该操作会被阻塞,知道队列发生出队操作
                 */
                queue.put("2222");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
    
        }
    }

    2.LinkedBlockingQueue:链表实现的有界阻塞队列

    3.PriorityBlockingQueue:支持优先级的无界阻塞队列

    4.DelayQueue

    package com.seeyon.queue;
    
    import java.util.concurrent.DelayQueue;
    import java.util.concurrent.Delayed;
    import java.util.concurrent.TimeUnit;
    
    import static java.util.concurrent.TimeUnit.NANOSECONDS;
    
    /**
     * Created by yangyu on 16/11/27.
     */
    
    /**
     * DelayQueue是一个支持延时获取元素的无界队列
     * DelayQueue可以用于如下场景:
     * 1.缓存系统的的设计:用DelayQueue保存缓存元素的有效期,用一个线程循环查询DelayQueue,一旦能从DelayQueue中获取到元素,说明该元素过期了
     * 2.定时任务调度:使用DelayQueue保存当天会执行的任务和执行时间,一旦从DelayQueue中获取到任务就开始执行,TimerQueue就是使用DelayQueue实现的
     * DelayQueue的原理:
     * 1.当线程put元素的时候,DelayQueue会对你put的元素通过其本身的compareTo方法进行排序,延时时间越短的顺序越靠近队列头部
     * 2.当线程take元素的时候,DelayQueue会检测当前是否有Thread已经在等待队头元素了,如果有的话,那么只能阻塞当前前程,等已经取到队头
     * 的Thread完成以后再唤醒。
     * 如果没有Thread在等待队头元素的话,那么会查询一下队头元素还剩多少Delay时间,并且将当前线程设置为队头等待线程,然后让当前线程wait剩余
     * Delay时间后在来获取队头元素。
     */
    public class TestDelayQueue {
        public static void main(String[] args) {
            DelayQueue<Message> delayQueue = new DelayQueue<>();
            delayQueue.put(new Message(2000,"yangyu"));
    
            try {
                System.out.println(delayQueue.take());
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
    
            System.out.println("end");
        }
    
        private static class Message implements Delayed{
    
            private long nanoTime;
    
            private String data;
    
            Message(long millisTime ,String data){
                this.nanoTime = now()+millisTime*(1000*1000);
                this.data = data;
            }
    
            private final long now(){
                return System.nanoTime();
            }
    
            @Override
            public long getDelay(TimeUnit unit) {
                return unit.convert(nanoTime - now(), NANOSECONDS);
            }
    
            @Override
            public int compareTo(Delayed other) {
                if (other == this) // compare zero if same object
                    return 0;
                if (other instanceof Message) {
                    Message x = (Message) other;
                    long diff = nanoTime - x.nanoTime;
                    if (diff < 0)
                        return -1;
                    else if (diff > 0)
                        return 1;
                    else
                        return 1;
                }
                long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS);
                return (diff < 0) ? -1 : (diff > 0) ? 1 : 0;
            }
        }
    }

    5.SynchronousQueue

    import java.util.concurrent.SynchronousQueue;
    
    /**
     * Created by yangyu on 16/11/27.
     */
    
    /**
     * SynchronousQueue是一个不存储数据的队列,只是做数据的的传递工作
     * 同步队列,一个put操作必须等待一个take操作,否则线程被阻塞
     * 同样,一个take操作必须等待一个put操作,否则线程被阻塞
     * 
     * Executors中newCachedThreadPool()(可缓存线程池),就是使用的SynchronousQueue
     * 当一个任务被放入可缓存线程池以后,会调用SynchronousQueue的offer方法来判断是否有正在等待取任务的线程
     * offer方法:如果有线程正在等待取任务则将任务交给该线程并且返回true,如果没有线程等待取任务则返回false
     * 如果没有正在等待的线程,那么可缓存线程池会重新启动一个线程来执行这个任务
     * 当该线程执行完任务以后,会去SynchronousQueue队列中获取数据,等待60s,直到60s还未获取到任务则就自行关闭了
     */
    public class TestSynchronousQueue {
        public static void main(String[] args) {
            SynchronousQueue<String> strings = new SynchronousQueue<>();
            Thread t =new Thread(()->{
                try {
                    /**
                     * 该take操作会被阻塞,直到后面的strings.put("yangyu")操作后,当前线程才会被唤醒
                     */
                    System.out.println(strings.take());
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
            t.start();
    
            
            try {
                Thread.sleep(2000);
                /**
                 * 唤醒阻塞线程并且传递数据
                 */
                strings.put("yangyu");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
    
            System.out.println("完成");
    
        }
    }

    6.LinkedTransferQueue

    7.LinkedBlockingDeqeue:是一个链表结构组成的双向阻塞队列,可以从队列的两端插入或者移出元素。

  • 相关阅读:
    stm32串口通讯
    Java中日期处理
    Java中synchronized同步的理解
    由代理模式到AOP的实例分析
    基数排序(RadixSort)
    桶排序(BucketSort)
    计数排序
    快速排序
    6.5 k个已排好序链表合并为一个排序链表
    优先队列 (堆实现)
  • 原文地址:https://www.cnblogs.com/eoss/p/6106312.html
Copyright © 2011-2022 走看看