对于Thrift服务化的改造,主要是客户端,可以从如下几个方面进行:
1.服务端的服务注册,客户端自动发现,无需手工修改配置,这里我们使用zookeeper,但由于zookeeper本身提供的客户端使用较为复杂,因此采用curator-recipes工具类进行处理服务的注册与发现。
2.客户端使用连接池对服务调用进行管理,提升性能,这里我们使用Apache Commons项目commons-pool,可以大大减少代码的复杂度。
3.关于Failover/LoadBalance,由于zookeeper的watcher,当服务端不可用是及时通知客户端,并移除不可用的服务节点,而LoadBalance有很多算法,这里我们采用随机加权方式,也是常有的负载算法,至于其他的算法介绍参考:常见的负载均衡的基本算法。
4.使thrift服务的注册和发现可以基于spring配置,可以提供很多的便利。
5.其他的改造如:
1)通过动态代理实现client和server端的交互细节透明化,让用户只需通过服务方提供的接口进行访问
2)Thrift通过两种方式调用服务Client和Iface
- // *) Client API 调用
- (EchoService.Client)client.echo("hello lilei"); ---(1)
- // *) Service 接口 调用
- (EchoService.Iface)service.echo("hello lilei"); ---(2)
// *) Client API 调用 (EchoService.Client)client.echo("hello lilei"); ---(1) // *) Service 接口 调用 (EchoService.Iface)service.echo("hello lilei"); ---(2)
Client API的方式, 不推荐, 我们推荐Service接口的方式(服务化)。
下面我们来一一实现:
一、pom.xml引入依赖jar包
- <dependency>
- <groupId>org.apache.thrift</groupId>
- <artifactId>libthrift</artifactId>
- <version>0.9.2</version>
- </dependency>
- <dependency>
- <groupId>commons-pool</groupId>
- <artifactId>commons-pool</artifactId>
- <version>1.6</version>
- </dependency>
- <dependency>
- <groupId>org.springframework</groupId>
- <artifactId>spring-context</artifactId>
- <version>4.0.9.RELEASE</version>
- </dependency>
- <dependency>
- <groupId>org.apache.zookeeper</groupId>
- <artifactId>zookeeper</artifactId>
- <version>3.4.6</version>
- </dependency>
- <dependency>
- <groupId>org.apache.curator</groupId>
- <artifactId>curator-recipes</artifactId>
- <version>2.7.1</version>
- </dependency>
<dependency> <groupId>org.apache.thrift</groupId> <artifactId>libthrift</artifactId> <version>0.9.2</version> </dependency> <dependency> <groupId>commons-pool</groupId> <artifactId>commons-pool</artifactId> <version>1.6</version> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-context</artifactId> <version>4.0.9.RELEASE</version> </dependency> <dependency> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> <version>3.4.6</version> </dependency> <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-recipes</artifactId> <version>2.7.1</version> </dependency>
二、使用zookeeper管理服务节点配置
RPC服务往平台化的方向发展, 会屏蔽掉更多的服务细节(服务的IP地址集群, 集群的扩容和迁移), 只暴露服务接口. 这部分的演化, 使得server端和client端完全的解耦合. 两者的交互通过ConfigServer(MetaServer)的中介角色来搭线。
注: 该图源自dubbo的官网
这边借助Zookeeper来扮演该角色, server扮演发布者的角色, 而client扮演订阅者的角色.
Zookeeper是分布式应用协作服务. 它实现了paxos的一致性算法, 在命名管理/配置推送/数据同步/主从切换方面扮演重要的角色。 其数据组织类似文件系统的目录结构:
每个节点被称为znode, 为znode节点依据其特性, 又可以分为如下类型:
1). PERSISTENT: 永久节点
2). EPHEMERAL: 临时节点, 会随session(client disconnect)的消失而消失
3). PERSISTENT_SEQUENTIAL: 永久节点, 其节点的名字编号是单调递增的
4). EPHEMERAL_SEQUENTIAL: 临时节点, 其节点的名字编号是单调递增的
注: 临时节点不能成为父节点
Watcher观察模式, client可以注册对节点的状态/内容变更的事件回调机制. 其Event事件的两类属性需要关注下:
1). KeeperState: Disconnected,SyncConnected,Expired
2). EventType: None,NodeCreated,NodeDeleted,NodeDataChanged,NodeChildrenChanged
RPC服务端:
作为具体业务服务的RPC服务发布方, 对其自身的服务描述由以下元素构成.
1). namespace: 命名空间,来区分不同应用
2). service: 服务接口, 采用发布方的类全名来表示
3). version: 版本号
借鉴了Maven的GAV坐标系, 三维坐标系更符合服务平台化的大环境.
*) 数据模型的设计
具体RPC服务的注册路径为: /rpc/{namespace}/{service}/{version}, 该路径上的节点都是永久节点
RPC服务集群节点的注册路径为: /rpc/{namespace}/{service}/{version}/{ip:port:weight}, 末尾的节点是临时节点.
1.定义Zookeeper的客户端的管理
ZookeeperFactory.java
- package cn.slimsmart.thrift.rpc.zookeeper;
- import org.apache.curator.framework.CuratorFramework;
- import org.apache.curator.framework.CuratorFrameworkFactory;
- import org.apache.curator.retry.ExponentialBackoffRetry;
- import org.springframework.beans.factory.FactoryBean;
- import org.springframework.util.StringUtils;
- /**
- * 获取zookeeper客户端链接
- */
- public class ZookeeperFactory implements FactoryBean<CuratorFramework> {
- private String zkHosts;
- // session超时
- private int sessionTimeout = 30000;
- private int connectionTimeout = 30000;
- // 共享一个zk链接
- private boolean singleton = true;
- // 全局path前缀,常用来区分不同的应用
- private String namespace;
- private final static String ROOT = "rpc";
- private CuratorFramework zkClient;
- public void setZkHosts(String zkHosts) {
- this.zkHosts = zkHosts;
- }
- public void setSessionTimeout(int sessionTimeout) {
- this.sessionTimeout = sessionTimeout;
- }
- public void setConnectionTimeout(int connectionTimeout) {
- this.connectionTimeout = connectionTimeout;
- }
- public void setSingleton(boolean singleton) {
- this.singleton = singleton;
- }
- public void setNamespace(String namespace) {
- this.namespace = namespace;
- }
- public void setZkClient(CuratorFramework zkClient) {
- this.zkClient = zkClient;
- }
- @Override
- public CuratorFramework getObject() throws Exception {
- if (singleton) {
- if (zkClient == null) {
- zkClient = create();
- zkClient.start();
- }
- return zkClient;
- }
- return create();
- }
- @Override
- public Class<?> getObjectType() {
- return CuratorFramework.class;
- }
- @Override
- public boolean isSingleton() {
- return singleton;
- }
- public CuratorFramework create() throws Exception {
- if (StringUtils.isEmpty(namespace)) {
- namespace = ROOT;
- } else {
- namespace = ROOT +"/"+ namespace;
- }
- return create(zkHosts, sessionTimeout, connectionTimeout, namespace);
- }
- public static CuratorFramework create(String connectString, int sessionTimeout, int connectionTimeout, String namespace) {
- CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder();
- return builder.connectString(connectString).sessionTimeoutMs(sessionTimeout).connectionTimeoutMs(30000)
- .canBeReadOnly(true).namespace(namespace).retryPolicy(new ExponentialBackoffRetry(1000, Integer.MAX_VALUE))
- .defaultData(null).build();
- }
- public void close() {
- if (zkClient != null) {
- zkClient.close();
- }
- }
- }
package cn.slimsmart.thrift.rpc.zookeeper; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.retry.ExponentialBackoffRetry; import org.springframework.beans.factory.FactoryBean; import org.springframework.util.StringUtils; /** * 获取zookeeper客户端链接 */ public class ZookeeperFactory implements FactoryBean<CuratorFramework> { private String zkHosts; // session超时 private int sessionTimeout = 30000; private int connectionTimeout = 30000; // 共享一个zk链接 private boolean singleton = true; // 全局path前缀,常用来区分不同的应用 private String namespace; private final static String ROOT = "rpc"; private CuratorFramework zkClient; public void setZkHosts(String zkHosts) { this.zkHosts = zkHosts; } public void setSessionTimeout(int sessionTimeout) { this.sessionTimeout = sessionTimeout; } public void setConnectionTimeout(int connectionTimeout) { this.connectionTimeout = connectionTimeout; } public void setSingleton(boolean singleton) { this.singleton = singleton; } public void setNamespace(String namespace) { this.namespace = namespace; } public void setZkClient(CuratorFramework zkClient) { this.zkClient = zkClient; } @Override public CuratorFramework getObject() throws Exception { if (singleton) { if (zkClient == null) { zkClient = create(); zkClient.start(); } return zkClient; } return create(); } @Override public Class<?> getObjectType() { return CuratorFramework.class; } @Override public boolean isSingleton() { return singleton; } public CuratorFramework create() throws Exception { if (StringUtils.isEmpty(namespace)) { namespace = ROOT; } else { namespace = ROOT +"/"+ namespace; } return create(zkHosts, sessionTimeout, connectionTimeout, namespace); } public static CuratorFramework create(String connectString, int sessionTimeout, int connectionTimeout, String namespace) { CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder(); return builder.connectString(connectString).sessionTimeoutMs(sessionTimeout).connectionTimeoutMs(30000) .canBeReadOnly(true).namespace(namespace).retryPolicy(new ExponentialBackoffRetry(1000, Integer.MAX_VALUE)) .defaultData(null).build(); } public void close() { if (zkClient != null) { zkClient.close(); } } }
2.服务端注册服务
由于服务端配置需要获取本机的IP地址,因此定义IP获取接口
ThriftServerIpResolve.java
- package cn.slimsmart.thrift.rpc.zookeeper;
- /**
- *
- * 解析thrift-server端IP地址,用于注册服务
- * 1) 可以从一个物理机器或者虚机的特殊文件中解析
- * 2) 可以获取指定网卡序号的Ip
- * 3) 其他
- */
- public interface ThriftServerIpResolve {
- String getServerIp() throws Exception;
- void reset();
- //当IP变更时,将会调用reset方法
- static interface IpRestCalllBack{
- public void rest(String newIp);
- }
- }
package cn.slimsmart.thrift.rpc.zookeeper; /** * * 解析thrift-server端IP地址,用于注册服务 * 1) 可以从一个物理机器或者虚机的特殊文件中解析 * 2) 可以获取指定网卡序号的Ip * 3) 其他 */ public interface ThriftServerIpResolve { String getServerIp() throws Exception; void reset(); //当IP变更时,将会调用reset方法 static interface IpRestCalllBack{ public void rest(String newIp); } }
可以对该接口做不通的实现,下面我们基于网卡获取IP地址,也可以通过配置serverIp
ThriftServerIpLocalNetworkResolve.java
- package cn.slimsmart.thrift.rpc.zookeeper;
- import java.net.Inet6Address;
- import java.net.InetAddress;
- import java.net.NetworkInterface;
- import java.net.SocketException;
- import java.util.Enumeration;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- /**
- * 解析网卡Ip
- *
- */
- public class ThriftServerIpLocalNetworkResolve implements ThriftServerIpResolve {
- private Logger logger = LoggerFactory.getLogger(getClass());
- //缓存
- private String serverIp;
- public void setServerIp(String serverIp) {
- this.serverIp = serverIp;
- }
- @Override
- public String getServerIp() {
- if (serverIp != null) {
- return serverIp;
- }
- // 一个主机有多个网络接口
- try {
- Enumeration<NetworkInterface> netInterfaces = NetworkInterface.getNetworkInterfaces();
- while (netInterfaces.hasMoreElements()) {
- NetworkInterface netInterface = netInterfaces.nextElement();
- // 每个网络接口,都会有多个"网络地址",比如一定会有lookback地址,会有siteLocal地址等.以及IPV4或者IPV6 .
- Enumeration<InetAddress> addresses = netInterface.getInetAddresses();
- while (addresses.hasMoreElements()) {
- InetAddress address = addresses.nextElement();
- if(address instanceof Inet6Address){
- continue;
- }
- if (address.isSiteLocalAddress() && !address.isLoopbackAddress()) {
- serverIp = address.getHostAddress();
- logger.info("resolve server ip :"+ serverIp);
- continue;
- }
- }
- }
- } catch (SocketException e) {
- e.printStackTrace();
- }
- return serverIp;
- }
- @Override
- public void reset() {
- serverIp = null;
- }
- }
package cn.slimsmart.thrift.rpc.zookeeper; import java.net.Inet6Address; import java.net.InetAddress; import java.net.NetworkInterface; import java.net.SocketException; import java.util.Enumeration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * 解析网卡Ip * */ public class ThriftServerIpLocalNetworkResolve implements ThriftServerIpResolve { private Logger logger = LoggerFactory.getLogger(getClass()); //缓存 private String serverIp; public void setServerIp(String serverIp) { this.serverIp = serverIp; } @Override public String getServerIp() { if (serverIp != null) { return serverIp; } // 一个主机有多个网络接口 try { Enumeration<NetworkInterface> netInterfaces = NetworkInterface.getNetworkInterfaces(); while (netInterfaces.hasMoreElements()) { NetworkInterface netInterface = netInterfaces.nextElement(); // 每个网络接口,都会有多个"网络地址",比如一定会有lookback地址,会有siteLocal地址等.以及IPV4或者IPV6 . Enumeration<InetAddress> addresses = netInterface.getInetAddresses(); while (addresses.hasMoreElements()) { InetAddress address = addresses.nextElement(); if(address instanceof Inet6Address){ continue; } if (address.isSiteLocalAddress() && !address.isLoopbackAddress()) { serverIp = address.getHostAddress(); logger.info("resolve server ip :"+ serverIp); continue; } } } } catch (SocketException e) { e.printStackTrace(); } return serverIp; } @Override public void reset() { serverIp = null; } }
接下来我们定义发布服务接口,并实现将服务信息(服务接口、版本号,IP、port、weight)发布到zookeeper中。
ThriftServerAddressRegister.java
- package cn.slimsmart.thrift.rpc.zookeeper;
- /**
- * 发布服务地址及端口到服务注册中心,这里是zookeeper服务器
- */
- public interface ThriftServerAddressRegister {
- /**
- * 发布服务接口
- * @param service 服务接口名称,一个产品中不能重复
- * @param version 服务接口的版本号,默认1.0.0
- * @param address 服务发布的地址和端口
- */
- void register(String service,String version,String address);
- }
package cn.slimsmart.thrift.rpc.zookeeper; /** * 发布服务地址及端口到服务注册中心,这里是zookeeper服务器 */ public interface ThriftServerAddressRegister { /** * 发布服务接口 * @param service 服务接口名称,一个产品中不能重复 * @param version 服务接口的版本号,默认1.0.0 * @param address 服务发布的地址和端口 */ void register(String service,String version,String address); }
实现:ThriftServerAddressRegisterZookeeper.java
- package cn.slimsmart.thrift.rpc.zookeeper;
- import java.io.UnsupportedEncodingException;
- import org.apache.curator.framework.CuratorFramework;
- import org.apache.curator.framework.imps.CuratorFrameworkState;
- import org.apache.zookeeper.CreateMode;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- import org.springframework.util.StringUtils;
- import cn.slimsmart.thrift.rpc.ThriftException;
- /**
- * 注册服务列表到Zookeeper
- */
- public class ThriftServerAddressRegisterZookeeper implements ThriftServerAddressRegister{
- private Logger logger = LoggerFactory.getLogger(getClass());
- private CuratorFramework zkClient;
- public ThriftServerAddressRegisterZookeeper(){}
- public ThriftServerAddressRegisterZookeeper(CuratorFramework zkClient){
- this.zkClient = zkClient;
- }
- public void setZkClient(CuratorFramework zkClient) {
- this.zkClient = zkClient;
- }
- @Override
- public void register(String service, String version, String address) {
- if(zkClient.getState() == CuratorFrameworkState.LATENT){
- zkClient.start();
- }
- if(StringUtils.isEmpty(version)){
- version="1.0.0";
- }
- //临时节点
- try {
- zkClient.create()
- .creatingParentsIfNeeded()
- .withMode(CreateMode.EPHEMERAL)
- .forPath("/"+service+"/"+version+"/"+address);
- } catch (UnsupportedEncodingException e) {
- logger.error("register service address to zookeeper exception:{}",e);
- throw new ThriftException("register service address to zookeeper exception: address UnsupportedEncodingException", e);
- } catch (Exception e) {
- logger.error("register service address to zookeeper exception:{}",e);
- throw new ThriftException("register service address to zookeeper exception:{}", e);
- }
- }
- public void close(){
- zkClient.close();
- }
- }
package cn.slimsmart.thrift.rpc.zookeeper; import java.io.UnsupportedEncodingException; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.imps.CuratorFrameworkState; import org.apache.zookeeper.CreateMode; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.util.StringUtils; import cn.slimsmart.thrift.rpc.ThriftException; /** * 注册服务列表到Zookeeper */ public class ThriftServerAddressRegisterZookeeper implements ThriftServerAddressRegister{ private Logger logger = LoggerFactory.getLogger(getClass()); private CuratorFramework zkClient; public ThriftServerAddressRegisterZookeeper(){} public ThriftServerAddressRegisterZookeeper(CuratorFramework zkClient){ this.zkClient = zkClient; } public void setZkClient(CuratorFramework zkClient) { this.zkClient = zkClient; } @Override public void register(String service, String version, String address) { if(zkClient.getState() == CuratorFrameworkState.LATENT){ zkClient.start(); } if(StringUtils.isEmpty(version)){ version="1.0.0"; } //临时节点 try { zkClient.create() .creatingParentsIfNeeded() .withMode(CreateMode.EPHEMERAL) .forPath("/"+service+"/"+version+"/"+address); } catch (UnsupportedEncodingException e) { logger.error("register service address to zookeeper exception:{}",e); throw new ThriftException("register service address to zookeeper exception: address UnsupportedEncodingException", e); } catch (Exception e) { logger.error("register service address to zookeeper exception:{}",e); throw new ThriftException("register service address to zookeeper exception:{}", e); } } public void close(){ zkClient.close(); } }
3.客户端发现服务
定义获取服务地址接口
ThriftServerAddressProvider.java
- package cn.slimsmart.thrift.rpc.zookeeper;
- import java.net.InetSocketAddress;
- import java.util.List;
- /**
- * thrift server-service地址提供者,以便构建客户端连接池
- */
- public interface ThriftServerAddressProvider {
- //获取服务名称
- String getService();
- /**
- * 获取所有服务端地址
- * @return
- */
- List<InetSocketAddress> findServerAddressList();
- /**
- * 选取一个合适的address,可以随机获取等'
- * 内部可以使用合适的算法.
- * @return
- */
- InetSocketAddress selector();
- void close();
- }
package cn.slimsmart.thrift.rpc.zookeeper; import java.net.InetSocketAddress; import java.util.List; /** * thrift server-service地址提供者,以便构建客户端连接池 */ public interface ThriftServerAddressProvider { //获取服务名称 String getService(); /** * 获取所有服务端地址 * @return */ List<InetSocketAddress> findServerAddressList(); /** * 选取一个合适的address,可以随机获取等' * 内部可以使用合适的算法. * @return */ InetSocketAddress selector(); void close(); }
基于zookeeper服务地址自动发现实现:ThriftServerAddressProviderZookeeper.java
- package cn.slimsmart.thrift.rpc.zookeeper;
- import java.net.InetSocketAddress;
- import java.util.ArrayList;
- import java.util.Collections;
- import java.util.HashSet;
- import java.util.LinkedList;
- import java.util.List;
- import java.util.Queue;
- import java.util.Set;
- import org.apache.curator.framework.CuratorFramework;
- import org.apache.curator.framework.imps.CuratorFrameworkState;
- import org.apache.curator.framework.recipes.cache.ChildData;
- import org.apache.curator.framework.recipes.cache.PathChildrenCache;
- import org.apache.curator.framework.recipes.cache.PathChildrenCache.StartMode;
- import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;
- import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;
- import org.slf4j.Logger;
- import org.slf4j.LoggerFactory;
- import org.springframework.beans.factory.InitializingBean;
- /**
- * 使用zookeeper作为"config"中心,使用apache-curator方法库来简化zookeeper开发
- */
- public class ThriftServerAddressProviderZookeeper implements ThriftServerAddressProvider, InitializingBean {
- private Logger logger = LoggerFactory.getLogger(getClass());
- // 注册服务
- private String service;
- // 服务版本号
- private String version = "1.0.0";
- private PathChildrenCache cachedPath;
- private CuratorFramework zkClient;
- // 用来保存当前provider所接触过的地址记录
- // 当zookeeper集群故障时,可以使用trace中地址,作为"备份"
- private Set<String> trace = new HashSet<String>();
- private final List<InetSocketAddress> container = new ArrayList<InetSocketAddress>();
- private Queue<InetSocketAddress> inner = new LinkedList<InetSocketAddress>();
- private Object lock = new Object();
- // 默认权重
- private static final Integer DEFAULT_WEIGHT = 1;
- public void setService(String service) {
- this.service = service;
- }
- public void setVersion(String version) {
- this.version = version;
- }
- public ThriftServerAddressProviderZookeeper() {
- }
- public ThriftServerAddressProviderZookeeper(CuratorFramework zkClient) {
- this.zkClient = zkClient;
- }
- public void setZkClient(CuratorFramework zkClient) {
- this.zkClient = zkClient;
- }
- @Override
- public void afterPropertiesSet() throws Exception {
- // 如果zk尚未启动,则启动
- if (zkClient.getState() == CuratorFrameworkState.LATENT) {
- zkClient.start();
- }
- buildPathChildrenCache(zkClient, getServicePath(), true);
- cachedPath.start(StartMode.POST_INITIALIZED_EVENT);
- }
- private String getServicePath(){
- return "/" + service + "/" + version;
- }
- private void buildPathChildrenCache(final CuratorFramework client, String path, Boolean cacheData) throws Exception {
- cachedPath = new PathChildrenCache(client, path, cacheData);
- cachedPath.getListenable().addListener(new PathChildrenCacheListener() {
- @Override
- public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {
- PathChildrenCacheEvent.Type eventType = event.getType();
- switch (eventType) {
- case CONNECTION_RECONNECTED:
- logger.info("Connection is reconection.");
- break;
- case CONNECTION_SUSPENDED:
- logger.info("Connection is suspended.");
- break;
- case CONNECTION_LOST:
- logger.warn("Connection error,waiting...");
- return;
- default:
- //
- }
- // 任何节点的时机数据变动,都会rebuild,此处为一个"简单的"做法.
- cachedPath.rebuild();
- rebuild();
- }
- protected void rebuild() throws Exception {
- List<ChildData> children = cachedPath.getCurrentData();
- if (children == null || children.isEmpty()) {
- // 有可能所有的thrift server都与zookeeper断开了链接
- // 但是,有可能,thrift client与thrift server之间的网络是良好的
- // 因此此处是否需要清空container,是需要多方面考虑的.
- container.clear();
- logger.error("thrift server-cluster error....");
- return;
- }
- List<InetSocketAddress> current = new ArrayList<InetSocketAddress>();
- String path = null;
- for (ChildData data : children) {
- path = data.getPath();
- logger.debug("get path:"+path);
- path = path.substring(getServicePath().length()+1);
- logger.debug("get serviceAddress:"+path);
- String address = new String(path.getBytes(), "utf-8");
- current.addAll(transfer(address));
- trace.add(address);
- }
- Collections.shuffle(current);
- synchronized (lock) {
- container.clear();
- container.addAll(current);
- inner.clear();
- inner.addAll(current);
- }
- }
- });
- }
- private List<InetSocketAddress> transfer(String address) {
- String[] hostname = address.split(":");
- Integer weight = DEFAULT_WEIGHT;
- if (hostname.length == 3) {
- weight = Integer.valueOf(hostname[2]);
- }
- String ip = hostname[0];
- Integer port = Integer.valueOf(hostname[1]);
- List<InetSocketAddress> result = new ArrayList<InetSocketAddress>();
- // 根据优先级,将ip:port添加多次到地址集中,然后随机取地址实现负载
- for (int i = 0; i < weight; i++) {
- result.add(new InetSocketAddress(ip, port));
- }
- return result;
- }
- @Override
- public List<InetSocketAddress> findServerAddressList() {
- return Collections.unmodifiableList(container);
- }
- @Override
- public synchronized InetSocketAddress selector() {
- if (inner.isEmpty()) {
- if (!container.isEmpty()) {
- inner.addAll(container);
- } else if (!trace.isEmpty()) {
- synchronized (lock) {
- for (String hostname : trace) {
- container.addAll(transfer(hostname));
- }
- Collections.shuffle(container);
- inner.addAll(container);
- }
- }
- }
- return inner.poll();
- }
- @Override
- public void close() {
- try {
- cachedPath.close();
- zkClient.close();
- } catch (Exception e) {
- }
- }
- @Override
- public String getService() {
- return service;
- }
- }
package cn.slimsmart.thrift.rpc.zookeeper; import java.net.InetSocketAddress; import java.util.ArrayList; import java.util.Collections; import java.util.HashSet; import java.util.LinkedList; import java.util.List; import java.util.Queue; import java.util.Set; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.imps.CuratorFrameworkState; import org.apache.curator.framework.recipes.cache.ChildData; import org.apache.curator.framework.recipes.cache.PathChildrenCache; import org.apache.curator.framework.recipes.cache.PathChildrenCache.StartMode; import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent; import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.InitializingBean; /** * 使用zookeeper作为"config"中心,使用apache-curator方法库来简化zookeeper开发 */ public class ThriftServerAddressProviderZookeeper implements ThriftServerAddressProvider, InitializingBean { private Logger logger = LoggerFactory.getLogger(getClass()); // 注册服务 private String service; // 服务版本号 private String version = "1.0.0"; private PathChildrenCache cachedPath; private CuratorFramework zkClient; // 用来保存当前provider所接触过的地址记录 // 当zookeeper集群故障时,可以使用trace中地址,作为"备份" private Set<String> trace = new HashSet<String>(); private final List<InetSocketAddress> container = new ArrayList<InetSocketAddress>(); private Queue<InetSocketAddress> inner = new LinkedList<InetSocketAddress>(); private Object lock = new Object(); // 默认权重 private static final Integer DEFAULT_WEIGHT = 1; public void setService(String service) { this.service = service; } public void setVersion(String version) { this.version = version; } public ThriftServerAddressProviderZookeeper() { } public ThriftServerAddressProviderZookeeper(CuratorFramework zkClient) { this.zkClient = zkClient; } public void setZkClient(CuratorFramework zkClient) { this.zkClient = zkClient; } @Override public void afterPropertiesSet() throws Exception { // 如果zk尚未启动,则启动 if (zkClient.getState() == CuratorFrameworkState.LATENT) { zkClient.start(); } buildPathChildrenCache(zkClient, getServicePath(), true); cachedPath.start(StartMode.POST_INITIALIZED_EVENT); } private String getServicePath(){ return "/" + service + "/" + version; } private void buildPathChildrenCache(final CuratorFramework client, String path, Boolean cacheData) throws Exception { cachedPath = new PathChildrenCache(client, path, cacheData); cachedPath.getListenable().addListener(new PathChildrenCacheListener() { @Override public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception { PathChildrenCacheEvent.Type eventType = event.getType(); switch (eventType) { case CONNECTION_RECONNECTED: logger.info("Connection is reconection."); break; case CONNECTION_SUSPENDED: logger.info("Connection is suspended."); break; case CONNECTION_LOST: logger.warn("Connection error,waiting..."); return; default: // } // 任何节点的时机数据变动,都会rebuild,此处为一个"简单的"做法. cachedPath.rebuild(); rebuild(); } protected void rebuild() throws Exception { List<ChildData> children = cachedPath.getCurrentData(); if (children == null || children.isEmpty()) { // 有可能所有的thrift server都与zookeeper断开了链接 // 但是,有可能,thrift client与thrift server之间的网络是良好的 // 因此此处是否需要清空container,是需要多方面考虑的. container.clear(); logger.error("thrift server-cluster error...."); return; } List<InetSocketAddress> current = new ArrayList<InetSocketAddress>(); String path = null; for (ChildData data : children) { path = data.getPath(); logger.debug("get path:"+path); path = path.substring(getServicePath().length()+1); logger.debug("get serviceAddress:"+path); String address = new String(path.getBytes(), "utf-8"); current.addAll(transfer(address)); trace.add(address); } Collections.shuffle(current); synchronized (lock) { container.clear(); container.addAll(current); inner.clear(); inner.addAll(current); } } }); } private List<InetSocketAddress> transfer(String address) { String[] hostname = address.split(":"); Integer weight = DEFAULT_WEIGHT; if (hostname.length == 3) { weight = Integer.valueOf(hostname[2]); } String ip = hostname[0]; Integer port = Integer.valueOf(hostname[1]); List<InetSocketAddress> result = new ArrayList<InetSocketAddress>(); // 根据优先级,将ip:port添加多次到地址集中,然后随机取地址实现负载 for (int i = 0; i < weight; i++) { result.add(new InetSocketAddress(ip, port)); } return result; } @Override public List<InetSocketAddress> findServerAddressList() { return Collections.unmodifiableList(container); } @Override public synchronized InetSocketAddress selector() { if (inner.isEmpty()) { if (!container.isEmpty()) { inner.addAll(container); } else if (!trace.isEmpty()) { synchronized (lock) { for (String hostname : trace) { container.addAll(transfer(hostname)); } Collections.shuffle(container); inner.addAll(container); } } } return inner.poll(); } @Override public void close() { try { cachedPath.close(); zkClient.close(); } catch (Exception e) { } } @Override public String getService() { return service; } }
对此接口还做了一种实现,通过配置获取服务地址,参考附件:FixedAddressProvider.java
三、服务端服务注册实现
ThriftServiceServerFactory.java
- package cn.slimsmart.thrift.rpc;
- import java.lang.instrument.IllegalClassFormatException;
- import java.lang.reflect.Constructor;
- import org.apache.thrift.TProcessor;
- import org.apache.thrift.TProcessorFactory;
- import org.apache.thrift.protocol.TBinaryProtocol;
- import org.apache.thrift.server.TServer;
- import org.apache.thrift.server.TThreadedSelectorServer;
- import org.apache.thrift.transport.TFramedTransport;
- import org.apache.thrift.transport.TNonblockingServerSocket;
- import org.springframework.beans.factory.InitializingBean;
- import org.springframework.util.StringUtils;
- import cn.slimsmart.thrift.rpc.zookeeper.ThriftServerAddressRegister;
- import cn.slimsmart.thrift.rpc.zookeeper.ThriftServerIpLocalNetworkResolve;
- import cn.slimsmart.thrift.rpc.zookeeper.ThriftServerIpResolve;
- /**
- * 服务端注册服务工厂
- */
- public class ThriftServiceServerFactory implements InitializingBean {
- // 服务注册本机端口
- private Integer port = 8299;
- // 优先级
- private Integer weight = 1;// default
- // 服务实现类
- private Object service;// serice实现类
- //服务版本号
- private String version;
- // 解析本机IP
- private ThriftServerIpResolve thriftServerIpResolve;
- //服务注册
- private ThriftServerAddressRegister thriftServerAddressRegister;
- private ServerThread serverThread;
- public void setPort(Integer port) {
- this.port = port;
- }
- public void setWeight(Integer weight) {
- this.weight = weight;
- }
- public void setService(Object service) {
- this.service = service;
- }
- public void setVersion(String version) {
- this.version = version;
- }
- public void setThriftServerIpResolve(ThriftServerIpResolve thriftServerIpResolve) {
- this.thriftServerIpResolve = thriftServerIpResolve;
- }
- public void setThriftServerAddressRegister(ThriftServerAddressRegister thriftServerAddressRegister) {
- this.thriftServerAddressRegister = thriftServerAddressRegister;
- }
- @Override
- public void afterPropertiesSet() throws Exception {
- if (thriftServerIpResolve == null) {
- thriftServerIpResolve = new ThriftServerIpLocalNetworkResolve();
- }
- String serverIP = thriftServerIpResolve.getServerIp();
- if (StringUtils.isEmpty(serverIP)) {
- throw new ThriftException("cant find server ip...");
- }
- String hostname = serverIP + ":" + port + ":" + weight;
- Class<?> serviceClass = service.getClass();
- // 获取实现类接口
- Class<?>[] interfaces = serviceClass.getInterfaces();
- if (interfaces.length == 0) {
- throw new IllegalClassFormatException("service-class should implements Iface");
- }
- // reflect,load "Processor";
- TProcessor processor = null;
- String serviceName = null;
- for (Class<?> clazz : interfaces) {
- String cname = clazz.getSimpleName();
- if (!cname.equals("Iface")) {
- continue;
- }
- serviceName = clazz.getEnclosingClass().getName();
- String pname = serviceName + "$Processor";
- try {
- ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
- Class<?> pclass = classLoader.loadClass(pname);
- if (!TProcessor.class.isAssignableFrom(pclass)) {
- continue;
- }
- Constructor<?> constructor = pclass.getConstructor(clazz);
- processor = (TProcessor) constructor.newInstance(service);
- break;
- } catch (Exception e) {
- //
- }
- }
- if (processor == null) {
- throw new IllegalClassFormatException("service-class should implements Iface");
- }
- //需要单独的线程,因为serve方法是阻塞的.
- serverThread = new ServerThread(processor, port);
- serverThread.start();
- // 注册服务
- if (thriftServerAddressRegister != null) {
- thriftServerAddressRegister.register(serviceName, version, hostname);
- }
- }
- class ServerThread extends Thread {
- private TServer server;
- ServerThread(TProcessor processor, int port) throws Exception {
- TNonblockingServerSocket serverTransport = new TNonblockingServerSocket(port);
- TThreadedSelectorServer.Args tArgs = new TThreadedSelectorServer.Args(serverTransport);
- TProcessorFactory processorFactory = new TProcessorFactory(processor);
- tArgs.processorFactory(processorFactory);
- tArgs.transportFactory(new TFramedTransport.Factory());
- tArgs.protocolFactory( new TBinaryProtocol.Factory(true, true));
- server = new TThreadedSelectorServer(tArgs);
- }
- @Override
- public void run(){
- try{
- //启动服务
- server.serve();
- }catch(Exception e){
- //
- }
- }
- public void stopServer(){
- server.stop();
- }
- }
- public void close() {
- serverThread.stopServer();
- }
- }
package cn.slimsmart.thrift.rpc; import java.lang.instrument.IllegalClassFormatException; import java.lang.reflect.Constructor; import org.apache.thrift.TProcessor; import org.apache.thrift.TProcessorFactory; import org.apache.thrift.protocol.TBinaryProtocol; import org.apache.thrift.server.TServer; import org.apache.thrift.server.TThreadedSelectorServer; import org.apache.thrift.transport.TFramedTransport; import org.apache.thrift.transport.TNonblockingServerSocket; import org.springframework.beans.factory.InitializingBean; import org.springframework.util.StringUtils; import cn.slimsmart.thrift.rpc.zookeeper.ThriftServerAddressRegister; import cn.slimsmart.thrift.rpc.zookeeper.ThriftServerIpLocalNetworkResolve; import cn.slimsmart.thrift.rpc.zookeeper.ThriftServerIpResolve; /** * 服务端注册服务工厂 */ public class ThriftServiceServerFactory implements InitializingBean { // 服务注册本机端口 private Integer port = 8299; // 优先级 private Integer weight = 1;// default // 服务实现类 private Object service;// serice实现类 //服务版本号 private String version; // 解析本机IP private ThriftServerIpResolve thriftServerIpResolve; //服务注册 private ThriftServerAddressRegister thriftServerAddressRegister; private ServerThread serverThread; public void setPort(Integer port) { this.port = port; } public void setWeight(Integer weight) { this.weight = weight; } public void setService(Object service) { this.service = service; } public void setVersion(String version) { this.version = version; } public void setThriftServerIpResolve(ThriftServerIpResolve thriftServerIpResolve) { this.thriftServerIpResolve = thriftServerIpResolve; } public void setThriftServerAddressRegister(ThriftServerAddressRegister thriftServerAddressRegister) { this.thriftServerAddressRegister = thriftServerAddressRegister; } @Override public void afterPropertiesSet() throws Exception { if (thriftServerIpResolve == null) { thriftServerIpResolve = new ThriftServerIpLocalNetworkResolve(); } String serverIP = thriftServerIpResolve.getServerIp(); if (StringUtils.isEmpty(serverIP)) { throw new ThriftException("cant find server ip..."); } String hostname = serverIP + ":" + port + ":" + weight; Class<?> serviceClass = service.getClass(); // 获取实现类接口 Class<?>[] interfaces = serviceClass.getInterfaces(); if (interfaces.length == 0) { throw new IllegalClassFormatException("service-class should implements Iface"); } // reflect,load "Processor"; TProcessor processor = null; String serviceName = null; for (Class<?> clazz : interfaces) { String cname = clazz.getSimpleName(); if (!cname.equals("Iface")) { continue; } serviceName = clazz.getEnclosingClass().getName(); String pname = serviceName + "$Processor"; try { ClassLoader classLoader = Thread.currentThread().getContextClassLoader(); Class<?> pclass = classLoader.loadClass(pname); if (!TProcessor.class.isAssignableFrom(pclass)) { continue; } Constructor<?> constructor = pclass.getConstructor(clazz); processor = (TProcessor) constructor.newInstance(service); break; } catch (Exception e) { // } } if (processor == null) { throw new IllegalClassFormatException("service-class should implements Iface"); } //需要单独的线程,因为serve方法是阻塞的. serverThread = new ServerThread(processor, port); serverThread.start(); // 注册服务 if (thriftServerAddressRegister != null) { thriftServerAddressRegister.register(serviceName, version, hostname); } } class ServerThread extends Thread { private TServer server; ServerThread(TProcessor processor, int port) throws Exception { TNonblockingServerSocket serverTransport = new TNonblockingServerSocket(port); TThreadedSelectorServer.Args tArgs = new TThreadedSelectorServer.Args(serverTransport); TProcessorFactory processorFactory = new TProcessorFactory(processor); tArgs.processorFactory(processorFactory); tArgs.transportFactory(new TFramedTransport.Factory()); tArgs.protocolFactory( new TBinaryProtocol.Factory(true, true)); server = new TThreadedSelectorServer(tArgs); } @Override public void run(){ try{ //启动服务 server.serve(); }catch(Exception e){ // } } public void stopServer(){ server.stop(); } } public void close() { serverThread.stopServer(); } }
四、客户端获取服务代理及连接池实现
客户端连接池实现:ThriftClientPoolFactory.java
- package cn.slimsmart.thrift.rpc;
- import java.net.InetSocketAddress;
- import org.apache.commons.pool.BasePoolableObjectFactory;
- import org.apache.thrift.TServiceClient;
- import org.apache.thrift.TServiceClientFactory;
- import org.apache.thrift.protocol.TBinaryProtocol;
- import org.apache.thrift.protocol.TProtocol;
- import org.apache.thrift.transport.TFramedTransport;
- import org.apache.thrift.transport.TSocket;
- import org.apache.thrift.transport.TTransport;
- import cn.slimsmart.thrift.rpc.zookeeper.ThriftServerAddressProvider;
- /**
- * 连接池,thrift-client for spring
- */
- public class ThriftClientPoolFactory extends BasePoolableObjectFactory<TServiceClient> {
- private final ThriftServerAddressProvider serverAddressProvider;
- private final TServiceClientFactory<TServiceClient> clientFactory;
- private PoolOperationCallBack callback;
- protected ThriftClientPoolFactory(ThriftServerAddressProvider addressProvider, TServiceClientFactory<TServiceClient> clientFactory) throws Exception {
- this.serverAddressProvider = addressProvider;
- this.clientFactory = clientFactory;
- }
- protected ThriftClientPoolFactory(ThriftServerAddressProvider addressProvider, TServiceClientFactory<TServiceClient> clientFactory,
- PoolOperationCallBack callback) throws Exception {
- this.serverAddressProvider = addressProvider;
- this.clientFactory = clientFactory;
- this.callback = callback;
- }
- static interface PoolOperationCallBack {
- // 销毁client之前执行
- void destroy(TServiceClient client);
- // 创建成功是执行
- void make(TServiceClient client);
- }
- public void destroyObject(TServiceClient client) throws Exception {
- if (callback != null) {
- try {
- callback.destroy(client);
- } catch (Exception e) {
- //
- }
- }
- TTransport pin = client.getInputProtocol().getTransport();
- pin.close();
- }
- public boolean validateObject(TServiceClient client) {
- TTransport pin = client.getInputProtocol().getTransport();
- return pin.isOpen();
- }
- @Override
- public TServiceClient makeObject() throws Exception {
- InetSocketAddress address = serverAddressProvider.selector();
- TSocket tsocket = new TSocket(address.getHostName(), address.getPort());
- TTransport transport = new TFramedTransport(tsocket);
- TProtocol protocol = new TBinaryProtocol(transport);
- TServiceClient client = this.clientFactory.getClient(protocol);
- transport.open();
- if (callback != null) {
- try {
- callback.make(client);
- } catch (Exception e) {
- //
- }
- }
- return client;
- }
- }
package cn.slimsmart.thrift.rpc; import java.net.InetSocketAddress; import org.apache.commons.pool.BasePoolableObjectFactory; import org.apache.thrift.TServiceClient; import org.apache.thrift.TServiceClientFactory; import org.apache.thrift.protocol.TBinaryProtocol; import org.apache.thrift.protocol.TProtocol; import org.apache.thrift.transport.TFramedTransport; import org.apache.thrift.transport.TSocket; import org.apache.thrift.transport.TTransport; import cn.slimsmart.thrift.rpc.zookeeper.ThriftServerAddressProvider; /** * 连接池,thrift-client for spring */ public class ThriftClientPoolFactory extends BasePoolableObjectFactory<TServiceClient> { private final ThriftServerAddressProvider serverAddressProvider; private final TServiceClientFactory<TServiceClient> clientFactory; private PoolOperationCallBack callback; protected ThriftClientPoolFactory(ThriftServerAddressProvider addressProvider, TServiceClientFactory<TServiceClient> clientFactory) throws Exception { this.serverAddressProvider = addressProvider; this.clientFactory = clientFactory; } protected ThriftClientPoolFactory(ThriftServerAddressProvider addressProvider, TServiceClientFactory<TServiceClient> clientFactory, PoolOperationCallBack callback) throws Exception { this.serverAddressProvider = addressProvider; this.clientFactory = clientFactory; this.callback = callback; } static interface PoolOperationCallBack { // 销毁client之前执行 void destroy(TServiceClient client); // 创建成功是执行 void make(TServiceClient client); } public void destroyObject(TServiceClient client) throws Exception { if (callback != null) { try { callback.destroy(client); } catch (Exception e) { // } } TTransport pin = client.getInputProtocol().getTransport(); pin.close(); } public boolean validateObject(TServiceClient client) { TTransport pin = client.getInputProtocol().getTransport(); return pin.isOpen(); } @Override public TServiceClient makeObject() throws Exception { InetSocketAddress address = serverAddressProvider.selector(); TSocket tsocket = new TSocket(address.getHostName(), address.getPort()); TTransport transport = new TFramedTransport(tsocket); TProtocol protocol = new TBinaryProtocol(transport); TServiceClient client = this.clientFactory.getClient(protocol); transport.open(); if (callback != null) { try { callback.make(client); } catch (Exception e) { // } } return client; } }
客户端服务代理工厂实现:ThriftServiceClientProxyFactory.java
- package cn.slimsmart.thrift.rpc;
- import java.lang.reflect.InvocationHandler;
- import java.lang.reflect.Method;
- import java.lang.reflect.Proxy;
- import org.apache.commons.pool.impl.GenericObjectPool;
- import org.apache.thrift.TServiceClient;
- import org.apache.thrift.TServiceClientFactory;
- import org.springframework.beans.factory.FactoryBean;
- import org.springframework.beans.factory.InitializingBean;
- import cn.slimsmart.thrift.rpc.ThriftClientPoolFactory.PoolOperationCallBack;
- import cn.slimsmart.thrift.rpc.zookeeper.ThriftServerAddressProvider;
- /**
- * 客户端代理
- */
- @SuppressWarnings({ "unchecked", "rawtypes" })
- public class ThriftServiceClientProxyFactory implements FactoryBean, InitializingBean {
- private Integer maxActive = 32;// 最大活跃连接数
- // ms,default 3 min,链接空闲时间
- // -1,关闭空闲检测
- private Integer idleTime = 180000;
- private ThriftServerAddressProvider serverAddressProvider;
- private Object proxyClient;
- private Class<?> objectClass;
- private GenericObjectPool<TServiceClient> pool;
- private PoolOperationCallBack callback = new PoolOperationCallBack() {
- @Override
- public void make(TServiceClient client) {
- System.out.println("create");
- }
- @Override
- public void destroy(TServiceClient client) {
- System.out.println("destroy");
- }
- };
- public void setMaxActive(Integer maxActive) {
- this.maxActive = maxActive;
- }
- public void setIdleTime(Integer idleTime) {
- this.idleTime = idleTime;
- }
- public void setServerAddressProvider(ThriftServerAddressProvider serverAddressProvider) {
- this.serverAddressProvider = serverAddressProvider;
- }
- @Override
- public void afterPropertiesSet() throws Exception {
- ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
- // 加载Iface接口
- objectClass = classLoader.loadClass(serverAddressProvider.getService() + "$Iface");
- // 加载Client.Factory类
- Class<TServiceClientFactory<TServiceClient>> fi = (Class<TServiceClientFactory<TServiceClient>>) classLoader.loadClass(serverAddressProvider.getService() + "$Client$Factory");
- TServiceClientFactory<TServiceClient> clientFactory = fi.newInstance();
- ThriftClientPoolFactory clientPool = new ThriftClientPoolFactory(serverAddressProvider, clientFactory, callback);
- GenericObjectPool.Config poolConfig = new GenericObjectPool.Config();
- poolConfig.maxActive = maxActive;
- poolConfig.minIdle = 0;
- poolConfig.minEvictableIdleTimeMillis = idleTime;
- poolConfig.timeBetweenEvictionRunsMillis = idleTime / 2L;
- pool = new GenericObjectPool<TServiceClient>(clientPool, poolConfig);
- proxyClient = Proxy.newProxyInstance(classLoader, new Class[] { objectClass }, new InvocationHandler() {
- @Override
- public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
- //
- TServiceClient client = pool.borrowObject();
- try {
- return method.invoke(client, args);
- } catch (Exception e) {
- throw e;
- } finally {
- pool.returnObject(client);
- }
- }
- });
- }
- @Override
- public Object getObject() throws Exception {
- return proxyClient;
- }
- @Override
- public Class<?> getObjectType() {
- return objectClass;
- }
- @Override
- public boolean isSingleton() {
- return true;
- }
- public void close() {
- if (serverAddressProvider != null) {
- serverAddressProvider.close();
- }
- }
- }
package cn.slimsmart.thrift.rpc; import java.lang.reflect.InvocationHandler; import java.lang.reflect.Method; import java.lang.reflect.Proxy; import org.apache.commons.pool.impl.GenericObjectPool; import org.apache.thrift.TServiceClient; import org.apache.thrift.TServiceClientFactory; import org.springframework.beans.factory.FactoryBean; import org.springframework.beans.factory.InitializingBean; import cn.slimsmart.thrift.rpc.ThriftClientPoolFactory.PoolOperationCallBack; import cn.slimsmart.thrift.rpc.zookeeper.ThriftServerAddressProvider; /** * 客户端代理 */ @SuppressWarnings({ "unchecked", "rawtypes" }) public class ThriftServiceClientProxyFactory implements FactoryBean, InitializingBean { private Integer maxActive = 32;// 最大活跃连接数 // ms,default 3 min,链接空闲时间 // -1,关闭空闲检测 private Integer idleTime = 180000; private ThriftServerAddressProvider serverAddressProvider; private Object proxyClient; private Class<?> objectClass; private GenericObjectPool<TServiceClient> pool; private PoolOperationCallBack callback = new PoolOperationCallBack() { @Override public void make(TServiceClient client) { System.out.println("create"); } @Override public void destroy(TServiceClient client) { System.out.println("destroy"); } }; public void setMaxActive(Integer maxActive) { this.maxActive = maxActive; } public void setIdleTime(Integer idleTime) { this.idleTime = idleTime; } public void setServerAddressProvider(ThriftServerAddressProvider serverAddressProvider) { this.serverAddressProvider = serverAddressProvider; } @Override public void afterPropertiesSet() throws Exception { ClassLoader classLoader = Thread.currentThread().getContextClassLoader(); // 加载Iface接口 objectClass = classLoader.loadClass(serverAddressProvider.getService() + "$Iface"); // 加载Client.Factory类 Class<TServiceClientFactory<TServiceClient>> fi = (Class<TServiceClientFactory<TServiceClient>>) classLoader.loadClass(serverAddressProvider.getService() + "$Client$Factory"); TServiceClientFactory<TServiceClient> clientFactory = fi.newInstance(); ThriftClientPoolFactory clientPool = new ThriftClientPoolFactory(serverAddressProvider, clientFactory, callback); GenericObjectPool.Config poolConfig = new GenericObjectPool.Config(); poolConfig.maxActive = maxActive; poolConfig.minIdle = 0; poolConfig.minEvictableIdleTimeMillis = idleTime; poolConfig.timeBetweenEvictionRunsMillis = idleTime / 2L; pool = new GenericObjectPool<TServiceClient>(clientPool, poolConfig); proxyClient = Proxy.newProxyInstance(classLoader, new Class[] { objectClass }, new InvocationHandler() { @Override public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { // TServiceClient client = pool.borrowObject(); try { return method.invoke(client, args); } catch (Exception e) { throw e; } finally { pool.returnObject(client); } } }); } @Override public Object getObject() throws Exception { return proxyClient; } @Override public Class<?> getObjectType() { return objectClass; } @Override public boolean isSingleton() { return true; } public void close() { if (serverAddressProvider != null) { serverAddressProvider.close(); } } }
下面我们看一下服务端和客户端的配置;
服务端spring-context-thrift-server.xml
- <?xml version="1.0" encoding="UTF-8"?>
- <beans xmlns="http://www.springframework.org/schema/beans"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context"
- xmlns:aop="http://www.springframework.org/schema/aop" xmlns:tx="http://www.springframework.org/schema/tx"
- xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.0.xsd
- http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.0.xsd
- http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-4.0.xsd
- http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-4.0.xsd"
- default-lazy-init="false">
- <!-- zookeeper -->
- <bean id="thriftZookeeper" class="cn.slimsmart.thrift.rpc.zookeeper.ZookeeperFactory"
- destroy-method="close">
- <property name="zkHosts"
- value="192.168.36.54:2181,192.168.36.99:2181,192.168.36.189:2181" />
- <property name="namespace" value="cn.slimsmart.thrift.rpc.demo" />
- <property name="connectionTimeout" value="3000" />
- <property name="sessionTimeout" value="3000" />
- <property name="singleton" value="true" />
- </bean>
- <bean id="sericeAddressRegister"
- class="cn.slimsmart.thrift.rpc.zookeeper.ThriftServerAddressRegisterZookeeper"
- destroy-method="close">
- <property name="zkClient" ref="thriftZookeeper" />
- </bean>
- <bean id="echoSerivceImpl" class="cn.slimsmart.thrift.rpc.demo.EchoSerivceImpl" />
- <bean id="echoSerivce" class="cn.slimsmart.thrift.rpc.ThriftServiceServerFactory"
- destroy-method="close">
- <property name="service" ref="echoSerivceImpl" />
- <property name="port" value="9000" />
- <property name="version" value="1.0.0" />
- <property name="weight" value="1" />
- <property name="thriftServerAddressRegister" ref="sericeAddressRegister" />
- </bean>
- <bean id="echoSerivce1" class="cn.slimsmart.thrift.rpc.ThriftServiceServerFactory"
- destroy-method="close">
- <property name="service" ref="echoSerivceImpl" />
- <property name="port" value="9001" />
- <property name="version" value="1.0.0" />
- <property name="weight" value="1" />
- <property name="thriftServerAddressRegister" ref="sericeAddressRegister" />
- </bean>
- <bean id="echoSerivce2" class="cn.slimsmart.thrift.rpc.ThriftServiceServerFactory"
- destroy-method="close">
- <property name="service" ref="echoSerivceImpl" />
- <property name="port" value="9002" />
- <property name="version" value="1.0.0" />
- <property name="weight" value="1" />
- <property name="thriftServerAddressRegister" ref="sericeAddressRegister" />
- </bean>
- </beans>
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xmlns:aop="http://www.springframework.org/schema/aop" xmlns:tx="http://www.springframework.org/schema/tx" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.0.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.0.xsd http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-4.0.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-4.0.xsd" default-lazy-init="false"> <!-- zookeeper --> <bean id="thriftZookeeper" class="cn.slimsmart.thrift.rpc.zookeeper.ZookeeperFactory" destroy-method="close"> <property name="zkHosts" value="192.168.36.54:2181,192.168.36.99:2181,192.168.36.189:2181" /> <property name="namespace" value="cn.slimsmart.thrift.rpc.demo" /> <property name="connectionTimeout" value="3000" /> <property name="sessionTimeout" value="3000" /> <property name="singleton" value="true" /> </bean> <bean id="sericeAddressRegister" class="cn.slimsmart.thrift.rpc.zookeeper.ThriftServerAddressRegisterZookeeper" destroy-method="close"> <property name="zkClient" ref="thriftZookeeper" /> </bean> <bean id="echoSerivceImpl" class="cn.slimsmart.thrift.rpc.demo.EchoSerivceImpl" /> <bean id="echoSerivce" class="cn.slimsmart.thrift.rpc.ThriftServiceServerFactory" destroy-method="close"> <property name="service" ref="echoSerivceImpl" /> <property name="port" value="9000" /> <property name="version" value="1.0.0" /> <property name="weight" value="1" /> <property name="thriftServerAddressRegister" ref="sericeAddressRegister" /> </bean> <bean id="echoSerivce1" class="cn.slimsmart.thrift.rpc.ThriftServiceServerFactory" destroy-method="close"> <property name="service" ref="echoSerivceImpl" /> <property name="port" value="9001" /> <property name="version" value="1.0.0" /> <property name="weight" value="1" /> <property name="thriftServerAddressRegister" ref="sericeAddressRegister" /> </bean> <bean id="echoSerivce2" class="cn.slimsmart.thrift.rpc.ThriftServiceServerFactory" destroy-method="close"> <property name="service" ref="echoSerivceImpl" /> <property name="port" value="9002" /> <property name="version" value="1.0.0" /> <property name="weight" value="1" /> <property name="thriftServerAddressRegister" ref="sericeAddressRegister" /> </bean> </beans>
客户端:spring-context-thrift-client.xml
- <?xml version="1.0" encoding="UTF-8"?>
- <beans xmlns="http://www.springframework.org/schema/beans"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context"
- xmlns:aop="http://www.springframework.org/schema/aop" xmlns:tx="http://www.springframework.org/schema/tx"
- xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.0.xsd
- http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.0.xsd
- http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-4.0.xsd
- http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-4.0.xsd"
- default-lazy-init="false">
- <!-- fixedAddress -->
- <!--
- <bean id="fixedAddressProvider" class="cn.slimsmart.thrift.rpc.zookeeper.FixedAddressProvider">
- <property name="service" value="cn.slimsmart.thrift.rpc.demo.EchoSerivce" />
- <property name="serverAddress" value="192.168.36.215:9001:1,192.168.36.215:9002:2,192.168.36.215:9003:3" />
- </bean>
- <bean id="echoSerivce" class="cn.slimsmart.thrift.rpc.ThriftServiceClientProxyFactory">
- <property name="maxActive" value="5" />
- <property name="idleTime" value="10000" />
- <property name="serverAddressProvider" ref="fixedAddressProvider" />
- </bean>
- -->
- <!-- zookeeper -->
- <bean id="thriftZookeeper" class="cn.slimsmart.thrift.rpc.zookeeper.ZookeeperFactory"
- destroy-method="close">
- <property name="zkHosts"
- value="192.168.36.54:2181,192.168.36.99:2181,192.168.36.189:2181" />
- <property name="namespace" value="cn.slimsmart.thrift.rpc.demo" />
- <property name="connectionTimeout" value="3000" />
- <property name="sessionTimeout" value="3000" />
- <property name="singleton" value="true" />
- </bean>
- <bean id="echoSerivce" class="cn.slimsmart.thrift.rpc.ThriftServiceClientProxyFactory" destroy-method="close">
- <property name="maxActive" value="5" />
- <property name="idleTime" value="1800000" />
- <property name="serverAddressProvider">
- <bean class="cn.slimsmart.thrift.rpc.zookeeper.ThriftServerAddressProviderZookeeper">
- <property name="service" value="cn.slimsmart.thrift.rpc.demo.EchoSerivce" />
- <property name="version" value="1.0.0" />
- <property name="zkClient" ref="thriftZookeeper" />
- </bean>
- </property>
- </bean>
- </beans>
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xmlns:aop="http://www.springframework.org/schema/aop" xmlns:tx="http://www.springframework.org/schema/tx" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.0.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.0.xsd http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-4.0.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-4.0.xsd" default-lazy-init="false"> <!-- fixedAddress --> <!-- <bean id="fixedAddressProvider" class="cn.slimsmart.thrift.rpc.zookeeper.FixedAddressProvider"> <property name="service" value="cn.slimsmart.thrift.rpc.demo.EchoSerivce" /> <property name="serverAddress" value="192.168.36.215:9001:1,192.168.36.215:9002:2,192.168.36.215:9003:3" /> </bean> <bean id="echoSerivce" class="cn.slimsmart.thrift.rpc.ThriftServiceClientProxyFactory"> <property name="maxActive" value="5" /> <property name="idleTime" value="10000" /> <property name="serverAddressProvider" ref="fixedAddressProvider" /> </bean> --> <!-- zookeeper --> <bean id="thriftZookeeper" class="cn.slimsmart.thrift.rpc.zookeeper.ZookeeperFactory" destroy-method="close"> <property name="zkHosts" value="192.168.36.54:2181,192.168.36.99:2181,192.168.36.189:2181" /> <property name="namespace" value="cn.slimsmart.thrift.rpc.demo" /> <property name="connectionTimeout" value="3000" /> <property name="sessionTimeout" value="3000" /> <property name="singleton" value="true" /> </bean> <bean id="echoSerivce" class="cn.slimsmart.thrift.rpc.ThriftServiceClientProxyFactory" destroy-method="close"> <property name="maxActive" value="5" /> <property name="idleTime" value="1800000" /> <property name="serverAddressProvider"> <bean class="cn.slimsmart.thrift.rpc.zookeeper.ThriftServerAddressProviderZookeeper"> <property name="service" value="cn.slimsmart.thrift.rpc.demo.EchoSerivce" /> <property name="version" value="1.0.0" /> <property name="zkClient" ref="thriftZookeeper" /> </bean> </property> </bean> </beans>
运行服务端后,我们可以看见zookeeper注册了多个服务地址。
详细实例这里就不详述了,请参考实例代码:https://github.com/slimina/thrift-zookeeper-rpc
关于Thrift设计优化文档: