zoukankan      html  css  js  c++  java
  • 并发编程(一)同步类容器和并发类容器

    并发编程(一)同步类容器和并发类容器

    一、同步类容器

    同步类容器是 线程安全 的,如 Vector、HashTable 等容器的同步功能都是由 Collections.synchronizedMap 等工厂方法去创建实现的,底层使用 synchronized 关键字,每次只有一个线程访问容器。这明显__不满足高并发的需求__。

    源代码: 【Vector】 底层使用 synchronized 修辞,显然是线程安全的

    public synchronized boolean add(E e) {
        modCount++;
        ensureCapacityHelper(elementCount + 1);
        elementData[elementCount++] = e;
        return true;
    }
    

    源代码: 【HashMap】 底层没有用 synchronized 修辞

    public V put(K key, V value) {
        return putVal(hash(key), key, value, false, true);
    }
    

    由于 java.util.HashMap 底层没有用 synchronized 修辞,显然不是线程安全的,为了实现线程安全可以用 Collections.synchronizedMap 装饰一下,实现线程安全。

    //HashMap不是线程安全容器,加工后成功线程安全
    Map<String, String> map = Collections.synchronizedMap(new HashMap<String, String>());
    

    二、并发类容器

    并发类容器 说明
    ConcurrentHashMap 替代 HashTable
    ConcurrentSkipListMap 排序
    CopyOnWriteArrayList 替代 Vector
    ConcurrentLinkedQueue 高性能队列,无阻塞
    LinkedBlockingQueue 阻塞形式队列,阻塞

    2.1 ConcurrentMap 容器

    ConcurrentHashMap 容器内部使用(Segment)来表示不同的部分,每个段其实就是一个小的 HashTable ,它们有自己的锁。只要多个修改操作发生在不同的段上,它们就可以并发进行。把一个整体分成了16个段(Segment)。也就是最高支持16个线程的并发修改操作。这也是在多线程场景时 减小锁的粒度从而降低锁竞争 一种方案。并且代码中大多共享变量使用 volatile 关键字声明,目的是第一时间获取修改的内容,性能非常好。

    ConcurrentMap 接口下两个重要的实现:

    • ConcurrentHashMap
    • ConcurrentSkipListMap(排序)

    2.2 Copy-On-Write 容器

    CopyOnWrite 容器既写时复制的容器, 用于读多写少的场景 。往一个容器添加元素时,不直接往当前容器添加,而是先将当前容器 Copy ,复制一个新的容器,然后往新的容器添加元素,添加完成之后,再将原容器的引用指向新的容器,这样做的好处是可以对 CopyOnWrite 容器进行并发的读,而不需要加锁,因为当前容器不会添加任何元素。所以 CopyOnWrite 容器是一种读写分离的思想,读和写不同的容器。

    源代码: 【CopyOnWriteArrayList】

    // Copy-On-Write 容器是一种读写分离的思想
    public class CopyOnWriteArrayList<E> {
        //Copy-On-Write 容器写操作时加锁,写操作结束后解锁
        public boolean add(E e) {
            final ReentrantLock lock = this.lock;
            lock.lock();//加锁
            try {
                //1. 获取原容器
                Object[] elements = getArray();
                int len = elements.length;
                //2. 原容器 -> Copy -> 新容器
                Object[] newElements = Arrays.copyOf(elements, len + 1);
                //3. 往新容器写入内容
                newElements[len] = e;
                //4. 指向新容器
                setArray(newElements);
                return true;
            } finally {
                lock.unlock();//解锁
            }
        }
    
        //读操作没有加锁,可以支持并发操作
        @SuppressWarnings("unchecked")
        private E get(Object[] a, int index) {
            return (E) a[index];
        }
    
        public E get(int index) {
            return get(getArray(), index);
        }
    }
    

    Copy-On-Write 容器下两个重要的实现:

    • CopyOnWriteArrayList
    • CopyOnWriteArraySet
    //CopyOnWriteArrayList <======> List 使用方法与List集合相同
    CopyOnWriteArrayList<String> list = new CopyOnWriteArrayList<String>();
    list.add("a");
    for (int i = 0; i < list.size(); i++) {
        System.out.println(list.get(i));
    }
    

    2.3 ConcurrentLinkedQueue 无阻塞队列

    适合高并发场景下的队列,通过无锁的方式,实现了高并发状态下的高性能,通常 ConcurrentLinkedQueue 性能好于 LinkedBlockingQueue。它是一个基于链接节点的无界线程安全队列。该队列的元素遵循先进先出的原则,且不允许 null 元素。更多实现原理

    ConcurrentLinkedQueue 重要方法:

    1. add()和offer():添加元素(ConcurrentLinkedQueue 下两个方法无区别)。

    2. poll()和peek():取头元素节点,区别在于前者删除元素,后者不会。注意: 没有元素时返回 null,不会阻塞队列。

    import java.util.Random;
    import java.util.concurrent.ConcurrentLinkedQueue;
    import java.util.concurrent.CountDownLatch;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    
    public class ConcurrentLinkedQueueTest {
    
        private static ConcurrentLinkedQueue<Integer> queue = new ConcurrentLinkedQueue<Integer>();
        private static int count = 2; // 线程个数
        //CountDownLatch,一个同步辅助类,在完成一组正在其他线程中执行的操作之前,它允许一个或多个线程一直等待。
        private static CountDownLatch latch = new CountDownLatch(count);
    
        public static void main(String[] args) throws InterruptedException {
            long timeStart = System.currentTimeMillis();
            ExecutorService pool = Executors.newFixedThreadPool(4);
            for (int i = 1; i <= 100; i++) {
                queue.offer(i);
            }
            for (int i = 0; i < count; i++) {
                pool.submit(new Runnable() {
                    @Override
                    public void run() {
                        Random random = new Random();
                        while (!queue.isEmpty()) {
                            try {
                                Thread.sleep(random.nextInt(10) * 50);
                            } catch (InterruptedException e) {
                                ;
                            }
                            // queue.poll() 可能为 null
                            System.out.println(Thread.currentThread().getName() + ":" + queue.poll());
                        }
                        latch.countDown();
                    }
                });
            }
            latch.await(); //使得主线程(main)阻塞直到latch.countDown()为零才继续执行
            System.out.println("cost time " + (System.currentTimeMillis() - timeStart) + "ms");
            pool.shutdown();
        }
    }
    

    2.4 BlockingQueue 阻塞队列

    与 ConcurrentLinkedQueue 相比,BlockingQueue 是阻塞的,即,put 方法在队列满的时候会阻塞直到有队列成员被消费,take 方法在队列空的时候也会阻塞,直到有队列成员被放进来。[自定义阻塞队列](./1.3 线程通信.md#自定义Queue)

    BlockingQueu API: BlockingQueue 方法以四种形式出现,对于不能立即满足但可能在将来某一时刻可以满足的操作,这四种形式的处理方式不同:

    • 第一种是抛出一个异常

    • 第二种是返回一个特殊值(null 或 false,具体取决于操作)

    • 第三种是在操作可以成功前,无限期地阻塞当前线程

    • 第四种是在放弃前只在给定的最大时间限制内阻塞。下表中总结了这些方法:

    操作 抛出异常 特殊值 阻塞 超时
    插入 add(e) offer(e) put(e) offer(e, time, unit)
    移除 remove() poll() take() poll(time, unit)
    检查 element() peek() 不可用 不可用

    (1) offer

    将指定元素插入此队列中(如果立即可行且不会违反容量限制),成功时返回 true,如果当前没有可用的空间,则返回 false,不会抛异常:

    java源代码:

    public boolean offer(E e) {  
       if (e == null) throw new NullPointerException();  
       final ReentrantLock lock = this.lock;  
       lock.lock();  
       try {  
           if (count == items.length)  
               return false;  
           else {  
               insert(e);  
               return true;  
           }  
       } finally {  
           lock.unlock();  
       }  
    }  
    

    (2) put

    将指定元素插入此队列中,将等待可用的空间.通俗点说就是>maxSize 时候,阻塞,直到能够有空间插入元素

    java源代码:

    public void put(E e) throws InterruptedException {  
        if (e == null) throw new NullPointerException();  
        final E[] items = this.items;  
        final ReentrantLock lock = this.lock;  
        lock.lockInterruptibly();  
        try {  
            try {  
                while (count == items.length)  
                    notFull.await();  
            } catch (InterruptedException e) {  
                notFull.signal(); // propagate to non-interrupted thread  
                throw e;  
            }  
            insert(e);  
       } finally {  
            lock.unlock();  
       }  
    }  
    

    (3) take

    获取并移除此队列的头部,在元素变得可用之前一直等待 。queue的长度 == 0 的时候,一直阻塞

    java 源代码:

    public E take() throws InterruptedException {  
        final ReentrantLock lock = this.lock;  
        lock.lockInterruptibly();  
        try {  
            try {  
                while (count == 0)  
                    notEmpty.await();  
            } catch (InterruptedException ie) {  
                notEmpty.signal(); // propagate to non-interrupted thread  
                throw ie;  
            }  
            E x = extract();  
            return x;  
        } finally {  
            lock.unlock();  
        }  
    }  
    

    (4) add

    和 collection 的 add 一样,没什么可以说的。如果当前没有可用的空间,则抛出 IllegalStateException。

    (5) poll/peek

    和 collection 的 poll/peek 一样,队列为空是返回 null

    BlockingQueue<String> queue = new LinkedBlockingQueue<String>(3);
    queue.offer("1");   // offer方法不是阻塞的
    queue.put("2");     // put方法是阻塞的
    queue.poll();       // poll方法不是阻塞的,删除第一个元素
    queue.peek();       // peek方法不是阻塞的
    queue.take();       // take方法是阻塞的
    

    2.4.1 ArrayBlockingQueue

    基于数组的阻塞队列实现,其内部维护了一个定长数组,以便缓存队列中的数据对象。没有实现读写分离,也就意味着生产和消费不能完全并行,需要定义长度,可以指定先进先出或者先进后出,也叫__有界队列__。

    2.4.2 LinkedBlockingQueue

    基于链表的阻塞队列实现,其内部维护了一个数据缓冲队列(链表构成),以便缓存队列中的数据对象。实现了读写分离(读和写两个锁),从而实现生产和消费的完全并行,进而能够高效的处理并发数据,是一个__无界队列__。

    2.4.3 SynchronousQueue

    一种没有缓冲的队列,生产者产生的数据直接会被消费者获取并消费。

    2.4.4 PriorityBlockingQueue

    基于优先级的阻塞队列(优先级的判断通过构造函数传入的Compator对象来决定,即传入队列的对象必须实现Comparable接口)实现。其内部控制线程同步的锁采用的是公平锁,也是一个无界队列。

    import java.util.concurrent.PriorityBlockingQueue;
    
    public class PriorityBlockingQueueTest {
    
        public static void main(String[] args) {
            PriorityBlockingQueue queue = new PriorityBlockingQueue();
            queue.add(new Task(1));
            queue.add(new Task(6));
            queue.add(new Task(5));
    
            while (true) {
                if (queue.size() == 0)
                    break;
                try {
                    System.out.println(queue.take()); // (1)
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
    
            /*for (Iterator it = queue.iterator(); it.hasNext();) {
                Task task = (Task) it.next();  // (2) 
                System.out.println(task);
            }*/
        }
    }
    
    class Task implements Comparable {
        private int id;
    
        @Override
        public int compareTo(Object o) {
            Task task = (Task) o;
            return this.id > task.id ? 1 : (this.id < task.id ? -1 : 0);
        }
    
        public void setId(int id) {
            this.id = id;
        }
        public Task(int id) {
            this.id = id;
        }
        public String toString() {
            return "Task{" + "id=" + id + '}';
        }
    }
    
    1. take/poll/peek 时,queue 队列按优先级顺序取出元素,程序执行结果如下:Task{id=1},Task{id=5},Task{id=6}

    2. 注意:queue.iterator() 时,queue 队列并__不是__按优先级顺序,结果如下:Task{id=1},Task{id=6},Task{id=5}

    2.4.5 DelayQueue

    带有延迟时间的队列,其中的元素只有当其指定的延迟时间到了,才能够从队列中获取元素(传入的元素必须实现Delayed接口),也是一个无界队列。应用场景比如缓存超时的数据进行移除、任务超时处理、空闲连接的关闭等等。

    import java.text.SimpleDateFormat;
    import java.util.Date;
    import java.util.concurrent.DelayQueue;
    import java.util.concurrent.Delayed;
    import java.util.concurrent.TimeUnit;
    
    /__
     * 延迟队列:模仿网吧上网场景
     */
    public class DelayQueueTest extends Thread {
    
        DelayQueue queue =  new DelayQueue();
        SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd hh:mm:ss");
    
        public void shangji(String name, int money) {
            WangMing wm = new WangMing(name, System.currentTimeMillis() + money * 1000l);
            queue.add(wm);
            System.out.println(name + "开始上网,时间:" + format.format(new Date()) +
                    ",预计下机时间为:" + format.format(new Date(wm.getEndTime())));
        }
    
        public void xiaji(WangMing wm) {
            System.out.println(wm.getName() + "下机,时间:" + format.format(new Date(wm.getEndTime())));
        }
    
        public void run() {
            while (true) {
                try {
                    WangMing wm = (WangMing) queue.take();
                    xiaji(wm);
                } catch (InterruptedException e) {
                    ;
                }
            }
        }
    
        public static void main(String[] args) {
            DelayQueueTest wangba = new DelayQueueTest();
            wangba.start();
    
            wangba.shangji("A", 5);
            wangba.shangji("B", 2);
            wangba.shangji("C", 4);
        }
    }
    
    /__
     * 网民,必须实现 Delayed 接口
     */
    class WangMing implements Delayed {
        private String name;
        private long endTime;
        private TimeUnit timeUnit = TimeUnit.SECONDS;
    
        @Override
        public long getDelay(TimeUnit unit) {
            return endTime - System.currentTimeMillis();
        }
    
        @Override
        public int compareTo(Delayed o) {
            WangMing wm = (WangMing) o;
            return this.getDelay(timeUnit) - wm.getDelay(timeUnit) > 0 ? 1 :
                    (this.getDelay(timeUnit) - wm.getDelay(timeUnit) < 0 ? -1 : 0);
        }
    
        public WangMing(String name, long endTime) {
            this.name = name;
            this.endTime = endTime;
        }
    
        public String getName() {
            return name;
        }
    
        public void setName(String name) {
            this.name = name;
        }
    
        public long getEndTime() {
            return endTime;
        }
    
        public void setEndTime(long endTime) {
            this.endTime = endTime;
        }
    }
    

    程序执行结果:

    A开始上网,时间:2017-12-07 09:37:52,预计下机时间为:2017-12-07 09:37:57
    B开始上网,时间:2017-12-07 09:37:52,预计下机时间为:2017-12-07 09:37:54
    C开始上网,时间:2017-12-07 09:37:52,预计下机时间为:2017-12-07 09:37:56
    B下机,时间:2017-12-07 09:37:54
    C下机,时间:2017-12-07 09:37:56
    A下机,时间:2017-12-07 09:37:57
    

    每天用心记录一点点。内容也许不重要,但习惯很重要!

  • 相关阅读:
    ENode框架Conference案例分析系列之
    ENode框架Conference案例分析系列之
    ENode框架Conference案例分析系列之
    ENode 2.6 架构与设计简介以及全新案例分享
    C#分布式消息队列 EQueue 2.0 发布啦
    EQueue 2.0 性能测试报告
    EQueue文件持久化消息关键点设计思路
    213.家庭账务管理信息系统
    212.基于DCT变换的水印算法模拟
    211.哈希表实现活期储蓄账目管理系统
  • 原文地址:https://www.cnblogs.com/binarylei/p/10024261.html
Copyright © 2011-2022 走看看