zoukankan      html  css  js  c++  java
  • Java 并发集合类

    集合

     1 ConcurrentHashMap

      基于散列链表+红黑树实现,类似于 HashMap,JDK 8 进行了优化,利用 volatile + CAS 实现无锁化操作,保证线程安全的同时,提高性能。默认容量16,默认加载因子0.75;
      散列链表和红黑树的内部类定义如下:

    // 基本结构
    static class Node<K,V> implements Map.Entry<K,V> {
    	final int hash;
    	final K key;
    	volatile V val;
    	volatile Node<K,V> next;
    }
    // 红黑树结构,链表长度大于8时转换
    static final class TreeNode<K,V> extends Node<K,V> {
    	TreeNode<K,V> parent;  // red-black tree links
    	TreeNode<K,V> left;
    	TreeNode<K,V> right;
    	TreeNode<K,V> prev;    // needed to unlink next upon deletion
    	boolean red;
    }
    

      和 HashMap 比较,多了内部类 TreeBin,它不存储键值,而是指向 TreeNode 列表和它们的根节点,而 ConcurrentHashMap 也是操作 TreeBin。此外,TreeBin 还维护了读写锁状态,这会使得在树重组之前,持有锁的写操作会等待未持锁的读操作完成。

    // 指向TreeNode列表和它们的根节点,
    static final class TreeBin<K,V> extends Node<K,V> {
    	TreeNode<K,V> root;
    	volatile TreeNode<K,V> first;
    	volatile Thread waiter;
    	volatile int lockState;
    	static final int WRITER = 1; // 持有写锁时
    	static final int WAITER = 2; // 等待写锁时
    	static final int READER = 4; // 用来设置读锁的增量值
    }
    

      如何做到线程安全的呢?
      1. sizeCtl:控制表的初始化和重建。负数表示表正在初始化或者重建:
        -1表示在初始化;
        -(1+N)表示有N个正在进行重建的线程;
      2. 节点哈希值表示的状态
        MOVED=-1 表示 forward 节点;
        TREEBIN=-2 表示 treeBin 的根节点;
      3. V put(K key, V value) 操作
        如果表为空,则初始化表;
        如果hash位置为空,则通过CAS设置值;
        如果hash=-1,则帮组扩容;
        如果节点既不为空,也不等于-1,那么锁住该节点,在链表或者红黑树上添加值;
      4. void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) 扩容
        初始化新表,容量是原表的2倍;
        复制元素到新表,处理一个节点就把节点设置为forward;
        当这个节点为空时,通过CAS来设置forward;
        当节点值为-1时,表示forward已经处理过了;
        当节点不为空且不为-1时,锁住节点进行处理;
        其他线程看到节点为forward时,向后遍历来完成;
      5. V get(Object key) 操作
        如果hash值匹配,则直接获取;
        如果hash值小于0,则从树上查找;
        否则,遍历链表寻找;
      6. mappingCount()(推荐,因为其返回值时long) 和 size(),都是调用 sumCount() 来计算
        定义了内部类 CounterCell 来计数,计算总数时,就是把 CounterCell[] 数组的值累加起来即可;

    // put 源码
    Node<K,V> f; int n, i, fh;
    if (tab == null || (n = tab.length) == 0)
    	tab = initTable(); // 表长度为空时,初始化表
    else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
    	if (casTabAt(tab, i, null, new Node<K,V>(hash, key, value, null)))
    		break;                   // hash的位置为空时,通过CAS设置值
    }
    else if ((fh = f.hash) == MOVED)
    	tab = helpTransfer(tab, f); // 如果节点是 forward 节点,帮助扩容
    else {
    	synchronized (f) { // 不为空,不是 forward 节点时,synchronized 锁住节点
    		if (tabAt(tab, i) == f) { // 锁住后再次判断节点有没有变化
    			if (fh >= 0) { 
    				... // 表示要操作链表节点
    			}
    			else if (f instanceof TreeBin) {
    				... // 表示操作的是TreeBin节点
    			}
    		}
    	}
    	if (binCount != 0) {
    		if (binCount >= TREEIFY_THRESHOLD)
    			treeifyBin(tab, i); // 链表长度大于8,转成红黑树
    	}
    }
    
    // 并发扩容的方法
    private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
    	int n = tab.length, stride;
    	if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
    		stride = MIN_TRANSFER_STRIDE; // subdivide range
    	if (nextTab == null) {            // 初始化新的表,容量是原表的2倍
    		Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
    		nextTab = nt;
    		nextTable = nextTab; // nextTable 是定义的临时表,仅在表重建时不为空
    		transferIndex = n; // 表重建时的下一个表的索引
    	}
    	int nextn = nextTab.length; // 扩容后的表长度
    	ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
    	boolean advance = true; // true 表示该节点已处理
    	boolean finishing = false; // 确保已经完成了
    	for (int i = 0, bound = 0;;) {
    		if (i < 0 || i >= n || i + n >= nextn) {
    			int sc;
    			if (finishing) {
    				... // 完成了
    				return;
    			}
    			if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) { // sizeCtl-1,表示多了一个线程来扩容
    				if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
    					return;
    				finishing = advance = true;
    				i = n; // recheck before commit
    			}
    		}
    		else if ((f = tabAt(tab, i)) == null)
    			advance = casTabAt(tab, i, null, fwd); // 节点位置是空的,通过CAS设置值为forward
    		else if ((fh = f.hash) == MOVED)
    			advance = true; // 这个位置是forward节点,表示已经处理了
    		else {
    			synchronized (f) { // 节点不为空,且不是forward节点,锁住该节点再处理
    				... // 类似put的操作
    			}
    		}
    	}
    }
    
    // get 源码
    if ((eh = e.hash) == h) {
    	if ((ek = e.key) == key || (ek != null && key.equals(ek)))
    		return e.val; // 直接获得值
    }
    else if (eh < 0)
    	return (p = e.find(h, key)) != null ? p.val : null; // 在树上查找
    while ((e = e.next) != null) {
    	if (e.hash == h && ((ek = e.key) == key || (ek != null && key.equals(ek))))
    		return e.val; // 遍历链表查找
    }
    
    // 计数方法
    private transient volatile CounterCell[] counterCells; // 数组,存储统计值
    @sun.misc.Contended static final class CounterCell {
    	volatile long value;
    	CounterCell(long x) { value = x; }
    }
    final long sumCount() {
    	CounterCell[] as = counterCells; CounterCell a;
    	long sum = baseCount;
    	if (as != null) {
    		for (int i = 0; i < as.length; ++i) {
    			if ((a = as[i]) != null)
    				sum += a.value; // 统计值累加
    		}
    	}
    	return sum;
    }
    

     2 ConcurrentSkipListMap

      基于跳表实现,按照 key 自然排序,key 不能为 null,类似 TreeMap。
      利用 volatile+CAS 来保证线程安全。

    static final class Node<K,V> {
        final K key;
        volatile Object value;
        volatile Node<K,V> next;
    }
    boolean casValue(Object cmp, Object val) {
        return UNSAFE.compareAndSwapObject(this, valueOffset, cmp, val);
    }
    

     3 ConcurrentSkipListSet

      使用 ConcurrentSkipListMap 实现。

     4 CopyOnWriteArrayList

      基于数组实现,相当于支持并发的 ArrayList,刚创建时初始化为长度0的数组。
      利用写时复制来保证线程安全。
      写时复制:数组是 volatile 类型的,修改数组时,首先 ReentrantLock 加锁,然后复制一个副本数组,对副本进行修改完成后,把原来的数组指向这个新的数组完成赋值。读时不用加锁。

    private transient volatile Object[] array;
    public boolean add(E e) {
    // 加锁进行写时复制
    final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            Object[] elements = getArray();
            int len = elements.length;
            // 拷贝新数组,长度+1
            Object[] newElements = Arrays.copyOf(elements, len + 1);
            newElements[len] = e; 
            // set给volatile的array
            setArray(newElements);
            return true;
      } finally {
            lock.unlock();
        }
    }
    

     5 CopyOnWriteArraySet

      使用 CopyOnWriteArrayList 实现。去重的,但是按照插入顺序排序的。

    非阻塞队列

     1 ConcurrentLinkedQueue

      基于链表实现的无界的线程安全的非阻塞队列,遵循 FIFO,利用 volatile+CAS 来保证线程安全。

    private static class Node<E> {
        volatile E item;
        volatile Node<E> next;
    }
    // 初始化 head 和 tail
    private transient volatile Node<E> head;
    private transient volatile Node<E> tail;
    public ConcurrentLinkedQueue() {
        head = tail = new Node<E>(null);
    }
    // 利用 CAS 修改链表
    private boolean casTail(Node<E> cmp, Node<E> val) {
        return UNSAFE.compareAndSwapObject(this, tailOffset, cmp, val);
    }
    

     2 ConcurrentLinkedDeque

      基于双向链表实现的无界的线程安全的非阻塞队列,实现方式类似 ConcurrentLinkedQueue。

    // 双向链表
    static final class Node<E> {
        volatile Node<E> prev;
        volatile E item;
        volatile Node<E> next;
    }
    

    阻塞队列

      实现:通过 ReentrantLock 和 Condition 实现的等待通知模型来实现阻塞队列。

     1 ArrayBlockingQueue

      基于数组实现的阻塞队列,需要指定容量。

    // poll 类似
    public boolean offer(E e) {
    	final ReentrantLock lock = this.lock;
    	lock.lock(); // 加锁
    	try {
    		if (count == items.length)
    			return false; // 超过长度,返回false,数据丢失
    		final Object[] items = this.items;
    		items[putIndex] = x; // putIndex表示下一次加元素的索引
    		if (++putIndex == items.length)
    			putIndex = 0; // 达到长度后,索引位归零
    		count++; // 计数+1
    		notEmpty.signal(); // 通知可以取值了
    		return true;
    	} finally {
    		lock.unlock(); // 解锁
    	}
    }
    

     2 LinkedBlockingQueue

      基于链表实现的阻塞队列,默认容量为 Integer.MAX_VALUE。
      实现类似 ArrayBlockingQueue,计数用的原子类 AtomicInteger。

     3 PriorityBlockingQueue

      基于二叉小顶堆实现的阻塞队列,保证取出的元素是最小的,默认初始化容量11。

     4 DelayQueue

      基于数组实现的延迟阻塞队列。使用时必须实现 Delayed。

    原子操作类

      以 AtomicInteger 为例,利用 volatile+CAS 来保证原子操作,直接看源码注释

    private static final Unsafe unsafe = Unsafe.getUnsafe();
    private static final long valueOffset;
    
    private volatile int value;
    
    // 直接获取 volatile 变量
    public final int get() {
        return value;
    }
    // 通过 Unsafe 的 CAS 原子操作 volatile 变量
    public final boolean compareAndSet(int expect, int update) {
        return unsafe.compareAndSwapInt(this, valueOffset, expect, update);
    }
    // 通过 Unsafe 的 CAS 原子操作 + 1
    public final int incrementAndGet() {
        return unsafe.getAndAddInt(this, valueOffset, 1) + 1;
    }
    

    并发工具类

     1 CountDownLatch

      功能:指定 N 个线程等待全部完成后,继续执行。
      实现:内部类 Sync 实现了 AQS 同步器,初始化时设置 AQS 的同步状态来表示 countDown 的数量,await() 方法把当前线程加入到 AQS 等待队列,让当前线程阻塞住,执行 countDown() 方法会把同步状态减1,当减到0时,唤醒等待队列中的线程。

     2 CyclicBarrier

      功能:类似 CountDownLatch,但是支持 reset() 重置状态,能指定到达数量时执行的动作。
      实现:基于 ReentrantLock 和 Condition 实现,核心源码如下

    private int dowait(boolean timed, long nanos) {
        final ReentrantLock lock = this.lock;
        lock.lock(); // 加锁,保护 count
        try {
            
            if (Thread.interrupted()) {
                breakBarrier(); // 使用 signalAll 唤醒所有线程
                throw new InterruptedException();
            }
    
            int index = --count; // 线程数量递减
            if (index == 0) {  // 如果线程数量到达 0
                final Runnable command = barrierCommand;
                if (command != null)
                    command.run(); // 执行 barrierAction
                return 0;
            }
    
            // 此时线程数量还没到 0
            for (;;) {
                try {
                    if (!timed)
                        trip.await(); // 调用 Condition 的 await 进行等待
                    else if (nanos > 0L)
                        nanos = trip.awaitNanos(nanos); // 待超时的等待
                }
            }
        } finally {
            lock.unlock(); // 释放锁
        }
    }
    

    线程池

     ThreadPoolExecutor 参数说明:
      1. 核心线程池
      2. 最大线程池
      3. 线程空闲时间
      4. 线程空闲时间单位
      5. 阻塞队列
      6. 线程工厂:创建具有相同特性的一组线程。
      7. 拒绝策略
       CallerRunsPolicy:重试添加当前的任务,会自动重复调用 execute() 方法,直到成功。
       AbortPolicy:对拒绝任务抛弃处理,并且抛出异常。
       DiscardPolicy:对拒绝任务直接无声抛弃,没有异常信息。
       DiscardOldestPolicy:对拒绝任务不抛弃,而是抛弃队列里面等待最久的一个线程,然后把拒绝任务加到队列。

       线程池数量理论值 -> CPU 密集型:N+1;IO 密集型:2N+1

       线程的提交方式:
       1. execute():用于提交不需要返回值的任务
       2. submit():用于提交需要返回值的任务,返回future对象。

       线程池线程的执行流程:核心 -> 队列 -> 最大 -> 拒绝策略
       1. 如果当前运行的线程少于核心线程池时,则创建新的线程执行任务;
       2. 如果当前运行的线程大于等于核心线程池时,则把任务加入阻塞队列;
       3. 如果阻塞队列已经满了,则创建新的线程执行任务;
       4. 如果线程数超过了最大线程数,则调用拒绝策略

  • 相关阅读:
    minixml3.1库的使用
    linux coredump及函数栈空间大小分析
    linx 设备名字来由 sd sr sg st
    gcc 遇到过的语法问题
    I帧、B帧、P帧、NALU类型
    linux grub 使用
    结构体sockadrr、sockaddr_in、in_addr的定义
    linux c log 日志接口
    关于32位/64位版本头文件的重要
    汇编指令缩写
  • 原文地址:https://www.cnblogs.com/bigshark/p/11180277.html
Copyright © 2011-2022 走看看