zoukankan      html  css  js  c++  java
  • java高并发编程(四)高并发的一些容器

    摘抄自马士兵java并发视频课程;

    一、需求背景:          

    有N张火车票,每张票都有一个编号,同时有10个窗口对外售票, 请写一个模拟程序。

    分析下面的程序可能会产生哪些问题?重复销售?超量销售?

    /**
     * 有N张火车票,每张票都有一个编号
     * 同时有10个窗口对外售票
     * 请写一个模拟程序
     * 
     * 分析下面的程序可能会产生哪些问题?
     * 重复销售?超量销售?
     * 
     * @author 马士兵
     */
    package yxxy.c_024;
    
    import java.util.ArrayList;
    import java.util.List;
    
    public class TicketSeller1 {
        static List<String> tickets = new ArrayList<>();
        
        static {
            for(int i=0; i<10000; i++) tickets.add("票编号:" + i);
        }
        
        public static void main(String[] args) {
            for(int i=0; i<10; i++) {
                new Thread(()->{
                    while(tickets.size() > 0) {
                        System.out.println("销售了--" + tickets.remove(0));
                    }
                }).start();
            }
        }
    }
    View Code

     可能卖重;一张票可能对多个线程同时remove(0),所以可能一张票被卖出去多次;也可能最后一张票的时候都被多个线程remove(),程序会报错,总之,不加锁是不行的。

    ArrayList不是同步的,remove、add等各种方法全都不是同步的;一定会出问题;

    二、使用Vector          

    /**
     * 使用Vector或者Collections.synchronizedXXX
     * 分析一下,这样能解决问题吗?
     * 
     * @author 马士兵
     */
    package yxxy.c_024;
    
    import java.util.Vector;
    import java.util.concurrent.TimeUnit;
    
    public class TicketSeller2 {
        static Vector<String> tickets = new Vector<>();
        
        static {
            for(int i=0; i<1000; i++) tickets.add("票 编号:" + i);
        }
        
        public static void main(String[] args) {
            
            for(int i=0; i<10; i++) {
                new Thread(()->{
                    while(tickets.size() > 0) {
                        
                        try {
                            TimeUnit.MILLISECONDS.sleep(10);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                        
                        System.out.println("销售了--" + tickets.remove(0));
                    }
                }).start();
            }
        }
    }
    View Code
    Vector是一个同步容器,所有的方法都是加锁的;
    虽然说在Vector里面remove方法是原子的,但是while条件中判断和remove是分离的;如果在while条件和remove之间被打断的话,问题依旧;(假设剩下最后一张票,多个线程争抢同一张票,每一个线程判断的size大于0,虽然size和remove都是原子性的,但是在判断和remove中间的这段过程中,还是可能被打断,A线程判断了size>0,还没有remove的时候被打断了,B线程把票拿走了,A线程继续往下执行的时候再remove就出问题了。)
    所以只是把List换成同步容器Vector,问题依旧;

    三、使用synchronized加锁:

    /**
     * 就算操作A和B都是同步的,但A和B组成的复合操作也未必是同步的,仍然需要自己进行同步
     * 就像这个程序,判断size和进行remove必须是一整个的原子操作
     * 
     * @author 马士兵
     */
    package yxxy.c_024;
    
    import java.util.LinkedList;
    import java.util.List;
    import java.util.concurrent.TimeUnit;
    
    public class TicketSeller3 {
        static List<String> tickets = new LinkedList<>();
        
        
        static {
            for(int i=0; i<1000; i++) tickets.add("票 编号:" + i);
        }
        
        public static void main(String[] args) {
            
            for(int i=0; i<10; i++) {
                new Thread(()->{
                    while(true) {
                        synchronized(tickets) {
                            if(tickets.size() <= 0) break;
                            
                            try {
                                TimeUnit.MILLISECONDS.sleep(10);
                            } catch (InterruptedException e) {
                                e.printStackTrace();
                            }
                            
                            System.out.println("销售了--" + tickets.remove(0));
                        }
                    }
                }).start();
            }
        }
    }
    View Code
    相当于把判断和销售都加到了一个原子操作里去了;可以解决问题;
    不过加锁后效率并不是很高;每销售一张票的时候都要把整个队列tickets锁定;
     
    四、使用ConcurrentLinkedQueue提供并发性
    /**
     * 使用ConcurrentQueue提高并发性
     * 
     * @author 马士兵
     */
    package yxxy.c_024;
    
    import java.util.Queue;
    import java.util.concurrent.ConcurrentLinkedQueue;
    
    public class TicketSeller4 {
        static Queue<String> tickets = new ConcurrentLinkedQueue<>();
        
        static {
            for(int i=0; i<1000; i++) tickets.add("票 编号:" + i);
        }
        
        public static void main(String[] args) {
            
            for(int i=0; i<10; i++) {
                new Thread(()->{
                    while(true) {
                        String s = tickets.poll();
                        if(s == null) {
                            break;
                        }else {
                            System.out.println("销售了--" + s);
                        }
                    }
                }).start();
            }
        }
    }
    View Code
     
    这里面没有加锁,同样的也有判断,但是这个不会出问题;为什么?
    因为在做了s==null判断后,再也没有对队列进行修改操作;(上个程序都是做了判断之后,需要对队列进行修改操作remove一下)
    假如A线程执行完String s = tickets.poll(),还没有来得及执行if(s==null) break就被打断了,另外一个线程把队列拿空了,大不了while(true)返过头来再拿一遍得到null,所以不会出问题;
     
     
    五、ConcurrentHashMap
    /**
     * http://blog.csdn.net/sunxianghuang/article/details/52221913 
     * http://www.educity.cn/java/498061.html
     * 阅读concurrentskiplistmap
     */
    package yxxy.c_025;
    
    import java.util.Arrays;
    import java.util.Hashtable;
    import java.util.Map;
    import java.util.Random;
    import java.util.concurrent.ConcurrentHashMap;
    import java.util.concurrent.ConcurrentSkipListMap;
    import java.util.concurrent.CountDownLatch;
    
    public class T01_ConcurrentMap {
        public static void main(String[] args) {
    //        Map<String, String> map = new ConcurrentHashMap<>();
            Map<String, String> map = new ConcurrentSkipListMap<>(); //高并发并且排序
            
    //        Map<String, String> map = new Hashtable<>();
            //Map<String, String> map = new HashMap<>(); //Collections.synchronizedXXX
            //TreeMap
            Random r = new Random();
            Thread[] ths = new Thread[100];
            CountDownLatch latch = new CountDownLatch(ths.length);
            long start = System.currentTimeMillis();
            for(int i=0; i<ths.length; i++) {
                ths[i] = new Thread(()->{
                    for(int j=0; j<10000; j++) map.put("a" + r.nextInt(100000), "a" + r.nextInt(100000));
                    latch.countDown();
                });
            }
            
            Arrays.asList(ths).forEach(t->t.start());
            try {
                latch.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            
            long end = System.currentTimeMillis();
            System.out.println(end - start);
        }
    }
    View Code
    不同的Map容器执行完这段代码的时间:
    HashTable:445;
    ConcurrentHashMap:402;
     
    多线程的环境下ConcurrentHashMap的效率要比hashTable高一些,高在哪?
    hashTable往里加任何一个数据的时候,都是要锁定整个hashTable对象,而concurrentHashMap默认的是把容器分成16段,每次往里插数据的时候只锁定16段其中的一个部分;把锁细化了;当很多线程共同往里插数据的时候,线程A插的是其中一段,线程B是往另一段里插,那么这两个线程就可以同时并发的往里插;因此多线程环境下要比hashTable高;
     
    ConcurrentSkipListMap:是支持排序的,所以插入的时候慢了一些;
    Collections.synchronizedList/Collections.synchronizedMap(Map<K, V>):往里面传一个不加锁的Map,将它包装一下,返回一个加了锁的Map;
     
    注:以上所有的map,都可以换成set;因为set只是使用了map的key。
     
     
    六、CopyOnWriteList:
    /**
     * 写时复制容器 copy on write
     * 多线程环境下,写时效率低,读时效率高
     * 适合写少读多的环境
     * @author 马士兵
     */
    package yxxy.c_025;
    
    import java.util.ArrayList;
    import java.util.Arrays;
    import java.util.List;
    import java.util.Random;
    import java.util.Vector;
    import java.util.concurrent.CopyOnWriteArrayList;
    
    public class T02_CopyOnWriteList {
        public static void main(String[] args) {
            List<String> lists = 
                    //new ArrayList<>(); //这个会出并发问题!
                    //new Vector();
                    new CopyOnWriteArrayList<>();
            Random r = new Random();
            Thread[] ths = new Thread[100];
            
            for(int i=0; i<ths.length; i++) {
                Runnable task = new Runnable() {
        
                    @Override
                    public void run() {
                        for(int i=0; i<1000; i++) lists.add("a" + r.nextInt(10000));
                    }
                    
                };
                ths[i] = new Thread(task);
            }
            
            runAndComputeTime(ths);
            
            System.out.println(lists.size());
        }
        
        static void runAndComputeTime(Thread[] ths) {
            long s1 = System.currentTimeMillis();
            Arrays.asList(ths).forEach(t->t.start());
            Arrays.asList(ths).forEach(t->{
                try {
                    t.join();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
            long s2 = System.currentTimeMillis();
            System.out.println(s2 - s1);
        }
    }
    View Code
    CopyOnWriteList:写时复制容器,这个容器当你往里要添加一个元素的时候,他会把这个容器复制一份,在后面加一新的数据,然后把引用指向新的容器;写时复制有什么好处?对于那些从里往外读数据的线程来说再也不用加锁了,因为读的时候引用指向新的容器了,再读的时候是从新的引用里读;
    (读需要加锁的情况下,是出现脏读的情况下才需要加锁;CopyOnWriteList是不可能出现脏读的,他前后数据一定是一致的,没有中间状态;因为它在新的复制一份的里面做更改,更改完了以后马上把引用指向新的,这是一个原子性操作,所以他不会出现脏读的情况,因此不需要加锁。)
    写的特别少,但是往外读特别多的时候使用CopyOnWriteList;
     
    七、ConcurrentLinkedQueue:
    package yxxy.c_025;
    
    import java.util.Queue;
    import java.util.concurrent.ConcurrentLinkedQueue;
    
    public class T04_ConcurrentQueue {
        public static void main(String[] args) {
            Queue<String> strs = new ConcurrentLinkedQueue<>();
            
            for(int i=0; i<10; i++) {
                strs.offer("a" + i);  //add
            }
            
            System.out.println(strs);
            
            System.out.println(strs.size());
            
            System.out.println(strs.poll());
            System.out.println(strs.size());
            
            System.out.println(strs.peek());
            System.out.println(strs.size());
            
            //双端队列Deque
        }
    }
    View Code

    console:

    [a0, a1, a2, a3, a4, a5, a6, a7, a8, a9]
    10
    a0
    9
    a1
    9
    View Code
    Queue:队列,在并发容器里面最重要的也是应用的最多的容器;有很多种实现,ConcurrentLinkedQueue,BlockingQueue;
    常见操作:
    offer: 类似于add方法,但是add方法加的时候会出问题,如果有容量的限制话add就会抛异常;offer不会抛异常,返回值boolean代表是否加成功;
    poll(): 从头部拿出来一个元素,同时把原来的删掉;
    peek(): 从头部拿出来一个,但是原来的不删;
     
     
    八、LinkedBlockingQueue和ArrayBlockingQueue
    LinkedBlockingQueue实现的一个简单的生产者消费者程序:
    package yxxy.c_025;
    
    import java.util.Random;
    import java.util.concurrent.BlockingQueue;
    import java.util.concurrent.LinkedBlockingQueue;
    import java.util.concurrent.TimeUnit;
    
    public class T05_LinkedBlockingQueue {
    
        static BlockingQueue<String> strs = new LinkedBlockingQueue<>();
    
        static Random r = new Random();
    
        public static void main(String[] args) {
            new Thread(() -> {
                for (int i = 0; i < 100; i++) {
                    try {
                        strs.put("a" + i); //如果满了,就会等待
                        TimeUnit.MILLISECONDS.sleep(r.nextInt(1000));
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }, "p1").start();
    
            for (int i = 0; i < 5; i++) {
                new Thread(() -> {
                    for (;;) {
                        try {
                            System.out.println(Thread.currentThread().getName() + " take -" + strs.take()); //如果空了,就会等待
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
                }, "c" + i).start();
    
            }
        }
    }
    View Code
    Queue在高并发的情况下可以使用两种队列:
    ConcurrentLinkedQueue:内部加锁的
    BlockingQueue:阻塞式队列,如LinkedBlockingQueue,ArrayBlockingQueue。阻塞式的意思是,生产者消费者模式中生产者已经生产满了直接等待wait,消费如果空了消费者就会直接等待。
    LinkedBockingQueue是链表实现的阻塞式容器,是无界队列(往里扔多少个元素都可以,内存满足的情况下)
    ArrayBlockingQueue:有界队列
     
    package yxxy.c_025;
    
    import java.util.Random;
    import java.util.concurrent.ArrayBlockingQueue;
    import java.util.concurrent.BlockingQueue;
    import java.util.concurrent.TimeUnit;
    
    public class T06_ArrayBlockingQueue {
    
        static BlockingQueue<String> strs = new ArrayBlockingQueue<>(10); //有界队列,最多装10个元素
    
        static Random r = new Random();
    
        public static void main(String[] args) throws InterruptedException {
            for (int i = 0; i < 10; i++) {
                strs.put("a" + i);
            }
            
            strs.put("aaa"); //满了就会等待,程序阻塞,无限制的阻塞下去
            //strs.add("aaa");  //报异常,Queue full
            //strs.offer("aaa"); //不会报异常,但是加不进去;boolean带表是否加成功;这是add和offer的区别 
            //strs.offer("aaa", 1, TimeUnit.SECONDS); //1s钟之后加不进去就加不进了;按时间段阻塞
            
            System.out.println(strs);
        }
    }
    View Code

    九、DelayQueue·:      

    package yxxy.c_025;
    
    import java.util.Random;
    import java.util.concurrent.BlockingQueue;
    import java.util.concurrent.DelayQueue;
    import java.util.concurrent.Delayed;
    import java.util.concurrent.TimeUnit;
    
    public class T07_DelayQueue {
    
        static BlockingQueue<MyTask> tasks = new DelayQueue<>();
    
        static Random r = new Random();
        
        static class MyTask implements Delayed {
            long runningTime;
            
            MyTask(long rt) {
                this.runningTime = rt;
            }
    
            @Override
            public int compareTo(Delayed o) {
                if(this.getDelay(TimeUnit.MILLISECONDS) < o.getDelay(TimeUnit.MILLISECONDS))
                    return -1;
                else if(this.getDelay(TimeUnit.MILLISECONDS) > o.getDelay(TimeUnit.MILLISECONDS)) 
                    return 1;
                else 
                    return 0;
            }
    
            @Override
            public long getDelay(TimeUnit unit) {
                return unit.convert(runningTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
            }
            
            @Override
            public String toString() {
                return "" + runningTime;
            }
        }
    
        public static void main(String[] args) throws InterruptedException {
            long now = System.currentTimeMillis();
            MyTask t1 = new MyTask(now + 1000);
            MyTask t2 = new MyTask(now + 2000);
            MyTask t3 = new MyTask(now + 1500);
            MyTask t4 = new MyTask(now + 2500);
            MyTask t5 = new MyTask(now + 500);
            
            tasks.put(t1);
            tasks.put(t2);
            tasks.put(t3);
            tasks.put(t4);
            tasks.put(t5);
            
            System.out.println(tasks);
            
            for(int i=0; i<5; i++) {
                System.out.println(tasks.take());
            }
        }
    }
    View Code

    console:

    [1534606492700, 1534606493200, 1534606493700, 1534606494700, 1534606494200]
    1534606492700
    1534606493200
    1534606493700
    1534606494200
    1534606494700
    View Code
     
    DelayQueue:无界队列,加进去的每一个元素,如果理解为一个任务的话,这个元素什么时候可以让消费者往外拿呢?每一个元素记载着我还有多长时间可以从队列中被消费者拿走;这个队列默认是排好顺序的,等待的时间最长的排在最前面,先往外拿;
    DelayQueue往里添加的元素是要实现Delayed接口;
    可以用来执行定时任务;
     
    十、TransferQueue:  
    TransferQueue:提供了transfer方法,一般是这种情形,有一个队列,消费者线程先启动,然后生产者生产一个东西的时候不是往队列里头仍,它首先去找有没有消费者,如果有消费者,生产的东西不往队列里扔了而是直接给消费者消费;如果没有消费者的话,调用transfer线程就会阻塞;
     
    比如场景:坦克大战中多个坦克客户端链接服务器,坦克A移动了,服务端需要把A移动的位置消息发送给其他客户端,服务端存在一个消息队列,消息都交给不同的线程处理,有一种是都往消息队列里扔,然后再往外拿,不过这种太慢了;假如有一大推消费者线程等着,那么直接把消息扔给消费者线程就行了,不要再往队列里扔了,效率会更高一些;所以TransferQueue是用在更高的并发的情况下。
     
    例子程序:
    1.先起消费者,在起生产者transfer,程序正常:
    package yxxy.c_025;
    
    import java.util.concurrent.LinkedTransferQueue;
    
    public class T08_TransferQueue {
        public static void main(String[] args) throws InterruptedException {
            LinkedTransferQueue<String> strs = new LinkedTransferQueue<>();
            
            new Thread(() -> {
                try {
                    System.out.println(strs.take());
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }).start();
            
            strs.transfer("aaa");
        }
    }
    View Code

    2.如果先起生产者transfer,然后再起消费者take,程序就会阻塞住了:

    package yxxy.c_025;
    
    import java.util.concurrent.LinkedTransferQueue;
    
    public class T08_TransferQueue {
        public static void main(String[] args) throws InterruptedException {
            LinkedTransferQueue<String> strs = new LinkedTransferQueue<>();
            
            strs.transfer("aaa");
    
            new Thread(() -> {
                try {
                    System.out.println(strs.take());
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }).start();
        }
    }
    View Code

    3.如果transfer换成put(或者add、offer),也不会有问题,因为不会阻塞:

    package yxxy.c_025;
    
    import java.util.concurrent.LinkedTransferQueue;
    
    public class T08_TransferQueue {
        public static void main(String[] args) throws InterruptedException {
            LinkedTransferQueue<String> strs = new LinkedTransferQueue<>();
            
            //strs.transfer("aaa");
            
            strs.put("aaa");
    
            new Thread(() -> {
                try {
                    System.out.println(strs.take());
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }).start();
        }
    }
    View Code

    十一、SynchronousQueue  

    package yxxy.c_025;
    
    import java.util.concurrent.BlockingQueue;
    import java.util.concurrent.SynchronousQueue;
    
    public class T09_SynchronusQueue { //容量为0
        public static void main(String[] args) throws InterruptedException {
            BlockingQueue<String> strs = new SynchronousQueue<>();
            
            new Thread(()->{
                try {
                    System.out.println(strs.take());
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }).start();
            
            strs.put("aaa"); //阻塞等待消费者消费
            //strs.add("aaa");
            System.out.println(strs.size());
        }
    }
    View Code
    SynchronousQueue:同步队列,一种特殊的transferQueue,前面说的TransferQueue如果生产者生产了东西,这时候没有消费者,如果使用put/add,还可以扔到队列里,这个队列还是有一定的容量的;
    而SynchronousQueue叫做没有容量的队列,容量为0,生产者生产的东西必须马上消费掉,如果不消费掉就会出问题;调add抛异常(Queue full),调put程序阻塞;
     
     总结:
    总结:
    1:对于map/set的选择使用
    HashMap       不需要多线程的情况下使用
    TreeMap       不需要多线程的情况下使用
    LinkedHashMap 不需要多线程的情况下使用
    
    Hashtable     并发量比较小                            
    Collections.sychronizedXXX  并发量比较小   
    
    ConcurrentHashMap       高并发
    ConcurrentSkipListMap   高并发同时要求排好顺序
    
    2:队列
    ArrayList        不需要同步的情况
    LinkedList      不需要同步的情况
    Collections.synchronizedXXX  并发量低
    Vector                            并发量低
    CopyOnWriteList              写的时候少,读时候多
    Queue
        CocurrentLinkedQueue //concurrentArrayQueue    高并发队列
        BlockingQueue                                        阻塞式
            LinkedBQ    无界
            ArrayBQ     有界
            TransferQueue   直接给消费者线程,如果没有消费者阻塞
            SynchronusQueue  特殊的transferQueue,容量0
        DelayQueue执行定时任务
     
     
     
     
  • 相关阅读:
    PHP 超级全局变量
    PHP 魔术变量
    PHP 变量
    Thinkphp 模板中常用的系统变量总结
    PHP $GLOBALS超全局变量分析
    php使用curl的post提交数据和get获取网页数据的方法总结
    php获取客户端真实ip地址的三种方法
    Jquery 【on事件】
    ptyhon【递归练习】
    C#中的线程
  • 原文地址:https://www.cnblogs.com/tenWood/p/9495791.html
Copyright © 2011-2022 走看看