zoukankan      html  css  js  c++  java
  • Thrift连接池实现

    简介

    ThriftFacebook的核心框架之一,使不同的开发语言开发的系统可以通过该框架实现彼此的通信,类似于webservice,但是Thrift提供了近乎变态的效率和开发的方便性,是webservice所不能比拟的。给分布式开发带来了极大的方便。但是这柄利器也有一些不完美。

     

    问题

    首先文档相当的少,只有一个wiki网站提供相应的帮助。这对于Thrift的推广极为不利。

    其次框架本身实现有一些缺陷,就Thriftjava部分来说,没有提供连接池的支持,对RPC的调用效率有所影响。

    对于文档稀少的问题,只能是通过一些Thrift的开发者和使用者多供献一些自己的心得来解决。这得需要一个过程。而连接池的问题的解决则可以快速一些。

             提到池一般做过Java开发的肯定会想到ObjectPoolApache Commons项目确实给我们的开发得来了很大的便利性,其中的pool项目正是我们实现thrift连接池的基础,当然也少不了神器spring framework

     

    实现

    一,定义thrift连接池接口

    ConnectionProvider
    /*
     * @(#)ConnectionProvider.java    0.1 05/11/17
     *
     * Copyright 2010 QISI, Inc. All rights reserved.
     * QISI PROPRIETARY/CONFIDENTIAL. Use is subject to license terms.
     
    */
    package com.qidea.thrift.pool;
    import org.apache.thrift.transport.TSocket;
    /**
     * 
     * 
    @author sunwei
     * 
    @version 2010-8-6
     * 
    @since JDK1.5
     
    */
    public interface ConnectionProvider
    {
        
    /**
         * 取链接池中的一个链接
         * 
         * 
    @return
         
    */
        
    public TSocket getConnection();
        
    /**
         * 返回链接
         * 
         * 
    @param socket
         
    */
        
    public void returnCon(TSocket socket);
    }

    二,实现连接池

    GenericConnectionProvider
    /*
     * @(#)DefaultConnectionProviderImpl.java    0.1 05/11/17
     *
     * Copyright 2010 QISI, Inc. All rights reserved.
     * QISI PROPRIETARY/CONFIDENTIAL. Use is subject to license terms.
     
    */
    package com.qidea.thrift.pool;
    import org.apache.commons.pool.ObjectPool;
    import org.apache.commons.pool.impl.GenericObjectPool;
    import org.apache.thrift.transport.TSocket;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.beans.factory.DisposableBean;
    import org.springframework.beans.factory.InitializingBean;
    /**
     * 
     * 
    @author sunwei
     * 
    @version 2010-8-10
     * 
    @since JDK1.5
     
    */
    public class GenericConnectionProvider implements ConnectionProvider,
            InitializingBean, DisposableBean
    {
        
    public static final Logger logger = LoggerFactory
                .getLogger(GenericConnectionProvider.
    class);
        
    /** 服务的IP地址 */
        
    private String serviceIP;
        
    /** 服务的端口 */
        
    private int servicePort;
        
    /** 连接超时配置 */
        
    private int conTimeOut;
        
    /** 可以从缓存池中分配对象的最大数量 */
        
    private int maxActive = GenericObjectPool.DEFAULT_MAX_ACTIVE;
        
    /** 缓存池中最大空闲对象数量 */
        
    private int maxIdle = GenericObjectPool.DEFAULT_MAX_IDLE;
        
    /** 缓存池中最小空闲对象数量 */
        
    private int minIdle = GenericObjectPool.DEFAULT_MIN_IDLE;
        
    /** 阻塞的最大数量 */
        
    private long maxWait = GenericObjectPool.DEFAULT_MAX_WAIT;
        
    /** 从缓存池中分配对象,是否执行PoolableObjectFactory.validateObject方法 */
        
    private boolean testOnBorrow = GenericObjectPool.DEFAULT_TEST_ON_BORROW;
        
    private boolean testOnReturn = GenericObjectPool.DEFAULT_TEST_ON_RETURN;
        
    private boolean testWhileIdle = GenericObjectPool.DEFAULT_TEST_WHILE_IDLE;
        
    /** 对象缓存池 */
        
    private ObjectPool objectPool = null;
        
    /**
         * 
         
    */
        @Override
        
    public void afterPropertiesSet() throws Exception
        {
            
    // 对象池
            objectPool = new GenericObjectPool();
            
    //
            ((GenericObjectPool) objectPool).setMaxActive(maxActive);
            ((GenericObjectPool) objectPool).setMaxIdle(maxIdle);
            ((GenericObjectPool) objectPool).setMinIdle(minIdle);
            ((GenericObjectPool) objectPool).setMaxWait(maxWait);
            ((GenericObjectPool) objectPool).setTestOnBorrow(testOnBorrow);
            ((GenericObjectPool) objectPool).setTestOnReturn(testOnReturn);
            ((GenericObjectPool) objectPool).setTestWhileIdle(testWhileIdle);
            ((GenericObjectPool) objectPool)
                    .setWhenExhaustedAction(GenericObjectPool.WHEN_EXHAUSTED_BLOCK);
            
    // 设置factory
            ThriftPoolableObjectFactory thriftPoolableObjectFactory = new ThriftPoolableObjectFactory(
                    serviceIP, servicePort, conTimeOut);
            objectPool.setFactory(thriftPoolableObjectFactory);
        }
        @Override
        
    public void destroy()
        {
            
    try
            {
                objectPool.close();
            }
            
    catch (Exception e)
            {
                
    throw new RuntimeException("erorr destroy()", e);
            }
        }
        @Override
        
    public TSocket getConnection()
        {
            
    try
            {
                TSocket socket 
    = (TSocket) objectPool.borrowObject();
                
    return socket;
            }
            
    catch (Exception e)
            {
                
    throw new RuntimeException("error getConnection()", e);
            }
        }
        @Override
        
    public void returnCon(TSocket socket)
        {
            
    try
            {
                objectPool.returnObject(socket);
            }
            
    catch (Exception e)
            {
                
    throw new RuntimeException("error returnCon()", e);
            }
        }
        
    public String getServiceIP()
        {
            
    return serviceIP;
        }
        
    public void setServiceIP(String serviceIP)
        {
            
    this.serviceIP = serviceIP;
        }
        
    public int getServicePort()
        {
            
    return servicePort;
        }
        
    public void setServicePort(int servicePort)
        {
            
    this.servicePort = servicePort;
        }
        
    public int getConTimeOut()
        {
            
    return conTimeOut;
        }
        
    public void setConTimeOut(int conTimeOut)
        {
            
    this.conTimeOut = conTimeOut;
        }
        
    public int getMaxActive()
        {
            
    return maxActive;
        }
        
    public void setMaxActive(int maxActive)
        {
            
    this.maxActive = maxActive;
        }
        
    public int getMaxIdle()
        {
            
    return maxIdle;
        }
        
    public void setMaxIdle(int maxIdle)
        {
            
    this.maxIdle = maxIdle;
        }
        
    public int getMinIdle()
        {
            
    return minIdle;
        }
        
    public void setMinIdle(int minIdle)
        {
            
    this.minIdle = minIdle;
        }
        
    public long getMaxWait()
        {
            
    return maxWait;
        }
        
    public void setMaxWait(long maxWait)
        {
            
    this.maxWait = maxWait;
        }
        
    public boolean isTestOnBorrow()
        {
            
    return testOnBorrow;
        }
        
    public void setTestOnBorrow(boolean testOnBorrow)
        {
            
    this.testOnBorrow = testOnBorrow;
        }
        
    public boolean isTestOnReturn()
        {
            
    return testOnReturn;
        }
        
    public void setTestOnReturn(boolean testOnReturn)
        {
            
    this.testOnReturn = testOnReturn;
        }
        
    public boolean isTestWhileIdle()
        {
            
    return testWhileIdle;
        }
        
    public void setTestWhileIdle(boolean testWhileIdle)
        {
            
    this.testWhileIdle = testWhileIdle;
        }
        
    public ObjectPool getObjectPool()
        {
            
    return objectPool;
        }
        
    public void setObjectPool(ObjectPool objectPool)
        {
            
    this.objectPool = objectPool;
        }
    }
    ThriftPoolableObjectFactory
    /*
     * @(#)ThriftPoolableObjectFactory.java    0.1 05/11/17
     *
     * Copyright 2010 QISI, Inc. All rights reserved.
     * QISI PROPRIETARY/CONFIDENTIAL. Use is subject to license terms.
     
    */
    package com.qidea.thrift.pool;
    import org.apache.commons.pool.PoolableObjectFactory;
    import org.apache.thrift.transport.TSocket;
    import org.apache.thrift.transport.TTransport;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    /**
     * 
     * 
    @author sunwei
     * 
    @version 2010-8-10
     * 
    @since JDK1.5
     
    */
    public class ThriftPoolableObjectFactory implements PoolableObjectFactory
    {
        
    /** 日志记录器 */
        
    public static final Logger logger = LoggerFactory
                .getLogger(ThriftPoolableObjectFactory.
    class);
        
    /** 服务的IP */
        
    private String serviceIP;
        
    /** 服务的端口 */
        
    private int servicePort;
        
    /** 超时设置 */
        
    private int timeOut;
        
    /**
         * 
         * 
    @param serviceIP
         * 
    @param servicePort
         * 
    @param timeOut
         
    */
        
    public ThriftPoolableObjectFactory(String serviceIP, int servicePort,
                
    int timeOut)
        {
            
    this.serviceIP = serviceIP;
            
    this.servicePort = servicePort;
            
    this.timeOut = timeOut;
        }
        @Override
        
    public void destroyObject(Object arg0) throws Exception
        {
            
    if (arg0 instanceof TSocket)
            {
                TSocket socket 
    = (TSocket) arg0;
                
    if (socket.isOpen())
                {
                    socket.close();
                }
            }
        }
        
    /**
         * 
         
    */
        @Override
        
    public Object makeObject() throws Exception
        {
            
    try
            {
                TTransport transport 
    = new TSocket(this.serviceIP,
                        
    this.servicePort, this.timeOut);
                transport.open();
                
    return transport;
            }
            
    catch (Exception e)
            {
                logger.error(
    "error ThriftPoolableObjectFactory()", e);
                
    throw new RuntimeException(e);
            }
        }
        @Override
        
    public boolean validateObject(Object arg0)
        {
            
    try
            {
                
    if (arg0 instanceof TSocket)
                {
                    TSocket thriftSocket 
    = (TSocket) arg0;
                    
    if (thriftSocket.isOpen())
                    {
                        
    return true;
                    }
                    
    else
                    {
                        
    return false;
                    }
                }
                
    else
                {
                    
    return false;
                }
            }
            
    catch (Exception e)
            {
                
    return false;
            }
        }
        @Override
        
    public void passivateObject(Object arg0) throws Exception
        {
            
    // DO NOTHING
        }
        @Override
        
    public void activateObject(Object arg0) throws Exception
        {
            
    // DO NOTHING
        }
        
    public String getServiceIP()
        {
            
    return serviceIP;
        }
        
    public void setServiceIP(String serviceIP)
        {
            
    this.serviceIP = serviceIP;
        }
        
    public int getServicePort()
        {
            
    return servicePort;
        }
        
    public void setServicePort(int servicePort)
        {
            
    this.servicePort = servicePort;
        }
        
    public int getTimeOut()
        {
            
    return timeOut;
        }
        
    public void setTimeOut(int timeOut)
        {
            
    this.timeOut = timeOut;
        }
    }

      三,定义连接的管理类 

    ConnectionManager
    /*
     * @(#)ConnectionManager.java    0.1 05/11/17
     *
     * Copyright 2010 QISI, Inc. All rights reserved.
     * QISI PROPRIETARY/CONFIDENTIAL. Use is subject to license terms.
     
    */
    package com.qidea.thrift.pool;
    import org.aopalliance.intercept.MethodInterceptor;
    import org.aopalliance.intercept.MethodInvocation;
    import org.apache.thrift.transport.TSocket;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    /**
     * 
     * 
    @author sunwei
     * 
    @version 2010-8-10
     * 
    @since JDK1.5
     
    */
    public class ConnectionManager implements MethodInterceptor
    {
        
    /** 日志记录器 */
        
    public Logger logger = LoggerFactory.getLogger(ConnectionManager.class);
        
    /** 保存local对象 */
        ThreadLocal
    <TSocket> socketThreadSafe = new ThreadLocal<TSocket>();
        
    /** 连接提供池 */
        
    public ConnectionProvider connectionProvider;
        @Override
        
    public Object invoke(MethodInvocation arg0) throws Throwable
        {
            TSocket socket 
    = null;
            
    try
            {
                socket 
    = connectionProvider.getConnection();
                socketThreadSafe.set(socket);
                Object ret 
    = arg0.proceed();
                
    return ret;
            }
            
    catch (Exception e)
            {
                logger.error(
    "error ConnectionManager.invoke()", e);
                
    throw new Exception(e);
            }
            
    finally
            {
                connectionProvider.returnCon(socket);
                socketThreadSafe.remove();
            }
        }
        
    /**
         * 取socket
         * 
         * 
    @return
         
    */
        
    public TSocket getSocket()
        {
            
    return socketThreadSafe.get();
        }
        
    public ConnectionProvider getConnectionProvider()
        {
            
    return connectionProvider;
        }
        
    public void setConnectionProvider(ConnectionProvider connectionProvider)
        {
            
    this.connectionProvider = connectionProvider;
        }
    }

    四,定义spring配置,对受管的bean提供thrift连接 

    Thrift连接池spring配置
    <?xml version="1.0" encoding="UTF-8"?>
    <beans xmlns="http://www.springframework.org/schema/beans"
        xmlns:xsi
    ="http://www.w3.org/2001/XMLSchema-instance" xmlns:jee="http://www.springframework.org/schema/jee"
        xmlns:aop
    ="http://www.springframework.org/schema/aop" xmlns:tx="http://www.springframework.org/schema/tx"
        xmlns:flex
    ="http://www.springframework.org/schema/flex" xmlns:context="http://www.springframework.org/schema/context"
        xsi:schemaLocation
    ="http://www.springframework.org/schema/beans
        http://www.springframework.org/schema/beans/spring-beans-2.5.xsd
        http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-2.5.xsd
        http://www.springframework.org/schema/jee http://www.springframework.org/schema/jee/spring-jee-2.5.xsd
        http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-2.5.xsd
        http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-2.0.xsd
          http://www.springframework.org/schema/flex http://www.springframework.org/schema/flex/spring-flex-1.0.xsd"
    >

        
    <!-- thrift连接池配置 -->
        
    <bean id="connectionProvider" class="com.qidea.thrift.pool.GenericConnectionProvider">
            
    <property name="serviceIP" value="localhost" />
            
    <property name="servicePort" value="9090" />
            
    <property name="maxActive" value="10" />
            
    <property name="maxIdle" value="10" />
            
    <property name="testOnBorrow" value="true" />
            
    <property name="testOnReturn" value="true" />
            
    <property name="testWhileIdle" value="true" />
            
    <property name="conTimeOut" value="2000" />
        
    </bean>
        
    <!-- thrift连接管理配置  -->
        
    <bean id="connectionManager" class="com.qidea.thrift.pool.ConnectionManager">
            
    <property name="connectionProvider" ref="connectionProvider" />
        
    </bean>
        
    <!-- 客户端接口配置  -->
        
    <bean class="com.qidea.pushserver.rpc.client.PushServiceClient">
            
    <property name="connectionManager" ref="connectionManager" />
        
    </bean>
        
    <!-- thrift连接AOP配置  -->
        
    <aop:config proxy-target-class="true">
            
    <aop:pointcut id="clientMethods"
                expression
    ="execution(* com.qidea.pushserver.rpc.client.*.*(..))" />
            
    <aop:advisor advice-ref="connectionManager" pointcut-ref="clientMethods" />
        
    </aop:config>
    </beans>

    五,使用连接池

    PushRPCClient
    /*
     * @(#)PushRPCClient.java    0.1 05/11/17
     *
     * Copyright 2010 QISI, Inc. All rights reserved.
     * QISI PROPRIETARY/CONFIDENTIAL. Use is subject to license terms.
     
    */
    package com.qidea.pushserver.rpc;
    import java.util.ArrayList;
    import java.util.List;
    import org.apache.thrift.TException;
    import org.apache.thrift.protocol.TBinaryProtocol;
    import org.apache.thrift.protocol.TProtocol;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import com.qidea.pushserver.ServiceException;
    import com.qidea.thrift.pool.ConnectionManager;
    /**
     * 
     * 
    @author sunwei
     * 
    @version 2010-8-11
     * 
    @since JDK1.5
     
    */
    public class PushRPCClient
    {
        
    public static Logger logger = LoggerFactory.getLogger(PushRPCClient.class);
        
    private ConnectionManager connectionManager;
        
    /**
         * 取在线玩家列表
         * 
         * 
    @param roleIdList
         * 
    @return
         * 
    @throws ServiceException
         
    */
        
    public List<Long> getOnLineRoleIdList(List<Long> roleIdList)
        {
            TProtocol protocol 
    = new TBinaryProtocol(connectionManager.getSocket());
            PushRPCService.Client client 
    = new PushRPCService.Client(protocol);
            
    try
            {
                List
    <Long> onLineIdList = client.getOnLineRoleIdList(roleIdList);
                
    return onLineIdList;
            }
            
    catch (TException e)
            {
                logger.error(
    "error getOnLineRoleIdList()", e);
            }
            
    return new ArrayList<Long>();
        }
        
    /**
         * 解散联盟
         * 
         * 
    @param allianceId
         
    */
        
    public void dismissAlliance(long allianceId)
        {
            TProtocol protocol 
    = new TBinaryProtocol(connectionManager.getSocket());
            PushRPCService.Client client 
    = new PushRPCService.Client(protocol);
            
    try
            {
                client.dismissAlliance(allianceId);
            }
            
    catch (TException e)
            {
                logger.error(
    "error dismissAlliance()", e);
            }
        }
        
    /**
         * 加入联盟
         * 
         * 
    @param roleId
         * 
    @param allianceId
         
    */
        
    public void joinAlliance(long roleId, long allianceId)
        {
            TProtocol protocol 
    = new TBinaryProtocol(connectionManager.getSocket());
            PushRPCService.Client client 
    = new PushRPCService.Client(protocol);
            
    try
            {
                client.joinAlliance(roleId, allianceId);
            }
            
    catch (TException e)
            {
                logger.error(
    "error joinAlliance()", e);
            }
        }
        
    /**
         * 解散联盟
         * 
         * 
    @param roleId
         * 
    @param allianceId
         
    */
        
    public void getOutAlliance(long roleId, long allianceId)
        {
            TProtocol protocol 
    = new TBinaryProtocol(connectionManager.getSocket());
            PushRPCService.Client client 
    = new PushRPCService.Client(protocol);
            
    try
            {
                client.getOutAlliance(roleId, allianceId);
            }
            
    catch (Exception e)
            {
                logger.error(
    "error getOutAlliance()", e);
            }
        }
        
    public ConnectionManager getConnectionManager()
        {
            
    return connectionManager;
        }
        
    public void setConnectionManager(ConnectionManager connectionManager)
        {
            
    this.connectionManager = connectionManager;
        }
    }
  • 相关阅读:
    Atitit 华为基本法 attilax读后感
    Atitit 华为管理者内训书系 以奋斗者为本 华为公司人力资源管理纲要 attilax读后感
    Atitit 项目版本管理gitflow 与 Forking的对比与使用
    Atitit 管理的模式扁平化管理 金字塔 直线型管理 垂直管理 水平管理 矩阵式管理 网状式样管理 多头管理 双头管理
    Atitit 乌合之众读后感attilax总结 与读后感结构规范总结
    深入理解 JavaScript 异步系列(4)—— Generator
    深入理解 JavaScript 异步系列(3)—— ES6 中的 Promise
    深入理解 JavaScript 异步系列(2)—— jquery的解决方案
    深入理解 JavaScript 异步系列(1)——基础
    使用 github + jekyll 搭建个人博客
  • 原文地址:https://www.cnblogs.com/51cto/p/Thrift_Connection_pool.html
Copyright © 2011-2022 走看看