zoukankan      html  css  js  c++  java
  • 基于zookeeper、连接池、Failover/LoadBalance等改造Thrift 服务化

    对于Thrift服务化的改造,主要是客户端,可以从如下几个方面进行:

    1.服务端的服务注册,客户端自动发现,无需手工修改配置,这里我们使用zookeeper,但由于zookeeper本身提供的客户端使用较为复杂,因此采用curator-recipes工具类进行处理服务的注册与发现。

    2.客户端使用连接池对服务调用进行管理,提升性能,这里我们使用Apache Commons项目commons-pool,可以大大减少代码的复杂度。

    3.关于Failover/LoadBalance,由于zookeeper的watcher,当服务端不可用是及时通知客户端,并移除不可用的服务节点,而LoadBalance有很多算法,这里我们采用随机加权方式,也是常有的负载算法,至于其他的算法介绍参考:常见的负载均衡的基本算法

    4.使thrift服务的注册和发现可以基于spring配置,可以提供很多的便利。

    5.其他的改造如:

    1)通过动态代理实现client和server端的交互细节透明化,让用户只需通过服务方提供的接口进行访问

    2)Thrift通过两种方式调用服务Client和Iface

    1. // *) Client API 调用  
    2. (EchoService.Client)client.echo("hello lilei");  ---(1)  
    3. // *) Service 接口 调用  
    4. (EchoService.Iface)service.echo("hello lilei");  ---(2)  
    // *) Client API 调用
    (EchoService.Client)client.echo("hello lilei");  ---(1)
    // *) Service 接口 调用
    (EchoService.Iface)service.echo("hello lilei");  ---(2)

    Client API的方式, 不推荐, 我们推荐Service接口的方式(服务化)。

    下面我们来一一实现:

    一、pom.xml引入依赖jar包

    1. <dependency>  
    2.             <groupId>org.apache.thrift</groupId>  
    3.             <artifactId>libthrift</artifactId>  
    4.             <version>0.9.2</version>  
    5.         </dependency>  
    6.         <dependency>  
    7.             <groupId>commons-pool</groupId>  
    8.             <artifactId>commons-pool</artifactId>  
    9.             <version>1.6</version>  
    10.         </dependency>  
    11.         <dependency>  
    12.             <groupId>org.springframework</groupId>  
    13.             <artifactId>spring-context</artifactId>  
    14.             <version>4.0.9.RELEASE</version>  
    15.         </dependency>  
    16.   
    17.         <dependency>  
    18.             <groupId>org.apache.zookeeper</groupId>  
    19.             <artifactId>zookeeper</artifactId>  
    20.             <version>3.4.6</version>  
    21.         </dependency>  
    22.         <dependency>  
    23.             <groupId>org.apache.curator</groupId>  
    24.             <artifactId>curator-recipes</artifactId>  
    25.             <version>2.7.1</version>  
    26.         </dependency>  
    <dependency>
    			<groupId>org.apache.thrift</groupId>
    			<artifactId>libthrift</artifactId>
    			<version>0.9.2</version>
    		</dependency>
    		<dependency>
    			<groupId>commons-pool</groupId>
    			<artifactId>commons-pool</artifactId>
    			<version>1.6</version>
    		</dependency>
    		<dependency>
    			<groupId>org.springframework</groupId>
    			<artifactId>spring-context</artifactId>
    			<version>4.0.9.RELEASE</version>
    		</dependency>
    
    		<dependency>
    			<groupId>org.apache.zookeeper</groupId>
    			<artifactId>zookeeper</artifactId>
    			<version>3.4.6</version>
    		</dependency>
    		<dependency>
    			<groupId>org.apache.curator</groupId>
    			<artifactId>curator-recipes</artifactId>
    			<version>2.7.1</version>
    		</dependency>

    二、使用zookeeper管理服务节点配置

    RPC服务往平台化的方向发展, 会屏蔽掉更多的服务细节(服务的IP地址集群, 集群的扩容和迁移), 只暴露服务接口. 这部分的演化, 使得server端和client端完全的解耦合. 两者的交互通过ConfigServer(MetaServer)的中介角色来搭线。

    注: 该图源自dubbo的官网
    这边借助Zookeeper来扮演该角色, server扮演发布者的角色, 而client扮演订阅者的角色.

    Zookeeper是分布式应用协作服务. 它实现了paxos的一致性算法, 在命名管理/配置推送/数据同步/主从切换方面扮演重要的角色。 其数据组织类似文件系统的目录结构: 

    每个节点被称为znode, 为znode节点依据其特性, 又可以分为如下类型:
      1). PERSISTENT: 永久节点
      2). EPHEMERAL: 临时节点, 会随session(client disconnect)的消失而消失
      3). PERSISTENT_SEQUENTIAL: 永久节点, 其节点的名字编号是单调递增的
      4). EPHEMERAL_SEQUENTIAL: 临时节点, 其节点的名字编号是单调递增的
      注: 临时节点不能成为父节点
      Watcher观察模式, client可以注册对节点的状态/内容变更的事件回调机制. 其Event事件的两类属性需要关注下:
      1). KeeperState: Disconnected,SyncConnected,Expired
      2). EventType: None,NodeCreated,NodeDeleted,NodeDataChanged,NodeChildrenChanged
    RPC服务端:
      作为具体业务服务的RPC服务发布方, 对其自身的服务描述由以下元素构成.
      1). namespace: 命名空间,来区分不同应用 
      2). service: 服务接口, 采用发布方的类全名来表示
      3). version: 版本号
      借鉴了Maven的GAV坐标系, 三维坐标系更符合服务平台化的大环境.
      *) 数据模型的设计
      具体RPC服务的注册路径为: /rpc/{namespace}/{service}/{version}, 该路径上的节点都是永久节点
      RPC服务集群节点的注册路径为: /rpc/{namespace}/{service}/{version}/{ip:port:weight}, 末尾的节点是临时节点.

    1.定义Zookeeper的客户端的管理

    ZookeeperFactory.java

    1. package cn.slimsmart.thrift.rpc.zookeeper;  
    2.   
    3. import org.apache.curator.framework.CuratorFramework;  
    4. import org.apache.curator.framework.CuratorFrameworkFactory;  
    5. import org.apache.curator.retry.ExponentialBackoffRetry;  
    6. import org.springframework.beans.factory.FactoryBean;  
    7. import org.springframework.util.StringUtils;  
    8.   
    9. /** 
    10.  * 获取zookeeper客户端链接 
    11.  */  
    12. public class ZookeeperFactory implements FactoryBean<CuratorFramework> {  
    13.   
    14.     private String zkHosts;  
    15.     // session超时  
    16.     private int sessionTimeout = 30000;  
    17.     private int connectionTimeout = 30000;  
    18.   
    19.     // 共享一个zk链接  
    20.     private boolean singleton = true;  
    21.   
    22.     // 全局path前缀,常用来区分不同的应用  
    23.     private String namespace;  
    24.   
    25.     private final static String ROOT = "rpc";  
    26.   
    27.     private CuratorFramework zkClient;  
    28.   
    29.     public void setZkHosts(String zkHosts) {  
    30.         this.zkHosts = zkHosts;  
    31.     }  
    32.   
    33.     public void setSessionTimeout(int sessionTimeout) {  
    34.         this.sessionTimeout = sessionTimeout;  
    35.     }  
    36.   
    37.     public void setConnectionTimeout(int connectionTimeout) {  
    38.         this.connectionTimeout = connectionTimeout;  
    39.     }  
    40.   
    41.     public void setSingleton(boolean singleton) {  
    42.         this.singleton = singleton;  
    43.     }  
    44.   
    45.     public void setNamespace(String namespace) {  
    46.         this.namespace = namespace;  
    47.     }  
    48.   
    49.     public void setZkClient(CuratorFramework zkClient) {  
    50.         this.zkClient = zkClient;  
    51.     }  
    52.   
    53.     @Override  
    54.     public CuratorFramework getObject() throws Exception {  
    55.         if (singleton) {  
    56.             if (zkClient == null) {  
    57.                 zkClient = create();  
    58.                 zkClient.start();  
    59.             }  
    60.             return zkClient;  
    61.         }  
    62.         return create();  
    63.     }  
    64.   
    65.     @Override  
    66.     public Class<?> getObjectType() {  
    67.         return CuratorFramework.class;  
    68.     }  
    69.   
    70.     @Override  
    71.     public boolean isSingleton() {  
    72.         return singleton;  
    73.     }  
    74.   
    75.     public CuratorFramework create() throws Exception {  
    76.         if (StringUtils.isEmpty(namespace)) {  
    77.             namespace = ROOT;  
    78.         } else {  
    79.             namespace = ROOT +"/"+ namespace;  
    80.         }  
    81.         return create(zkHosts, sessionTimeout, connectionTimeout, namespace);  
    82.     }  
    83.   
    84.     public static CuratorFramework create(String connectString, int sessionTimeout, int connectionTimeout, String namespace) {  
    85.         CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder();  
    86.         return builder.connectString(connectString).sessionTimeoutMs(sessionTimeout).connectionTimeoutMs(30000)  
    87.                 .canBeReadOnly(true).namespace(namespace).retryPolicy(new ExponentialBackoffRetry(1000, Integer.MAX_VALUE))  
    88.                 .defaultData(null).build();  
    89.     }  
    90.   
    91.     public void close() {  
    92.         if (zkClient != null) {  
    93.             zkClient.close();  
    94.         }  
    95.     }  
    96. }  
    package cn.slimsmart.thrift.rpc.zookeeper;
    
    import org.apache.curator.framework.CuratorFramework;
    import org.apache.curator.framework.CuratorFrameworkFactory;
    import org.apache.curator.retry.ExponentialBackoffRetry;
    import org.springframework.beans.factory.FactoryBean;
    import org.springframework.util.StringUtils;
    
    /**
     * 获取zookeeper客户端链接
     */
    public class ZookeeperFactory implements FactoryBean<CuratorFramework> {
    
    	private String zkHosts;
    	// session超时
    	private int sessionTimeout = 30000;
    	private int connectionTimeout = 30000;
    
    	// 共享一个zk链接
    	private boolean singleton = true;
    
    	// 全局path前缀,常用来区分不同的应用
    	private String namespace;
    
    	private final static String ROOT = "rpc";
    
    	private CuratorFramework zkClient;
    
    	public void setZkHosts(String zkHosts) {
    		this.zkHosts = zkHosts;
    	}
    
    	public void setSessionTimeout(int sessionTimeout) {
    		this.sessionTimeout = sessionTimeout;
    	}
    
    	public void setConnectionTimeout(int connectionTimeout) {
    		this.connectionTimeout = connectionTimeout;
    	}
    
    	public void setSingleton(boolean singleton) {
    		this.singleton = singleton;
    	}
    
    	public void setNamespace(String namespace) {
    		this.namespace = namespace;
    	}
    
    	public void setZkClient(CuratorFramework zkClient) {
    		this.zkClient = zkClient;
    	}
    
    	@Override
    	public CuratorFramework getObject() throws Exception {
    		if (singleton) {
    			if (zkClient == null) {
    				zkClient = create();
    				zkClient.start();
    			}
    			return zkClient;
    		}
    		return create();
    	}
    
    	@Override
    	public Class<?> getObjectType() {
    		return CuratorFramework.class;
    	}
    
    	@Override
    	public boolean isSingleton() {
    		return singleton;
    	}
    
    	public CuratorFramework create() throws Exception {
    		if (StringUtils.isEmpty(namespace)) {
    			namespace = ROOT;
    		} else {
    			namespace = ROOT +"/"+ namespace;
    		}
    		return create(zkHosts, sessionTimeout, connectionTimeout, namespace);
    	}
    
    	public static CuratorFramework create(String connectString, int sessionTimeout, int connectionTimeout, String namespace) {
    		CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder();
    		return builder.connectString(connectString).sessionTimeoutMs(sessionTimeout).connectionTimeoutMs(30000)
    				.canBeReadOnly(true).namespace(namespace).retryPolicy(new ExponentialBackoffRetry(1000, Integer.MAX_VALUE))
    				.defaultData(null).build();
    	}
    
    	public void close() {
    		if (zkClient != null) {
    			zkClient.close();
    		}
    	}
    }
    

    2.服务端注册服务

    由于服务端配置需要获取本机的IP地址,因此定义IP获取接口

    ThriftServerIpResolve.java

    1. package cn.slimsmart.thrift.rpc.zookeeper;  
    2.   
    3. /** 
    4.  *  
    5.  * 解析thrift-server端IP地址,用于注册服务 
    6.  * 1) 可以从一个物理机器或者虚机的特殊文件中解析 
    7.  * 2) 可以获取指定网卡序号的Ip 
    8.  * 3) 其他 
    9.  */  
    10. public interface ThriftServerIpResolve {  
    11.       
    12.     String getServerIp() throws Exception;  
    13.       
    14.     void reset();  
    15.       
    16.     //当IP变更时,将会调用reset方法  
    17.     static interface IpRestCalllBack{  
    18.         public void rest(String newIp);  
    19.     }  
    20. }  
    package cn.slimsmart.thrift.rpc.zookeeper;
    
    /**
     * 
     * 解析thrift-server端IP地址,用于注册服务
     * 1) 可以从一个物理机器或者虚机的特殊文件中解析
     * 2) 可以获取指定网卡序号的Ip
     * 3) 其他
     */
    public interface ThriftServerIpResolve {
    	
    	String getServerIp() throws Exception;
    	
    	void reset();
    	
    	//当IP变更时,将会调用reset方法
    	static interface IpRestCalllBack{
    		public void rest(String newIp);
    	}
    }
    

    可以对该接口做不通的实现,下面我们基于网卡获取IP地址,也可以通过配置serverIp
    ThriftServerIpLocalNetworkResolve.java

    1. package cn.slimsmart.thrift.rpc.zookeeper;  
    2.   
    3. import java.net.Inet6Address;  
    4. import java.net.InetAddress;  
    5. import java.net.NetworkInterface;  
    6. import java.net.SocketException;  
    7. import java.util.Enumeration;  
    8.   
    9. import org.slf4j.Logger;  
    10. import org.slf4j.LoggerFactory;  
    11.   
    12. /** 
    13.  * 解析网卡Ip 
    14.  * 
    15.  */  
    16. public class ThriftServerIpLocalNetworkResolve implements ThriftServerIpResolve {  
    17.       
    18.     private Logger logger = LoggerFactory.getLogger(getClass());  
    19.   
    20.     //缓存  
    21.     private String serverIp;  
    22.       
    23.     public void setServerIp(String serverIp) {  
    24.         this.serverIp = serverIp;  
    25.     }  
    26.   
    27.     @Override  
    28.     public String getServerIp() {  
    29.         if (serverIp != null) {  
    30.             return serverIp;  
    31.         }  
    32.         // 一个主机有多个网络接口  
    33.         try {  
    34.             Enumeration<NetworkInterface> netInterfaces = NetworkInterface.getNetworkInterfaces();  
    35.             while (netInterfaces.hasMoreElements()) {  
    36.                 NetworkInterface netInterface = netInterfaces.nextElement();  
    37.                 // 每个网络接口,都会有多个"网络地址",比如一定会有lookback地址,会有siteLocal地址等.以及IPV4或者IPV6 .  
    38.                 Enumeration<InetAddress> addresses = netInterface.getInetAddresses();  
    39.                 while (addresses.hasMoreElements()) {  
    40.                     InetAddress address = addresses.nextElement();  
    41.                     if(address instanceof Inet6Address){  
    42.                         continue;  
    43.                     }  
    44.                     if (address.isSiteLocalAddress() && !address.isLoopbackAddress()) {  
    45.                         serverIp = address.getHostAddress();  
    46.                         logger.info("resolve server ip :"+ serverIp);  
    47.                         continue;  
    48.                     }  
    49.                 }  
    50.             }  
    51.         } catch (SocketException e) {  
    52.             e.printStackTrace();  
    53.         }  
    54.         return serverIp;  
    55.     }  
    56.   
    57.     @Override  
    58.     public void reset() {  
    59.         serverIp = null;  
    60.     }  
    61. }  
    package cn.slimsmart.thrift.rpc.zookeeper;
    
    import java.net.Inet6Address;
    import java.net.InetAddress;
    import java.net.NetworkInterface;
    import java.net.SocketException;
    import java.util.Enumeration;
    
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    /**
     * 解析网卡Ip
     *
     */
    public class ThriftServerIpLocalNetworkResolve implements ThriftServerIpResolve {
    	
    	private Logger logger = LoggerFactory.getLogger(getClass());
    
    	//缓存
    	private String serverIp;
    	
    	public void setServerIp(String serverIp) {
    		this.serverIp = serverIp;
    	}
    
    	@Override
    	public String getServerIp() {
    		if (serverIp != null) {
    			return serverIp;
    		}
    		// 一个主机有多个网络接口
    		try {
    			Enumeration<NetworkInterface> netInterfaces = NetworkInterface.getNetworkInterfaces();
    			while (netInterfaces.hasMoreElements()) {
    				NetworkInterface netInterface = netInterfaces.nextElement();
    				// 每个网络接口,都会有多个"网络地址",比如一定会有lookback地址,会有siteLocal地址等.以及IPV4或者IPV6 .
    				Enumeration<InetAddress> addresses = netInterface.getInetAddresses();
    				while (addresses.hasMoreElements()) {
    					InetAddress address = addresses.nextElement();
    					if(address instanceof Inet6Address){
    						continue;
    					}
    					if (address.isSiteLocalAddress() && !address.isLoopbackAddress()) {
    						serverIp = address.getHostAddress();
    						logger.info("resolve server ip :"+ serverIp);
    						continue;
    					}
    				}
    			}
    		} catch (SocketException e) {
    			e.printStackTrace();
    		}
    		return serverIp;
    	}
    
    	@Override
    	public void reset() {
    		serverIp = null;
    	}
    }
    

    接下来我们定义发布服务接口,并实现将服务信息(服务接口、版本号,IP、port、weight)发布到zookeeper中。
    ThriftServerAddressRegister.java

    1. package cn.slimsmart.thrift.rpc.zookeeper;  
    2.   
    3. /** 
    4.  * 发布服务地址及端口到服务注册中心,这里是zookeeper服务器 
    5.  */  
    6. public interface ThriftServerAddressRegister {  
    7.     /** 
    8.      * 发布服务接口 
    9.      * @param service 服务接口名称,一个产品中不能重复 
    10.      * @param version 服务接口的版本号,默认1.0.0 
    11.      * @param address 服务发布的地址和端口 
    12.      */  
    13.     void register(String service,String version,String address);  
    14. }  
    package cn.slimsmart.thrift.rpc.zookeeper;
    
    /**
     * 发布服务地址及端口到服务注册中心,这里是zookeeper服务器
     */
    public interface ThriftServerAddressRegister {
    	/**
    	 * 发布服务接口
    	 * @param service 服务接口名称,一个产品中不能重复
    	 * @param version 服务接口的版本号,默认1.0.0
    	 * @param address 服务发布的地址和端口
    	 */
    	void register(String service,String version,String address);
    }
    

    实现:ThriftServerAddressRegisterZookeeper.java

    1. package cn.slimsmart.thrift.rpc.zookeeper;  
    2.   
    3. import java.io.UnsupportedEncodingException;  
    4.   
    5. import org.apache.curator.framework.CuratorFramework;  
    6. import org.apache.curator.framework.imps.CuratorFrameworkState;  
    7. import org.apache.zookeeper.CreateMode;  
    8. import org.slf4j.Logger;  
    9. import org.slf4j.LoggerFactory;  
    10. import org.springframework.util.StringUtils;  
    11.   
    12. import cn.slimsmart.thrift.rpc.ThriftException;  
    13.   
    14. /** 
    15.  *  注册服务列表到Zookeeper 
    16.  */  
    17. public class ThriftServerAddressRegisterZookeeper implements ThriftServerAddressRegister{  
    18.       
    19.     private Logger logger = LoggerFactory.getLogger(getClass());  
    20.       
    21.     private CuratorFramework zkClient;  
    22.       
    23.     public ThriftServerAddressRegisterZookeeper(){}  
    24.       
    25.     public ThriftServerAddressRegisterZookeeper(CuratorFramework zkClient){  
    26.         this.zkClient = zkClient;  
    27.     }  
    28.   
    29.     public void setZkClient(CuratorFramework zkClient) {  
    30.         this.zkClient = zkClient;  
    31.     }  
    32.   
    33.     @Override  
    34.     public void register(String service, String version, String address) {  
    35.         if(zkClient.getState() == CuratorFrameworkState.LATENT){  
    36.             zkClient.start();  
    37.         }  
    38.         if(StringUtils.isEmpty(version)){  
    39.             version="1.0.0";  
    40.         }  
    41.         //临时节点  
    42.         try {  
    43.             zkClient.create()  
    44.                 .creatingParentsIfNeeded()  
    45.                 .withMode(CreateMode.EPHEMERAL)  
    46.                 .forPath("/"+service+"/"+version+"/"+address);  
    47.         } catch (UnsupportedEncodingException e) {  
    48.             logger.error("register service address to zookeeper exception:{}",e);  
    49.             throw new ThriftException("register service address to zookeeper exception: address UnsupportedEncodingException", e);  
    50.         } catch (Exception e) {  
    51.             logger.error("register service address to zookeeper exception:{}",e);  
    52.             throw new ThriftException("register service address to zookeeper exception:{}", e);  
    53.         }  
    54.     }  
    55.       
    56.     public void close(){  
    57.         zkClient.close();  
    58.     }  
    59. }  
    package cn.slimsmart.thrift.rpc.zookeeper;
    
    import java.io.UnsupportedEncodingException;
    
    import org.apache.curator.framework.CuratorFramework;
    import org.apache.curator.framework.imps.CuratorFrameworkState;
    import org.apache.zookeeper.CreateMode;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.util.StringUtils;
    
    import cn.slimsmart.thrift.rpc.ThriftException;
    
    /**
     *  注册服务列表到Zookeeper
     */
    public class ThriftServerAddressRegisterZookeeper implements ThriftServerAddressRegister{
    	
    	private Logger logger = LoggerFactory.getLogger(getClass());
    	
    	private CuratorFramework zkClient;
    	
    	public ThriftServerAddressRegisterZookeeper(){}
    	
    	public ThriftServerAddressRegisterZookeeper(CuratorFramework zkClient){
    		this.zkClient = zkClient;
    	}
    
    	public void setZkClient(CuratorFramework zkClient) {
    		this.zkClient = zkClient;
    	}
    
    	@Override
    	public void register(String service, String version, String address) {
    		if(zkClient.getState() == CuratorFrameworkState.LATENT){
    			zkClient.start();
    		}
    		if(StringUtils.isEmpty(version)){
    			version="1.0.0";
    		}
    		//临时节点
    		try {
    			zkClient.create()
    				.creatingParentsIfNeeded()
    				.withMode(CreateMode.EPHEMERAL)
    				.forPath("/"+service+"/"+version+"/"+address);
    		} catch (UnsupportedEncodingException e) {
    			logger.error("register service address to zookeeper exception:{}",e);
    			throw new ThriftException("register service address to zookeeper exception: address UnsupportedEncodingException", e);
    		} catch (Exception e) {
    			logger.error("register service address to zookeeper exception:{}",e);
    			throw new ThriftException("register service address to zookeeper exception:{}", e);
    		}
    	}
    	
    	public void close(){
    		zkClient.close();
    	}
    }

    3.客户端发现服务

    定义获取服务地址接口

    ThriftServerAddressProvider.java

    1. package cn.slimsmart.thrift.rpc.zookeeper;  
    2.   
    3. import java.net.InetSocketAddress;  
    4. import java.util.List;  
    5.   
    6. /** 
    7.  * thrift server-service地址提供者,以便构建客户端连接池 
    8.  */  
    9. public interface ThriftServerAddressProvider {  
    10.       
    11.     //获取服务名称  
    12.     String getService();  
    13.   
    14.     /** 
    15.      * 获取所有服务端地址 
    16.      * @return 
    17.      */  
    18.     List<InetSocketAddress> findServerAddressList();  
    19.   
    20.     /** 
    21.      * 选取一个合适的address,可以随机获取等' 
    22.      * 内部可以使用合适的算法. 
    23.      * @return 
    24.      */  
    25.     InetSocketAddress selector();  
    26.   
    27.     void close();  
    28. }  
    package cn.slimsmart.thrift.rpc.zookeeper;
    
    import java.net.InetSocketAddress;
    import java.util.List;
    
    /**
     * thrift server-service地址提供者,以便构建客户端连接池
     */
    public interface ThriftServerAddressProvider {
    	
    	//获取服务名称
    	String getService();
    
    	/**
    	 * 获取所有服务端地址
    	 * @return
    	 */
        List<InetSocketAddress> findServerAddressList();
    
        /**
         * 选取一个合适的address,可以随机获取等'
         * 内部可以使用合适的算法.
         * @return
         */
        InetSocketAddress selector();
    
        void close();
    }
    

    基于zookeeper服务地址自动发现实现:ThriftServerAddressProviderZookeeper.java

    1. package cn.slimsmart.thrift.rpc.zookeeper;  
    2.   
    3. import java.net.InetSocketAddress;  
    4. import java.util.ArrayList;  
    5. import java.util.Collections;  
    6. import java.util.HashSet;  
    7. import java.util.LinkedList;  
    8. import java.util.List;  
    9. import java.util.Queue;  
    10. import java.util.Set;  
    11.   
    12. import org.apache.curator.framework.CuratorFramework;  
    13. import org.apache.curator.framework.imps.CuratorFrameworkState;  
    14. import org.apache.curator.framework.recipes.cache.ChildData;  
    15. import org.apache.curator.framework.recipes.cache.PathChildrenCache;  
    16. import org.apache.curator.framework.recipes.cache.PathChildrenCache.StartMode;  
    17. import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;  
    18. import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;  
    19. import org.slf4j.Logger;  
    20. import org.slf4j.LoggerFactory;  
    21. import org.springframework.beans.factory.InitializingBean;  
    22.   
    23. /** 
    24.  * 使用zookeeper作为"config"中心,使用apache-curator方法库来简化zookeeper开发 
    25.  */  
    26. public class ThriftServerAddressProviderZookeeper implements ThriftServerAddressProvider, InitializingBean {  
    27.   
    28.     private Logger logger = LoggerFactory.getLogger(getClass());  
    29.   
    30.     // 注册服务  
    31.     private String service;  
    32.     // 服务版本号  
    33.     private String version = "1.0.0";  
    34.   
    35.     private PathChildrenCache cachedPath;  
    36.   
    37.     private CuratorFramework zkClient;  
    38.   
    39.     // 用来保存当前provider所接触过的地址记录  
    40.     // 当zookeeper集群故障时,可以使用trace中地址,作为"备份"  
    41.     private Set<String> trace = new HashSet<String>();  
    42.   
    43.     private final List<InetSocketAddress> container = new ArrayList<InetSocketAddress>();  
    44.   
    45.     private Queue<InetSocketAddress> inner = new LinkedList<InetSocketAddress>();  
    46.   
    47.     private Object lock = new Object();  
    48.   
    49.     // 默认权重  
    50.     private static final Integer DEFAULT_WEIGHT = 1;  
    51.   
    52.     public void setService(String service) {  
    53.         this.service = service;  
    54.     }  
    55.   
    56.     public void setVersion(String version) {  
    57.         this.version = version;  
    58.     }  
    59.   
    60.     public ThriftServerAddressProviderZookeeper() {  
    61.     }  
    62.   
    63.     public ThriftServerAddressProviderZookeeper(CuratorFramework zkClient) {  
    64.         this.zkClient = zkClient;  
    65.     }  
    66.   
    67.     public void setZkClient(CuratorFramework zkClient) {  
    68.         this.zkClient = zkClient;  
    69.     }  
    70.   
    71.     @Override  
    72.     public void afterPropertiesSet() throws Exception {  
    73.         // 如果zk尚未启动,则启动  
    74.         if (zkClient.getState() == CuratorFrameworkState.LATENT) {  
    75.             zkClient.start();  
    76.         }  
    77.         buildPathChildrenCache(zkClient, getServicePath(), true);  
    78.         cachedPath.start(StartMode.POST_INITIALIZED_EVENT);  
    79.     }  
    80.   
    81.     private String getServicePath(){  
    82.         return "/" + service + "/" + version;  
    83.     }  
    84.     private void buildPathChildrenCache(final CuratorFramework client, String path, Boolean cacheData) throws Exception {  
    85.         cachedPath = new PathChildrenCache(client, path, cacheData);  
    86.         cachedPath.getListenable().addListener(new PathChildrenCacheListener() {  
    87.             @Override  
    88.             public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {  
    89.                 PathChildrenCacheEvent.Type eventType = event.getType();  
    90.                 switch (eventType) {  
    91.                 case CONNECTION_RECONNECTED:  
    92.                     logger.info("Connection is reconection.");  
    93.                     break;  
    94.                 case CONNECTION_SUSPENDED:  
    95.                     logger.info("Connection is suspended.");  
    96.                     break;  
    97.                 case CONNECTION_LOST:  
    98.                     logger.warn("Connection error,waiting...");  
    99.                     return;  
    100.                 default:  
    101.                     //  
    102.                 }  
    103.                 // 任何节点的时机数据变动,都会rebuild,此处为一个"简单的"做法.  
    104.                 cachedPath.rebuild();  
    105.                 rebuild();  
    106.             }  
    107.   
    108.             protected void rebuild() throws Exception {  
    109.                 List<ChildData> children = cachedPath.getCurrentData();  
    110.                 if (children == null || children.isEmpty()) {  
    111.                     // 有可能所有的thrift server都与zookeeper断开了链接  
    112.                     // 但是,有可能,thrift client与thrift server之间的网络是良好的  
    113.                     // 因此此处是否需要清空container,是需要多方面考虑的.  
    114.                     container.clear();  
    115.                     logger.error("thrift server-cluster error....");  
    116.                     return;  
    117.                 }  
    118.                 List<InetSocketAddress> current = new ArrayList<InetSocketAddress>();  
    119.                 String path = null;  
    120.                 for (ChildData data : children) {  
    121.                     path = data.getPath();  
    122.                     logger.debug("get path:"+path);  
    123.                     path = path.substring(getServicePath().length()+1);  
    124.                     logger.debug("get serviceAddress:"+path);  
    125.                     String address = new String(path.getBytes(), "utf-8");  
    126.                     current.addAll(transfer(address));  
    127.                     trace.add(address);  
    128.                 }  
    129.                 Collections.shuffle(current);  
    130.                 synchronized (lock) {  
    131.                     container.clear();  
    132.                     container.addAll(current);  
    133.                     inner.clear();  
    134.                     inner.addAll(current);  
    135.   
    136.                 }  
    137.             }  
    138.         });  
    139.     }  
    140.   
    141.     private List<InetSocketAddress> transfer(String address) {  
    142.         String[] hostname = address.split(":");  
    143.         Integer weight = DEFAULT_WEIGHT;  
    144.         if (hostname.length == 3) {  
    145.             weight = Integer.valueOf(hostname[2]);  
    146.         }  
    147.         String ip = hostname[0];  
    148.         Integer port = Integer.valueOf(hostname[1]);  
    149.         List<InetSocketAddress> result = new ArrayList<InetSocketAddress>();  
    150.         // 根据优先级,将ip:port添加多次到地址集中,然后随机取地址实现负载  
    151.         for (int i = 0; i < weight; i++) {  
    152.             result.add(new InetSocketAddress(ip, port));  
    153.         }  
    154.         return result;  
    155.     }  
    156.   
    157.     @Override  
    158.     public List<InetSocketAddress> findServerAddressList() {  
    159.         return Collections.unmodifiableList(container);  
    160.     }  
    161.   
    162.     @Override  
    163.     public synchronized InetSocketAddress selector() {  
    164.         if (inner.isEmpty()) {  
    165.             if (!container.isEmpty()) {  
    166.                 inner.addAll(container);  
    167.             } else if (!trace.isEmpty()) {  
    168.                 synchronized (lock) {  
    169.                     for (String hostname : trace) {  
    170.                         container.addAll(transfer(hostname));  
    171.                     }  
    172.                     Collections.shuffle(container);  
    173.                     inner.addAll(container);  
    174.                 }  
    175.             }  
    176.         }  
    177.         return inner.poll();  
    178.     }  
    179.   
    180.     @Override  
    181.     public void close() {  
    182.         try {  
    183.             cachedPath.close();  
    184.             zkClient.close();  
    185.         } catch (Exception e) {  
    186.         }  
    187.     }  
    188.   
    189.     @Override  
    190.     public String getService() {  
    191.         return service;  
    192.     }  
    193.   
    194. }  
    package cn.slimsmart.thrift.rpc.zookeeper;
    
    import java.net.InetSocketAddress;
    import java.util.ArrayList;
    import java.util.Collections;
    import java.util.HashSet;
    import java.util.LinkedList;
    import java.util.List;
    import java.util.Queue;
    import java.util.Set;
    
    import org.apache.curator.framework.CuratorFramework;
    import org.apache.curator.framework.imps.CuratorFrameworkState;
    import org.apache.curator.framework.recipes.cache.ChildData;
    import org.apache.curator.framework.recipes.cache.PathChildrenCache;
    import org.apache.curator.framework.recipes.cache.PathChildrenCache.StartMode;
    import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
    import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.beans.factory.InitializingBean;
    
    /**
     * 使用zookeeper作为"config"中心,使用apache-curator方法库来简化zookeeper开发
     */
    public class ThriftServerAddressProviderZookeeper implements ThriftServerAddressProvider, InitializingBean {
    
    	private Logger logger = LoggerFactory.getLogger(getClass());
    
    	// 注册服务
    	private String service;
    	// 服务版本号
    	private String version = "1.0.0";
    
    	private PathChildrenCache cachedPath;
    
    	private CuratorFramework zkClient;
    
    	// 用来保存当前provider所接触过的地址记录
    	// 当zookeeper集群故障时,可以使用trace中地址,作为"备份"
    	private Set<String> trace = new HashSet<String>();
    
    	private final List<InetSocketAddress> container = new ArrayList<InetSocketAddress>();
    
    	private Queue<InetSocketAddress> inner = new LinkedList<InetSocketAddress>();
    
    	private Object lock = new Object();
    
    	// 默认权重
    	private static final Integer DEFAULT_WEIGHT = 1;
    
    	public void setService(String service) {
    		this.service = service;
    	}
    
    	public void setVersion(String version) {
    		this.version = version;
    	}
    
    	public ThriftServerAddressProviderZookeeper() {
    	}
    
    	public ThriftServerAddressProviderZookeeper(CuratorFramework zkClient) {
    		this.zkClient = zkClient;
    	}
    
    	public void setZkClient(CuratorFramework zkClient) {
    		this.zkClient = zkClient;
    	}
    
    	@Override
    	public void afterPropertiesSet() throws Exception {
    		// 如果zk尚未启动,则启动
    		if (zkClient.getState() == CuratorFrameworkState.LATENT) {
    			zkClient.start();
    		}
    		buildPathChildrenCache(zkClient, getServicePath(), true);
    		cachedPath.start(StartMode.POST_INITIALIZED_EVENT);
    	}
    
    	private String getServicePath(){
    		return "/" + service + "/" + version;
    	}
    	private void buildPathChildrenCache(final CuratorFramework client, String path, Boolean cacheData) throws Exception {
    		cachedPath = new PathChildrenCache(client, path, cacheData);
    		cachedPath.getListenable().addListener(new PathChildrenCacheListener() {
    			@Override
    			public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {
    				PathChildrenCacheEvent.Type eventType = event.getType();
    				switch (eventType) {
    				case CONNECTION_RECONNECTED:
    					logger.info("Connection is reconection.");
    					break;
    				case CONNECTION_SUSPENDED:
    					logger.info("Connection is suspended.");
    					break;
    				case CONNECTION_LOST:
    					logger.warn("Connection error,waiting...");
    					return;
    				default:
    					//
    				}
    				// 任何节点的时机数据变动,都会rebuild,此处为一个"简单的"做法.
    				cachedPath.rebuild();
    				rebuild();
    			}
    
    			protected void rebuild() throws Exception {
    				List<ChildData> children = cachedPath.getCurrentData();
    				if (children == null || children.isEmpty()) {
    					// 有可能所有的thrift server都与zookeeper断开了链接
    					// 但是,有可能,thrift client与thrift server之间的网络是良好的
    					// 因此此处是否需要清空container,是需要多方面考虑的.
    					container.clear();
    					logger.error("thrift server-cluster error....");
    					return;
    				}
    				List<InetSocketAddress> current = new ArrayList<InetSocketAddress>();
    				String path = null;
    				for (ChildData data : children) {
    					path = data.getPath();
    					logger.debug("get path:"+path);
    					path = path.substring(getServicePath().length()+1);
    					logger.debug("get serviceAddress:"+path);
    					String address = new String(path.getBytes(), "utf-8");
    					current.addAll(transfer(address));
    					trace.add(address);
    				}
    				Collections.shuffle(current);
    				synchronized (lock) {
    					container.clear();
    					container.addAll(current);
    					inner.clear();
    					inner.addAll(current);
    
    				}
    			}
    		});
    	}
    
    	private List<InetSocketAddress> transfer(String address) {
    		String[] hostname = address.split(":");
    		Integer weight = DEFAULT_WEIGHT;
    		if (hostname.length == 3) {
    			weight = Integer.valueOf(hostname[2]);
    		}
    		String ip = hostname[0];
    		Integer port = Integer.valueOf(hostname[1]);
    		List<InetSocketAddress> result = new ArrayList<InetSocketAddress>();
    		// 根据优先级,将ip:port添加多次到地址集中,然后随机取地址实现负载
    		for (int i = 0; i < weight; i++) {
    			result.add(new InetSocketAddress(ip, port));
    		}
    		return result;
    	}
    
    	@Override
    	public List<InetSocketAddress> findServerAddressList() {
    		return Collections.unmodifiableList(container);
    	}
    
    	@Override
    	public synchronized InetSocketAddress selector() {
    		if (inner.isEmpty()) {
    			if (!container.isEmpty()) {
    				inner.addAll(container);
    			} else if (!trace.isEmpty()) {
    				synchronized (lock) {
    					for (String hostname : trace) {
    						container.addAll(transfer(hostname));
    					}
    					Collections.shuffle(container);
    					inner.addAll(container);
    				}
    			}
    		}
    		return inner.poll();
    	}
    
    	@Override
    	public void close() {
    		try {
                cachedPath.close();
                zkClient.close();
            } catch (Exception e) {
            }
    	}
    
    	@Override
    	public String getService() {
    		return service;
    	}
    
    }
    

    对此接口还做了一种实现,通过配置获取服务地址,参考附件:FixedAddressProvider.java

    三、服务端服务注册实现

    ThriftServiceServerFactory.java

    1. package cn.slimsmart.thrift.rpc;  
    2.   
    3. import java.lang.instrument.IllegalClassFormatException;  
    4. import java.lang.reflect.Constructor;  
    5.   
    6. import org.apache.thrift.TProcessor;  
    7. import org.apache.thrift.TProcessorFactory;  
    8. import org.apache.thrift.protocol.TBinaryProtocol;  
    9. import org.apache.thrift.server.TServer;  
    10. import org.apache.thrift.server.TThreadedSelectorServer;  
    11. import org.apache.thrift.transport.TFramedTransport;  
    12. import org.apache.thrift.transport.TNonblockingServerSocket;  
    13. import org.springframework.beans.factory.InitializingBean;  
    14. import org.springframework.util.StringUtils;  
    15.   
    16. import cn.slimsmart.thrift.rpc.zookeeper.ThriftServerAddressRegister;  
    17. import cn.slimsmart.thrift.rpc.zookeeper.ThriftServerIpLocalNetworkResolve;  
    18. import cn.slimsmart.thrift.rpc.zookeeper.ThriftServerIpResolve;  
    19.   
    20. /** 
    21.  * 服务端注册服务工厂 
    22.  */  
    23. public class ThriftServiceServerFactory implements InitializingBean {  
    24.     // 服务注册本机端口  
    25.     private Integer port = 8299;  
    26.     // 优先级  
    27.     private Integer weight = 1;// default  
    28.     // 服务实现类  
    29.     private Object service;// serice实现类  
    30.     //服务版本号  
    31.     private String version;  
    32.     // 解析本机IP  
    33.     private ThriftServerIpResolve thriftServerIpResolve;  
    34.     //服务注册  
    35.     private ThriftServerAddressRegister thriftServerAddressRegister;  
    36.   
    37.     private ServerThread serverThread;  
    38.       
    39.     public void setPort(Integer port) {  
    40.         this.port = port;  
    41.     }  
    42.   
    43.     public void setWeight(Integer weight) {  
    44.         this.weight = weight;  
    45.     }  
    46.   
    47.     public void setService(Object service) {  
    48.         this.service = service;  
    49.     }  
    50.   
    51.     public void setVersion(String version) {  
    52.         this.version = version;  
    53.     }  
    54.   
    55.     public void setThriftServerIpResolve(ThriftServerIpResolve thriftServerIpResolve) {  
    56.         this.thriftServerIpResolve = thriftServerIpResolve;  
    57.     }  
    58.   
    59.     public void setThriftServerAddressRegister(ThriftServerAddressRegister thriftServerAddressRegister) {  
    60.         this.thriftServerAddressRegister = thriftServerAddressRegister;  
    61.     }  
    62.   
    63.     @Override  
    64.     public void afterPropertiesSet() throws Exception {  
    65.         if (thriftServerIpResolve == null) {  
    66.             thriftServerIpResolve = new ThriftServerIpLocalNetworkResolve();  
    67.         }  
    68.         String serverIP = thriftServerIpResolve.getServerIp();  
    69.         if (StringUtils.isEmpty(serverIP)) {  
    70.             throw new ThriftException("cant find server ip...");  
    71.         }  
    72.   
    73.         String hostname = serverIP + ":" + port + ":" + weight;  
    74.         Class<?> serviceClass = service.getClass();  
    75.         // 获取实现类接口  
    76.         Class<?>[] interfaces = serviceClass.getInterfaces();  
    77.         if (interfaces.length == 0) {  
    78.             throw new IllegalClassFormatException("service-class should implements Iface");  
    79.         }  
    80.         // reflect,load "Processor";  
    81.         TProcessor processor = null;  
    82.         String serviceName = null;  
    83.         for (Class<?> clazz : interfaces) {  
    84.             String cname = clazz.getSimpleName();  
    85.             if (!cname.equals("Iface")) {  
    86.                 continue;  
    87.             }  
    88.             serviceName = clazz.getEnclosingClass().getName();  
    89.             String pname = serviceName + "$Processor";  
    90.             try {  
    91.                 ClassLoader classLoader = Thread.currentThread().getContextClassLoader();  
    92.                 Class<?> pclass = classLoader.loadClass(pname);  
    93.                 if (!TProcessor.class.isAssignableFrom(pclass)) {  
    94.                     continue;  
    95.                 }  
    96.                 Constructor<?> constructor = pclass.getConstructor(clazz);  
    97.                 processor = (TProcessor) constructor.newInstance(service);  
    98.                 break;  
    99.             } catch (Exception e) {  
    100.                 //  
    101.             }  
    102.         }  
    103.         if (processor == null) {  
    104.             throw new IllegalClassFormatException("service-class should implements Iface");  
    105.         }  
    106.         //需要单独的线程,因为serve方法是阻塞的.  
    107.         serverThread = new ServerThread(processor, port);  
    108.         serverThread.start();  
    109.         // 注册服务  
    110.         if (thriftServerAddressRegister != null) {  
    111.             thriftServerAddressRegister.register(serviceName, version, hostname);  
    112.         }  
    113.   
    114.     }  
    115.     class ServerThread extends Thread {  
    116.         private TServer server;  
    117.         ServerThread(TProcessor processor, int port) throws Exception {  
    118.             TNonblockingServerSocket serverTransport = new TNonblockingServerSocket(port);  
    119.             TThreadedSelectorServer.Args tArgs = new TThreadedSelectorServer.Args(serverTransport);    
    120.             TProcessorFactory processorFactory = new TProcessorFactory(processor);  
    121.             tArgs.processorFactory(processorFactory);  
    122.             tArgs.transportFactory(new TFramedTransport.Factory());    
    123.             tArgs.protocolFactory( new TBinaryProtocol.Factory(true, true));   
    124.             server = new TThreadedSelectorServer(tArgs);  
    125.         }  
    126.   
    127.         @Override  
    128.         public void run(){  
    129.             try{  
    130.                 //启动服务  
    131.                 server.serve();  
    132.             }catch(Exception e){  
    133.                 //  
    134.             }  
    135.         }  
    136.           
    137.         public void stopServer(){  
    138.             server.stop();  
    139.         }  
    140.     }  
    141.       
    142.     public void close() {  
    143.         serverThread.stopServer();  
    144.     }  
    145. }  
    package cn.slimsmart.thrift.rpc;
    
    import java.lang.instrument.IllegalClassFormatException;
    import java.lang.reflect.Constructor;
    
    import org.apache.thrift.TProcessor;
    import org.apache.thrift.TProcessorFactory;
    import org.apache.thrift.protocol.TBinaryProtocol;
    import org.apache.thrift.server.TServer;
    import org.apache.thrift.server.TThreadedSelectorServer;
    import org.apache.thrift.transport.TFramedTransport;
    import org.apache.thrift.transport.TNonblockingServerSocket;
    import org.springframework.beans.factory.InitializingBean;
    import org.springframework.util.StringUtils;
    
    import cn.slimsmart.thrift.rpc.zookeeper.ThriftServerAddressRegister;
    import cn.slimsmart.thrift.rpc.zookeeper.ThriftServerIpLocalNetworkResolve;
    import cn.slimsmart.thrift.rpc.zookeeper.ThriftServerIpResolve;
    
    /**
     * 服务端注册服务工厂
     */
    public class ThriftServiceServerFactory implements InitializingBean {
    	// 服务注册本机端口
    	private Integer port = 8299;
    	// 优先级
    	private Integer weight = 1;// default
    	// 服务实现类
    	private Object service;// serice实现类
    	//服务版本号
    	private String version;
    	// 解析本机IP
    	private ThriftServerIpResolve thriftServerIpResolve;
    	//服务注册
    	private ThriftServerAddressRegister thriftServerAddressRegister;
    
    	private ServerThread serverThread;
    	
    	public void setPort(Integer port) {
    		this.port = port;
    	}
    
    	public void setWeight(Integer weight) {
    		this.weight = weight;
    	}
    
    	public void setService(Object service) {
    		this.service = service;
    	}
    
    	public void setVersion(String version) {
    		this.version = version;
    	}
    
    	public void setThriftServerIpResolve(ThriftServerIpResolve thriftServerIpResolve) {
    		this.thriftServerIpResolve = thriftServerIpResolve;
    	}
    
    	public void setThriftServerAddressRegister(ThriftServerAddressRegister thriftServerAddressRegister) {
    		this.thriftServerAddressRegister = thriftServerAddressRegister;
    	}
    
    	@Override
    	public void afterPropertiesSet() throws Exception {
    		if (thriftServerIpResolve == null) {
    			thriftServerIpResolve = new ThriftServerIpLocalNetworkResolve();
    		}
    		String serverIP = thriftServerIpResolve.getServerIp();
    		if (StringUtils.isEmpty(serverIP)) {
    			throw new ThriftException("cant find server ip...");
    		}
    
    		String hostname = serverIP + ":" + port + ":" + weight;
    		Class<?> serviceClass = service.getClass();
    		// 获取实现类接口
    		Class<?>[] interfaces = serviceClass.getInterfaces();
    		if (interfaces.length == 0) {
    			throw new IllegalClassFormatException("service-class should implements Iface");
    		}
    		// reflect,load "Processor";
    		TProcessor processor = null;
    		String serviceName = null;
    		for (Class<?> clazz : interfaces) {
    			String cname = clazz.getSimpleName();
    			if (!cname.equals("Iface")) {
    				continue;
    			}
    			serviceName = clazz.getEnclosingClass().getName();
    			String pname = serviceName + "$Processor";
    			try {
    				ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
    				Class<?> pclass = classLoader.loadClass(pname);
    				if (!TProcessor.class.isAssignableFrom(pclass)) {
    					continue;
    				}
    				Constructor<?> constructor = pclass.getConstructor(clazz);
    				processor = (TProcessor) constructor.newInstance(service);
    				break;
    			} catch (Exception e) {
    				//
    			}
    		}
    		if (processor == null) {
    			throw new IllegalClassFormatException("service-class should implements Iface");
    		}
    		//需要单独的线程,因为serve方法是阻塞的.
    		serverThread = new ServerThread(processor, port);
    		serverThread.start();
    		// 注册服务
    		if (thriftServerAddressRegister != null) {
    			thriftServerAddressRegister.register(serviceName, version, hostname);
    		}
    
    	}
    	class ServerThread extends Thread {
    		private TServer server;
    		ServerThread(TProcessor processor, int port) throws Exception {
    			TNonblockingServerSocket serverTransport = new TNonblockingServerSocket(port);
    			TThreadedSelectorServer.Args tArgs = new TThreadedSelectorServer.Args(serverTransport);  
    			TProcessorFactory processorFactory = new TProcessorFactory(processor);
    			tArgs.processorFactory(processorFactory);
    			tArgs.transportFactory(new TFramedTransport.Factory());  
    			tArgs.protocolFactory( new TBinaryProtocol.Factory(true, true)); 
    			server = new TThreadedSelectorServer(tArgs);
    		}
    
    		@Override
    		public void run(){
    			try{
    				//启动服务
    				server.serve();
    			}catch(Exception e){
    				//
    			}
    		}
    		
    		public void stopServer(){
    			server.stop();
    		}
    	}
    	
    	public void close() {
    		serverThread.stopServer();
    	}
    }

    四、客户端获取服务代理及连接池实现
    客户端连接池实现:ThriftClientPoolFactory.java

    1. package cn.slimsmart.thrift.rpc;  
    2.   
    3. import java.net.InetSocketAddress;  
    4.   
    5. import org.apache.commons.pool.BasePoolableObjectFactory;  
    6. import org.apache.thrift.TServiceClient;  
    7. import org.apache.thrift.TServiceClientFactory;  
    8. import org.apache.thrift.protocol.TBinaryProtocol;  
    9. import org.apache.thrift.protocol.TProtocol;  
    10. import org.apache.thrift.transport.TFramedTransport;  
    11. import org.apache.thrift.transport.TSocket;  
    12. import org.apache.thrift.transport.TTransport;  
    13.   
    14. import cn.slimsmart.thrift.rpc.zookeeper.ThriftServerAddressProvider;  
    15.   
    16. /** 
    17.  * 连接池,thrift-client for spring 
    18.  */  
    19. public class ThriftClientPoolFactory extends BasePoolableObjectFactory<TServiceClient> {  
    20.   
    21.     private final ThriftServerAddressProvider serverAddressProvider;  
    22.     private final TServiceClientFactory<TServiceClient> clientFactory;  
    23.     private PoolOperationCallBack callback;  
    24.   
    25.     protected ThriftClientPoolFactory(ThriftServerAddressProvider addressProvider, TServiceClientFactory<TServiceClient> clientFactory) throws Exception {  
    26.         this.serverAddressProvider = addressProvider;  
    27.         this.clientFactory = clientFactory;  
    28.     }  
    29.   
    30.     protected ThriftClientPoolFactory(ThriftServerAddressProvider addressProvider, TServiceClientFactory<TServiceClient> clientFactory,  
    31.             PoolOperationCallBack callback) throws Exception {  
    32.         this.serverAddressProvider = addressProvider;  
    33.         this.clientFactory = clientFactory;  
    34.         this.callback = callback;  
    35.     }  
    36.   
    37.     static interface PoolOperationCallBack {  
    38.         // 销毁client之前执行  
    39.         void destroy(TServiceClient client);  
    40.   
    41.         // 创建成功是执行  
    42.         void make(TServiceClient client);  
    43.     }  
    44.   
    45.     public void destroyObject(TServiceClient client) throws Exception {  
    46.         if (callback != null) {  
    47.             try {  
    48.                 callback.destroy(client);  
    49.             } catch (Exception e) {  
    50.                 //  
    51.             }  
    52.         }  
    53.         TTransport pin = client.getInputProtocol().getTransport();  
    54.         pin.close();  
    55.     }  
    56.   
    57.     public boolean validateObject(TServiceClient client) {  
    58.         TTransport pin = client.getInputProtocol().getTransport();  
    59.         return pin.isOpen();  
    60.     }  
    61.   
    62.     @Override  
    63.     public TServiceClient makeObject() throws Exception {  
    64.         InetSocketAddress address = serverAddressProvider.selector();  
    65.         TSocket tsocket = new TSocket(address.getHostName(), address.getPort());  
    66.         TTransport transport = new TFramedTransport(tsocket);  
    67.         TProtocol protocol = new TBinaryProtocol(transport);  
    68.         TServiceClient client = this.clientFactory.getClient(protocol);  
    69.         transport.open();  
    70.         if (callback != null) {  
    71.             try {  
    72.                 callback.make(client);  
    73.             } catch (Exception e) {  
    74.                 //  
    75.             }  
    76.         }  
    77.         return client;  
    78.     }  
    79.   
    80. }  
    package cn.slimsmart.thrift.rpc;
    
    import java.net.InetSocketAddress;
    
    import org.apache.commons.pool.BasePoolableObjectFactory;
    import org.apache.thrift.TServiceClient;
    import org.apache.thrift.TServiceClientFactory;
    import org.apache.thrift.protocol.TBinaryProtocol;
    import org.apache.thrift.protocol.TProtocol;
    import org.apache.thrift.transport.TFramedTransport;
    import org.apache.thrift.transport.TSocket;
    import org.apache.thrift.transport.TTransport;
    
    import cn.slimsmart.thrift.rpc.zookeeper.ThriftServerAddressProvider;
    
    /**
     * 连接池,thrift-client for spring
     */
    public class ThriftClientPoolFactory extends BasePoolableObjectFactory<TServiceClient> {
    
    	private final ThriftServerAddressProvider serverAddressProvider;
    	private final TServiceClientFactory<TServiceClient> clientFactory;
    	private PoolOperationCallBack callback;
    
    	protected ThriftClientPoolFactory(ThriftServerAddressProvider addressProvider, TServiceClientFactory<TServiceClient> clientFactory) throws Exception {
    		this.serverAddressProvider = addressProvider;
    		this.clientFactory = clientFactory;
    	}
    
    	protected ThriftClientPoolFactory(ThriftServerAddressProvider addressProvider, TServiceClientFactory<TServiceClient> clientFactory,
    			PoolOperationCallBack callback) throws Exception {
    		this.serverAddressProvider = addressProvider;
    		this.clientFactory = clientFactory;
    		this.callback = callback;
    	}
    
    	static interface PoolOperationCallBack {
    		// 销毁client之前执行
    		void destroy(TServiceClient client);
    
    		// 创建成功是执行
    		void make(TServiceClient client);
    	}
    
    	public void destroyObject(TServiceClient client) throws Exception {
    		if (callback != null) {
    			try {
    				callback.destroy(client);
    			} catch (Exception e) {
    				//
    			}
    		}
    		TTransport pin = client.getInputProtocol().getTransport();
    		pin.close();
    	}
    
    	public boolean validateObject(TServiceClient client) {
    		TTransport pin = client.getInputProtocol().getTransport();
    		return pin.isOpen();
    	}
    
    	@Override
    	public TServiceClient makeObject() throws Exception {
    		InetSocketAddress address = serverAddressProvider.selector();
    		TSocket tsocket = new TSocket(address.getHostName(), address.getPort());
    		TTransport transport = new TFramedTransport(tsocket);
    		TProtocol protocol = new TBinaryProtocol(transport);
    		TServiceClient client = this.clientFactory.getClient(protocol);
    		transport.open();
    		if (callback != null) {
    			try {
    				callback.make(client);
    			} catch (Exception e) {
    				//
    			}
    		}
    		return client;
    	}
    
    }
    

    客户端服务代理工厂实现:ThriftServiceClientProxyFactory.java

    1. package cn.slimsmart.thrift.rpc;  
    2.   
    3. import java.lang.reflect.InvocationHandler;  
    4. import java.lang.reflect.Method;  
    5. import java.lang.reflect.Proxy;  
    6.   
    7. import org.apache.commons.pool.impl.GenericObjectPool;  
    8. import org.apache.thrift.TServiceClient;  
    9. import org.apache.thrift.TServiceClientFactory;  
    10. import org.springframework.beans.factory.FactoryBean;  
    11. import org.springframework.beans.factory.InitializingBean;  
    12.   
    13. import cn.slimsmart.thrift.rpc.ThriftClientPoolFactory.PoolOperationCallBack;  
    14. import cn.slimsmart.thrift.rpc.zookeeper.ThriftServerAddressProvider;  
    15.   
    16. /** 
    17.  * 客户端代理 
    18.  */  
    19. @SuppressWarnings({ "unchecked", "rawtypes" })  
    20. public class ThriftServiceClientProxyFactory implements FactoryBean, InitializingBean {  
    21.   
    22.     private Integer maxActive = 32;// 最大活跃连接数  
    23.   
    24.     // ms,default 3 min,链接空闲时间  
    25.     // -1,关闭空闲检测  
    26.     private Integer idleTime = 180000;  
    27.     private ThriftServerAddressProvider serverAddressProvider;  
    28.   
    29.     private Object proxyClient;  
    30.     private Class<?> objectClass;  
    31.   
    32.     private GenericObjectPool<TServiceClient> pool;  
    33.   
    34.     private PoolOperationCallBack callback = new PoolOperationCallBack() {  
    35.         @Override  
    36.         public void make(TServiceClient client) {  
    37.             System.out.println("create");  
    38.         }  
    39.   
    40.         @Override  
    41.         public void destroy(TServiceClient client) {  
    42.             System.out.println("destroy");  
    43.         }  
    44.     };  
    45.       
    46.     public void setMaxActive(Integer maxActive) {  
    47.         this.maxActive = maxActive;  
    48.     }  
    49.   
    50.     public void setIdleTime(Integer idleTime) {  
    51.         this.idleTime = idleTime;  
    52.     }  
    53.   
    54.     public void setServerAddressProvider(ThriftServerAddressProvider serverAddressProvider) {  
    55.         this.serverAddressProvider = serverAddressProvider;  
    56.     }  
    57.   
    58.     @Override  
    59.     public void afterPropertiesSet() throws Exception {  
    60.         ClassLoader classLoader = Thread.currentThread().getContextClassLoader();  
    61.         // 加载Iface接口  
    62.         objectClass = classLoader.loadClass(serverAddressProvider.getService() + "$Iface");  
    63.         // 加载Client.Factory类  
    64.         Class<TServiceClientFactory<TServiceClient>> fi = (Class<TServiceClientFactory<TServiceClient>>) classLoader.loadClass(serverAddressProvider.getService() + "$Client$Factory");  
    65.         TServiceClientFactory<TServiceClient> clientFactory = fi.newInstance();  
    66.         ThriftClientPoolFactory clientPool = new ThriftClientPoolFactory(serverAddressProvider, clientFactory, callback);  
    67.         GenericObjectPool.Config poolConfig = new GenericObjectPool.Config();  
    68.         poolConfig.maxActive = maxActive;  
    69.         poolConfig.minIdle = 0;  
    70.         poolConfig.minEvictableIdleTimeMillis = idleTime;  
    71.         poolConfig.timeBetweenEvictionRunsMillis = idleTime / 2L;  
    72.         pool = new GenericObjectPool<TServiceClient>(clientPool, poolConfig);  
    73.         proxyClient = Proxy.newProxyInstance(classLoader, new Class[] { objectClass }, new InvocationHandler() {  
    74.             @Override  
    75.             public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {  
    76.                 //  
    77.                 TServiceClient client = pool.borrowObject();  
    78.                 try {  
    79.                     return method.invoke(client, args);  
    80.                 } catch (Exception e) {  
    81.                     throw e;  
    82.                 } finally {  
    83.                     pool.returnObject(client);  
    84.                 }  
    85.             }  
    86.         });  
    87.     }  
    88.   
    89.     @Override  
    90.     public Object getObject() throws Exception {  
    91.         return proxyClient;  
    92.     }  
    93.   
    94.     @Override  
    95.     public Class<?> getObjectType() {  
    96.         return objectClass;  
    97.     }  
    98.   
    99.     @Override  
    100.     public boolean isSingleton() {  
    101.         return true;  
    102.     }  
    103.   
    104.     public void close() {  
    105.         if (serverAddressProvider != null) {  
    106.             serverAddressProvider.close();  
    107.         }  
    108.     }  
    109. }  
    package cn.slimsmart.thrift.rpc;
    
    import java.lang.reflect.InvocationHandler;
    import java.lang.reflect.Method;
    import java.lang.reflect.Proxy;
    
    import org.apache.commons.pool.impl.GenericObjectPool;
    import org.apache.thrift.TServiceClient;
    import org.apache.thrift.TServiceClientFactory;
    import org.springframework.beans.factory.FactoryBean;
    import org.springframework.beans.factory.InitializingBean;
    
    import cn.slimsmart.thrift.rpc.ThriftClientPoolFactory.PoolOperationCallBack;
    import cn.slimsmart.thrift.rpc.zookeeper.ThriftServerAddressProvider;
    
    /**
     * 客户端代理
     */
    @SuppressWarnings({ "unchecked", "rawtypes" })
    public class ThriftServiceClientProxyFactory implements FactoryBean, InitializingBean {
    
    	private Integer maxActive = 32;// 最大活跃连接数
    
    	// ms,default 3 min,链接空闲时间
    	// -1,关闭空闲检测
    	private Integer idleTime = 180000;
    	private ThriftServerAddressProvider serverAddressProvider;
    
    	private Object proxyClient;
    	private Class<?> objectClass;
    
    	private GenericObjectPool<TServiceClient> pool;
    
    	private PoolOperationCallBack callback = new PoolOperationCallBack() {
    		@Override
    		public void make(TServiceClient client) {
    			System.out.println("create");
    		}
    
    		@Override
    		public void destroy(TServiceClient client) {
    			System.out.println("destroy");
    		}
    	};
    	
    	public void setMaxActive(Integer maxActive) {
    		this.maxActive = maxActive;
    	}
    
    	public void setIdleTime(Integer idleTime) {
    		this.idleTime = idleTime;
    	}
    
    	public void setServerAddressProvider(ThriftServerAddressProvider serverAddressProvider) {
    		this.serverAddressProvider = serverAddressProvider;
    	}
    
    	@Override
    	public void afterPropertiesSet() throws Exception {
    		ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
    		// 加载Iface接口
    		objectClass = classLoader.loadClass(serverAddressProvider.getService() + "$Iface");
    		// 加载Client.Factory类
    		Class<TServiceClientFactory<TServiceClient>> fi = (Class<TServiceClientFactory<TServiceClient>>) classLoader.loadClass(serverAddressProvider.getService() + "$Client$Factory");
    		TServiceClientFactory<TServiceClient> clientFactory = fi.newInstance();
    		ThriftClientPoolFactory clientPool = new ThriftClientPoolFactory(serverAddressProvider, clientFactory, callback);
    		GenericObjectPool.Config poolConfig = new GenericObjectPool.Config();
    		poolConfig.maxActive = maxActive;
    		poolConfig.minIdle = 0;
    		poolConfig.minEvictableIdleTimeMillis = idleTime;
    		poolConfig.timeBetweenEvictionRunsMillis = idleTime / 2L;
    		pool = new GenericObjectPool<TServiceClient>(clientPool, poolConfig);
    		proxyClient = Proxy.newProxyInstance(classLoader, new Class[] { objectClass }, new InvocationHandler() {
    			@Override
    			public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
    				//
    				TServiceClient client = pool.borrowObject();
    				try {
    					return method.invoke(client, args);
    				} catch (Exception e) {
    					throw e;
    				} finally {
    					pool.returnObject(client);
    				}
    			}
    		});
    	}
    
    	@Override
    	public Object getObject() throws Exception {
    		return proxyClient;
    	}
    
    	@Override
    	public Class<?> getObjectType() {
    		return objectClass;
    	}
    
    	@Override
    	public boolean isSingleton() {
    		return true;
    	}
    
    	public void close() {
    		if (serverAddressProvider != null) {
    			serverAddressProvider.close();
    		}
    	}
    }
    

    下面我们看一下服务端和客户端的配置;

    服务端spring-context-thrift-server.xml

    1. <?xml version="1.0" encoding="UTF-8"?>  
    2. <beans xmlns="http://www.springframework.org/schema/beans"  
    3.     xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context"  
    4.     xmlns:aop="http://www.springframework.org/schema/aop" xmlns:tx="http://www.springframework.org/schema/tx"  
    5.     xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.0.xsd  
    6.                 http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.0.xsd  
    7.                 http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-4.0.xsd  
    8.                 http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-4.0.xsd"  
    9.     default-lazy-init="false">  
    10.   
    11.     <!-- zookeeper -->  
    12.     <bean id="thriftZookeeper" class="cn.slimsmart.thrift.rpc.zookeeper.ZookeeperFactory"  
    13.         destroy-method="close">  
    14.         <property name="zkHosts"  
    15.             value="192.168.36.54:2181,192.168.36.99:2181,192.168.36.189:2181" />  
    16.         <property name="namespace" value="cn.slimsmart.thrift.rpc.demo" />  
    17.         <property name="connectionTimeout" value="3000" />  
    18.         <property name="sessionTimeout" value="3000" />  
    19.         <property name="singleton" value="true" />  
    20.     </bean>  
    21.     <bean id="sericeAddressRegister"  
    22.         class="cn.slimsmart.thrift.rpc.zookeeper.ThriftServerAddressRegisterZookeeper"  
    23.         destroy-method="close">  
    24.         <property name="zkClient" ref="thriftZookeeper" />  
    25.     </bean>  
    26.     <bean id="echoSerivceImpl" class="cn.slimsmart.thrift.rpc.demo.EchoSerivceImpl" />  
    27.   
    28.     <bean id="echoSerivce" class="cn.slimsmart.thrift.rpc.ThriftServiceServerFactory"  
    29.         destroy-method="close">  
    30.         <property name="service" ref="echoSerivceImpl" />  
    31.         <property name="port" value="9000" />  
    32.         <property name="version" value="1.0.0" />  
    33.         <property name="weight" value="1" />  
    34.         <property name="thriftServerAddressRegister" ref="sericeAddressRegister" />  
    35.     </bean>  
    36.       
    37.     <bean id="echoSerivce1" class="cn.slimsmart.thrift.rpc.ThriftServiceServerFactory"  
    38.         destroy-method="close">  
    39.         <property name="service" ref="echoSerivceImpl" />  
    40.         <property name="port" value="9001" />  
    41.         <property name="version" value="1.0.0" />  
    42.         <property name="weight" value="1" />  
    43.         <property name="thriftServerAddressRegister" ref="sericeAddressRegister" />  
    44.     </bean>  
    45.       
    46.     <bean id="echoSerivce2" class="cn.slimsmart.thrift.rpc.ThriftServiceServerFactory"  
    47.         destroy-method="close">  
    48.         <property name="service" ref="echoSerivceImpl" />  
    49.         <property name="port" value="9002" />  
    50.         <property name="version" value="1.0.0" />  
    51.         <property name="weight" value="1" />  
    52.         <property name="thriftServerAddressRegister" ref="sericeAddressRegister" />  
    53.     </bean>  
    54. </beans>  
    <?xml version="1.0" encoding="UTF-8"?>
    <beans xmlns="http://www.springframework.org/schema/beans"
    	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context"
    	xmlns:aop="http://www.springframework.org/schema/aop" xmlns:tx="http://www.springframework.org/schema/tx"
    	xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.0.xsd
    				http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.0.xsd
    				http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-4.0.xsd
    				http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-4.0.xsd"
    	default-lazy-init="false">
    
    	<!-- zookeeper -->
    	<bean id="thriftZookeeper" class="cn.slimsmart.thrift.rpc.zookeeper.ZookeeperFactory"
    		destroy-method="close">
    		<property name="zkHosts"
    			value="192.168.36.54:2181,192.168.36.99:2181,192.168.36.189:2181" />
    		<property name="namespace" value="cn.slimsmart.thrift.rpc.demo" />
    		<property name="connectionTimeout" value="3000" />
    		<property name="sessionTimeout" value="3000" />
    		<property name="singleton" value="true" />
    	</bean>
    	<bean id="sericeAddressRegister"
    		class="cn.slimsmart.thrift.rpc.zookeeper.ThriftServerAddressRegisterZookeeper"
    		destroy-method="close">
    		<property name="zkClient" ref="thriftZookeeper" />
    	</bean>
    	<bean id="echoSerivceImpl" class="cn.slimsmart.thrift.rpc.demo.EchoSerivceImpl" />
    
    	<bean id="echoSerivce" class="cn.slimsmart.thrift.rpc.ThriftServiceServerFactory"
    		destroy-method="close">
    		<property name="service" ref="echoSerivceImpl" />
    		<property name="port" value="9000" />
    		<property name="version" value="1.0.0" />
    		<property name="weight" value="1" />
    		<property name="thriftServerAddressRegister" ref="sericeAddressRegister" />
    	</bean>
    	
    	<bean id="echoSerivce1" class="cn.slimsmart.thrift.rpc.ThriftServiceServerFactory"
    		destroy-method="close">
    		<property name="service" ref="echoSerivceImpl" />
    		<property name="port" value="9001" />
    		<property name="version" value="1.0.0" />
    		<property name="weight" value="1" />
    		<property name="thriftServerAddressRegister" ref="sericeAddressRegister" />
    	</bean>
    	
    	<bean id="echoSerivce2" class="cn.slimsmart.thrift.rpc.ThriftServiceServerFactory"
    		destroy-method="close">
    		<property name="service" ref="echoSerivceImpl" />
    		<property name="port" value="9002" />
    		<property name="version" value="1.0.0" />
    		<property name="weight" value="1" />
    		<property name="thriftServerAddressRegister" ref="sericeAddressRegister" />
    	</bean>
    </beans>

    客户端:spring-context-thrift-client.xml

    1. <?xml version="1.0" encoding="UTF-8"?>  
    2. <beans xmlns="http://www.springframework.org/schema/beans"  
    3.     xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context"  
    4.     xmlns:aop="http://www.springframework.org/schema/aop" xmlns:tx="http://www.springframework.org/schema/tx"  
    5.     xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.0.xsd  
    6.                 http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.0.xsd  
    7.                 http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-4.0.xsd  
    8.                 http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-4.0.xsd"  
    9.     default-lazy-init="false">  
    10.       
    11.     <!-- fixedAddress -->  
    12.     <!--   
    13.     <bean id="fixedAddressProvider" class="cn.slimsmart.thrift.rpc.zookeeper.FixedAddressProvider">  
    14.          <property name="service" value="cn.slimsmart.thrift.rpc.demo.EchoSerivce" />  
    15.          <property name="serverAddress" value="192.168.36.215:9001:1,192.168.36.215:9002:2,192.168.36.215:9003:3" />  
    16.     </bean>  
    17.     <bean id="echoSerivce" class="cn.slimsmart.thrift.rpc.ThriftServiceClientProxyFactory">  
    18.         <property name="maxActive" value="5" />  
    19.         <property name="idleTime" value="10000" />  
    20.         <property name="serverAddressProvider" ref="fixedAddressProvider" />  
    21.     </bean>  
    22.    -->  
    23.     <!-- zookeeper   -->  
    24.     <bean id="thriftZookeeper" class="cn.slimsmart.thrift.rpc.zookeeper.ZookeeperFactory"  
    25.         destroy-method="close">  
    26.         <property name="zkHosts"  
    27.             value="192.168.36.54:2181,192.168.36.99:2181,192.168.36.189:2181" />  
    28.         <property name="namespace" value="cn.slimsmart.thrift.rpc.demo" />  
    29.         <property name="connectionTimeout" value="3000" />  
    30.         <property name="sessionTimeout" value="3000" />  
    31.         <property name="singleton" value="true" />  
    32.     </bean>  
    33.     <bean id="echoSerivce" class="cn.slimsmart.thrift.rpc.ThriftServiceClientProxyFactory" destroy-method="close">  
    34.         <property name="maxActive" value="5" />  
    35.         <property name="idleTime" value="1800000" />  
    36.         <property name="serverAddressProvider">  
    37.             <bean class="cn.slimsmart.thrift.rpc.zookeeper.ThriftServerAddressProviderZookeeper">  
    38.                 <property name="service" value="cn.slimsmart.thrift.rpc.demo.EchoSerivce" />  
    39.                 <property name="version" value="1.0.0" />  
    40.                 <property name="zkClient" ref="thriftZookeeper" />  
    41.             </bean>  
    42.         </property>  
    43.     </bean>  
    44. </beans>  
    <?xml version="1.0" encoding="UTF-8"?>
    <beans xmlns="http://www.springframework.org/schema/beans"
    	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context"
    	xmlns:aop="http://www.springframework.org/schema/aop" xmlns:tx="http://www.springframework.org/schema/tx"
    	xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.0.xsd
    				http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.0.xsd
    				http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-4.0.xsd
    				http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-4.0.xsd"
    	default-lazy-init="false">
    	
    	<!-- fixedAddress -->
    	<!--	
    	<bean id="fixedAddressProvider" class="cn.slimsmart.thrift.rpc.zookeeper.FixedAddressProvider">
    		 <property name="service" value="cn.slimsmart.thrift.rpc.demo.EchoSerivce" />
             <property name="serverAddress" value="192.168.36.215:9001:1,192.168.36.215:9002:2,192.168.36.215:9003:3" />
    	</bean>
        <bean id="echoSerivce" class="cn.slimsmart.thrift.rpc.ThriftServiceClientProxyFactory">
            <property name="maxActive" value="5" />
            <property name="idleTime" value="10000" />
            <property name="serverAddressProvider" ref="fixedAddressProvider" />
        </bean>
       -->
        <!-- zookeeper   -->
        <bean id="thriftZookeeper" class="cn.slimsmart.thrift.rpc.zookeeper.ZookeeperFactory"
    		destroy-method="close">
    		<property name="zkHosts"
    			value="192.168.36.54:2181,192.168.36.99:2181,192.168.36.189:2181" />
    		<property name="namespace" value="cn.slimsmart.thrift.rpc.demo" />
    		<property name="connectionTimeout" value="3000" />
    		<property name="sessionTimeout" value="3000" />
    		<property name="singleton" value="true" />
    	</bean>
        <bean id="echoSerivce" class="cn.slimsmart.thrift.rpc.ThriftServiceClientProxyFactory" destroy-method="close">
            <property name="maxActive" value="5" />
            <property name="idleTime" value="1800000" />
            <property name="serverAddressProvider">
            	<bean class="cn.slimsmart.thrift.rpc.zookeeper.ThriftServerAddressProviderZookeeper">
            		<property name="service" value="cn.slimsmart.thrift.rpc.demo.EchoSerivce" />
            		<property name="version" value="1.0.0" />
            		<property name="zkClient" ref="thriftZookeeper" />
            	</bean>
            </property>
        </bean>
    </beans>

    运行服务端后,我们可以看见zookeeper注册了多个服务地址。


    详细实例这里就不详述了,请参考实例代码:https://github.com/slimina/thrift-zookeeper-rpc

    关于Thrift设计优化文档:

    Thrift RPC服务框架日志的优化

  • 相关阅读:
    小米2系列板砖自救行动
    大公司都有哪些开源项目~~~阿里,百度,腾讯,360,新浪,网易,小米等
    SQLServer 2016安装时的错误:Polybase要求安装Oracle JRE 7更新51或更高版本
    异步方法不能使用ref和out的解决方法
    大公司都有哪些开源项目~~~简化版
    08.LoT.UI 前后台通用框架分解系列之——多样的Tag选择器
    07.LoT.UI 前后台通用框架分解系列之——强大的文本编辑器
    BIOS中未启用虚拟化支持系列~~例如:因此无法安装Hyper-V
    【开源】1句代码搞定图片批量上传,无需什么代码功底【无语言界限】
    06.LoT.UI 前后台通用框架分解系列之——浮夸的图片上传
  • 原文地址:https://www.cnblogs.com/rainy-shurun/p/5359201.html
Copyright © 2011-2022 走看看