zoukankan      html  css  js  c++  java
  • common-pool2 学习:thrift连接池的另一种实现

    对象池是一种很实用的技术,经典的例子就是数据库连接池。去年曾经从零开始写过一个thrift客户端连接池如果不想重造轮子,可以直接在apache开源项目commons-pool的基础上开发。 

    步骤:

    一、定义对象工厂

    package test.cn.mwee.service.paidui.pool;
    
    import org.apache.commons.pool2.BasePooledObjectFactory;
    import org.apache.commons.pool2.PooledObject;
    import org.apache.commons.pool2.impl.DefaultPooledObject;
    import org.apache.thrift.protocol.TCompactProtocol;
    import org.apache.thrift.protocol.TProtocol;
    import org.apache.thrift.transport.TFramedTransport;
    import org.apache.thrift.transport.TSocket;
    import org.apache.thrift.transport.TTransport;
    import org.apache.thrift.transport.TTransportException;
    
    public class TProtocolFactory
            extends BasePooledObjectFactory<TProtocol> {
    
    
        private String host;
        private int port;
        private boolean keepAlive = true;
    
        public TProtocolFactory(String host, int port, boolean keepAlive) {
            this.host = host;
            this.port = port;
            this.keepAlive = keepAlive;
        }
    
        @Override
        public TProtocol create() throws TTransportException {
            TSocket tSocket = new TSocket(host, port);
            TTransport tTransport = new TFramedTransport(tSocket);
            tTransport.open();
            return new TCompactProtocol(tTransport);
        }
    
    
        @Override
        public PooledObject<TProtocol> wrap(TProtocol protocol) {
            return new DefaultPooledObject<>(protocol);
        }
    
        /**
         * 对象钝化(即:从激活状态转入非激活状态,returnObject时触发)
         *
         * @param pooledObject
         * @throws TTransportException
         */
        @Override
        public void passivateObject(PooledObject<TProtocol> pooledObject) throws TTransportException {
            if (!keepAlive) {
                pooledObject.getObject().getTransport().flush();
                pooledObject.getObject().getTransport().close();
            }
        }
    
    
        /**
         * 对象激活(borrowObject时触发)
         *
         * @param pooledObject
         * @throws TTransportException
         */
        @Override
        public void activateObject(PooledObject<TProtocol> pooledObject) throws TTransportException {
            if (!pooledObject.getObject().getTransport().isOpen()) {
                pooledObject.getObject().getTransport().open();
            }
        }
    
    
        /**
         * 对象销毁(clear时会触发)
         * @param pooledObject
         * @throws TTransportException
         */
        @Override
        public void destroyObject(PooledObject<TProtocol> pooledObject) throws TTransportException {
            passivateObject(pooledObject);
            pooledObject.markAbandoned();
        }
    
    
        /**
         * 验证对象有效性
         *
         * @param p
         * @return
         */
        @Override
        public boolean validateObject(PooledObject<TProtocol> p) {
            if (p.getObject() != null) {
                if (p.getObject().getTransport().isOpen()) {
                    return true;
                }
                try {
                    p.getObject().getTransport().open();
                    return true;
                } catch (TTransportException e) {
                    e.printStackTrace();
                }
            }
            return false;
        }
    }
    

    有二个关键的方法,需要重写:activateObject(对象激活) 及 passivateObject(对象钝化)

    二、定义对象池

    package test.cn.mwee.service.paidui.pool;
    
    import org.apache.commons.pool2.PooledObjectFactory;
    import org.apache.commons.pool2.impl.GenericObjectPool;
    import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
    
    /**
     * Created by yangjunming on 6/7/16.
     */
    public class AutoClearGenericObjectPool<T> extends GenericObjectPool<T> {
    
        public AutoClearGenericObjectPool(PooledObjectFactory<T> factory) {
            super(factory);
        }
    
        public AutoClearGenericObjectPool(PooledObjectFactory<T> factory, GenericObjectPoolConfig config) {
            super(factory, config);
        }
    
        @Override
        public void returnObject(T obj) {
            super.returnObject(obj);
            //空闲数>=激活数时,清理掉空闲连接
            if (getNumIdle() >= getNumActive()) {
                clear();
            }
        }
    
    }
    

    common-pools提供了对象池的默认实现:GenericObjectPool 但是该对象池中,对于处于空闲的对象,需要手动调用clear来释放空闲对象,如果希望改变这一行为,可以自己派生自己的子类,重写returnObject方法,上面的代码中,每次归还对象时,如果空闲的对象比激活的对象还要多(即:一半以上的对象都在打酱油),则调用clear方法。

    三、使用示例:

    package test.cn.mwee.service.paidui.pool;
    
    import org.apache.commons.pool2.ObjectPool;
    import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
    import org.apache.thrift.protocol.TProtocol;
    
    import java.util.ArrayList;
    import java.util.List;
    import java.util.Random;
    
    /**
     * thrift 连接池测试
     */
    public class ProtocolPoolTest {
    
    
        public static void main(String[] args) throws Exception {
    
            GenericObjectPoolConfig poolConfig = new GenericObjectPoolConfig();
            poolConfig.setMaxTotal(10);
            poolConfig.setMinIdle(1);
            poolConfig.setTestOnBorrow(true);
    
            ObjectPool<TProtocol> pool = new AutoClearGenericObjectPool<>(
                    new TProtocolFactory("127.0.0.1", 13041, true), poolConfig);
    
            List<TProtocol> list = new ArrayList<>();
            for (int i = 1; i <= 10; i++) {
                TProtocol protocol = pool.borrowObject();
                System.out.println(protocol.toString());
                if (i % 2 == 0) {
                    //10个连接中,将偶数归还
                    pool.returnObject(protocol);
                } else {
                    list.add(protocol);
                }
            }
    
            Random rnd = new Random();
            while (true) {
                System.out.println(String.format("active:%d,idea:%d", pool.getNumActive(), pool.getNumIdle()));
                Thread.sleep(5000);
                //每次还一个
                if (list.size() > 0) {
                    int i = rnd.nextInt(list.size());
                    pool.returnObject(list.get(i));
                    list.remove(i);
                }
    
                //直到全部还完
                if (pool.getNumActive() <= 0) {
                    break;
                }
            }
    
            System.out.println("------------------------");
    
    
            list.clear();
            //连接池为空,测试是否能重新创建新连接
            for (int i = 1; i <= 10; i++) {
                TProtocol protocol = pool.borrowObject();
                System.out.println(protocol.toString());
                if (i % 2 == 0) {
                    pool.returnObject(protocol);
                } else {
                    list.add(protocol);
                }
            }
    
            while (true) {
                System.out.println(String.format("active:%d,idea:%d", pool.getNumActive(), pool.getNumIdle()));
                Thread.sleep(5000);
                if (list.size() > 0) {
                    int i = rnd.nextInt(list.size());
                    pool.returnObject(list.get(i));
                    list.remove(i);
                }
    
                if (pool.getNumActive() <= 0) {
                    pool.close();
                    break;
                }
            }
    
        }
    }
    

    注:需要从对象池取一个对象时,调用borrowObject(背后会调用activeObject激活对象),类似的,对象使用完之后,需要调用returnObject将对象放回对象池(背后会调用passivateObject使对象钝化)

    输出:

    org.apache.thrift.protocol.TCompactProtocol@146044d7
    org.apache.thrift.protocol.TCompactProtocol@1e9e725a
    org.apache.thrift.protocol.TCompactProtocol@516be40f
    org.apache.thrift.protocol.TCompactProtocol@3c0a50da
    org.apache.thrift.protocol.TCompactProtocol@3c0a50da
    org.apache.thrift.protocol.TCompactProtocol@646be2c3
    org.apache.thrift.protocol.TCompactProtocol@646be2c3
    org.apache.thrift.protocol.TCompactProtocol@797badd3
    org.apache.thrift.protocol.TCompactProtocol@797badd3
    org.apache.thrift.protocol.TCompactProtocol@77be656f
    active:5,idea:1
    active:4,idea:2
    active:3,idea:0
    active:2,idea:1
    active:1,idea:0
    ------------------------
    org.apache.thrift.protocol.TCompactProtocol@221af3c0
    org.apache.thrift.protocol.TCompactProtocol@62bd765
    org.apache.thrift.protocol.TCompactProtocol@23a5fd2
    org.apache.thrift.protocol.TCompactProtocol@78a2da20
    org.apache.thrift.protocol.TCompactProtocol@78a2da20
    org.apache.thrift.protocol.TCompactProtocol@dd3b207
    org.apache.thrift.protocol.TCompactProtocol@dd3b207
    org.apache.thrift.protocol.TCompactProtocol@551bdc27
    org.apache.thrift.protocol.TCompactProtocol@551bdc27
    org.apache.thrift.protocol.TCompactProtocol@58fdd99
    active:5,idea:1
    active:4,idea:2
    active:3,idea:0
    active:2,idea:1
    active:1,idea:0

    Process finished with exit code 0

    从输出上看,归还对象后,再次取出时,并没有创建新对象,而是直接使用了对象池中已经空闲的对象。当对象池中的所有对象都归还变成空闲并被clear后,再次从对象池中借对象时,会重新创建对象。

  • 相关阅读:
    MS CRM 2011 RC中的新特性(4)——活动方面之批量编辑、自定义活动
    最近的一些有关MS CRM 2011的更新
    MS CRM 2011 RC中的新特性(6)——连接
    MS CRM 2011 RC中的新特性(7)—仪表板
    参加MS CRM2011深度培训课程——第一天
    MS CRM 2011插件调试工具
    MS CRM2011实体介绍(四)——目标管理方面的实体
    MS CRM 2011 RC中的新特性(3)——客户服务管理方面
    MS CRM 2011 RC中的新特性(8)—数据管理
    ExtAspNet 登陆
  • 原文地址:https://www.cnblogs.com/yjmyzz/p/the-implementation-of-thrift-pool-using-common-pool2.html
Copyright © 2011-2022 走看看