zoukankan      html  css  js  c++  java
  • GenericObjectPool源码分析

    最近有需求为 Elasticsearch增加连接池,经过搜索资料决定采用GenericObjectPool进行实现,在网上查找的资料,记之如下:

    apache提供了三种对象池:GenericKeyedObjectPool,SoftReferenceObjectPool和GenericObjectPool,其中GenericObjectPool是我们最常用的对象池,内部实现也最复杂,本文记录其实现原理。

    GenericObjectPool实现了ObjectPool<T>接口,而ObjectPool<T>中有以下方法:

    // 从池中获得一个对象  
    Object borrowObject() 
    // 返回一个对象给池  
    void returnObject(Object obj)
    // 使对象实效,不再受池管辖(必须是已经从池中获得的对象)  
    void invalidateObject(Object obj) 
    // 生成一个对象(通过工程或其他实现方式),并将其放入空闲队列中  
    void addObject() 
    // 获得空闲对象的数量  
    int getNumIdle() 
    // 获得活动对象的数量  
    int getNumActive() 
    // 清空池中空闲对象,释放相关资源 
    void clear() 
    // 关闭池,释放所有与它相关资源  
    void close() 
     // 设置池对象工厂
    void setFactory(PoolableObjectFactory factory)

    其中,前四个方法比较重要,本文重点研究这四个方法的码源实现,记录这四个方法前,先了解下部分重要属性的含义

    CursorableLinkedList<ObjectTimestampPair<T>>  _pool: 队列,用于保存空闲object,ObjectTimestampPair的value值即为真实的object
    LinkedList<Latch<T>> _allocationQueue: 队列,用于保存线程borrow object的请求。
    PoolableObjectFactory<T> _factory:用于生产object的工厂类
    _maxActive: 链接池中最大连接数,默认为8.
    _whenExhaustedAction: 当“连接池”中active数量达到阀值时,即“链接”资源耗尽时,连接池需要采取的手段, 默认为1:
    -> 0 : 抛出异常,
    -> 1 : 阻塞,直到有可用链接资源,这里如果设置了maxWait值,则在阻塞了maxWait时间后抛出异常
    -> 2 : 强制创建新的链接资源
    _maxWait: 当连接池资源耗尽时,调用者最大阻塞的时间,超时将跑出异常。单位,毫秒数;默认为-1.表示永不超时.
    _maxIdle: 链接池中最大空闲的连接数,默认为8.该参数一般尽量与_maxActive相同,以提高并发数
    _minIdle: 连接池中最少空闲的连接数,默认为0.
    _testOnBorrow: 向调用者输出“链接”资源时,是否检测是有有效,如果无效则从连接池中移除,并尝试获取继续获取。默认为false。建议保持默认值.
    _testOnReturn:  向连接池“归还”链接时,是否检测“链接”对象的有效性。默认为false。建议保持默认值.
    _timeBetweenEvictionRunsMillis:  “空闲链接”检测线程,检测的周期,毫秒数。如果为负值,表示不运行“检测线程”。默认为-1.该值非-1时下面的参数才有效
    _numTestsPerEvictionRun:检测线程一次运行检查多少条“链接”
    _minEvictableIdleTimeMillis: 连接空闲的最小时间,达到此值后空闲连接将可能会被移除。负值(-1)表示不移除
    _testWhileIdle:  向调用者输出“链接”对象时,是否检测它的空闲超时;默认为false。如果“链接”空闲超时,将会被移除。建议保持默认值.
    _softMinEvictableIdleTimeMillis: 连接空闲的最小时间,达到此值后空闲链接将会被移除,且保留“minIdle”个空闲连接数。默认为-1.
    lifo:false为队列,true为栈,表示object 的出借方式

    构造方法:工厂方法用于创建object,config主要配置pool的一些属性(上面属性中第四个到最后)。其他的构造方法基本一致,都会传factory,pool属性配置可以不传,GenericObjectPool有默认属性可设置

    public GenericObjectPool(PoolableObjectFactory<T> factory, GenericObjectPool.Config config) {
            this(factory, config.maxActive, config.whenExhaustedAction, config.maxWait, config.maxIdle, config.minIdle,
                    config.testOnBorrow, config.testOnReturn, config.timeBetweenEvictionRunsMillis,
                    config.numTestsPerEvictionRun, config.minEvictableIdleTimeMillis, config.testWhileIdle,
                    config.softMinEvictableIdleTimeMillis, config.lifo);
    
    }

    主要四个方法:

    1.borrowObject()

    public T borrowObject() throws Exception {
        第一步:创建请求latch放入分配队列,设置相关属性,并执行一次分配动作
            long starttime = System.currentTimeMillis();
            Latch<T> latch = new Latch<T>(); // 保存object的基本单位
            byte whenExhaustedAction;
            long maxWait;
            synchronized (this) {                        
                // Get local copy of current config. Can't sync when used later as
                // it can result in a deadlock. Has the added advantage that config
                // is consistent for entire method execution
                whenExhaustedAction = _whenExhaustedAction; //设置阻塞方式
                maxWait = _maxWait;//阻塞时最大等待时间
    
                // Add this request to the queue
                _allocationQueue.add(latch);  //将borrow请求加入到分配队列
            }
            // Work the allocation queue, allocating idle instances and
            // instance creation permits in request arrival order
            allocate();   //执行一次分配动作,尝试给上面的latch分配object

    第二步是一个大循环,里面又分为2步,第一步用于判断是否获取到object并根据阻塞方式操作,第二步是为latch分配或创建object。
        1. 第一步中如果没有分配到object,并且不能创建新的object时,switch pool设置的阻塞方式:
             a:whenExhaustedAction=2,强制创建一个object,此时同步pool,如果未获取且不能创建(说明未分配到object),则从分配队列移除latch,正在创建数量+1,break跳出switch,进入第2步
             b:whenExhaustedAction=0,直接抛出异常,如果已获取或允许创建,则break出switch,进入第2步,否则从分配队列移除latch,抛出异常,跳出borrowObject方法
             c:whenExhaustedAction=1,阻塞一定时间抛出异常,同步latch,如果已获取或允许创建,break跳出switch,进入第2步;否则,如果maxWait<0,则一直阻塞,maxWait>0,计算阻塞时间waitTime并阻塞。
                c2:这一段都是c中的异常逻辑,如果抛出中断异常,同步pool并判断latch状态,1、未获取且不能创建,则从分配队列移除latch 2、未获取但允许创建,正在创建数量减一,创建标志为true  3、已获取object,正在创建数量减一,已borrow数量加一,调用returnObject(object)归还object ; 然后如果创建标志为true,调用分配方法(这里异常可能占用一个创建的机会,需调用分配方法显式让其他线程获取得创建标志),然后中断当前线程,抛出异常,跳出borrowObject方法。  
                c3:然后,如果已超时, 继续判断latch,未创建且不能创建,则从分配队列中移除latch,然后抛出异常,跳出borrowObject;已创建或可以创建,则break,进入第2步。未超时的情况下,继续循环,从第1步开始。
            d:默认 阻塞方式 属性不能识别

        2.如果latch未获取object,则通过factory创建一个object赋给latch,设置新创建标志为true;有异常时,如果标志位不是新创建的,则将正在创建数量减一(这里减一对应分配方法创建新的 和a 中 给新创建数量加1的逻辑,因为只有强制创建a,或者分配方法中才有创建加1的逻辑),并再分配。
          然后激活object,检测object有效性,无效进入异常逻辑。有效使创建数量减1,已borrow数量加一,返回object,跳出borrowObject方法。 如果出现异常,工厂毁灭object,并将正在创建减一,如果不是新创建的Object(失效的空闲object),latch重置,并加入分配队列,再分配, 如果是新创建的,抛出异常,不是则继续循环。

     for(;;) {
                synchronized (this) {
                    assertOpen();//父类的方法,确认 pool没有被关闭,如果关闭了调用该方法会抛异常
                }
           第1步:
                // If no object was allocated from the pool above
                if(latch.getPair() == null) {    没有从pool中分配到object(没有空闲的)
                    // check if we were allowed to create one
                    if(latch.mayCreate()) {   如果设置了可以尝试创建新object
                        // allow new object to be created
                    } else {
                   pool设置了取不到object时的动作
                        // the pool is exhausted
                        switch(whenExhaustedAction) {
                            case WHEN_EXHAUSTED_GROW:   强制创造一个object分配的情况
                                // allow new object to be created   
                                synchronized (this) {
                    //防止其他
                                    // Make sure another thread didn't allocate us an object
                                    // or permit a new object to be created
                                    if (latch.getPair() == null && !latch.mayCreate()) {
                                        _allocationQueue.remove(latch); 确保未分配到object,分配队列中删除请求
                                        _numInternalProcessing++;
                                    }
                                }
                                break;
                            case WHEN_EXHAUSTED_FAIL:   直接抛异常的情况
                                synchronized (this) {
                                    // Make sure allocate hasn't already assigned an object
                                    // in a different thread or permitted a new object to be created
                                    if (latch.getPair() != null || latch.mayCreate()) {  如果分配到object或获取到创建object权限,则跳出switch,进入创建逻辑代码
                                        break;    
                                    }
                                    _allocationQueue.remove(latch);
                                }
                                throw new NoSuchElementException("Pool exhausted");
                            case WHEN_EXHAUSTED_BLOCK:   阻塞maxWait秒 抛异常  的情况
                                try {
                                    synchronized (latch) {
                                        // Before we wait, make sure another thread didn't allocate us an object
                                        // or permit a new object to be created
                                        if (latch.getPair() == null && !latch.mayCreate()) {  //未分配到object并且没有权限创建
                                            if(maxWait <= 0) {
                                                latch.wait(); maxWait小于0 就一直阻塞
                                            } else {
                                                // this code may be executed again after a notify then continue cycle
                                                // so, need to calculate the amount of time to wait
                                                final long elapsed = (System.currentTimeMillis() - starttime);
                                                final long waitTime = maxWait - elapsed; //计算需要阻塞的时间
                                                if (waitTime > 0)
                                                {
                                                    latch.wait(waitTime);
                                                }
                                            }
                                        } else {
                                            break;
                                        }
                                    }
                                    // see if we were awakened by a closing pool
                                    if(isClosed() == true) {
                                        throw new IllegalStateException("Pool closed");
                                    }
                                } catch(InterruptedException e) {
                                    boolean doAllocate = false;
                                    synchronized(this) {
                                        // Need to handle the all three possibilities
                                        if (latch.getPair() == null && !latch.mayCreate()) {
                        还是没有分配到object 在分配队列中,直接移除
                                            // Case 1: latch still in allocation queue      
                                            // Remove latch from the allocation queue
                                            _allocationQueue.remove(latch);
                                        } else if (latch.getPair() == null && latch.mayCreate()) {
                        可以创建一个object,需要将 正在创建的数量-1,设置允许创建标志位
                                            // Case 2: latch has been given permission to create
                                            //         a new object
                                            _numInternalProcessing--;
                                            doAllocate = true;   允许创建的标志
                                        } else {这种情况是 已分配得到object
                        被分配到对象   正创建数量 -1   已borrow数量+1,并且object归还给pool
                                            // Case 3: An object has been allocated
                                            _numInternalProcessing--;
                                            _numActive++;
                                            returnObject(latch.getPair().getValue());
                                        }
                                    }
                                    if (doAllocate) {
                                        allocate();//如果是可以创建,则显示调用分配方法,将该机会分配出去
                                    }
                                    Thread.currentThread().interrupt();
                                    throw e;
                                }
                                if(maxWait > 0 && ((System.currentTimeMillis() - starttime) >= maxWait)) {
                                    synchronized(this) {    如果阻塞超时,并且没有获得object、不能创建,则在分配队列中去除latch,否则跳出switch,可以进入创建逻辑
                                        // Make sure allocate hasn't already assigned an object
                                        // in a different thread or permitted a new object to be created
                                        if (latch.getPair() == null && !latch.mayCreate()) { 同上面
                                            // Remove latch from the allocation queue
                                            _allocationQueue.remove(latch);
                                        } else {
                                            break;
                                        }
                                    }
                                    超时并且没有分配到object并且没有创建权限,则抛异常
                                    throw new NoSuchElementException("Timeout waiting for idle object");
                                } else {
                                    continue; // keep looping
                                }
                            default:
                                throw new IllegalArgumentException("WhenExhaustedAction property " + whenExhaustedAction +
                                        " not recognized.");
                        }
                    }
                }
           //第2步,这一步是创建过程的逻辑,说明是允许创建或者已分配到object,  其他的阻塞超时之类的上面直接抛出异常跳出方法
                boolean newlyCreated = false;
                if(null == latch.getPair()) { 未分配到object时创建object
                    try {
                        T obj = _factory.makeObject();  工厂类创建object
                        latch.setPair(new ObjectTimestampPair<T>(obj));
                        newlyCreated = true;
                    } finally {
                        if (!newlyCreated) {  //  如果不是新创建的,说明创建出现异常,创建失败,需要把正在创建中数量-1
                            // object cannot be created
                            synchronized (this) {
                                _numInternalProcessing--;
                                // No need to reset latch - about to throw exception
                            }
                            allocate(); //分配 方法,将创建失败的机会尝试分配给其他线程
                        }
                    }
                }
                // activate & validate the object
                try {
                    _factory.activateObject(latch.getPair().value);  激活object
                    if(_testOnBorrow &&
                            !_factory.validateObject(latch.getPair().value)) {
                        throw new Exception("ValidateObject failed”);      如果设置了testOnBorrow参数并且对象失效抛异常
                    }
                    synchronized(this) {
                        _numInternalProcessing--;     将正在创建数量转为已borrow数量
                        _numActive++;
                    }
                    return latch.getPair().value;
                }
                catch (Throwable e) {
                    PoolUtils.checkRethrow(e);
                    // object cannot be activated or is invalid
                    try {
                        _factory.destroyObject(latch.getPair().value);   将已经该失效的object毁灭
                    } catch (Throwable e2) {
                        PoolUtils.checkRethrow(e2);
                        // cannot destroy broken object
                    }
                    synchronized (this) {
                        _numInternalProcessing--; 抛出异常说明创建失败,将正在创建减一
                        if (!newlyCreated) {  如果是object是从pool中获取的空闲object,失效后需要reset,将请求命令重新放入分配队列中,再次尝试获取新的object
                            latch.reset();
                            _allocationQueue.add(0, latch);   插入到队列中第一位
                        }
                    }
                    allocate();   // 显示为创建失败的请求 分配object
                    如果object是新创建的,则抛异常,说明再创建也有问题,直接抛异常
                    if(newlyCreated) {
                        throw new NoSuchElementException("Could not create a validated object, cause: " + e.getMessage());
                    }
                    else {
                        continue; // keep looping
                    }
                }
            }
        }
    
    
    对分配队列中的请求分配object方法
    private synchronized void allocate() {
            if (isClosed()) return;
    
            // First use any objects in the pool to clear the queue
         1.尝试从pool中向需要分配的队列分配  空闲的对象! 直对象池空或者分配队列空才break
            for (;;) {
                if (!_pool.isEmpty() && !_allocationQueue.isEmpty()) {
                    Latch<T> latch = _allocationQueue.removeFirst();  //Latch 代表borrowObject的一个命令请求
                    latch.setPair( _pool.removeFirst()); //将池中的pair分配出去
                    _numInternalProcessing++;  //指还在分配过程中的数量,不属于已经borrow、空闲的对象的数
                    synchronized (latch) {
                        latch.notify();//唤醒borrow中的阻塞的线程
                    }
                } else {
                    break;
                }
            }
        
            // Second utilise any spare capacity to create new objects
            for(;;) {
            2.如果分配队列还有需要分配对象的请求,并且 pool大小小于0(这个逻辑不理解,有懂的人麻烦说下)  或者  已borrow的数量+正在分配的数量 小于pool的大小  ,这个时候可以让pool创建新的object,否则break
                if((!_allocationQueue.isEmpty()) && (_maxActive < 0 || (_numActive + _numInternalProcessing) < _maxActive)) {
                    Latch<T> latch = _allocationQueue.removeFirst();
                    latch.setMayCreate(true);  //设置为创建新的对象
                    _numInternalProcessing++;
                    synchronized (latch) {
                        latch.notify();
                    }
                } else {
                    break;
                }
            }
        }

    2.returnObject(T obj)

    将对象返回给pool,这时object已经不受pool管理,如果有异常,让其失效,jvm会处理

    public void returnObject(T obj) throws Exception {
            try {
                addObjectToPool(obj, true); //将对象添加到pool true 表示归还的对象,false表示新创建的
            } catch (Exception e) {
                if (_factory != null) {
                    try {
                        _factory.destroyObject(obj);   添加到pool时异常,需将对象毁灭,将borrow数量减一
                    } catch (Exception e2) {
                        // swallowed
                    }
                    // TODO: Correctness here depends on control in addObjectToPool.
                    // These two methods should be refactored, removing the
                    // "behavior flag", decrementNumActive, from addObjectToPool.
                    synchronized(this) {
                        _numActive--;
                    }
                    allocate();//将异常时归还object的机会分配 给其他线程
                }
            }
        }
    
    private void addObjectToPool(T obj, boolean decrementNumActive) throws Exception {
            boolean success = true;
            if(_testOnReturn && !(_factory.validateObject(obj))) { //检测对象是否有效
                success = false;   对象失效
            } else {
                _factory.passivateObject(obj);   //对象钝化
            }
    
            boolean shouldDestroy = !success;  是否应该毁来
    
            // Add instance to pool if there is room and it has passed validation
            // (if testOnreturn is set)
            boolean doAllocate = false;
            synchronized (this) {
                if (isClosed()) {   //pool关闭,所有object应该被 毁灭
                    shouldDestroy = true;
                } else {
                    if((_maxIdle >= 0) && (_pool.size() >= _maxIdle)) {
                        shouldDestroy = true;    最大空闲数量大于0      并且 pool的空闲数量大于最大空闲数量时,需将其毁灭
                    } else if(success) {
                        // borrowObject always takes the first element from the queue,
                        // so for LIFO, push on top, FIFO add to end   按照设定的模式将对象加入pool
                        if (_lifo) {
                            _pool.addFirst(new ObjectTimestampPair<T>(obj));
                        } else {
                            _pool.addLast(new ObjectTimestampPair<T>(obj));
                        }
                        if (decrementNumActive) { 如果是旧对象归还的操作
                            _numActive--;    
                        }
                        doAllocate = true; 再分配
                    }
                }
            }
            if (doAllocate) {
                allocate(); //有可分配的object,所以执行一次
            }
    
            // Destroy the instance if necessary
            if(shouldDestroy) {
                try {
                    _factory.destroyObject(obj); 毁灭
                } catch(Exception e) {
                    // ignored
                }
                // Decrement active count *after* destroy if applicable
                if (decrementNumActive) {  如果是归还的对象
                    synchronized(this) {
                        _numActive--;
                    }
                    allocate();
                }
            }
    
        }

    3.invalidateObject(T obj)

    使对象实效,不再受池管辖(必须是已经从池中获得的对象)提供给开发者调用,在其抛出异常时可使用     由jvm自动清理

    public void invalidateObject(T obj) throws Exception {
            try {
                if (_factory != null) {
                    _factory.destroyObject(obj);   毁灭object ,将borrow数减一
                }
            } finally {
                synchronized (this) {
                    _numActive--;   不管毁灭成功失败,都要将borrow数量减一
                }
                allocate();  将返回的可borrow 分配出去
            }
        }

    4.addObject()

    添加一个object到pool中,一般开发者不会调用,用于pool维护 最小空闲object数量

    public void addObject() throws Exception {
            assertOpen(); pool是否打开
            if (_factory == null) {
                throw new IllegalStateException("Cannot add objects without a factory.");
            }
            T obj = _factory.makeObject();  工厂创造object
            try {
                assertOpen();    pool是否打开
                addObjectToPool(obj, false);    将object添加到pool,设置false为表示新创建的object
            } catch (IllegalStateException ex) { // Pool closed
                try {
                    _factory.destroyObject(obj);添加失败就毁灭object
                } catch (Exception ex2) {
                    // swallow
                }
                throw ex;
            }
        }

    原文连接

  • 相关阅读:
    01-Django 简介
    函数及函数的嵌套等
    循环及循环嵌套
    运算符
    if, elif, else及if嵌套
    变量及变量计算和引用
    Python的3种执行方式
    Jmeter设置默认中文启动
    Vysor
    python基础学习(二)
  • 原文地址:https://www.cnblogs.com/guoxiangyue/p/15229022.html
Copyright © 2011-2022 走看看