zoukankan      html  css  js  c++  java
  • [转载] Thrift-client与spring集成

    转载自http://shift-alt-ctrl.iteye.com/blog/1990030?utm_source=tuicool&utm_medium=referral

    Thrift-client作为服务消费端,由于thrift使用socket通讯,因此它需要面对几个问题:

        1) client端需要知道server端的IP + port,如果是分布式部署,还需要知道所有server的IP + port列表.  

        2) client为了提升性能,不可能只使用一个socket来处理并发请求,当然也不能每个请求都创建一个socket;我们需要使用连接池方案.

        3) 对于java开发工程师而言,基于spring配置thrift服务,可以提供很多的便利.

        4) 基于zookeeper配置管理,那么client端就不需要"硬编码"的配置server的ip + port,可以使用zookeeper来推送每个service的服务地址.

        5) 因为thrift-client端不使用连接池的话,将不能有效的提高并发能力,本文重点描述看如何使用thrift-client连接池。

    1. pom.xml

    Java代码  收藏代码
    1. <dependencies>  
    2.     <dependency>  
    3.         <groupId>org.springframework</groupId>  
    4.         <artifactId>spring-context</artifactId>  
    5.         <version>3.0.7.RELEASE</version>  
    6.     </dependency>  
    7.     <dependency>  
    8.         <groupId>org.apache.zookeeper</groupId>  
    9.         <artifactId>zookeeper</artifactId>  
    10.         <version>3.4.5</version>  
    11.         <!--<exclusions>-->  
    12.             <!--<exclusion>-->  
    13.                 <!--<groupId>log4j</groupId>-->  
    14.                 <!--<artifactId>log4j</artifactId>-->  
    15.             <!--</exclusion>-->  
    16.         <!--</exclusions>-->  
    17.     </dependency>  
    18.     <!--  
    19.     <dependency>  
    20.         <groupId>com.101tec</groupId>  
    21.         <artifactId>zkclient</artifactId>  
    22.         <version>0.4</version>  
    23.     </dependency>  
    24.     -->  
    25.     <dependency>  
    26.         <groupId>org.apache.thrift</groupId>  
    27.         <artifactId>libthrift</artifactId>  
    28.         <version>0.9.1</version>  
    29.     </dependency>  
    30.     <dependency>  
    31.         <groupId>org.apache.curator</groupId>  
    32.         <artifactId>curator-recipes</artifactId>  
    33.         <version>2.3.0</version>  
    34.     </dependency>  
    35.     <dependency>  
    36.         <groupId>commons-pool</groupId>  
    37.         <artifactId>commons-pool</artifactId>  
    38.         <version>1.6</version>  
    39.     </dependency>  
    40.   
    41. </dependencies>  

    2. spring-thrift-client.xml

        其中zookeeper作为可选项,开发者也可以通过制定serverAddress的方式指定server的地址.

    Java代码  收藏代码
    1. <!-- fixedAddress -->  
    2. <!--    
    3. <bean id="userService" class="com.demo.thrift.ThriftServiceClientProxyFactory">  
    4.     <property name="service" value="com.demo.service.UserService"></property>  
    5.     <property name="serverAddress" value="127.0.0.1:9090:2"></property>  
    6.     <property name="maxActive" value="5"></property>  
    7.     <property name="idleTime" value="10000"></property>  
    8. </bean>  
    9. -->  
    10. <!-- zookeeper -->  
    11. <bean id="thriftZookeeper" class="com.demo.thrift.zookeeper.ZookeeperFactory" destroy-method="close">  
    12.     <property name="connectString" value="127.0.0.1:2181"></property>  
    13.     <property name="namespace" value="demo/thrift-service"></property>  
    14. </bean>  
    15. <bean id="userService" class="com.demo.thrift.ThriftServiceClientProxyFactory" destroy-method="close">  
    16.     <property name="service" value="com.demo.service.UserService"></property>  
    17.     <property name="maxActive" value="5"></property>  
    18.     <property name="idleTime" value="1800000"></property>  
    19.     <property name="addressProvider">  
    20.         <bean class="com.demo.thrift.support.impl.DynamicAddressProvider">  
    21.             <property name="configPath" value="UserServiceImpl"></property>  
    22.             <property name="zookeeper" ref="thriftZookeeper"></property>  
    23.         </bean>  
    24.     </property>  
    25. </bean>  

    3. ThriftServiceClientProxyFactory.java

        因为我们要在client端使用连接池方案,那么就需要对client的方法调用过程,进行代理,这个类,就是维护了一个"Client"代理类,并在方法调用时,从"对象池"中取出一个"Client"对象,并在方法实际调用结束后归还给"对象池".  

    Java代码  收藏代码
    1. @SuppressWarnings("rawtypes")  
    2. public class ThriftServiceClientProxyFactory implements FactoryBean,InitializingBean {  
    3.   
    4.     private String service;  
    5.   
    6.     private String serverAddress;  
    7.       
    8.     private Integer maxActive = 32;//最大活跃连接数  
    9.       
    10.     ////ms,default 3 min,链接空闲时间  
    11.     //-1,关闭空闲检测  
    12.     private Integer idleTime = 180000;  
    13.     private ThriftServerAddressProvider addressProvider;  
    14.   
    15.     private Object proxyClient;  
    16.       
    17.   
    18.     public void setMaxActive(Integer maxActive) {  
    19.         this.maxActive = maxActive;  
    20.     }  
    21.   
    22.   
    23.     public void setIdleTime(Integer idleTime) {  
    24.         this.idleTime = idleTime;  
    25.     }  
    26.   
    27.   
    28.     public void setService(String service) {  
    29.         this.service = service;  
    30.     }  
    31.   
    32.   
    33.     public void setServerAddress(String serverAddress) {  
    34.         this.serverAddress = serverAddress;  
    35.     }  
    36.   
    37.   
    38.     public void setAddressProvider(ThriftServerAddressProvider addressProvider) {  
    39.         this.addressProvider = addressProvider;  
    40.     }  
    41.   
    42.     private Class objectClass;  
    43.       
    44.     private GenericObjectPool<TServiceClient> pool;  
    45.       
    46.     private PoolOperationCallBack callback = new PoolOperationCallBack() {  
    47.           
    48.         @Override  
    49.         public void make(TServiceClient client) {  
    50.             System.out.println("create");  
    51.               
    52.         }  
    53.           
    54.         @Override  
    55.         public void destroy(TServiceClient client) {  
    56.             System.out.println("destroy");  
    57.               
    58.         }  
    59.     };  
    60.   
    61.     @Override  
    62.     public void afterPropertiesSet() throws Exception {  
    63.         if(serverAddress != null){  
    64.             addressProvider = new FixedAddressProvider(serverAddress);  
    65.         }  
    66.         ClassLoader classLoader = Thread.currentThread().getContextClassLoader();  
    67.         //加载Iface接口  
    68.         objectClass = classLoader.loadClass(service + "$Iface");  
    69.         //加载Client.Factory类  
    70.         Class<TServiceClientFactory<TServiceClient>> fi = (Class<TServiceClientFactory<TServiceClient>>)classLoader.loadClass(service + "$Client$Factory");  
    71.         TServiceClientFactory<TServiceClient> clientFactory = fi.newInstance();  
    72.         ThriftClientPoolFactory clientPool = new ThriftClientPoolFactory(addressProvider, clientFactory,callback);  
    73.         GenericObjectPool.Config poolConfig = new GenericObjectPool.Config();  
    74.         poolConfig.maxActive = maxActive;  
    75.         poolConfig.minIdle = 0;  
    76.         poolConfig.minEvictableIdleTimeMillis = idleTime;  
    77.         poolConfig.timeBetweenEvictionRunsMillis = idleTime/2L;  
    78.         pool = new GenericObjectPool<TServiceClient>(clientPool,poolConfig);  
    79.         proxyClient = Proxy.newProxyInstance(classLoader,new Class[]{objectClass},new InvocationHandler() {  
    80.             @Override  
    81.             public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {  
    82.                 //  
    83.                 TServiceClient client = pool.borrowObject();  
    84.                 try{  
    85.                     return method.invoke(client, args);  
    86.                 }catch(Exception e){  
    87.                     throw e;  
    88.                 }finally{  
    89.                     pool.returnObject(client);  
    90.                 }  
    91.             }  
    92.         });  
    93.     }  
    94.   
    95.     @Override  
    96.     public Object getObject() throws Exception {  
    97.         return proxyClient;  
    98.     }  
    99.   
    100.     @Override  
    101.     public Class<?> getObjectType() {  
    102.         return objectClass;  
    103.     }  
    104.   
    105.     @Override  
    106.     public boolean isSingleton() {  
    107.         return true;  //To change body of implemented methods use File | Settings | File Templates.  
    108.     }  
    109.       
    110.     public void close(){  
    111.         if(addressProvider != null){  
    112.             addressProvider.close();  
    113.         }  
    114.     }  
    115. }  

    4. ThriftClientPoolFactory.java

        "Client"对象池,对象池中是已经实例化的Client对象,Client对象负责与Thrift server通信.

    Java代码  收藏代码
    1. /** 
    2.  * 连接池,thrift-client for spring 
    3.  */  
    4. public class ThriftClientPoolFactory extends BasePoolableObjectFactory<TServiceClient>{  
    5.   
    6.     private final ThriftServerAddressProvider addressProvider;  
    7.       
    8.     private final TServiceClientFactory<TServiceClient> clientFactory;  
    9.       
    10.     private PoolOperationCallBack callback;  
    11.   
    12.     protected ThriftClientPoolFactory(ThriftServerAddressProvider addressProvider,TServiceClientFactory<TServiceClient> clientFactory) throws Exception {  
    13.         this.addressProvider = addressProvider;  
    14.         this.clientFactory = clientFactory;  
    15.     }  
    16.       
    17.     protected ThriftClientPoolFactory(ThriftServerAddressProvider addressProvider,TServiceClientFactory<TServiceClient> clientFactory,PoolOperationCallBack callback) throws Exception {  
    18.         this.addressProvider = addressProvider;  
    19.         this.clientFactory = clientFactory;  
    20.         this.callback = callback;  
    21.     }  
    22.   
    23.   
    24.   
    25.     @Override  
    26.     public TServiceClient makeObject() throws Exception {  
    27.         InetSocketAddress address = addressProvider.selector();  
    28.         TSocket tsocket = new TSocket(address.getHostName(),address.getPort());  
    29.         TProtocol protocol = new TBinaryProtocol(tsocket);  
    30.         TServiceClient client = this.clientFactory.getClient(protocol);  
    31.         tsocket.open();  
    32.         if(callback != null){  
    33.             try{  
    34.                 callback.make(client);  
    35.             }catch(Exception e){  
    36.                 //  
    37.             }  
    38.         }  
    39.         return client;  
    40.     }  
    41.   
    42.     public void destroyObject(TServiceClient client) throws Exception {  
    43.         if(callback != null){  
    44.             try{  
    45.                 callback.destroy(client);  
    46.             }catch(Exception e){  
    47.                 //  
    48.             }  
    49.         }  
    50.         TTransport pin = client.getInputProtocol().getTransport();  
    51.         pin.close();  
    52.     }  
    53.   
    54.     public boolean validateObject(TServiceClient client) {  
    55.         TTransport pin = client.getInputProtocol().getTransport();  
    56.         return pin.isOpen();  
    57.     }  
    58.       
    59.     static interface PoolOperationCallBack {  
    60.         //销毁client之前执行  
    61.         void destroy(TServiceClient client);  
    62.         //创建成功是执行  
    63.         void make(TServiceClient client);  
    64.     }  
    65.   
    66. }  

    5. DynamicAddressProvider.java

        将zookeeper作为server地址的提供者,这样客户端就不需要再配置文件中指定一堆ip + port,而且当server服务有更新时,也不需要client端重新配置.

    Java代码  收藏代码
    1. /** 
    2.  * 可以动态获取address地址,方案设计参考 
    3.  * 1) 可以间歇性的调用一个web-service来获取地址 
    4.  * 2) 可以使用事件监听机制,被动的接收消息,来获取最新的地址(比如基于MQ,nio等) 
    5.  * 3) 可以基于zookeeper-watcher机制,获取最新地址 
    6.  * <p/> 
    7.  * 本实例,使用zookeeper作为"config"中心,使用apache-curator方法库来简化zookeeper开发 
    8.  * 如下实现,仅供参考 
    9.  */  
    10. public class DynamicAddressProvider implements ThriftServerAddressProvider, InitializingBean {  
    11.   
    12.     private String configPath;  
    13.   
    14.     private PathChildrenCache cachedPath;  
    15.   
    16.     private CuratorFramework zookeeper;  
    17.       
    18.     //用来保存当前provider所接触过的地址记录  
    19.     //当zookeeper集群故障时,可以使用trace中地址,作为"备份"  
    20.     private Set<String> trace = new HashSet<String>();  
    21.   
    22.     private final List<InetSocketAddress> container = new ArrayList<InetSocketAddress>();  
    23.   
    24.     private Queue<InetSocketAddress> inner = new LinkedList<InetSocketAddress>();  
    25.       
    26.     private Object lock = new Object();  
    27.       
    28.     private static final Integer DEFAULT_PRIORITY = 1;  
    29.   
    30.     public void setConfigPath(String configPath) {  
    31.         this.configPath = configPath;  
    32.     }  
    33.   
    34.     public void setZookeeper(CuratorFramework zookeeper) {  
    35.         this.zookeeper = zookeeper;  
    36.     }  
    37.   
    38.     @Override  
    39.     public void afterPropertiesSet() throws Exception {  
    40.         //如果zk尚未启动,则启动  
    41.         if(zookeeper.getState() == CuratorFrameworkState.LATENT){  
    42.             zookeeper.start();  
    43.         }  
    44.         buildPathChildrenCache(zookeeper, configPath, true);  
    45.         cachedPath.start(StartMode.POST_INITIALIZED_EVENT);  
    46.     }  
    47.   
    48.     private void buildPathChildrenCache(CuratorFramework client, String path, Boolean cacheData) throws Exception {  
    49.         cachedPath = new PathChildrenCache(client, path, cacheData);  
    50.         cachedPath.getListenable().addListener(new PathChildrenCacheListener() {  
    51.             @Override  
    52.             public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {  
    53.                 PathChildrenCacheEvent.Type eventType = event.getType();  
    54.                 switch (eventType) {  
    55. //                    case CONNECTION_RECONNECTED:  
    56. //                          
    57. //                        break;  
    58.                     case CONNECTION_SUSPENDED:  
    59.                     case CONNECTION_LOST:  
    60.                         System.out.println("Connection error,waiting...");  
    61.                         return;  
    62.                     default:  
    63.                         //  
    64.                 }  
    65.                 //任何节点的时机数据变动,都会rebuild,此处为一个"简单的"做法.  
    66.                 cachedPath.rebuild();  
    67.                 rebuild();  
    68.             }  
    69.               
    70.             protected void rebuild() throws Exception {  
    71.                 List<ChildData> children = cachedPath.getCurrentData();  
    72.                 if (children == null || children.isEmpty()) {  
    73.                     //有可能所有的thrift server都与zookeeper断开了链接  
    74.                     //但是,有可能,thrift client与thrift server之间的网络是良好的  
    75.                     //因此此处是否需要清空container,是需要多方面考虑的.  
    76.                     container.clear();  
    77.                     System.out.println("thrift server-cluster error....");  
    78.                     return;  
    79.                 }  
    80.                 List<InetSocketAddress> current = new ArrayList<InetSocketAddress>();  
    81.                 for (ChildData data : children) {  
    82.                     String address = new String(data.getData(), "utf-8");  
    83.                     current.addAll(transfer(address));  
    84.                     trace.add(address);  
    85.                 }  
    86.                 Collections.shuffle(current);  
    87.                 synchronized (lock) {  
    88.                     container.clear();  
    89.                     container.addAll(current);  
    90.                     inner.clear();  
    91.                     inner.addAll(current);  
    92.                       
    93.                 }  
    94.             }  
    95.         });  
    96.     }  
    97.       
    98.       
    99.       
    100.     private List<InetSocketAddress> transfer(String address){  
    101.         String[] hostname = address.split(":");  
    102.         Integer priority = DEFAULT_PRIORITY;  
    103.         if (hostname.length == 3) {  
    104.             priority = Integer.valueOf(hostname[2]);  
    105.         }  
    106.         String ip = hostname[0];  
    107.         Integer port = Integer.valueOf(hostname[1]);  
    108.         List<InetSocketAddress> result = new ArrayList<InetSocketAddress>();  
    109.         for (int i = 0; i < priority; i++) {  
    110.             result.add(new InetSocketAddress(ip, port));  
    111.         }  
    112.         return result;  
    113.     }  
    114.   
    115.   
    116.     @Override  
    117.     public List<InetSocketAddress> getAll() {  
    118.         return Collections.unmodifiableList(container);  
    119.     }  
    120.   
    121.     @Override  
    122.     public synchronized InetSocketAddress selector() {  
    123.         if (inner.isEmpty()) {  
    124.             if(!container.isEmpty()){  
    125.                 inner.addAll(container);  
    126.             }else if(!trace.isEmpty()){  
    127.                 synchronized (lock) {  
    128.                     for(String hostname : trace){  
    129.                         container.addAll(transfer(hostname));  
    130.                     }  
    131.                     Collections.shuffle(container);  
    132.                     inner.addAll(container);  
    133.                 }  
    134.             }  
    135.         }  
    136.         return inner.poll();//null  
    137.     }  
    138.   
    139.   
    140.     @Override  
    141.     public void close() {  
    142.         try {  
    143.             cachedPath.close();  
    144.             zookeeper.close();  
    145.         } catch (Exception e) {  
    146.             //  
    147.         }  
    148.     }  
    149. }  

        到此为止,我们的Thrift基本上就可以顺利运行起来了.更多代码,参见附件.

        Thrift-server端开发与配置,参见[Thrift-server] 

      

  • 相关阅读:
    迭代
    UIViewController生命周期控制
    JPA相关注解
    正則表達式截取字符串两字符间的内容
    HDU 1789 Doing Homework again
    《从零開始学Swift》学习笔记(Day48)——类型检查与转换
    POJ 3280 Cheapest Palindrome(区间DP)
    JavaScript高级特性之原型
    http协议
    编程算法
  • 原文地址:https://www.cnblogs.com/scott19820130/p/4919121.html
Copyright © 2011-2022 走看看