zoukankan      html  css  js  c++  java
  • 高可用的池化 Thrift Client 实现(源码分享)

    本文将分享一个高可用的池化 Thrift Client 及其源码实现,欢迎阅读源码(Github)并使用,同时欢迎提出宝贵的意见和建议,本人将持续完善。

    本文的主要目标读者是对 Thrift 有一定了解并使用的童鞋,如对 Thrift 的基础知识了解不多或者想重温一下基础知识,推荐先阅读本站文章《和 Thrift 的一场美丽邂逅》。

    下面进入正题。

    为什么我们需要这么一个组件?

    我们知道,Thrift 是一个 RPC 框架体系,可以非常方便的进行跨语言 RPC 服务的开发和调用。然而,它并没有提供针对多个 Server 的 Smart Client【1】。比如,你有一个服务 service,分别部署在 116.31.1.1 和 116.31.1.2 两台服务器上,当你需要从 Client 端调用该 service 的某个远程方法的时候,你只能在代码中显式指定使用 116.31.1.1 或者 116.31.1.2 其中的一个。这种情况下,你调用的时候无法预知所指定 IP 对应的服务是否可用,并且当该服务不可用时,无法隐式自动切换到调用另外一个 IP 对应的服务。也就是说,服务的状态对你并不是透明的,并且无法做到服务的负载均衡和高可用。

    此外,当你调用远程方法时,每次你都得新建一个连接,当请求量很大时,不断的创建、删除连接所耗费的服务资源是巨大的。

    因此,我们需要这么一个组件,使服务状态透明化并底层实现负载均衡和高可用,让你可以专注于业务逻辑的实现,提升工作效率和服务的质量。下面我们就对该组件(ThrifJ)进行详细的剖析。

    它到底能做些什么?

    特性

    • 链式调用API,简洁直观
    • 完善的默认配置,无需担心调用时配置不全导致抛错
    • 池化连接对象,高效管理连接的生命周期
    • 异常服务自动隔离与恢复
    • 多种可配置的负载均衡策略,支持随机、轮询、权重和哈希
    • 多种可配置的服务级别,并自动根据服务级别进行服务降级

    该如何使用它?

    目前最新版本为1.0.1(点此关注最新版本的更新),首先在项目中引入 thriftj-1.0.1.jar,或在 Maven 依赖中加入:

    <dependency>
        <groupId>com.github.cyfonly</groupId>
        <artifactId>thriftj</artifactId>
        <version>1.0.1</version>
    </dependency>
    

     需要注意的是,ThriftJ 基于 slf4j 构建,因此你需要在项目中增加具体日志实现的依赖,比如 log4j 或 logback。

    然后在项目中,参照以下这段代码进行调用:

    //Thrift server 列表
    private static final String servers = "127.0.0.1:10001,127.0.0.1:10002";
    
    //TTransport 验证器
    ConnectionValidator validator = new ConnectionValidator() {
        @Override
        public boolean isValid(TTransport object) {
            return object.isOpen();
        }
    };
    
    //连接对象池配置
    GenericKeyedObjectPoolConfig poolConfig = new GenericKeyedObjectPoolConfig();
    
    //failover 策略
    FailoverStrategy failoverStrategy = new FailoverStrategy();
    
    //构造 ThriftClient 对象并配置
    final ThriftClient thriftClient = new ThriftClient();
    thriftClient.servers(servers)
                .loadBalance(Constant.LoadBalance.RANDOM)
                .connectionValidator(validator)
                .poolConfig(poolConfig)
                .failoverStrategy(failoverStrategy)
                .connTimeout(5)
                .backupServers("")
                .serviceLevel(Constant.ServiceLevel.NOT_EMPTY)
                .start();
    
    //打印从 ThriftClient 获取到的可用服务列表
    List<ThriftServer> servers = thriftClient.getAvailableServers();
    for(ThriftServer server : servers){
        System.out.println(server.getHost() + ":" + server.getPort());
    }
    
    //服务调用
    if(servers.size()>0){
        try{
                TestThriftJ.Client client = thriftClient.iface(TestThriftJ.Client.class);
                QryResult result = client.qryTest(1);
                System.out.println("result[code=" + result.code + " msg=" + result.msg + "]");
          }catch(Throwable t){
                logger.error("-------------exception happen", t);
          }
    }
    

     友情提示:除 servers 必须配置外,其他配置均为可选(使用默认配置)

    它是如何设计并实现的呢?

    整体设计

    连接池对象工厂及连接对象的管理

    基于 commons-pool2 中的 KeyedPooledObjectFactory,以 ThriftServer 为 key,TTransport 为 value 进行实现。关键代码如下:

    @Override
    public PooledObject<TTransport> makeObject(ThriftServer thriftServer) throws Exception {
        TSocket tsocket = new TSocket(thriftServer.getHost(), thriftServer.getPort());
        tsocket.setTimeout(timeout);
        TFramedTransport transport = new TFramedTransport(tsocket);
        
        transport.open();
        DefaultPooledObject<TTransport> result = new DefaultPooledObject<TTransport>(transport);
        logger.trace("Make new thrift connection: {}:{}", thriftServer.getHost(), thriftServer.getPort());
        
        return result;
    }
    
    @Override
    public boolean validateObject(ThriftServer thriftServer, PooledObject<TTransport> pooledObject) {
        boolean isValidate;
        try {
            if (failoverChecker == null) {
                isValidate = pooledObject.getObject().isOpen();
            } else {
                ConnectionValidator validator = failoverChecker.getConnectionValidator();
                isValidate = pooledObject.getObject().isOpen() && (validator == null || validator.isValid(pooledObject.getObject()));
            }
        } catch (Throwable e) {
            logger.warn("Fail to validate tsocket: {}:{}", new Object[]{thriftServer.getHost(), thriftServer.getPort(), e});
            isValidate = false;
        }
        if (failoverChecker != null && !isValidate) {
            failoverChecker.getFailoverStrategy().fail(thriftServer);
        }
        logger.info("ValidateObject isValidate:{}", isValidate);
        
        return isValidate;
    }
    
    @Override
    public void destroyObject(ThriftServer thriftServer, PooledObject<TTransport> pooledObject) throws Exception {
        TTransport transport = pooledObject.getObject();
        if (transport != null) {
            transport.close();
            logger.trace("Close thrift connection: {}:{}", thriftServer.getHost(), thriftServer.getPort());
        }
    }
    

    在使用连接对象时,根据用户的自定义连接池配置创建连接池,并实现连接对象的获取、回池、清除以及连接池的关闭操作。关键代码如下:

    public DefaultThriftConnectionPool(KeyedPooledObjectFactory<ThriftServer, TTransport> factory, GenericKeyedObjectPoolConfig config) {
    	connections = new GenericKeyedObjectPool<>(factory, config);
    }
    
    @Override
    public TTransport getConnection(ThriftServer thriftServer) {
    	try {
    		return connections.borrowObject(thriftServer);
    	} catch (Exception e) {
    		logger.warn("Fail to get connection for {}:{}", new Object[]{thriftServer.getHost(), thriftServer.getPort(), e});
    		throw new RuntimeException(e);
    	}
    }
    
    @Override
    public void returnConnection(ThriftServer thriftServer, TTransport transport) {
    	connections.returnObject(thriftServer, transport);
    }
    
    @Override
    public void returnBrokenConnection(ThriftServer thriftServer, TTransport transport) {
    	try {
    		connections.invalidateObject(thriftServer, transport);
    	} catch (Exception e) {
    		logger.warn("Fail to invalid object:{},{}", new Object[] { thriftServer, transport, e });
    	}
    }
    
    @Override
    public void close() {
    	connections.close();
    }
    
    @Override
    public void clear(ThriftServer thriftServer) {
    	connections.clear(thriftServer);
    }
    

    异常服务自动隔离与恢复

    需要实现服务状态的透明化,就必须在底层实现服务的监测、隔离和恢复。在 ThriftJ 中,调用 ThriftClient 时会启动一个线程对服务进行异步监测,用户可以指定检验规则(对应配置为 ConnectionValidator)以及 failover 策略(对应配置为 FailoverStrategy,可以指定失败的次数、失效持续时间和恢复持续时间)。默认情况下,服务验证规则为判断 TTransport 是否处于开启状态,即:

    if (this.validator == null) {
      this.validator = new ConnectionValidator() {
        @Override
        public boolean isValid(TTransport object) {
          return object.isOpen();
        }
      };
    }
    

     而默认的 failover 策略为

    • 失败次数:10(次),表示通过 ConnectionValidator 检验失败 10 次后才考虑将该服务失效,需要配合失效持续时间一起使用
    • 时效持续时间:1(分钟),表示在一个检验周期内,首次检验失败的时间持续达到该值后才考虑将该服务失效,配合失败次数一起使用
    • 恢复持续时间:1(分钟),表示在判定某服务失效并隔离后,经过该值后将服务重新恢复

    以上功能基于 Guava cache 实现,关键代码如下:

    /**
     * 使用默认 failover 策略
     */
    public FailoverStrategy() {
    	this(DEFAULT_FAIL_COUNT, DEFAULT_FAIL_DURATION, DEFAULT_RECOVER_DURATION);
    }
    
    /**
     * 自定义 failover 策略
     * @param failCount 失败次数
     * @param failDuration 失效持续时间
     * @param recoverDuration 恢复持续时间
     */
    public FailoverStrategy(final int failCount, long failDuration, long recoverDuration) {
    	this.failDuration = failDuration;
    	this.failedList = CacheBuilder.newBuilder().weakKeys().expireAfterWrite(recoverDuration, TimeUnit.MILLISECONDS).build();
    	this.failCountMap = CacheBuilder.newBuilder().weakKeys().build(new CacheLoader<T, EvictingQueue<Long>>() {
    		@Override
    		public EvictingQueue<Long> load(T key) throws Exception {
    			return EvictingQueue.create(failCount);
    		}
    	});
    }
    
    public void fail(T object) {
    	logger.info("Server {}:{} failed.", ((ThriftServer)object).getHost(),((ThriftServer)object).getPort());
    	boolean addToFail = false;
    	try {
    		EvictingQueue<Long> evictingQueue = failCountMap.get(object);
    		synchronized (evictingQueue) {
    			evictingQueue.add(System.currentTimeMillis());
    			if (evictingQueue.remainingCapacity() == 0 && evictingQueue.element() >= (System.currentTimeMillis() - failDuration)) {
    				addToFail = true;
    			}
    		}
    	} catch (ExecutionException e) {
    		logger.error("Ops.", e);
    	}
    	if (addToFail) {
    		failedList.put(object, Boolean.TRUE);
    		logger.info("Server {}:{} failed. Add to fail list.", ((ThriftServer)object).getHost(), ((ThriftServer)object).getPort());
    	}
    }
    
    public Set<T> getFailed() {
    	return failedList.asMap().keySet();
    }
    

    负载均衡

    ThriftJ 提供了四种可选的负载均衡策略:

    • 随机
    • 轮询
    • 权重
    • 哈希

    在用户不显式指定的情况下,默认采用随机算法。具体算法的实现在此就不再进行过多的描述了。

    需要注意的是,ThriftJ 严格规范了调用的语义,比如使用哈希策略时,必须要指定 hash key;当使用非哈希的其他策略时,一定不能指定 key,避免造成理解的二义性。

    服务级别与服务降级

    ThriftJ 提供了多种可配置的服务级别,并根据服务级别进行服务降级处理,其对应关系如下:

    • SERVERS_ONLY:最高级别,仅返回配置的 servers 列表中可用的服务
    • ALL_SERVERS:中等级别,当 servers 列表中的服务全部不可用时,返回 backupServers 列表中的可用服务
    • NOT_EMPTY:最低级别,当 servers 和 backupServers 列表中的服务全部不可用时,返回 servers 列表中的所有服务

    其中 ThriftJ 默认使用的服务级别是 NOT_EMPTY。服务降级处理的关键代码如下:

    private List<ThriftServer> getAvailableServers(boolean all) {
    	List<ThriftServer> returnList = new ArrayList<>();
    	Set<ThriftServer> failedServers = failoverStrategy.getFailed();
    	for (ThriftServer thriftServer : serverList) {
    		if (!failedServers.contains(thriftServer))
    			returnList.add(thriftServer);
    	}
    	if (this.serviceLevel == Constant.ServiceLevel.SERVERS_ONLY) {
    		return returnList;
    	}
    	if ((all || returnList.isEmpty()) && !backupServerList.isEmpty()) {
    		for (ThriftServer thriftServer : backupServerList) {
    			if (!failedServers.contains(thriftServer))
    				returnList.add(thriftServer);
    		}
    	}
    	if (this.serviceLevel == Constant.ServiceLevel.ALL_SERVERS) {
    		return returnList;
    	}
    	if(returnList.isEmpty()){
    		returnList.addAll(serverList);
    	}
    	return returnList;
    }
    

    我还有话要说

    技术的提升源自无私的分享,好的技术或工具分享出来,并不会让自己失去什么,反而可以在大家共同研究和沟通后使之获得更好的完善。不要担心自己写的工具不够好,不要害怕自己的技术不够牛,谁能一步就登天呢?

    请热爱你的热爱!

    【1】Smart Client:比如 MongoClient,可自动发现集群服务节点、自动故障转移和负载均衡。

  • 相关阅读:
    Linux系统操作问题汇总
    记录一些mysql数据库常用操作命令和问题汇总
    python学习之路-练习小程序02(模拟用户登录)
    python学习之路02(基础篇2)
    python学习之路-练习小程序01(猜年龄)
    python学习之路01(基础篇1)
    hashmap详解(基于jdk1.8)
    maven创建项目太慢怎么办
    CAS原理
    JUC原子类3-AtomicLongArray原子类
  • 原文地址:https://www.cnblogs.com/cyfonly/p/6284117.html
Copyright © 2011-2022 走看看