zoukankan      html  css  js  c++  java
  • 并发编程(四)------并发quene

    在并发队列上JDK提供了两套实现,一个是以ConcurrentLinkedQueue为代表的高性能队列,一个是以BlockingQueue接口为代表的阻塞队列,无论哪种都继承自Queue接口!

    ConcurrentLinkedQueue:

      是一个适用于高并发场景下的队列,通过无锁的方式,实现了高并发状态下的高性能,通常ConcurrentLinkedQueue性能好于BlockingQueue。它是一个基于链接节点的无界线程安全队列。

      该队列的元素遵循先进先出的原则,头是最先加入的,尾是最近加入的,该队列不允许null元素。

    ConcurrentLinkedQueue重要方法:
    add() 和 offer() 都是加入元素的方法 (在ConcurrentLinkedQueue中,这俩个方法没有任何区别)
    poll() 和 peek() 都是取头元素节点,区别在于前者会删除元素,后者不会。

    BlockingQueue:

    BlockingQueue接口的重要方法

    offer(anObject): 表示如果可能的话, 将anObject加到BlockingQueue里,即如果BlockingQueue可以容纳, 则返回true, 否则返回false.(本方法不阻塞当前执行方法的线程)

    offer(E o, long timeout, TimeUnit unit), 可以设定等待的时间,如果在指定的时间内,还不能往队列中加入BlockingQueue,则返回失败。

    put(anObject): 把anObject加到BlockingQueue里, 如果BlockQueue没有空间, 则调用此方法的线程被阻断直到BlockingQueue里面有空间再继续.

    poll(long timeout, TimeUnit unit):从BlockingQueue取出一个队首的对象,如果在指定时间内,队列一旦有数据可取,则立即返回队列中的数据。否则知道时间超时还没有数据可取,返回失败。

    take(): 取走BlockingQueue里排在首位的对象,若BlockingQueue为空,阻断进入等待状态直到BlockingQueue有新的数据被加入;

    drainTo(): 一次性从BlockingQueue获取所有可用的数据对象(还可以指定获取数据的个数),通过该方法,可以提升获取数据效率;不需要多次分批加锁或释放锁。

    MyQueueTest

     1 package com.bfxy.thread.cord.collection;
     2 
     3 public class MyQueueTest {
     4     
     5     public static void main(String[] args) throws InterruptedException {
     6         MyQueue mq = new MyQueue(5);
     7         mq.put("a");
     8         mq.put("b");
     9         mq.put("c");
    10         mq.put("d");
    11         mq.put("e");
    12         
    13         System.err.println("当前元素的个数"+mq.size());
    14         
    15         Thread t1 = new Thread(new Runnable() {
    16             
    17             @Override
    18             public void run() {
    19                 mq.put("f");
    20                 mq.put("g");
    21                 
    22             }
    23         },"t1");
    24         
    25         Thread.sleep(1000);
    26         
    27         Thread t2 = new Thread(new Runnable() {
    28             
    29             @Override
    30             public void run() {
    31                 try {
    32                     Thread.sleep(1000);
    33                     Object o1=mq.take();
    34                     Thread.sleep(1000);
    35                     Object o2=mq.take();
    36                 } catch (InterruptedException e) {
    37                     // TODO Auto-generated catch block
    38                     e.printStackTrace();
    39                 }
    40                     
    41             }
    42         },"t2");
    43         
    44         t1.start();
    45         Thread.sleep(1000);
    46         t2.start();
    47         Thread.sleep(5000);
    48         
    49         System.err.println(mq.getQueueList().toString());
    50     }
    51     
    52 
    53 }

    MyQueue

     1 package com.bfxy.thread.cord.collection;
     2 
     3 import java.util.LinkedList;
     4 import java.util.List;
     5 import java.util.concurrent.atomic.AtomicInteger;
     6 
     7 public class MyQueue {
     8     //1 就是我们整个队列的容器
     9     private final LinkedList<Object> list =new LinkedList<>();
    10     
    11     //2计数器
    12     private final AtomicInteger count = new AtomicInteger();
    13     
    14     private final int maxSize; //最大容量限制
    15     
    16     
    17     private final int minSize = 0;
    18     
    19     private final Object lock = new Object(); //
    20     
    21     public  MyQueue(int maxSize) {
    22         this.maxSize = maxSize;        
    23     }
    24     
    25     public void put(Object obj) {
    26         synchronized (lock) {
    27             while (count.get() ==this.maxSize) {
    28                 try {
    29                     lock.wait();
    30                 } catch (InterruptedException e) {
    31                     // TODO Auto-generated catch block
    32                     e.printStackTrace();
    33                 }    
    34             }
    35             //添加新的元素进到容器里
    36             list.add(obj);
    37             count.getAndIncrement();//i++
    38             System.err.println("元素"+obj+"已经添加容器中");
    39             //进行唤醒可能正在等待的take方法操作中的线程
    40             lock.notify();
    41             
    42         }
    43         
    44     }
    45     
    46     public Object take() {
    47         Object temp= null;
    48         synchronized (lock) {
    49             while (count.get() ==this.minSize) {
    50                 try {
    51                     lock.wait();
    52                 } catch (InterruptedException e) {
    53                     // TODO Auto-generated catch block
    54                     e.printStackTrace();
    55                 }    
    56             }
    57             temp=list.removeFirst();
    58             count.getAndDecrement();//i--
    59             System.err.println("元素"+temp+"已经从容器中取走");
    60             //进行唤醒可能正在等待的put方法操作中的线程
    61             lock.notify();
    62             
    63         }
    64         
    65         return temp;
    66     }
    67     
    68     public int size() {
    69         return count.get();
    70     }
    71     
    72     public List<Object> getQueueList() {
    73         return list;
    74     }
    75     
    76 
    77 }

      

     

     

  • 相关阅读:
    Java 8 —— Lambda表达式
    Calendar 学习
    No provider available for the service com.xxx.xxx 错误解决
    ContextLoaderListener错误
    pom文件中引入依赖成功了,但是jar包找不着
    https=http+ssl
    Spring boot学习笔记
    Spring cloud 学习笔记
    记录一些注解的含义
    Linux笔记
  • 原文地址:https://www.cnblogs.com/yangyang521/p/10178542.html
Copyright © 2011-2022 走看看