锁是最常见的同步方法之一,在高并发环境下,激烈的的锁竞争导致程序的性能下降,因此我们有必要讨论一些有关锁的性能问题,以及一些注意事项,比如避免死锁等。为了降低锁的竞争导致程序性能下降的话,可以用以下建议提高一下性能。
1. 减少锁持有时间
对于使用锁进行并发控制的应用程序而言,在锁竞争过程中,单个线程对锁的持有时间与系统性能有着直接关系,如果线程持有锁的时间越长,那么锁的竞争程度越激烈。以下面的代码片段为例进行说明:
public synchronized void syncMethod(){ othercode1(); mutexMethod(); othercode2(); }
在syncMethod()方法中,假设只有mutexMethod()方法是有同步需要的,而othercode1()方法和othercode2()方法并不需要做同步控制。如果这俩方法分别是重量级的方法的话,就会花费长时间的CPU,高并发情况下,同步整个方法会导致等待线程大量增加。因为线程进入该方法时获得内部锁,只有在所有任务执行完后才会释放锁,优化方案就是在只在必要是进行同步,这样就能明显减少线程持有锁的时间,提高系统的吞吐量。
public void syncMethod2(){ othercode1(); synchronized(this){ mutexMethod(); } othercode2(); }
2. 减小锁粒度
是指缩小锁定对象的范围,从而降低锁冲突的可能性,进而提高系统的并发能力。在JDK中典型的使用场景就是ConcurrentHashMap,这个Map内部细分成了若干个小的HashMap,称之为段(SEGMENT),默认情况下是16段。
如果需要在ConcurrentHashMap中增加一个新的表项,并不是将整个HashMap加锁,而是首先根据hashcode得到该表项应该被存放在哪个段中,然后对该段加锁,并完成put()方法操作。在多线程中,如果多个线程同时进行put()操作,只要被加入的表项不存放在同一个段中,线程间便可以做到真正的并行。默认是16段,幸运的话,在可以接受16个线程同时插入,大大提升吞吐量。下面的代码就是其put()方法操作过程。第5~6行代码根据key获得对应段的序号 j,然后得到段 s,然后将数据插入给定的段中。
public V put(K key, V value) { Segment<K,V> s; if (value == null) throw new NullPointerException(); int hash = hash(key); int j = (hash >>> segmentShift) & segmentMask; if ((s = (Segment<K,V>)UNSAFE.getObject // nonvolatile; recheck (segments, (j << SSHIFT) + SBASE)) == null) // in ensureSegment s = ensureSegment(j); return s.put(key, hash, value, false); }
但是减小粒度会带来一个新问题,当系统需要全局锁的时候,其消耗的资源就很多了。比如要获得ConcurrentHashMap的全局信息,就需要同时取得所有段的锁方能顺利实施。比如这个map的size()方法,返回ConcurrentHashMap全部有效表项之和,要获取这个信息需要取得所有子段的锁,因此size()方法的代码如下:
public int size() { // 尝试几次以获得准确的计数。如果由于表中的连续异步更改而失败,请使用锁定。 final Segment<K,V>[] segments = this.segments; int size; boolean overflow; long sum; // 总数 long last = 0L; // 上一个总数 int retries = -1; try { for (;;) { if (retries++ == RETRIES_BEFORE_LOCK) { for (int j = 0; j < segments.length; ++j) ensureSegment(j).lock(); // 给所有段加锁 } sum = 0L; size = 0; overflow = false; for (int j = 0; j < segments.length; ++j) { Segment<K,V> seg = segmentAt(segments, j); if (seg != null) { sum += seg.modCount; // 统计总数 int c = seg.count; if (c < 0 || (size += c) < 0) overflow = true; } } if (sum == last) break; last = sum; } } finally { if (retries > RETRIES_BEFORE_LOCK) { for (int j = 0; j < segments.length; ++j) segmentAt(segments, j).unlock(); // 释放所有锁 } } return overflow ? Integer.MAX_VALUE : size; }
从上面代码中可以看到size()首先尝试无锁求和,如果失败了会尝试加锁的方法。只有类似于size()方法获取全局信息的方法调用使用不频繁时候,这种减小锁粒度的方法才能在真正意义上提高系统的吞吐量。
3. 用读写分离锁替换独占锁
使用读写分离锁ReadWriteLock可以提高系统的性能,其实这是减小粒度的一种特殊情况,读写分离锁是对系统功能点分割。因为读操作本身不会影响数据的完整性和一致性,因此在理论上讲,可以允许多线程之间同时读,读写锁正是实现了这个功能。所以在读多写少的场合使用读写锁可以有效提升系统的并发能力。
4. 锁分离
将读写锁的思想进一步延伸就是锁分离了。读写锁根据读写操作功能上的不同,进行有效的锁分离。我们可以依据应用程序的功能特点,使用类似的分离思想,也可以对独占锁进行分离。LinkedBlockingQueue的实现中,take()和put()分别实现了从队列中取数据和增加数据的功能,虽然俩函数对队列都有修改,但是LinkedBlockingQueue是基于链表的,俩操作一个作用与链表头,一个作用于链表尾部,从理论上讲两者不冲突的。使用独占锁的话,这俩方法不可能实现真正的并发,他们彼此等待对方释放锁资源。JDK中分别用两把锁分离了take()和put()。
/** Lock held by take, poll, etc */ private final ReentrantLock takeLock = new ReentrantLock(); /** Wait queue for waiting takes */ private final Condition notEmpty = takeLock.newCondition(); /** Lock held by put, offer, etc */ private final ReentrantLock putLock = new ReentrantLock(); /** Wait queue for waiting puts */ private final Condition notFull = putLock.newCondition();
以上代码片段定义了takeLock和putLock,它们分别在take()和put()方法中使用,因此这俩方法就此相互独立,这俩方法之间不存在锁竞争关系,只需要take()和take()方法间、put()和put()方法间分别对takeLock和putLock进行竞争,从而削弱了锁竞争。take()方法实现如下:
public E take() throws InterruptedException { E x; int c = -1; final AtomicInteger count = this.count; final ReentrantLock takeLock = this.takeLock; takeLock.lockInterruptibly(); // 不能有两个线程同时取数据 try { while (count.get() == 0) { // 如果当前没有可用数据,一直等待 notEmpty.await(); // 等待put()方法操作的通知 } x = dequeue(); // 取得第一个数据 c = count.getAndDecrement();// 数量减一,原子操作,因此会和put()同时访问count if (c > 1) notEmpty.signal(); // 通知其他take()方法操作 } finally { takeLock.unlock(); // 释放锁 } if (c == capacity) signalNotFull(); // 通知put()方法操作,已有空余空间 return x; }
put()方法实现如下:
public void put(E e) throws InterruptedException { if (e == null) throw new NullPointerException(); int c = -1; Node<E> node = new Node(e); final ReentrantLock putLock = this.putLock; final AtomicInteger count = this.count; putLock.lockInterruptibly(); // 不能有两个线程同时进行put()方法 try { while (count.get() == capacity) { // 队列满了 notFull.await(); // 等待 } enqueue(node); // 插入数据 c = count.getAndIncrement(); // 更新总数,变量c是count加1前的值 if (c + 1 < capacity) notFull.signal(); // 有足够空间,通知其他线程 } finally { putLock.unlock(); // 释放锁 } if (c == 0) signalNotEmpty(); // 插入成功后,通知take()方法取数据 }
5. 锁粗化
通常情况下,为了保证多线程间的有效并发,会要求每个线程持有锁的时间尽量短,使用资源后立即释放锁。只有这样等待在这个锁上的其他线程才能尽早的获得资源执行任务,但是,如果对同一个锁不停进行请求、同步、释放,其本身也会消耗系统资源,反而不利于性能优化。为此,虚拟机在遇到一连串连续对同一个锁不断进行请求和释放的操作时,便会把所有的锁操作整合成对锁的一次请求,从而减少对锁的请求同步次数,这个操作叫锁的粗化。
在开发过程中,可以在合理的场合进行锁粗化,尤其是在循环内部请求锁时,因为每次循环都有申请锁和释放锁的操作,其实这完全没必要的,在循环外加锁就可以了。