zoukankan      html  css  js  c++  java
  • 并发容器

    并发容器

    From Vector To Queue
    1.原始的ArrayList
    /**
     * 有N张火车票  每张票都有一个编号
     * 同时有10个窗口对外售票
     *
     */
    package com.bingfarongqi;
    
    import java.util.ArrayList;
    import java.util.List;
    import java.util.Vector;
    import java.util.concurrent.TimeUnit;
    
    public class TicketSeller1 {
            //定义一个容器 tickets 总票数
                static List<String> tickets=new ArrayList<>();
                static {
                    //总共1000张票 定义每张票的编号
                    for (int i = 0; i <1000 ; i++) tickets.add("票编号"+i);
                }
    
        public static void main(String[] args) {
            for (int i = 0; i <10 ; i++) {
                new Thread(()->{
                while (tickets.size()>0){
                    try {
                        TimeUnit.MICROSECONDS.sleep(10);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    //把容器内第0个位置的票remove
                    System.out.println("销售了--"+tickets.remove(0));
                }
                }).start();
            }
        }
    }
    
    

    这种最原始的集合容器 一眼可以看出 没有加锁 问题就在这 10个线程同时运行没有加锁肯定会出问题

    size大于0 的时候没有问题 因为只要还有剩余的票我就往外卖,取一张往外remove

    但是当剩下最后一张票的时候问题就出现了 好几个线程执行到这都发现size大于0 所有线程都往外卖一张票,那么就会发生只有一个线程拿到了票,其余线程就会输出空null,所以没有加锁 线程不安全。

    2.Vector
    /**
     * 有N张火车票  每张票都有一个编号
     * 同时有10个窗口对外售票
     *
     */
    package com.bingfarongqi;
    
    import java.util.ArrayList;
    import java.util.List;
    import java.util.Vector;
    import java.util.concurrent.TimeUnit;
    
    public class TicketSeller2 {
        //定义一个容器 tickets 总票数
        static Vector<String> tickets=new Vector<>();
        static {
            //总共1000张票 定义每张票的编号
            for (int i = 0; i <1000 ; i++) tickets.add("票编号"+i);
        }
    
        public static void main(String[] args) {
            for (int i = 0; i <10 ; i++) {
                new Thread(()->{
                    while (tickets.size()>0){
                        try {
                            TimeUnit.MICROSECONDS.sleep(10);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                        //把容器内第0个位置的票remove
                        System.out.println("销售了--"+tickets.remove(0));
                    }
                }).start();
            }
        }
    }
    
    

    第二种是容器Vetor 是自带内部锁

    读它的内部方法时会看到synchronized

    虽然Vetor上来先加了锁保证了线程的安全性 但是当我们读这个程序的时候发现还是不对,锁为了线程的安全,就是当我们调用size方法的时候他加锁了,调用了remove的时候也加锁了,可不幸的是你这两个中间没加 那好多个线程依然断定这个size是大于0 的 又出现了超卖的现象。

    3.Linkenlist

    /**
     * 有N张火车票  每张票都有一个编号
     * 同时有10个窗口对外售票
     *
     */
    package com.bingfarongqi;
    
    import java.util.ArrayList;
    import java.util.LinkedList;
    import java.util.List;
    import java.util.Vector;
    import java.util.concurrent.TimeUnit;
    
    public class TicketSeller3 {
        //定义一个容器 tickets 总票数
        static List<String> tickets=new LinkedList<>();
        static {
            //总共1000张票 定义每张票的编号
            for (int i = 0; i <1000 ; i++) tickets.add("票编号"+i);
        }
    
        public static void main(String[] args) {
            for(int i = 0; i <10 ; i++){
                new Thread(()->{
                    while (true){
                        //要在外层单独加一把锁 不然还是出问题
                        synchronized (tickets){
                            if (tickets.size()<=0) break;
    
                        try {
                            TimeUnit.MICROSECONDS.sleep(10);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                        //把容器内第0个位置的票remove
                        System.out.println("销售了--"+tickets.remove(0));
                    }
                    }
                }).start();
            }
        }
    }
    
    

    这个容器虽然是用了加锁 但由于子啊调用并发容器的时候你是调用了两个原子方法,所以你在外层还是得再加上一把锁 这个不会出现什么问题 它会踏踏实实的执行 但是效率不是最高的。

    4.Queue
    /**
     * 有N张火车票  每张票都有一个编号
     * 同时有10个窗口对外售票
     *
     */
    package com.bingfarongqi;
    
    import java.util.*;
    import java.util.concurrent.ConcurrentLinkedQueue;
    import java.util.concurrent.TimeUnit;
    
    public class TicketSeller4 {
        //定义一个容器 tickets 总票数
        static Queue<String> tickets=new ConcurrentLinkedQueue<>();
        static {
            //总共1000张票 定义每张票的编号
            for (int i = 0; i <1000 ; i++) tickets.add("票编号"+i);
        }
    
        public static void main(String[] args) {
            for(int i = 0; i <10 ; i++){
                new Thread(()->{
                    while (true){
                        String s=tickets.poll();
                        if (s==null) break;
                            //把容器内第0个位置的票remove
                            else System.out.println("销售了--"+s);
                        }
                }).start();
            }
        }
    }
    
    

    这个容器是最新的一个接口 主要是为了多线程用的 所以以后多线程这种单个元素的时候多考虑Queue 少用List呵呵Set

    这里调用了个poll方法 poll的意思就是我从ticket去取值,这个值什么时候空了说明里面的值已经没了,所以while(true)不断往外销售,一直等到票没了

    它的源码 :

    意思是说从queue的头部取值 等取完了 也就是值为null的时候我就返回null值。

    ConcurrentMAP

    这里会出现一个跳表结构

    CopyOnWrite

    CopyOnWrite的意思就是写时复制

    主要是在写等待时候很少 读的时候很多的情况 这个时候就可以考虑用CopyOnWrite方式来提高效率了。

    Vector是在写的时候加锁 读的时候也加锁,但是CopyOnWriteList用的时候读的时候不加锁 写的时候会在原来的基础上拷贝一个,拷贝的时候扩展出来一个新元素来,然后把你添加的这个扔到最后的位置,与此同时把指向老的容器的一个引用指向新的,这就叫写时复制。

    BlockingQueue

    BlockingQueue的概念重点在Blocking上 Blocking阻塞 Queue队列,是阻塞队列。

    这个Queue提供了一些可以给多线程比较友好的接口 :

    1.offer 对应的就是原来的add 与add的区别就在于 add如果加不进去是会抛出异常 ,所以我们用的最多的是Queue里面的offer 它会概念一个返回值。

    2.poll 用来取数据 取出数据并且remove

    3.peek 拿出来数据 取出数据而不remove

    package com.bingfarongqi.f_03;
    
    import java.util.Queue;
    import java.util.concurrent.ConcurrentLinkedQueue;
    
    public class BlockingQueue {
        public static void main(String[] args) {
            Queue<String> strs=new ConcurrentLinkedQueue<>();
            for (int i = 0; i <10 ; i++) {
                strs.offer("a"+i);//add
            }
            //10个数据
            System.out.println(strs);
            //size是10
            System.out.println(strs.size());
                //取出a0 并且remove掉 所以size还剩9
            System.out.println(strs.poll());
            System.out.println(strs.size());
            //因为刚才poll取出a0并且remove 所以peek取出的只能是a1 并且size还是9
            //因为peek不remove 所以size 9 不变
            System.out.println(strs.peek());
            System.out.println(strs.size());
        }
    }
    
    
    LinkedBlockingQueue

    LinkedBlockingQueue是在Queue的基础上又添加了两个方法 一个叫put 一个叫take

    这两个方法真正的实现了阻塞,put往里面装 如果满了的话就会让这个线程阻塞住,take往外取,如果空了的话就会阻塞住。 所以这是真正的实现了 生产者和消费者的容器。

    ArrayBlockingQueue(扩展-面试常问问题)

    ArrayBlockingQueue是有界的 可以界定容器的大小。

    当往容器里面仍数据的时候 一旦满了 ,这个put方法就会阻塞住,offer用返回值来判断到底加没加成功。

    扩展- 面试常问问题:Queue和list的区别到底在哪里?

    主要是在这,添加里一些对线程友好的API :offer peek poll put take

    package com.bingfarongqi.f_03;
    
    import java.util.Random;
    import java.util.concurrent.ArrayBlockingQueue;
    import java.util.concurrent.BlockingQueue;
    import java.util.concurrent.TimeUnit;
    
    public class ArrayBlockingQueue_test {
        static BlockingQueue<String> strs=new ArrayBlockingQueue<>(10);
        static Random r=new Random();
    
        public static void main(String[] args) throws InterruptedException {
            for (int i = 0; i <10 ; i++) {
                strs.put("a"+i);
            }
            //strs.put("aaa");//满了就会等待,程序阻塞(无返回值)
            //strs.add("aaa");//满了就会报出异常
            strs.offer("aaa");//满了就会返回值判定成功
            strs.offer("aaa",1, TimeUnit.SECONDS);
    
            System.out.println(strs);
        }
    }
    
    
    DelayQueue(扩展PriorityQueue)

    DelayQueue可以实现时间上的排序,主要是按照在里面等待的时间进行排序。

    就看站着线程 正常情况下按照顺序 t1 >2>3>4>5

    但是这里看的是后面指定的时间 t1是1秒 t2是2秒 t3是1.5 t4是2.5 t5是0.5

    所以根据这个时间 运行顺序就是t5>1>3>2>4

    这个主要是用在调度时间的案例 ,比如 谁谁谁一个小时后干什么 三小时后干什么.....

    扩展--:

    DelayQueue的本质上用的是一个PriorityQueue,PriorityQueue是从AbstractQueue继承的。

    PriorityQueue的特点是它内部你往里装的时候并不是按顺序,而是内部进行了一个排序,按照优先级,最小的优先。它内部的结构其实是一个二叉树 .

    package com.bingfarongqi.f_03;
    
    import java.util.PriorityQueue;
    
    public class PriorityQueue_test {
        public static void main(String[] args) {
            PriorityQueue<String> q=new PriorityQueue<>();
    
            q.add("c");
            q.add("e");
            q.add("a");
            q.add("d");
            q.add("z");
            for (int i = 0; i <5 ; i++) {
                System.out.println(q.poll());
                //输出结果是 a c d e z
            }
        }
    }
    
    

    这个例子就很明显的展现胡了二叉树 优先级的问题 最小的在上面 。

    SynchronusQueue

    这个容器的容量是0 它不是用来装东西的,而是专门用来两个线程传内容的,第三节的最好有一个容器叫Exchanger 就是那个两个人交换装备的,他们两个本质上概念是一样的。

    package com.bingfarongqi.f_03;
    
    import java.util.concurrent.BlockingQueue;
    import java.util.concurrent.SynchronousQueue;
    
    public class SynchronusQueue_test {
        //容量为0
        public static void main(String[] args) throws InterruptedException {
            BlockingQueue<String> strs=new SynchronousQueue<>();
            new Thread(()->{
                try {
                    System.out.println(strs.take());
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }).start();
                strs.put("aaa");//阻塞等待消费者
                //strs.put("bbb");
                //strs.add("aaa");
    
            System.out.println(strs.size());
        }
    }
    
    

    SynchronusQueue在线程池的作用特别大 ,很多线程取任务,互相之间进行任务调度的时候都用它。

    TransferQueue

    TransferQueue添加了一个新方法 叫transfer 这个方法和put相似 但是和put唯一的不同是put是把数据放进来就走 然后接着放 知道满了 然后等待,transfer是撞进来一个数据就在这等着 ,直到消费者把这个数据拿走了我才回去再做我的事。

    package com.bingfarongqi.f_03;
    
    import java.util.concurrent.LinkedTransferQueue;
    
    public class TransferQueue_test {
        public static void main(String[] args) throws InterruptedException {
            LinkedTransferQueue<String> strs=new LinkedTransferQueue<>();
    
            new Thread(()->{
                try {
                    System.out.println(strs.take());
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }).start();
    
            strs.transfer("aaa");
    //        strs.put("aaa");
    //        new Thread(()->{
    //            try {
    //                System.out.println(strs.take());
    //            } catch (InterruptedException e) {
    //                e.printStackTrace();
    //            }
    //        }).start();
        }
    }
    
    

    一般使用场景:我付了钱,这个订单我付款完成了,但是我一直要等这个付款的结果完成才可以给客户反馈。

  • 相关阅读:
    linux环境变量(一)
    linux常用命令-ps
    linux实用小命令--查看文本内容
    postman tests常用方法
    Vue 中嵌套 Iframe,用postMessage通信 踩坑记
    [Vue warn]: Error in nextTick: "RangeError: Maximum call stack size exceeded"
    对STM32所用位带操作宏的详细剖析
    移植Modbus TCP二
    移植Modbus TCP一
    STM32位带操作
  • 原文地址:https://www.cnblogs.com/beizhai/p/13796277.html
Copyright © 2011-2022 走看看