zoukankan      html  css  js  c++  java
  • Java并发编程(四)

    Java并发编程(四) - 原子类及阻塞队列

    1. 概述


    这里主要讲Java并发包中原子类以及阻塞队列

    2. 阻塞队列


    2.1 阻塞队列的概念
    阻塞队列是支持阻塞插入(队列满时,队列会阻塞插入元素的线程)以及阻塞移除(队列为空时,获取元素的队列会被阻塞)的队列。阻塞队列常用于生产者/消费者存放/获取元素的容器。

    2.2 阻塞队列的插入与移除方式

    方法/处理方式抛出异常返回特殊值一直阻塞超时抛出
    插入方法 add(e) offer(e) put(e) offer(e, time, unit)
    移除方法 remove() poll() take() poll(time, unit)
    检查方法 element peek() 不可用 不可用

    注:如果是无界阻塞队列,队列不可能出现满的情况,所以使用put或offer不会阻塞,offer永远放回true。

    2.3 Java中的阻塞队列

    队列名描述
    ArrayBlockingQueue 由数组结构组成的有界阻塞队列,可选公平策略,默认非公平队列
    LinkedBlockingQueue 由链表结构组成的有界阻塞队列,队列最大长度为Integer.MAX_VALUE(可以说无界)
    PriorityBlockingQueue 支持优先级排序的无界阻塞队列,元素必须实现Compareable接口或者构造时传入Comparator
    DelayQueue 支持延时获取元素的无界阻塞队列,元素必须实现Delayed接口
    SynchrinousQueue 不存储元素的阻塞队列,也就是说每个put操作必须等待一个take操作

    注:以上阻塞队列还没有写完,具体可见Java API。

    2.4 阻塞队列的实现机制
    其实之前的Java并发编程(三)已经讲过了,阻塞队列的实现有Lock+Condition的等待/通知机制来实现的,具体可见ArrayBlockingQueue的源码(JDK 1.8)如下所示,注:源码省略了一些内容:

    public class ArrayBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable {
        /** The queued items */
        final Object[] items;
    
        /** items index for next take, poll, peek or remove */
        int takeIndex;
    
        /** items index for next put, offer, or add */
        int putIndex;
    
        /** Number of elements in the queue */
        int count;
        
        /** Main lock guarding all access */
        final ReentrantLock lock;
    
        /** Condition for waiting takes */
        private final Condition notEmpty;
    
        /** Condition for waiting puts */
        private final Condition notFull;
    
        public ArrayBlockingQueue(int capacity, boolean fair) {
            if (capacity <= 0)
                throw new IllegalArgumentException();
            this.items = new Object[capacity];
            lock = new ReentrantLock(fair);
            notEmpty = lock.newCondition();
            notFull =  lock.newCondition();
        }
        /**
         * put操作
         */
        public void put(E e) throws InterruptedException {
            checkNotNull(e);
            final ReentrantLock lock = this.lock;
            lock.lockInterruptibly();
            try {
                // 当队列中元素满了,阻塞插入元素的线程
                while (count == items.length)
                    notFull.await();
                // 插入元素到队列中
                enqueue(e);
            } finally {
                lock.unlock();
            }
        }
        private void enqueue(E x) {
            // assert lock.getHoldCount() == 1;
            // assert items[putIndex] == null;
            final Object[] items = this.items;
            items[putIndex] = x;
            if (++putIndex == items.length)
                putIndex = 0;
            count++;
            // 通知获取等待队列中的线程有元素可获取
            notEmpty.signal();
        }
        /**
         * take操作
         */
        public E take() throws InterruptedException {
            final ReentrantLock lock = this.lock;
            lock.lockInterruptibly();
            try {
                // 当队列中元素为空,阻塞获取元素的线程
                while (count == 0)
                    notEmpty.await();
                // 从队列中移除一个元素
                return dequeue();
            } finally {
                lock.unlock();
            }
        }
        private E dequeue() {
            // assert lock.getHoldCount() == 1;
            // assert items[takeIndex] != null;
            final Object[] items = this.items;
            @SuppressWarnings("unchecked")
            E x = (E) items[takeIndex];
            items[takeIndex] = null;
            if (++takeIndex == items.length)
                takeIndex = 0;
            count--;
            if (itrs != null)
                itrs.elementDequeued();
            // 通知插入等待队列中的线程有空位可供插入元素
            notFull.signal();
            return x;
        }
    }
    

    阻塞/唤醒线程是通过Condition中的await()与signal()实现的,而锁依赖于AQS实现的,具体可以看AQS中的ConditionObject的await()与signal()方法。而AQS中的阻塞/唤醒线程使用了LockSupport类中park()与unPark()方法,再进一步就是调用本地方法进行阻塞/唤醒线程。

    3. Java中的原子类


    3.1 原子类的概述
    原子类具有内存可见性以及原子性的特征,我们知道volatile变量具有内存可见性以及单个读写操作的原子性,但对于复杂操作不具有原子性。然后,通过引进CAS操作来实现原子性。所以原子类是由volatile+CAS来共同实现的。

    3.2 原子类的实现
    原子类是由volatile+CAS来实现的,具体可见AtomicInteger的源码(JDK 1.8)如下,注:源码有删减

    public class AtomicInteger extends Number implements java.io.Serializable {
        private volatile int value;
        
        public AtomicInteger(int initialValue) {
            value = initialValue;
        }
        
        public AtomicInteger(int initialValue) {
            value = initialValue;
        }
    
        public final int get() {
            return value;
        }
        
        /**
         * 这里使用到了CAS操作,通过本地方法进行原子更新
         */
        public final int incrementAndGet() {
            return unsafe.getAndAddInt(this, valueOffset, 1) + 1;
        }
    }
    

    4. References


    《Java并发编程的艺术》

  • 相关阅读:
    loj#2049. 「HNOI2016」网络(set 树剖 暴力)
    创建多个Oracle数据库及相应的实例
    [置顶] lua 进阶3--lua文件中调用C++函数
    android 设置Button或者ImageButton的背景透明 半透明 透明
    struts2 18拦截器详解(七)
    《Linux命令行与shell脚本编程大全》 第二十三章 学习笔记
    ios7下不能录音问题解决
    360 2013校园招聘笔试题(含参考答案)
    【MFC三天一个游戏】之 局域网黑白棋
    Remove Nth Node From End of List
  • 原文地址:https://www.cnblogs.com/maying3010/p/6932545.html
Copyright © 2011-2022 走看看