zoukankan      html  css  js  c++  java
  • 并发容器

    ThreadLocal

    ThreadLocal 线程局部变量,只对当前线程范围有效,比如下面例子,在第一个线程设置的值,第二个线程是使用不了的。

    public class TLDemo2 {
    
        private static ThreadLocal<User> threadLocal = new ThreadLocal<>();
    
        public static void main(String[] args) {
    
            new Thread(()->{
                try {
                    TimeUnit.SECONDS.sleep(1);
                } catch (Exception e) {
                    e.printStackTrace();
                }
                threadLocal.set(new User());
                System.out.println(threadLocal.get());
            }).start();
    
            new Thread(()->{
                try {
                    TimeUnit.SECONDS.sleep(2);
                } catch (Exception e) {
                    e.printStackTrace();
                }
               threadLocal.set(new User());
    //            threadLocal.remove();
                System.out.println(threadLocal.get());
            }).start();
    
        }
    
    }

    有10000张火车票,同时有10个窗口对外售票

    请写一个模拟程序

    public class SaleOfTickets1 {
    
        private static List<Integer> tickets = new ArrayList<>();
        
        static{
            for (int i = 0; i < 10000; i++) {
                tickets.add(i);
            }
        }
    
        /**
         * 当我们卖到最后一张票的时候,tickets是大于0的,第一个线程看到他大于0,进入判断,
         * 第二个线程同样操作区remove的时候票就没有了,在高并发的情况下,还可能会卖重,
         * 几百万线程去卖票的时候,remove方法不是同步的,第一个线程卖了这张票,第二个线程可能也买了。
         * @param args
         */
        public static void main(String[] args) {
            for (int i = 0; i < 10; i++) {
                new Thread(()->{
                    while(tickets.size() > 0){
                        System.out.println(Thread.currentThread().getName()+"销售票编号:" + tickets.remove(0));
                    }
                }).start();
            }
        }
    }

    vector

    public class SaleOfTickets2 {
    
        private static Vector<Integer> tickets = new Vector<>();
        
        static {
            for (int i = 0; i < 10000; i++) {
                tickets.add(i);
            }
        }
    
    
        /**
         * vector是一个同步容器
         * 这里是判断和操作分离了,虽然说在vector里面remove方法是原子性的,但是他的判断和remove方法是分离的,
         * 但是可能在判断到remove的过程当中线程可能会被打断。我们可以加一个模拟性的睡眠,因为在你实际开发的
         * 时候,可能在这中间有些判断代码逻辑代码。
         * 如果剩了最后一个了,很多线程去抢票,虽然size是原子性的,remove是原子性的,但是在他们的中间,
         * 线程还是有可能被打断
         */
        public static void main(String[] args) {
            for (int i = 0; i < 10; i++) {
                new Thread(()->{
                    while(tickets.size() > 0){
                        try {
                            TimeUnit.MILLISECONDS.sleep(10);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                        System.out.println("销售票编号:" + tickets.remove(0));
                    }
                }).start();
            }
        }
    }

    synchronized

    public class SaleOfTickets3 {
    
        private static List<Integer> tickets = new LinkedList<>();
        
        static {
            for (int i = 0; i < 10000; i++) {
                tickets.add(i);
            }
        }
        
        public static void main(String[] args) {
            for (int i = 0; i < 10; i++) {
                new Thread(()->{
                    while(true){
                        //这里使用synchronized,使两个操作具备了原子性,不会出问题
                        synchronized(tickets){
                            if(tickets.size() <= 0){
                                break;
                            }
                            System.out.println("销售票编号:" + tickets.remove(0));
                        }
                    }
                }).start();
            }
        }
    
    }

    ConcurrentLinkedQueue

    在JDK1.5以后,java里面提供了很多的并发容器,这里我们用的是一个queue,队列。
    * 所谓队列其实就是一个容器,就是站成一对,不管票还是人都在里面排成一堆,队列有几种,有先进先出的,
    * 还有两端的队列,还有就是栈,先进后出,先加进去的后出来。
    * 这里用了一个concurrentlinkedqueue,并发的链表队列。线程里面调用了一个poll方法,
    * 意思是往外面拿一个数据,相当于在尾巴里面拿一个,如果没有拿到,他的返回值就是空,那么就中断线程。
    * 这里面没有加锁,同样有判断,但是不会出问题。完成卖票功能这种效率是比较高的。queue里面是不能装空值。
    * 这里虽然判断和操作是一起的,但是我们没有在判断里面有任何操作,大不了反过头来再拿一边,
    * poll底层实现是cas,这里我们就不用加锁了。

    public class SaleOfTickets4 {
    
        private static Queue<Integer> tickets = new ConcurrentLinkedQueue<>();
        
        static {
            for (int i = 0; i < 10000; i++) {
                tickets.add(i);
            }
        }
        
        public static void main(String[] args) {
            for (int i = 0; i < 10; i++) {
                new Thread(()->{
                    while(true){
                        Integer poll = tickets.poll();
                        if(poll == null){
                            break;
                        }
                        System.out.println("销售票编号:" + poll);
                    }
                }).start();
            }
        }
    }

    CopyOnWriteArrayList

    写时复制容器
    *
    * 在往集合中添加数据的时候,先拷贝存储的数组,然后添加元素到拷贝好的数组中,
    * 然后用现在的数组去替换成员变量的数组(就是get等读取操作读取的数组)。
    * 这个机制和读写锁是一样的,但是比读写锁有改进的地方,那就是读取的时候可以写入的 ,
    * 这样省去了读写之间的竞争,看了这个过程,你也发现了问题,同时写入的时候怎么办呢,当然果断还是加锁。
    * 读多写少可以用copyonwritelist
    public class Demo {
    
        public static void main(String[] args) {
    //        List<String> lists = new ArrayList<>();
    //        List<String> lists = new Vector<>();
            List<String> lists = new CopyOnWriteArrayList<>();
            Random r = new Random();
            Thread[] threads = new Thread[100];
            
            for (int i = 0; i < threads.length; i++) {
                Runnable task = new Runnable() {
                    @Override
                    public void run() {
                        for (int j = 0; j < 1000; j++) {
                            lists.add("A" + r.nextInt(10000));
                        }
                    }
                };
                threads[i] = new Thread(task);
            }
            
            run(threads);
            
            System.out.println(lists.size());
        }
    
        private static void run(Thread[] threads) {
            long start = System.currentTimeMillis();
            Arrays.asList(threads).forEach(t->t.start());
            Arrays.asList(threads).forEach(t->{
                try {
                    t.join();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            });
            long end = System.currentTimeMillis();
            System.out.println(end - start);
        }
    }

    Collections

    collections是java里面一个集合处理类,里面有给容器加锁的方法,通过调用api可以返回一个加了锁的容器。

    public static void main(String[] args) {
            ArrayList<String> arrayList = new ArrayList<>();
            List<String> synchronizedList = Collections.synchronizedList(arrayList);
        }

    ConcurrentLinkedQueue

    public class Demo {
    
        public static void main(String[] args) {
            Queue<String> strings = new ConcurrentLinkedQueue<>();
            
            for (int i = 0; i < 10; i++) {
                //offer,类似于add方法,add会出一些问题,比如容量限制,
                //超出限制会抛异常,offer有返回值可以判断是否加成功了
                strings.offer("a" + i);
            }
            
            System.out.println(strings);
            
            System.out.println(strings.size());
    
            System.out.println(strings.poll());//拿了就没了
            System.out.println(strings.size());
    
            System.out.println(strings.peek());//用一下不删
            System.out.println(strings.size());
        }
    }

    ConcurrentHashMap

    并发的hashmap,这个例子测试一下效率
    *
    * 第一种用hashtable,hashtable所有方法都加了锁了,第二种concurrenthashmap,
    * 大致能看出来他的效率要比hashtable要高一些,在多线程的情况下。
    * 为什么呢,因为hashtable往里面加任何数据的时候都是要锁定整个对象,
    * 而concurrenthashmap,是分成十六个段,每次插数据的时候,只会锁住一小段,1.8之后实现不同。
    public class Demo {
    
        public static void main(String[] args) {
            Map<String, String> map = new ConcurrentHashMap<>();
            //Map<String, String> map = new ConcurrentSkipListMap<>();
    //        Map<String, String> map = new Hashtable<>();
    
    //        Map<String, String> map = new HashMap<>();
    //        Map<String, String> map1 = Collections.synchronizedMap(map);
    
            Random random = new Random();
            Thread[] threads = new Thread[100];
            CountDownLatch latch = new CountDownLatch(threads.length);
            long start_time = System.currentTimeMillis();
            for (int i = 0; i < threads.length; i++) {
                threads[i] = new Thread(()->{
                    for(int j=0; j<10000;j++) {
                        map.put("a" + random.nextInt(100000), "a" + random.nextInt(100000));
    //                    map1.put("a" + random.nextInt(100000), "a" + random.nextInt(100000));
                    }
                    latch.countDown();
                });
            }
            Arrays.asList(threads).forEach(t->t.start());
            try {
                latch.await();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            long end_time = System.currentTimeMillis();
            System.out.println(end_time-start_time);
        }
    }

     换成hashtable看下结果

    LinkedBlockingQueue

    阻塞式的容器
    public class Demo {
    
        private static BlockingQueue<String> strings = new LinkedBlockingQueue<>(10);
    
        public static void main(String[] args) {
            new Thread(()->{
                for (int i = 0; i < 100; i++) {
                    try {
                        // 在阻塞式容器里面加了一个方法,put,也就是如果满了就会等待,对应的方法叫take,如果空了就会等待。
                        // 这种容器我们去用的时候自动就实现了阻塞式的生产者消费者。
                        strings.put("商品" + i);
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            }, "producer").start();
    
            for (int i = 0; i < 5; i++) {
                new Thread(()->{
                    for(;;){
                        try {
                            // take,拿,如果空了也会阻塞
                            System.out.println(Thread.currentThread().getName() + " take " + strings.take()); //如果空了,就会等待
                        } catch (Exception e) {
                            e.printStackTrace();
                        } 
                    }
                },"consumer" + i).start();
            }
    
        }
    
    }

    ArrayBlockingQueue

    * 有界队列,意思就是说这个队列能装的元素的个数是固定的,后面讲线程池的时候,里面装的其实是一个个任务。
    * 这里只能装10个,如果超过了可能会出问题可能会阻塞,这里看你调用什么方法。
    * add会报异常
    * offer不会报异常,他只通过布尔类型的返回值来告诉你是加成功了还是没有加成功。
    * offer可以设置时间,如果这段时间加不进去就不加了也就是返回false
    * put方法是满了会阻塞住。
    public class Demo {
    
        private static BlockingQueue<String> strings = new ArrayBlockingQueue<>(10);
        
        public static void main(String[] args) throws InterruptedException {
            for (int i = 0; i < 10; i++) {
                strings.put("a" + i);
            }
            strings.add("aaaa");
    //        strings.put("aaaa");
    //        strings.offer("aaaa");
            strings.offer("aaaa",1, TimeUnit.SECONDS);
            System.out.println(strings);
        }
    }

    DelayQueue

    容器里每一个元素都设置了一个时间,时间到了才能从中提取元素

    public class Demo {
    
        private static BlockingQueue<MyTask> tasks = new DelayQueue<>();
    
        static class MyTask implements Delayed{
    
            long runningTime;
            
            public MyTask(long rt) {
                this.runningTime = rt;
            }
    
            @Override
            public int compareTo(Delayed o) {
                if (this.getDelay(TimeUnit.MILLISECONDS) < o.getDelay(TimeUnit.MICROSECONDS)) {
                    return -1;
                }else if (this.getDelay(TimeUnit.MILLISECONDS) > o.getDelay(TimeUnit.MILLISECONDS)) {
                    return 1;
                }else {
                    return 0;
                }
            }
    
            @Override
            public long getDelay(TimeUnit unit) {
                return unit.convert(runningTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
            }
            
            @Override
            public String toString() {
                return "" + runningTime;
            }
            
            public static void main(String[] args) throws InterruptedException {
                long now = System.currentTimeMillis();
                MyTask t1 = new MyTask(now+1000);
                MyTask t2 = new MyTask(now+2000);
                MyTask t3 = new MyTask(now+1500);
                MyTask t4 = new MyTask(now+2500);
                MyTask t5 = new MyTask(now+500);
                
                tasks.put(t1);
                tasks.put(t2);
                tasks.put(t3);
                tasks.put(t4);
                tasks.put(t5);
                
                System.out.println(tasks);
                
                for (int i = 0; i < 5; i++) {
                    System.out.println(tasks.take());
                }
            }
    
        }
    
    }

    TransferQueue

    * 和普通的queue的方法差不多,多了一个transfer方法。
    * 如果你用这种队列的话,往往是消费者先启动,生产者生产一个东西的时候,他先是去找消费者,
    * 如果有消费者就直接丢给消费者。
    public class Demo {
    
        public static void main(String[] args) throws InterruptedException {
            LinkedTransferQueue<String> strings = new LinkedTransferQueue<>();
            
            new Thread(()->{
                try {
                    System.out.println("t1"+strings.take());
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }).start();
    
            new Thread(()->{
                try {
                    System.out.println("t2"+strings.take());
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }).start();
    
            TimeUnit.SECONDS.sleep(2);
    
            strings.transfer("aaa");
    //        strings.put("aaa");
            System.out.println(strings.size());
    //        new Thread(()->{
    //            try {
    //                System.out.println(strings.take());
    //            } catch (Exception e) {
    //                e.printStackTrace();
    //            }
    //        }).start();
        }
    
    }

    SynchronousQueue

    同步队列
    * 同步队列是容量为0,也就是来的东西必须给消费掉.
    * 首先启动一个消费者,调用add方法,他报错了
    * 只能调用put,意思就是阻塞等待消费者消费。put里面其实用的是transfer,任何东西必须消费,不能往容器里面扔。
    public class Demo {
    
        public static void main(String[] args) throws InterruptedException {
            BlockingQueue<String> strings = new SynchronousQueue<>();
            
            new Thread(()->{
                try {
                    System.out.println(strings.take());
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }).start();
    //        strings.add("aaa");
            strings.put("aaa");
            strings.put("aaa");
            strings.put("aaa");
            strings.put("aaa");
            strings.put("aaa");
            System.out.println(strings.size());
        }
    }

     

  • 相关阅读:
    收缩sql server数据库日志
    maven设置jdk版本
    maven设置镜像地址
    mysql查询最大值,最小值,平均值,总和
    ajax相同url和参数,将不会重复发起请求
    jsp遍历集合
    jsp分割字符串并遍历
    jsp格式化日期
    java判断文件或文件夹是否在
    sqoop1.4.6 用法总结一
  • 原文地址:https://www.cnblogs.com/lusaisai/p/12741456.html
Copyright © 2011-2022 走看看