zoukankan      html  css  js  c++  java
  • 并发Queue

    在并发队列上JDK提供了两套实现,一个是以ConcurrentLinkedQueue为代表的高性能队列,一个是以BlockingQueue接口为代表的阻塞队列,无论是哪种,都继承自Queue

    ConcurrentLinkedQueue:是一个适用于高并发场景下的队列,通过无锁的方式,实现了高并发状态下的高性能,通常ConcurrentLinkedQueue性能好于BlockingQueue。它是一个基于链接节点的无界线程安全队列。该队列的元素遵循先进先出原则。头是最先加入的,尾是最近加入的,该队列不允许null元素。

    ConcurrentLinkedQueue重要方法:

      add()和offer()都是加入元素的重要的方法(ConcurrentLinkedQueue中,这两个方法没有任何区别)

      poll()和peek()都是取头元素节点,区别在于前者会删除元素,后者不会。


    代码:

     1 package com.java.day03_queue;
     2 
     3 import java.util.ArrayList;
     4 import java.util.List;
     5 import java.util.concurrent.ConcurrentLinkedQueue;
     6 import java.util.concurrent.LinkedBlockingQueue;
     7 
     8 public class UseQueue {
     9     public static void main(String[] args) {
    10         //高性能无阻塞队列
    11         ConcurrentLinkedQueue<String>q = new ConcurrentLinkedQueue<String>();
    12         q.add("add");
    13         q.offer("offer");
    14         
    15         System.out.println("q.poll():"+q.poll());
    16         System.out.println("q.size():"+q.size());
    17         System.out.println("q.peek():"+q.peek());
    18         System.out.println("q.size():"+q.size());
    19         
    20         
    21         //阻塞队列
    22         LinkedBlockingQueue<String> b = new LinkedBlockingQueue<String>();
    23         b.offer("offer");
    24         b.add("add1");
    25         b.add("add2");
    26         b.add("add3");
    27         b.add("add4");
    28         
    29         List <String>list = new ArrayList<String>();
    30         //drainTo:将b的元素取出1个放入list中
    31         System.out.println("b.drainTo(list,3):"+b.drainTo(list,1));
    32         System.out.println("b.size():"+b.size());
    33         
    34         System.out.println("*****遍历list*****");
    35         for (String string : list) {
    36             System.out.println(string);
    37         }
    38         
    39         
    40     }
    41 }

    运行结果:

    1 q.poll():add
    2 q.size():1
    3 q.peek():offer
    4 q.size():1
    5 b.drainTo(list,3):1
    6 b.size():4
    7 *****遍历list*****
    8 offer

    BlockingQueue接口

    ArrayBlockingQueue:基于数组的阻塞队列实现,在ArrayBlockingQueue内部,维护了一个定常的数组,以便缓存队列中的数据对象,其内部没有实现读写分离,也就意味着生产和消费不能完全并行,长度是需要定义的,可以指定先进先出或者先进后出,也叫有界队列,在很多场合非常适合使用。

    代码:

     1 package com.java.day03_queue;
     2 
     3 import java.util.ArrayList;
     4 import java.util.List;
     5 import java.util.concurrent.ArrayBlockingQueue;
     6 import java.util.concurrent.ConcurrentLinkedQueue;
     7 import java.util.concurrent.LinkedBlockingQueue;
     8 import java.util.concurrent.TimeUnit;
     9 
    10 public class UseQueue {
    11     public static void main(String[] args) throws Exception {
    12         System.out.println("********ConcurrentLinkedQueue********");
    13         // 高性能无阻塞队列
    14         ConcurrentLinkedQueue<String> q = new ConcurrentLinkedQueue<String>();
    15         q.add("add");
    16         q.offer("offer");
    17 
    18         System.out.println("q.poll():" + q.poll());
    19         System.out.println("q.size():" + q.size());
    20         System.out.println("q.peek():" + q.peek());
    21         System.out.println("q.size():" + q.size());
    22 
    23         System.out.println("******ArrayBlockingQueue********");
    24         
    25         // 阻塞队列
    26         ArrayBlockingQueue<String> a = new ArrayBlockingQueue<String>(5);
    27         a.put("put1");
    28         a.put("put2");
    29         a.add("add1");
    30         a.add("add2");
    31         a.add("add3");
    32         // offer方法是阻塞的
    33         System.out.println("a.offer():"+a.offer("offer", 2, TimeUnit.SECONDS));
    34         System.out.println("***遍历a****");
    35         for (String string : a) {
    36             System.out.println(string);
    37         }
    38         
    39         
    40         
    41         System.out.println("******LinkedBlockingQueue*************");
    42         
    43         LinkedBlockingQueue<String> b = new LinkedBlockingQueue<String>();
    44         b.offer("offer");
    45         b.add("add1");
    46         b.add("add2");
    47         b.add("add3");
    48         b.add("add4");
    49 
    50         List<String> list = new ArrayList<String>();
    51         // drainTo:将b的元素取出1个放入list中
    52         System.out.println("b.drainTo(list,3):" + b.drainTo(list, 1));
    53         System.out.println("b.size():" + b.size());
    54 
    55         System.out.println("*****遍历list*****");
    56         for (String string : list) {
    57             System.out.println(string);
    58         }
    59 
    60     }
    61 }

    运行结果:

     1 ********ConcurrentLinkedQueue********
     2 q.poll():add
     3 q.size():1
     4 q.peek():offer
     5 q.size():1
     6 ******ArrayBlockingQueue********
     7 a.offer():false
     8 ***遍历a****
     9 put1
    10 put2
    11 add1
    12 add2
    13 add3
    14 ******LinkedBlockingQueue*************
    15 b.drainTo(list,3):1
    16 b.size():4
    17 *****遍历list*****
    18 offer

    LinkedBlockingQueue:基于链表的阻塞队列,同ArrayBlockingQueue类似,其内部也维持着一个数据缓冲队列(该队列由一个链表构成),LinkedBlockingQueue之所以能够高效的处理并发数据,是因为其内部实现采用分离锁(读写分离两个锁),从而实现生产者和消费者操作的完全并行运行。它是一个无界队列。

    代码:

     1 package com.java.day03_queue;
     2 
     3 import java.util.ArrayList;
     4 import java.util.List;
     5 import java.util.concurrent.ArrayBlockingQueue;
     6 import java.util.concurrent.ConcurrentLinkedQueue;
     7 import java.util.concurrent.LinkedBlockingQueue;
     8 import java.util.concurrent.TimeUnit;
     9 
    10 public class UseQueue {
    11     public static void main(String[] args) throws Exception {
    12         System.out.println("********ConcurrentLinkedQueue********");
    13         // 高性能无阻塞队列
    14         ConcurrentLinkedQueue<String> q = new ConcurrentLinkedQueue<String>();
    15         q.add("add");
    16         q.offer("offer");
    17 
    18         System.out.println("q.poll():" + q.poll());
    19         System.out.println("q.size():" + q.size());
    20         System.out.println("q.peek():" + q.peek());
    21         System.out.println("q.size():" + q.size());
    22 
    23         System.out.println("******ArrayBlockingQueue********");
    24         
    25         // 阻塞队列
    26         ArrayBlockingQueue<String> a = new ArrayBlockingQueue<String>(5);
    27         a.put("put1");
    28         a.put("put2");
    29         a.add("add1");
    30         a.add("add2");
    31         a.add("add3");
    32         // offer方法是阻塞的
    33         System.out.println("a.offer():"+a.offer("offer", 2, TimeUnit.SECONDS));
    34         System.out.println("***遍历a****");
    35         for (String string : a) {
    36             System.out.println(string);
    37         }
    38         
    39         
    40         
    41         System.out.println("******LinkedBlockingQueue*************");
    42         
    43         //LinkedBlockingQueue<String> b = new LinkedBlockingQueue<String>();//无界
    44         LinkedBlockingQueue<String> b = new LinkedBlockingQueue<String>(5);//有界为5
    45         b.offer("offer");
    46         b.add("add1");
    47         b.add("add2");
    48         b.add("add3");
    49         b.add("add4");
    50         //b.add("add5"); 如果初始化 为5个,则第六个如果是以add加入,抛异常,如果是以offer加入,不抛异常
    51         b.offer("add5");
    52         
    53         System.out.println("b.size():"+b.size());
    54         List<String> list = new ArrayList<String>();
    55         // drainTo:将b的元素取出1个放入list中
    56         System.out.println("b.drainTo(list,3):" + b.drainTo(list, 1));
    57         System.out.println("b.size():" + b.size());
    58 
    59         System.out.println("*****遍历list*****");
    60         for (String string : list) {
    61             System.out.println(string);
    62         }
    63 
    64     }
    65 }

    运行结果:

     1 ********ConcurrentLinkedQueue********
     2 q.poll():add
     3 q.size():1
     4 q.peek():offer
     5 q.size():1
     6 ******ArrayBlockingQueue********
     7 a.offer():false
     8 ***遍历a****
     9 put1
    10 put2
    11 add1
    12 add2
    13 add3
    14 ******LinkedBlockingQueue*************
    15 b.size():5
    16 b.drainTo(list,3):1
    17 b.size():4
    18 *****遍历list*****
    19 offer

    SynchronizedQueue:一种没有缓冲的队列,生产者生产的数据直接会被消费者获取并消费

    不能直接添加元素,直接添加会报异常

    代码:

     1 package com.java.day03_queue;
     2 
     3 import java.util.ArrayList;
     4 import java.util.List;
     5 import java.util.concurrent.ArrayBlockingQueue;
     6 import java.util.concurrent.ConcurrentLinkedQueue;
     7 import java.util.concurrent.LinkedBlockingQueue;
     8 import java.util.concurrent.SynchronousQueue;
     9 import java.util.concurrent.TimeUnit;
    10 
    11 public class UseQueue {
    12     public static void main(String[] args) throws Exception {
    13         System.out.println("********ConcurrentLinkedQueue********");
    14         // 高性能无阻塞队列
    15         ConcurrentLinkedQueue<String> q = new ConcurrentLinkedQueue<String>();
    16         q.add("add");
    17         q.offer("offer");
    18 
    19         System.out.println("q.poll():" + q.poll());
    20         System.out.println("q.size():" + q.size());
    21         System.out.println("q.peek():" + q.peek());
    22         System.out.println("q.size():" + q.size());
    23 
    24         System.out.println("******ArrayBlockingQueue********");
    25         
    26         // 阻塞队列
    27         ArrayBlockingQueue<String> a = new ArrayBlockingQueue<String>(5);
    28         a.put("put1");
    29         a.put("put2");
    30         a.add("add1");
    31         a.add("add2");
    32         a.add("add3");
    33         // offer方法是阻塞的
    34         System.out.println("a.offer():"+a.offer("offer", 2, TimeUnit.SECONDS));
    35         System.out.println("***遍历a****");
    36         for (String string : a) {
    37             System.out.println(string);
    38         }
    39         
    40         
    41         
    42         System.out.println("******LinkedBlockingQueue*************");
    43         
    44         //LinkedBlockingQueue<String> b = new LinkedBlockingQueue<String>();//无界
    45         LinkedBlockingQueue<String> b = new LinkedBlockingQueue<String>(5);//有界为5
    46         b.offer("offer");
    47         b.add("add1");
    48         b.add("add2");
    49         b.add("add3");
    50         b.add("add4");
    51         //b.add("add5"); 如果初始化 为5个,则第六个如果是以add加入,抛异常,如果是以offer加入,不抛异常
    52         b.offer("add5");
    53         
    54         System.out.println("b.size():"+b.size());
    55         List<String> list = new ArrayList<String>();
    56         // drainTo:将b的元素取出1个放入list中
    57         System.out.println("b.drainTo(list,3):" + b.drainTo(list, 1));
    58         System.out.println("b.size():" + b.size());
    59 
    60         System.out.println("*****遍历list*****");
    61         for (String string : list) {
    62             System.out.println(string);
    63         }
    64 
    65         //SynchronizedQueue
    66         SynchronousQueue<String> s = new SynchronousQueue<String>();
    67         s.add("test");
    68         
    69         
    70     }
    71 }

    运行结果:

     1 ********ConcurrentLinkedQueue********
     2 q.poll():add
     3 q.size():1
     4 q.peek():offer
     5 q.size():1
     6 ******ArrayBlockingQueue********
     7 a.offer():false
     8 ***遍历a****
     9 put1
    10 put2
    11 add1
    12 add2
    13 add3
    14 ******LinkedBlockingQueue*************
    15 b.size():5
    16 b.drainTo(list,3):1
    17 b.size():4
    18 *****遍历list*****
    19 offer
    20 Exception in thread "main" java.lang.IllegalStateException: Queue full
    21     at java.util.AbstractQueue.add(Unknown Source)
    22     at com.java.day03_queue.UseQueue.main(UseQueue.java:67)

    SynchronousQueue的一个使用场景是在线程池里。Executors.newCachedThreadPool()就使用了SynchronousQueue,这个线程池根据需要(新任务到来时)创建新的线程,如果有空闲线程则会重复使用,线程空闲了60秒后会被回收。

     代码:

     1 package com.java.day03_queue;
     2 
     3 import java.util.ArrayList;
     4 import java.util.List;
     5 import java.util.concurrent.ArrayBlockingQueue;
     6 import java.util.concurrent.ConcurrentLinkedQueue;
     7 import java.util.concurrent.LinkedBlockingQueue;
     8 import java.util.concurrent.SynchronousQueue;
     9 import java.util.concurrent.TimeUnit;
    10 
    11 public class UseQueue {
    12     public static void main(String[] args) throws Exception {
    13         System.out.println("********ConcurrentLinkedQueue********");
    14         // 高性能无阻塞队列
    15         ConcurrentLinkedQueue<String> q = new ConcurrentLinkedQueue<String>();
    16         q.add("add");
    17         q.offer("offer");
    18 
    19         System.out.println("q.poll():" + q.poll());
    20         System.out.println("q.size():" + q.size());
    21         System.out.println("q.peek():" + q.peek());
    22         System.out.println("q.size():" + q.size());
    23 
    24         System.out.println("******ArrayBlockingQueue********");
    25         
    26         // 阻塞队列
    27         ArrayBlockingQueue<String> a = new ArrayBlockingQueue<String>(5);
    28         a.put("put1");
    29         a.put("put2");
    30         a.add("add1");
    31         a.add("add2");
    32         a.add("add3");
    33         // offer方法是阻塞的
    34         System.out.println("a.offer():"+a.offer("offer", 2, TimeUnit.SECONDS));
    35         System.out.println("***遍历a****");
    36         for (String string : a) {
    37             System.out.println(string);
    38         }
    39         
    40         
    41         
    42         System.out.println("******LinkedBlockingQueue*************");
    43         
    44         //LinkedBlockingQueue<String> b = new LinkedBlockingQueue<String>();//无界
    45         LinkedBlockingQueue<String> b = new LinkedBlockingQueue<String>(5);//有界为5
    46         b.offer("offer");
    47         b.add("add1");
    48         b.add("add2");
    49         b.add("add3");
    50         b.add("add4");
    51         //b.add("add5"); 如果初始化 为5个,则第六个如果是以add加入,抛异常,如果是以offer加入,不抛异常
    52         b.offer("add5");
    53         
    54         System.out.println("b.size():"+b.size());
    55         List<String> list = new ArrayList<String>();
    56         // drainTo:将b的元素取出1个放入list中
    57         System.out.println("b.drainTo(list,3):" + b.drainTo(list, 1));
    58         System.out.println("b.size():" + b.size());
    59 
    60         System.out.println("*****遍历list*****");
    61         for (String string : list) {
    62             System.out.println(string);
    63         }
    64 
    65         //SynchronizedQueue
    66         System.out.println("**********SynchronizedQueue***************");
    67         final SynchronousQueue<String> s = new SynchronousQueue<String>();
    68         Thread t1 = new Thread(new Runnable() {
    69             public void run() {
    70                 try {
    71                     System.out.println(s.take());
    72                 } catch (InterruptedException e) {
    73                     e.printStackTrace();
    74                 }
    75             }
    76         });
    77         
    78         t1.start();
    79         
    80         Thread t2 = new Thread(new Runnable() {
    81             public void run() {
    82         
    83                 s.add("nanami");
    84             }
    85         });
    86         
    87         t2.start();
    88     }
    89 }

    运行结果:

     1 ********ConcurrentLinkedQueue********
     2 q.poll():add
     3 q.size():1
     4 q.peek():offer
     5 q.size():1
     6 ******ArrayBlockingQueue********
     7 a.offer():false
     8 ***遍历a****
     9 put1
    10 put2
    11 add1
    12 add2
    13 add3
    14 ******LinkedBlockingQueue*************
    15 b.size():5
    16 b.drainTo(list,3):1
    17 b.size():4
    18 *****遍历list*****
    19 offer
    20 **********SynchronizedQueue***************
    21 nanami

    PriorityBlockingQueue:基于优先级的阻塞队列(优先级的判断通过构造函数传入的Compator对象来决定,也就是说传入队列的对象必须实现Comparable接口),在实现PriorityBlockingQueue时,内部控制线程同步的锁采用的是公平锁,他也是一个无界的队列。

    (优先级的排列:并不是在加入的时候排序,而是在第一次take的时候按照优先级进行选择排序)

    代码:

     1 package com.java.day03_queue;
     2 
     3 import java.util.Iterator;
     4 import java.util.concurrent.PriorityBlockingQueue;
     5 
     6 //比较器有问题
     7 public class UsePriorityBlockingQueue {
     8     public static void main(String[] args) throws Exception {
     9         PriorityBlockingQueue<Task> p = new PriorityBlockingQueue<>();
    10         
    11         Task t1= new Task();
    12         t1.setId(3);
    13         t1.setName("nanami-----3");
    14         
    15         Task t2= new Task();
    16         t2.setId(8);
    17         t2.setName("tomoe---8");
    18         
    19         Task t3= new Task();
    20         t3.setId(2);
    21         t3.setName("t3---2");
    22         
    23         Task t4= new Task();
    24         t4.setId(6);
    25         t4.setName("t4---6");
    26         
    27         Task t5= new Task();
    28         t5.setId(7);
    29         t5.setName("t5---7");
    30         
    31         
    32         p.add(t1);
    33         p.add(t2);
    34         p.add(t3);
    35         p.add(t4);
    36         p.add(t5);
    37         
    38         //排序:在第一次take之后,按照优先级进行排序
    39         System.out.println(p.take());
    40         for (Iterator iterator = p.iterator(); iterator.hasNext();) {
    41             Task task = (Task) iterator.next();
    42             System.out.println(task);
    43         }
    44         
    45         
    46     }
    47 }
     1 package com.java.day03_queue;
     2 
     3 public class Task implements Comparable<Task>{
     4 
     5     private int id;
     6     private String name;
     7     public int getId() {
     8         return id;
     9     }
    10     public void setId(int id) {
    11         this.id = id;
    12     }
    13     public String getName() {
    14         return name;
    15     }
    16     public void setName(String name) {
    17         this.name = name;
    18     }
    19     @Override
    20     public int compareTo(Task task) {
    21         return this.getId()>task.getId()?1: (this.getId()<task.getId()?-1:0);
    22     }
    23     @Override
    24     public String toString() {
    25         return "Task [id=" + id + ", name=" + name + "]";
    26     }
    27     
    28     
    29     
    30 }

    运行结果:

    1 Task [id=2, name=t3---2]
    2 Task [id=3, name=nanami-----3]
    3 Task [id=6, name=t4---6]
    4 Task [id=7, name=t5---7]
    5 Task [id=8, name=tomoe---8]

    DelayQueue:带有延迟时间的队列,其中的元素只有当其指定的延迟时间到了,才能够从队列中获取到该元素。DelayQueue中的元素必须实现Delayed接口,DelayQueue是一个没有大小限制的队列,应用场景有很多,比如对缓存超时的数据进行移除、任务超时处理、空闲连接的关闭等。(延迟时间一到,从队列中自动出来,如:网吧上网)

    代码:

     1 package com.java.day03_queue;
     2 
     3 import java.util.concurrent.DelayQueue;
     4 
     5 public class WangBa implements Runnable{
     6 
     7     private DelayQueue<WangMin>queue = new DelayQueue<WangMin>();
     8     private boolean yinye = true;
     9     
    10     
    11     //上机
    12     public void shangji(String name,int id,int money){
    13         WangMin man = new WangMin(name,id,1000*money+System.currentTimeMillis());
    14         System.out.println("网名:"+name +"   身份证:"+id+"交钱:"+money+"元,开始上机。。。。");
    15         queue.add(man);
    16     }
    17     
    18     //下机
    19     public void xiaji(WangMin man){
    20         System.out.println("网民:"+man.getName()+"身份证:"+man.getId()+"   时间到,下机。。。。");
    21     }
    22     
    23     
    24     @Override
    25     public void run() {
    26         while(yinye){
    27             try {
    28                 WangMin man  = queue.take();
    29                 xiaji(man);
    30             } catch (InterruptedException e) {
    31                 e.printStackTrace();
    32             }
    33         }
    34     }
    35     
    36     public static void main(String[] args) {
    37         System.out.println("网吧开始营业");
    38         WangBa siyu = new WangBa();
    39         //创建了一个线程
    40         Thread shangwang  = new Thread(siyu);
    41         shangwang.start();
    42         
    43         
    44         siyu.shangji("nanami", 1, 3);
    45         siyu.shangji("tomoe", 2, 5);
    46     }
    47     
    48     
    49 
    50 }
     1 package com.java.day03_queue;
     2 
     3 import java.util.concurrent.Delayed;
     4 import java.util.concurrent.TimeUnit;
     5 
     6 public class WangMin implements Delayed{
     7 
     8     private int id;
     9     private String name;
    10     private long endTime;
    11     private TimeUnit timeUtil = TimeUnit.SECONDS ;
    12     
    13     public WangMin(String name ,int id,long endTime){
    14         this.name=name;
    15         this.id=id;
    16         this.endTime=endTime;
    17     }
    18     
    19     public int getId() {
    20         return id;
    21     }
    22 
    23     public void setId(int id) {
    24         this.id = id;
    25     }
    26 
    27     public String getName() {
    28         return name;
    29     }
    30 
    31     public void setName(String name) {
    32         this.name = name;
    33     }
    34 
    35     
    36     
    37     public long getEndTime() {
    38         return endTime;
    39     }
    40 
    41     public void setEndTime(long endTime) {
    42         this.endTime = endTime;
    43     }
    44 
    45     @Override
    46     public int compareTo(Delayed delayed) {
    47         WangMin w = (WangMin)delayed;
    48         return  (this.getDelay(this.timeUtil)-this.getDelay(this.timeUtil))>0?1:0;
    49     }
    50 
    51     @Override
    52     public long getDelay(TimeUnit unit) {
    53         return endTime-System.currentTimeMillis();
    54     }
    55 
    56 }

    运行结果:因为上面为while死循环,所以,程序并没有停止

    1 网吧开始营业
    2 网名:nanami   身份证:1交钱:3元,开始上机。。。。
    3 网名:tomoe   身份证:2交钱:5元,开始上机。。。。
    4 网民:nanami身份证:1   时间到,下机。。。。
    5 网民:tomoe身份证:2   时间到,下机。。。。
  • 相关阅读:
    ABAP 程序中的类 沧海
    ABAP类的方法(转载) 沧海
    More than 100 ABAP Interview Faq's(2) 沧海
    SAP and ABAP Memory总结 沧海
    ABAP Frequently Asked Question 沧海
    ABAP System Reports(Additional functions) 沧海
    ABAP Questions Commonly Asked 1 沧海
    ABAP Tips and Tricks 沧海
    ABAP System Fields 沧海
    ABAP 面试问题及答案(一):数据库更新及更改 SAP Standard (转) 沧海
  • 原文地址:https://www.cnblogs.com/syousetu/p/6732709.html
Copyright © 2011-2022 走看看