zoukankan      html  css  js  c++  java
  • Hikaricp源码解读(3)——ConcurrentBag介绍

    3、ConcurrentBag介绍

    本文以v2.7.2源码为主进行分析

    HikariCP连接池是基于自主实现的ConcurrentBag完成的数据连接的多线程共享交互,是HikariCP连接管理快速的其中一个关键点。

    ConcurrentBag是一个专门的并发包裹,在连接池(多线程数据交互)的实现上具有比LinkedBlockingQueue和LinkedTransferQueue更优越的性能。
    ConcurrentBag通过拆分 CopyOnWriteArrayList、ThreadLocal和SynchronousQueue
    进行并发数据交互。

    • CopyOnWriteArrayList:负责存放ConcurrentBag中全部用于出借的资源
    • ThreadLocal:用于加速线程本地化资源访问
    • SynchronousQueue:用于存在资源等待线程时的第一手资源交接
    private final CopyOnWriteArrayList<T> sharedList;
    private final ThreadLocal<List<Object>> threadList;
    private final SynchronousQueue<T> handoffQueue;
    

    ConcurrentBag中全部的资源均只能通过add方法进行添加,只能通过remove方法进行移出。

    public void add(final T bagEntry)
    {
       if (closed) {
          LOGGER.info("ConcurrentBag has been closed, ignoring add()");
          throw new IllegalStateException("ConcurrentBag has been closed, ignoring add()");
       }
    
       sharedList.add(bagEntry); //新添加的资源优先放入CopyOnWriteArrayList
    
       // 当有等待资源的线程时,将资源交到某个等待线程后才返回(SynchronousQueue)
       while (waiters.get() > 0 && !handoffQueue.offer(bagEntry)) {
          yield();
       }
    }
    
    public boolean remove(final T bagEntry)
    {
       // 如果资源正在使用且无法进行状态切换,则返回失败
       if (!bagEntry.compareAndSet(STATE_IN_USE, STATE_REMOVED) && !bagEntry.compareAndSet(STATE_RESERVED, STATE_REMOVED) && !closed) {
          LOGGER.warn("Attempt to remove an object from the bag that was not borrowed or reserved: {}", bagEntry);
          return false;
       }
    
       final boolean removed = sharedList.remove(bagEntry); // 从CopyOnWriteArrayList中移出
       if (!removed && !closed) {
          LOGGER.warn("Attempt to remove an object from the bag that does not exist: {}", bagEntry);
       }
    
       return removed;
    }
    

    ConcurrentBag中通过borrow方法进行数据资源借用,通过requite方法进行资源回收,注意其中borrow方法只提供对象引用,不移除对象,因此使用时通过borrow取出的对象必须通过requite方法进行放回,否则容易导致内存泄露!

    public T borrow(long timeout, final TimeUnit timeUnit) throws InterruptedException
    {
       // 优先查看有没有可用的本地化的资源
       final List<Object> list = threadList.get();
       for (int i = list.size() - 1; i >= 0; i--) {
          final Object entry = list.remove(i);
          @SuppressWarnings("unchecked")
          final T bagEntry = weakThreadLocals ? ((WeakReference<T>) entry).get() : (T) entry;
          if (bagEntry != null && bagEntry.compareAndSet(STATE_NOT_IN_USE, STATE_IN_USE)) {
             return bagEntry;
          }
       }
    
       final int waiting = waiters.incrementAndGet();
       try {
          // 当无可用本地化资源时,遍历全部资源,查看是否存在可用资源
          // 因此被一个线程本地化的资源也可能被另一个线程“抢走”
          for (T bagEntry : sharedList) {
             if (bagEntry.compareAndSet(STATE_NOT_IN_USE, STATE_IN_USE)) {
                if (waiting > 1) {
                    // 因为可能“抢走”了其他线程的资源,因此提醒包裹进行资源添加
                   listener.addBagItem(waiting - 1);
                }
                return bagEntry;
             }
          }
    
          listener.addBagItem(waiting);
    
          timeout = timeUnit.toNanos(timeout);
          do {
             final long start = currentTime();
             // 当现有全部资源全部在使用中,等待一个被释放的资源或者一个新资源
             final T bagEntry = handoffQueue.poll(timeout, NANOSECONDS);
             if (bagEntry == null || bagEntry.compareAndSet(STATE_NOT_IN_USE, STATE_IN_USE)) {
                return bagEntry;
             }
    
             timeout -= elapsedNanos(start);
          } while (timeout > 10_000);
    
          return null;
       }
       finally {
          waiters.decrementAndGet();
       }
    }
    
    public void requite(final T bagEntry)
    {
       // 将状态转为未在使用
       bagEntry.setState(STATE_NOT_IN_USE);
    
       // 判断是否存在等待线程,若存在,则直接转手资源
       for (int i = 0; waiters.get() > 0; i++) {
          if (bagEntry.getState() != STATE_NOT_IN_USE || handoffQueue.offer(bagEntry)) {
             return;
          }
          else if ((i & 0xff) == 0xff) {
             parkNanos(MICROSECONDS.toNanos(10));
          }
          else {
             yield();
          }
       }
    
       // 否则,进行资源本地化
       final List<Object> threadLocalList = threadList.get();
       threadLocalList.add(weakThreadLocals ? new WeakReference<>(bagEntry) : bagEntry);
    }
    

    上述代码中的 weakThreadLocals 是用来判断是否使用弱引用,通过下述方法初始化:

    private boolean useWeakThreadLocals()
    {
       try {
          // 人工指定是否使用弱引用,但是官方不推荐进行自主设置。
          if (System.getProperty("com.dareway.concurrent.useWeakReferences") != null) { 
             return Boolean.getBoolean("com.dareway.concurrent.useWeakReferences");
          }
    
          // 默认通过判断初始化的ClassLoader是否是系统的ClassLoader来确定
          return getClass().getClassLoader() != ClassLoader.getSystemClassLoader();
       }
       catch (SecurityException se) {
          return true;
       }
    }
    
  • 相关阅读:
    AVR开发 Arduino方法(六) 内存子系统
    AVR开发 Arduino方法(五) 模数转换子系统
    AVR开发 Arduino方法(四) 串行通信子系统
    AVR开发 Arduino方法(三) 定时/计数器子系统
    AVR开发 Arduino方法(二) 中断子系统
    2014.5.17—所谓生活,就是让自己变得更好
    2014.5.10—做事分清时间地点
    2014.5.7—社交网络用户心理分析
    2014.5.7—20岁这几年
    2014.5.5—反向绑定域名,无需工信部备案
  • 原文地址:https://www.cnblogs.com/taisenki/p/7699667.html
Copyright © 2011-2022 走看看