zoukankan      html  css  js  c++  java
  • 多线程并发编程总结(二)

    本文基于https://github.com/h2pl/Java-Tutorial的总结

    ReentrantReadWriteLock(读写锁)源码分析

    ReentrantReadWriteLock 分为读锁和写锁两个实例:
    	读锁是共享锁,可被多个线程同时使用,写锁是独占锁。
    	持有写锁的线程可以继续获取读锁(锁降级),反之不行(持有读锁必须先释放才能再次获取写锁)。
    
    
    AQS 的精髓在于内部的属性 state:
    
    	独占模式,通常就是 0 代表可获取锁,>=1 代表锁被别人获取了。
    	共享模式下,每个线程都可以对 state 进行加减操作。
    
    	独占模式和共享模式对于 state 的操作完全不一样,
    	读写锁 ReentrantReadWriteLock 中是将 state 这个 32 位的 int 值分为高 16 位和低 16位,
    	分别用于共享模式和独占模式。
    
    
    /**
     * 读锁
     */
    public class ReentrantReadWriteLock
            implements ReadWriteLock, java.io.Serializable {
    
        private final ReentrantReadWriteLock.ReadLock readerLock;
    
        private final ReentrantReadWriteLock.WriteLock writerLock;
    
        final Sync sync;
    
    
        abstract static class Sync extends AbstractQueuedSynchronizer {
        	static final int SHARED_SHIFT   = 16;
            static final int SHARED_UNIT    = (1 << SHARED_SHIFT);
            static final int MAX_COUNT      = (1 << SHARED_SHIFT) - 1;
            static final int EXCLUSIVE_MASK = (1 << SHARED_SHIFT) - 1;
    
            //分享锁(高16位)
            static int sharedCount(int c)    { return c >>> SHARED_SHIFT; }
            //独占锁(低16位)
            static int exclusiveCount(int c) { return c & EXCLUSIVE_MASK; }
        }
    
    
        public static class ReadLock implements Lock, java.io.Serializable {
    
        	private final Sync sync;
    
        	public void lock() {
                sync.acquireShared(1);
            }
        }
    }
    
    
    AbstractQueuedSynchronizer:
    
        public final void acquireShared(int arg) {
            if (tryAcquireShared(arg) < 0)	//尝试获取锁
                doAcquireShared(arg);	//获取锁失败,进入阻塞队列
        }
    
    
    ReentrantReadWriteLock:
    
    	abstract static class Sync extends AbstractQueuedSynchronizer {
    
            protected final int tryAcquireShared(int unused) {
                Thread current = Thread.currentThread();
                int c = getState();
                if (exclusiveCount(c) != 0 &&
                    getExclusiveOwnerThread() != current)	//非当前线程持有写锁,获取读锁失败(如果是当前线程,可以锁降级)
                    return -1;
                int r = sharedCount(c);	//当前持有读锁的次数
                //获取读锁
                //如果公平模式:判断阻塞队列是否有等待
                //如果非公平模式:判断阻塞队列中 head 的第一个后继节点是否是来获取写锁的,如果是的话,让这个写锁先来,避免写锁饥饿。
                if (!readerShouldBlock() &&
                    r < MAX_COUNT &&
                    compareAndSetState(c, c + SHARED_UNIT)) {
                    if (r == 0) {
                        firstReader = current;
                        firstReaderHoldCount = 1;
                    } else if (firstReader == current) {
                        firstReaderHoldCount++;
                    } else {
                        HoldCounter rh = cachedHoldCounter;
                        if (rh == null ||
                            rh.tid != LockSupport.getThreadId(current))
                            cachedHoldCounter = rh = readHolds.get();
                        else if (rh.count == 0)
                            readHolds.set(rh);
                        rh.count++;
                    }
                    return 1;
                }
                //处理可重入锁( readerShouldBlock()中阻塞队列有等待而使其获取重入锁失败,则在此方法中获取重入锁 )
                return fullTryAcquireShared(current);
            }
    	}
    
    
    /**
     * 写锁
     */
    写锁是独占锁:
    	如果有读锁被占用,写锁获取是要进入到阻塞队列中等待的。
    

    JUC 一 ReentrantReadWriteLock

    BlockingQueue(不接受 null 值的插入)

    public interface BlockingQueue<E> extends Queue<E> {}
    

    BlockingQueu 阻塞队列

    ArrayBlockingQueue(数组。有界)

    ArrayBlockingQueue 实现并发同步的原理是:根据 Condition 实现。
    
    	读操作和写操作都需要获取到 AQS 独占锁才能进行操作。
    
    	如果队列为空,这个时候读操作的线程进入到读线程队列排队,等待写线程写入新的元素,然后唤醒读线程队列的第一个等待线程。
    
    	如果队列已满,这个时候写操作的线程进入到写线程队列排队,等待读线程将队列元素移除腾出空间,然后唤醒写线程队列的第一个等待线程。
    
    
    public class ArrayBlockingQueue<E> extends AbstractQueue<E>
            implements BlockingQueue<E>, java.io.Serializable {
    
        final Object[] items;
    
        int takeIndex;	//下一次读取操作的位置
    
        int putIndex;	//下一次写入操作的位置
    
        int count;
    
        final ReentrantLock lock;
    
        private final Condition notEmpty;
    
        private final Condition notFull;
    
        transient Itrs itrs;
    
    
        //写元素
        public void put(E e) throws InterruptedException {
            Objects.requireNonNull(e);
            final ReentrantLock lock = this.lock;	//获取锁
            lock.lockInterruptibly();
            try {
                while (count == items.length)
                    notFull.await();	//队列满了,写线程阻塞,等待被唤醒
                enqueue(e);	//队列不满,写线程被唤醒,写元素,并唤醒读元素线程
            } finally {
                lock.unlock();
            }
        }
    
    
        private void enqueue(E e) {
            final Object[] items = this.items;
            items[putIndex] = e;
            if (++putIndex == items.length) putIndex = 0;
            count++;
            notEmpty.signal();	//唤醒读线程
        }
    }
    

    LinkedBlockingQueue(链表。可以无界,也可以有界)

    底层基于单向链表实现的阻塞队列,可以当做无界队列也可以当做有界队列来使用。
    
    
    public class LinkedBlockingQueue<E> extends AbstractQueue<E>
            implements BlockingQueue<E>, java.io.Serializable {
    
        static class Node<E> {
            E item;
    
            Node<E> next;
    
            Node(E x) { item = x; }
        }
    
        private final int capacity;
    
        private final AtomicInteger count = new AtomicInteger();
    
        transient Node<E> head;	//对头
    
        private transient Node<E> last;	//队尾
    
        private final ReentrantLock takeLock = new ReentrantLock();
    
        private final Condition notEmpty = takeLock.newCondition();
    
        private final ReentrantLock putLock = new ReentrantLock();
    
        private final Condition notFull = putLock.newCondition();
    }
    
    
    takeLock 和 notEmpty 搭配:
    	如果要获取(take)一个元素,需要获取 takeLock 锁,
    	但是获取了锁还不够,如果队列此时为空,还需要队列不为空(notEmpty)这个条件(Condition)。
    
    putLock 需要和 notFull 搭配:
    	如果要插入(put)一个元素,需要获取 putLock 锁,
    	但是获取了锁还不够,如果队列此时已满,还需要队列不是满的(notFull)这个条件(Condition)。
    
    
    	//写元素
    	public void put(E e) throws InterruptedException {
            if (e == null) throw new NullPointerException();
            final int c;
            final Node<E> node = new Node<E>(e);
            final ReentrantLock putLock = this.putLock;
            final AtomicInteger count = this.count;
            putLock.lockInterruptibly();	//获取写锁
            try {
                while (count.get() == capacity) {
                    notFull.await();	//队列满了,写线程阻塞
                }
                enqueue(node);	//被别人唤醒,队列未满,写元素
                c = count.getAndIncrement();
                if (c + 1 < capacity)	//写进一个元素后,如果还有空位,唤醒别的写线程
                    notFull.signal();
            } finally {
                putLock.unlock();
            }
            if (c == 0)
            	//如果 c == 0,那么代表队列在这个元素入队前是空的(不包括head空节点),
        		//那么所有的读线程都在等待 notEmpty 这个条件,等待唤醒,这里做一次唤醒操作
                signalNotEmpty();
        }
    
    
        private void enqueue(Node<E> node) {
            last = last.next = node;	//写元素(将元素放在链表队尾)
        }
    
    
        private void signalNotEmpty() {
            final ReentrantLock takeLock = this.takeLock;
            takeLock.lock();
            try {
                notEmpty.signal();
            } finally {
                takeLock.unlock();
            }
        }
    

    SynchronousQueue(读写匹配,不提供储存元素的空间。可以选择公平与非公平)

    Synchronous 指的就是读线程和写线程需要同步,一个读线程匹配一个写线程。
    
    不提供任何空间(一个都没有)来存储元素。数据必须从某个写线程交给某个读线程,而不是写到某个队列中等待被消费。
    
    
    transfer 的设计思路:
    	
    	当调用这个方法时,如果队列是空的,或者队列中的节点和当前的线程操作类型一致
    	(如当前操作是 put 操作,而队列中的元素也都是写线程)。
    	这种情况下,将当前线程加入到等待队列即可。
    
    	如果队列中有等待节点,而且与当前操作可以匹配(如队列中都是读操作线程,当前线程是写操作线程,反之亦然)。
    	这种情况下,匹配等待队列的队头,出队,返回相应数据。
    
    
    public class SynchronousQueue<E> extends AbstractQueue<E>
        implements BlockingQueue<E>, java.io.Serializable {
    
        public void put(E e) throws InterruptedException {
            if (e == null) throw new NullPointerException();
            if (transferer.transfer(e, false, 0) == null) {
                Thread.interrupted();
                throw new InterruptedException();
            }
        }
    
    
        public E take() throws InterruptedException {
            E e = transferer.transfer(null, false, 0);
            if (e != null)
                return e;
            Thread.interrupted();
            throw new InterruptedException();
        }
    
    
        static final class TransferQueue<E> extends Transferer<E> {	//公平模式
    
        	static final class QNode {	//单向链表等待队列
            	volatile QNode next;
            	volatile Object item;
            	volatile Thread waiter;
            	final boolean isData;
    
     		}
    
            transient volatile QNode head;
    
            transient volatile QNode tail;
    
            transient volatile QNode cleanMe;
    
        	E transfer(E e, boolean timed, long nanos) {}
        }
    }
    

    PriorityBlockingQueue(数组。无界。二叉堆)

    插入队列的对象必须是可比较大小的(comparable),否则报 ClassCastException 异常。
    它的插入操作 put 方法不会 block,因为它是无界队列(take 方法在队列为空的时候会阻塞)。
    
    迭代并遍历时,不能保证有序性:
    	如果你想要实现有序遍历,建议采用 Arrays.sort(queue.toArray()) 进行处理。
    
    public class PriorityBlockingQueue<E> extends AbstractQueue<E>
        implements BlockingQueue<E>, java.io.Serializable {
    
        private transient Object[] queue;
    
        private transient int size;
    
        private transient Comparator<? super E> comparator;
    
        private final ReentrantLock lock = new ReentrantLock();	//并发控制采用的是 ReentrantLock
    
        private final Condition notEmpty = lock.newCondition();
    
        private transient volatile int allocationSpinLock;	//数组扩容的时候,需要先获取到这个锁,才能进行扩容操作
    
        private PriorityQueue<E> q;	//用于序列化和反序列化的时候用
    }
    
  • 相关阅读:
    3dmax安装、破解与插件安装--以2014为例
    两数求和java(字符串强转int型)
    《大道至简》第一章伪代码(四个小部分)
    咳咳,软工新手读《大道至简》读后感
    一:requests爬虫基础
    爬虫
    Django中ORM系统多表数据操作
    Django中ORM简介与单表数据操作
    Django初识
    jQuery基础
  • 原文地址:https://www.cnblogs.com/loveer/p/11829537.html
Copyright © 2011-2022 走看看