zoukankan      html  css  js  c++  java
  • 高并发编程之并发容器

      之前简单学习了jvm提供的synchronized和JDK提供的ReentrantLock,本文主要学习并发容器。

       首先来看一个简单的业务场景:有1000张火车票,每张火车票都有一个编号,同时有10个窗口对外售票。

      首先来看下面这种写法是否可以实现:

     1 /**
     2  * 有1000张火车票,每张火车票都有一个编号,同时有10个窗口对外售票。
     3  * @author Wuyouxin
     4  *
     5  */
     6 public class TicketSeller1 {
     7 
     8     static List<String> tickets = new ArrayList<String>();
     9     
    10     static {
    11         for (int i = 0; i < 1000; i++) {
    12             tickets.add("座位:" + i);
    13         }
    14     }
    15     
    16     public static void main(String[] args) {
    17         //10个窗口
    18         for (int i = 0; i < 10; i++) {
    19             new Thread(new Runnable() {
    20                 
    21                 @Override
    22                 public void run() {
    23                     while (tickets.size() > 0){
    24                         System.out.println(tickets.remove(0));
    25                     }
    26                     
    27                 }
    28             }, "窗口" + i).start();
    29         }
    30     }
    31 }

      上面方式可以实现上面的需求么?其实,不可以,因为上面代码时一个无锁状态,当一个线程判断tickets.size()> 0 之后被其他线程获取抢先进入将tickets中的元素删除,则第一个线程则会出现异常,有可能会出现重复删除,也有可能会出现无元素可以删除。

      那么我们来看下面的代码是否可以删除。

     1 /**
     2  * 有1000张火车票,每张火车票都有一个编号,同时有10个窗口对外售票。
     3  * @author Wuyouxin
     4  *
     5  */
     6 public class TicketSeller2 {
     7 
     8     static Vector<String> tickets = new Vector<String>();
     9     
    10     static {
    11         for (int i = 0; i < 1000; i++) {
    12             tickets.add("座位:" + i);
    13         }
    14     }
    15     
    16     public static void main(String[] args) {
    17         //10个窗口
    18         for (int i = 0; i < 10; i++) {
    19             new Thread(new Runnable() {
    20                 
    21                 @Override
    22                 public void run() {
    23                     while (tickets.size() > 0){
    24                         System.out.println(tickets.remove(0));
    25                     }
    26                     
    27                 }
    28             }, "窗口" + i).start();
    29         }
    30     }
    31 }

      我们对代码进行改造,将第一个代码中的list换成了Vector,大家都知道Vector集合呢是一个线程安全的集合,它的所有操作都是原子性的,那么这样做,是否可以实现上述需求呢?其实不然,虽然我们使用了Vector集合,它的方法都是原子性的,但是在tickets.size()方法和tickets.remove(0)方法中间是没有原子性的,其实第一个代码的问题还是会出现。

      继续对代码进行改进:

     1 /**
     2  * 有1000张火车票,每张火车票都有一个编号,同时有10个窗口对外售票。
     3  * @author Wuyouxin
     4  *
     5  */
     6 public class TicketSeller3 {
     7 
     8     static List<String> tickets = new ArrayList<String>();
     9     
    10     static {
    11         for (int i = 0; i < 1000; i++) {
    12             tickets.add("座位:" + i);
    13         }
    14     }
    15     
    16     public static void main(String[] args) {
    17         //10个窗口
    18         for (int i = 0; i < 10; i++) {
    19             new Thread(new Runnable() {
    20                 
    21                 @Override
    22                 public void run() {
    23                     while (true){
    24                         //将存放票的集合上锁
    25                         synchronized(tickets){
    26                             if (tickets.size() <= 0){
    27                                 break;
    28                             }
    29                             System.out.println(tickets.remove(0));
    30                         }
    31                     }
    32                     
    33                 }
    34             }, "窗口" + i).start();
    35         }
    36     }
    37 }

      这次,我们在进行判断时将tickets上锁,让其他线程无法获取,这样确实可以实现上面的需求,但是这样效率不高。

      继续将代码进行优化:

     1 /**
     2  * 有1000张火车票,每张火车票都有一个编号,同时有10个窗口对外售票。
     3  * @author Wuyouxin
     4  *
     5  */
     6 public class TicketSeller4 {
     7 
     8     static Queue<String> tickets = new ConcurrentLinkedQueue<String>();
     9     
    10     static {
    11         for (int i = 0; i < 1000; i++) {
    12             tickets.add("座位:" + i);
    13         }
    14     }
    15     
    16     public static void main(String[] args) {
    17         //10个窗口
    18         for (int i = 0; i < 10; i++) {
    19             new Thread(new Runnable() {
    20                 
    21                 @Override
    22                 public void run() {
    23                     while (true){
    24                         String s = tickets.poll();
    25                         if (s == null) break;
    26                         System.out.println(s);
    27                     }
    28                     
    29                 }
    30             }, "窗口" + i).start();
    31         }
    32     }
    33 }

      在这里我也没有使用锁,但是我使用了Queue队列,每次将队首的元素拿出来再去判断,这样就不会出现多拿或者拿多的情况了。

      下面开始本文的核心,并发容器。

    一、Map

      在日常的开发过程中,Map集合使用率算是比较高的容器了,像hashMap(hash结构),treeMap(tree结构),linkedHashMap(hash结构,加双向链表)等等,但是上述两种容器都不是并发容器,其中的方法都不能保证原子性,而在高并发编程中,我们使用更多的是hashTable(hash结构),Collection.synchronizedMap(将非线程安全的容器变为线程安全的容器),concurrentHashMap(hash结构),concurrentSkiplistMap(跳表结构)。

      当然在选用时也要根据业务场景去选择:

        非并发业务不需要排序:hashMap

        非并发业务需要排序:treeMap,linkedHashMap

        并发业务量少不需要排序:hashTable,Collection.synchronizedMap

        并发业务量大不需要排序:concurrentHashMap

        并发业务量大需要排序:concurrentSkiplistMap(https://blog.csdn.net/sunxianghuang/article/details/52221913

      我也简单测试了一下性能,发现concurrentHashMap的性能确实要比hashTable性能要高这时因为在hashTable在操作时会将整个容器上锁,而concurrentHashMap操作时只会将hash表中下标位置上锁。

     1 /**
     2  * 效率测试
     3  * @author Wuyouxin
     4  *
     5  */
     6 public class ConcurrentMaP {
     7     public static void main(String[] args) {
     8         //final Map<String, String> map = new Hashtable<String, String>();
     9         //final Map<String, String> map = new ConcurrentHashMap<String, String>();
    10         //final Map<String, String> map = new ConcurrentSkipListMap<String, String>();
    11         final Map<String, String> map = new HashMap<String, String>();
    12         final Random r = new Random();
    13         Thread [] t = new Thread[100];
    14         final CountDownLatch c = new CountDownLatch(t.length);
    15         long start = System.currentTimeMillis();
    16         for (int i = 0; i < t.length; i++) {
    17             new Thread(new Runnable() {
    18                 
    19                 @Override
    20                 public void run() {
    21                     for (int j = 0; j < 10000; j++) {
    22                         map.put(Thread.currentThread().getName() + j, "a" + r.nextInt(10000));
    23                     }
    24                     c.countDown();
    25                 }
    26             }, "线程" + i).start();
    27         }
    28         try {
    29             c.await();
    30         } catch (InterruptedException e) {
    31             e.printStackTrace();
    32         }
    33         long end = System.currentTimeMillis();
    34         System.out.println(end - start);
    35     }
    36 }

    二、List

      在开发过程中,list也时比较常用的容器了,我们使用比较多的时arrayList,linkedList但是它们都是线程不安全的,在高并发编程中我们使用更多的是Vector,Collection.synchronizedList,和CopyOnWritList(写时复制容器,插入时复制新的List装新的集合,所以不会有线程问题),但是也是要根据业务场景去使用。

      无并发问题,读多:arrayList

      无并发问题,写多:linkedList

      有并发问题,读多写少:CopyOnWritList

      有并发问题,写多:Vector,Collection.synchronizedList

      我也简单测试了一下性能:

     1 public class Top2_ConcurrentList {
     2     public static void main(String[] args) {
     3         //final Vector<String> list = new Vector<String>();
     4         final List<String> list = new CopyOnWriteArrayList<String>();
     5         Thread [] t = new Thread[100];
     6         final CountDownLatch c = new CountDownLatch(t.length);
     7         long start = System.currentTimeMillis();
     8         for (int i = 0; i < t.length; i++) {
     9             new Thread(new Runnable() {
    10                 
    11                 @Override
    12                 public void run() {
    13                     for (int j = 0; j < 10000; j++) {
    14                         list.add(Thread.currentThread().getName() + j);
    15                     }
    16                     c.countDown();
    17                 }
    18             }, "线程" + i).start();
    19         }
    20         try {
    21             c.await();
    22         } catch (InterruptedException e) {
    23             e.printStackTrace();
    24         }
    25         long end = System.currentTimeMillis();
    26         System.out.println(end - start);
    27     }
    28 
    29 }

    三、ConcurrentLinkedQueue队列(内部加锁)

      队列,在并发编程中使用的频率时非常高的,ConcurrentLinkedQueue为单向链表结构,这个队列也是一个无界队列。下面,简单介绍一下ConcurrentLinkedQueue的一些方法。

     1 public class Top3_ConcurrentQueue {
     2     public static void main(String[] args) {
     3         Queue<String> queue = new ConcurrentLinkedQueue<String>();
     4         for (int i = 0; i < 10; i++) {
     5             //等价与add,但是offer有返回值,表示是否新增成功
     6             queue.offer(""+ i );
     7         }
     8         System.out.println(queue);
     9         System.out.println(queue.size());
    10         //删除头
    11         System.out.println(queue.poll());
    12         System.out.println(queue.size());
    13         //拿出头,但是不删除
    14         System.out.println(queue.peek());
    15         System.out.println(queue.size());
    16     }
    17 }

      当然,还有一种队列,叫做ConcurrentLinkedDeque,这个队列为双向队列,其结构为双向链表。其中方法跟上面单项链表大致相同,只不过有从头加数据或者从尾加数据,删除也一样。

    四、BlockingQueue队列(阻塞式队列)

      BlockingQueue为阻塞式队列,当队列中没值时,消费者自动等待,当队列满时,生产者自动等待。

      下面看一个生产者消费者模式的简单的例子:

     1 public class Top4_LinkedBlockingQueue {
     2 
     3     static BlockingQueue<String> queue = new LinkedBlockingQueue<String>();
     4     
     5     public static void main(String[] args) {
     6         new Thread(new Runnable() {
     7             
     8             @Override
     9             public void run() {
    10                 for (int i = 0; i < 100; i++) {
    11                     try {
    12                         //向队列中加元素,如果队列满了则自动等待
    13                         queue.put("a" + i);
    14                     } catch (InterruptedException e) {
    15                         e.printStackTrace();
    16                     }
    17                 }
    18             }
    19         }, "p1").start();
    20         
    21         for (int i = 0; i < 5; i++) {
    22             new Thread(new Runnable() {
    23                 
    24                 @Override
    25                 public void run() {
    26                     while(true){
    27                         try {
    28                             //如果队列空了,则自动等待
    29                             System.out.println(Thread.currentThread().getName() + 
    30                                     "take-" + queue.take());
    31                         } catch (InterruptedException e) {
    32                             e.printStackTrace();
    33                         }
    34                     }
    35                 }
    36             }, "c" + i).start();
    37         }
    38     }
    39 }

      上面的LinkedBlockingQueue队列为无界队列,还有一种叫做ArrayBlockingQueue为有界队列,如果队列满了,这时使用add会抛出异常,如果队列满了,使用的时offer方法的话会有一个boolean返回值,这时会返回false,offer还有另一种用法,如果1秒内无法加入的话就不添加,还有一个put方法,当队列满了之后,会阻塞线程。

     1 public class Top5_ArrayBlockingQueue {
     2 
     3     static BlockingQueue<String> queue = new ArrayBlockingQueue<String>(10);
     4     public static void main(String[] args) {
     5         for (int i = 0; i < 10; i++) {
     6             queue.offer("a" + i);
     7         }
     8         
     9         //如果队列满了,这时使用add会抛出异常
    10         //queue.add("aaa");
    11         
    12         //如果队列满了,使用的时offer方法的话会有一个boolean返回值,这时会返回false
    13         queue.offer("aaa");
    14         
    15         try {
    16             //put方法当队列满了之后会阻塞线程
    17             queue.put("aaa");
    18         } catch (InterruptedException e1) {
    19             // TODO Auto-generated catch block
    20             e1.printStackTrace();
    21         }
    22         try {
    23             //这个是offer的另一种用法,如果1秒内无法加入的话就不添加
    24             queue.offer("aaa", 1, TimeUnit.SECONDS);
    25         } catch (InterruptedException e) {
    26             // TODO Auto-generated catch block
    27             e.printStackTrace();
    28         }
    29     }
    30 }

    五、DelayQueue队列

      这个队列比较特殊,加入它的元素都必须实现Delayed接口,而且在往里面put数据时需要加入时间,表示多久后可以被拿出,而且在其中默认会自动按照时间排好顺序,这个队列可以用于定时任务执行。

     1 public class Top6_DelayQueue {
     2 
     3     static BlockingQueue<MyTask> tasks = new DelayQueue<MyTask>();
     4     
     5     static class MyTask implements Delayed {
     6         
     7         long runningTime;
     8         
     9         MyTask(long rt) {
    10             this.runningTime = rt;
    11         }
    12 
    13         @Override
    14         public int compareTo(Delayed o) {
    15             if (this.getDelay(TimeUnit.MILLISECONDS) < o.getDelay(TimeUnit.MILLISECONDS)) {
    16                 return -1;
    17             } else if (this.getDelay(TimeUnit.MILLISECONDS) > o.getDelay(TimeUnit.MILLISECONDS)) {
    18                 return 1;
    19             } else {
    20                 return 0;
    21             }
    22         }
    23 
    24         //加入元素时到现在过了多久
    25         @Override
    26         public long getDelay(TimeUnit unit) {
    27             return unit.convert(runningTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
    28         }
    29         
    30     }
    31     
    32     public static void main(String[] args) {
    33         long now = System.currentTimeMillis();
    34         MyTask t1 = new MyTask(now + 1000);
    35         MyTask t2 = new MyTask(now + 2000);
    36         MyTask t3 = new MyTask(now + 1500);
    37         MyTask t4 = new MyTask(now + 2500);
    38         MyTask t5 = new MyTask(now + 500);
    39         
    40         try {
    41             tasks.put(t1);
    42             tasks.put(t2);
    43             tasks.put(t3);
    44             tasks.put(t4);
    45             tasks.put(t5);
    46         } catch (InterruptedException e) {
    47             e.printStackTrace();
    48         }
    49         
    50         System.out.println(tasks);
    51         for (int i = 0; i < 5; i++) {
    52             try {
    53                 System.out.println(tasks.take());
    54             } catch (InterruptedException e) {
    55                 e.printStackTrace();
    56             }
    57         }
    58     }
    59     
    60 }

    六、TransferQueue队列

      TransferQueue队列也比较特殊,他有自己的transfer方法往队列中加数据,如果发现有消费者处于空闲状态,则直接给消费者,如果消费者都处于忙碌状态,则加入队列。但是如果此时没有消费者,则会阻塞线程。

     1 public class Top7_TransferQueue {
     2 
     3     public static void main(String[] args) {
     4         LinkedTransferQueue<String> queue = new LinkedTransferQueue<String>();
     5         
     6         new Thread(new Runnable() {
     7             
     8             @Override
     9             public void run() {
    10                 try {
    11                     System.out.println(queue.take());
    12                 } catch (Exception e) {
    13                     e.printStackTrace();
    14                 }
    15                 
    16             }
    17         }).start();
    18         //首先查看是否有空闲的消费者,如果有则直接交给消费者
    19         queue.transfer("aaa");
    20     }
    21 }

    六、SynchronizedQueue队列

      SynchronizedQueue队列时一种特殊的TransferQueue队列,他的队列容量为0,直接等待消费者消费。

      

      本文简单介绍了并发编程中用到的一些容器,但是具体使用要根据业务情况进行选择使用,不可盲目使用。

  • 相关阅读:
    find命令 -- 之查找指定时间内修改过的文件
    nginx
    lighttpd 搭建
    mysql主从复制5.6基于GID及多线程的复制笔记
    centos下MySQL主从同步配置
    数据库集群搭建
    linux 系统监控、诊断工具之 top 详解
    Linux下Apache并发连接数和带宽控制
    DXGI屏幕捕捉
    CUDA以及CUDNN安装配置(WIN10为例)
  • 原文地址:https://www.cnblogs.com/wuyx/p/8849667.html
Copyright © 2011-2022 走看看