zoukankan      html  css  js  c++  java
  • 由浅入深了解Thrift之客户端连接池化

    一、问题描述                                                                                        

        在上一篇《由浅入深了解Thrift之服务模型和序列化机制》文章中,我们已经了解了thrift的基本架构和网络服务模型的优缺点。如今的互联网圈中,RPC服务化的思想如火如荼。我们又该如何将thrift服务化应用到我们的项目中哪?实现thrift服务化前,我们先想想这几个问题:服务注册、服务发现、服务健康检测、服务“Load Balance”、隐藏client和server端的交互细节、服务调用端的对象池化。

    • 服务的注册、发现和健康检测,我们使用zookeeper可以很好的解决
    • 服务“Load Balance",我们可以使用简单的算法“权重+随机”,当然也可以使用成熟复杂的算法
    • 服务调用端的对象池化,我们可以使用common pool,使用简单又可以满足我们的需求   

    二、实现思路                                                                                        

        1、thrift server端启动时,每个实例向zk集群以临时节点方式注册(这样,遍历zk上/server下有多少个临时节点就知道有哪些server实例)

            thrift server端可以单机多端口多实例或多机部署多实例方式运行。

       2、服务调用方实现一个连接池,连接池初始化时,通过zk将在线的server实例信息同步到本地并缓存,同时监听zk下的节点变化。

       3、服务调用方与Server通讯时,从连接池中取一个可用的连接,用它实现RPC调用。

                     

    三、具体实现                                                                                       

       1、thrift server端   

          thrift server端,向zk中注册server address

    package com.wy.thriftpool.commzkpool;
    
    import java.lang.instrument.IllegalClassFormatException;
    import java.lang.reflect.Constructor;
    
    import org.apache.thrift.protocol.TBinaryProtocol;
    import org.apache.thrift.protocol.TBinaryProtocol.Factory;
    import org.apache.thrift.server.TServer;
    import org.apache.thrift.server.TThreadedSelectorServer;
    import org.apache.thrift.transport.TFramedTransport;
    import org.apache.thrift.transport.TNonblockingServerSocket;
    import org.springframework.beans.factory.InitializingBean;
    
    import com.wy.thrift.service.UserService.Processor;
    import com.wy.thriftpool.commzkpool.support.ThriftServerAddressReporter;
    import com.wy.thriftpool.commzkpool.support.ThriftServerIpTransfer;
    import com.wy.thriftpool.commzkpool.support.impl.LocalNetworkIpTransfer;
    
    /**
     * thrift server端,向zk中注册server address
     * 
     * @author wy
     * 
     */
    public class ThriftServiceServerFactory implements InitializingBean {
    
        // thrift server 服务端口
        private Integer port;
        // default 权重
        private Integer priority = 1;
        // service实现类
        private Object service;
        // thrift server 注册路径
        private String configPath;
    
        private ThriftServerIpTransfer ipTransfer;
        // thrift server注册类
        private ThriftServerAddressReporter addressReporter;
        // thrift server开启服务
        private ServerThread serverThread;
    
        @Override
        public void afterPropertiesSet() throws Exception {
            if (ipTransfer == null) {
                ipTransfer = new LocalNetworkIpTransfer();
            }
            String ip = ipTransfer.getIp();
            if (ip == null) {
                throw new NullPointerException("cant find server ip...");
            }
            String hostname = ip + ":" + port + ":" + priority;
            Class<? extends Object> serviceClass = service.getClass();
            ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
            Class<?>[] interfaces = serviceClass.getInterfaces();
            if (interfaces.length == 0) {
                throw new IllegalClassFormatException("service-class should implements Iface");
            }
    
            // reflect,load "Processor";
            Processor<?> processor = null;
            for (Class<?> clazz : interfaces) {
                String cname = clazz.getSimpleName();
                if (!cname.equals("Iface")) {
                    continue;
                }
                String pname = clazz.getEnclosingClass().getName() + "$Processor";
                try {
                    Class<?> pclass = classLoader.loadClass(pname);
                    if (!pclass.isAssignableFrom(Processor.class)) {
                        continue;
                    }
                    Constructor<?> constructor = pclass.getConstructor(clazz);
                    processor = (Processor<?>) constructor.newInstance(service);
                    break;
                } catch (Exception e) {
                    // TODO
                }
            }
    
            if (processor == null) {
                throw new IllegalClassFormatException("service-class should implements Iface");
            }
            // 需要单独的线程,因为serve方法是阻塞的.
            serverThread = new ServerThread(processor, port);
            serverThread.start();
            // report
            if (addressReporter != null) {
                addressReporter.report(configPath, hostname);
            }
        }
    
        class ServerThread extends Thread {
            private TServer server;
    
            ServerThread(Processor<?> processor, int port) throws Exception {
                // 设置传输通道
                TNonblockingServerSocket serverTransport = new TNonblockingServerSocket(port);
                // 设置二进制协议
                Factory protocolFactory = new TBinaryProtocol.Factory();
                
                TThreadedSelectorServer.Args tArgs = new TThreadedSelectorServer.Args(serverTransport);
                tArgs.processor(processor);
                tArgs.transportFactory(new TFramedTransport.Factory());
                tArgs.protocolFactory(protocolFactory);
                int num = Runtime.getRuntime().availableProcessors() * 2 + 1;
                tArgs.selectorThreads(num);
                tArgs.workerThreads(num * 10);
                
                // 网络服务模型
                server = new TThreadedSelectorServer(tArgs);
                
            }
    
            @Override
            public void run() {
                try {
                    server.serve();
                } catch (Exception e) {
                    //TODO
                }
            }
    
            public void stopServer() {
                server.stop();
            }
        }
    
        public void close() {
            serverThread.stopServer();
        }
    
        public void setService(Object service) {
            this.service = service;
        }
    
        public void setPriority(Integer priority) {
            this.priority = priority;
        }
    
        public void setPort(Integer port) {
            this.port = port;
        }
    
        public void setIpTransfer(ThriftServerIpTransfer ipTransfer) {
            this.ipTransfer = ipTransfer;
        }
    
        public void setAddressReporter(ThriftServerAddressReporter addressReporter) {
            this.addressReporter = addressReporter;
        }
    
        public void setConfigPath(String configPath) {
            this.configPath = configPath;
        }
    }
    View Code

          thrift server address注册到zk   

    package com.wy.thriftpool.commzkpool.support.impl;
    
    import org.apache.curator.framework.CuratorFramework;
    import org.apache.curator.framework.imps.CuratorFrameworkState;
    import org.apache.zookeeper.CreateMode;
    
    import com.wy.thriftpool.commzkpool.support.ThriftServerAddressReporter;
    
    /**
     * thrift server address注册到zk
     * 
     * @author wy
     *
     */
    public class DynamicAddressReporter implements ThriftServerAddressReporter {
    
        private CuratorFramework zookeeper;
    
        public DynamicAddressReporter() {
        }
    
        public DynamicAddressReporter(CuratorFramework zookeeper) {
            this.zookeeper = zookeeper;
        }
    
        public void setZookeeper(CuratorFramework zookeeper) {
            this.zookeeper = zookeeper;
        }
    
        @Override
        public void report(String service, String address) throws Exception {
            if (zookeeper.getState() == CuratorFrameworkState.LATENT) {
                zookeeper.start();
                zookeeper.newNamespaceAwareEnsurePath(service);
            }
            zookeeper.create().creatingParentsIfNeeded()
                    .withMode(CreateMode.EPHEMERAL_SEQUENTIAL)
                    .forPath(service + "/i_", address.getBytes("utf-8"));
        }
    
        public void close() {
            zookeeper.close();
        }
    
    }
    View Code

          。。。

          spring配置文件

    <!-- zookeeper -->
        <bean id="thriftZookeeper" class="com.wy.thriftpool.commzkpool.zookeeper.ZookeeperFactory" destroy-method="close">
            <property name="connectString" value="127.0.0.1:2181"></property>
            <property name="namespace" value="thrift/thrift-service"></property>
        </bean>
        
        <bean id="serviceAddressReporter" class="com.wy.thriftpool.commzkpool.support.impl.DynamicAddressReporter" destroy-method="close">
            <property name="zookeeper" ref="thriftZookeeper"></property>
        </bean>
        
        <bean id="userService" class="com.wy.thrift.service.UserServiceImpl"/>
        
        <bean class="com.wy.thriftpool.commzkpool.ThriftServiceServerFactory" destroy-method="close">
            <property name="service" ref="userService"></property>
            <property name="configPath" value="UserServiceImpl"></property>
            <property name="port" value="9090"></property>
            <property name="addressReporter" ref="serviceAddressReporter"></property>
        </bean>
    View Code

       2、服务调用端

          连接池实现  

         杯了个具,为啥就不能提交。代码在评论中。

       连接池工厂,负责与Thrift server通信 

    package com.wy.thriftpool.commzkconnpool;
    
    import java.net.InetSocketAddress;
    
    import org.apache.commons.pool.PoolableObjectFactory;
    import org.apache.thrift.transport.TSocket;
    import org.apache.thrift.transport.TTransport;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    import com.wy.thriftpool.commzkpool.support.ThriftServerAddressProvider;
    
    /**
     * 连接池工厂,负责与Thrift server通信
     * 
     * @author wy
     *
     */
    public class ThriftPoolFactory implements PoolableObjectFactory<TTransport> {
        private final Logger logger = LoggerFactory.getLogger(getClass());
        // 超时设置 
        public int timeOut;
        
        private final ThriftServerAddressProvider addressProvider;
        private PoolOperationCallBack callback;
        
        public ThriftPoolFactory(ThriftServerAddressProvider addressProvider, PoolOperationCallBack callback) {
            super();
            this.addressProvider = addressProvider;
            this.callback = callback;
        }
        
        public ThriftPoolFactory(ThriftServerAddressProvider addressProvider, PoolOperationCallBack callback, int timeOut) {
            super();
            this.addressProvider = addressProvider;
            this.callback = callback;
            this.timeOut = timeOut;
        }
    
        /**
         * 创建对象
         */
        @Override
        public TTransport makeObject() throws Exception {
            try {
                InetSocketAddress address = addressProvider.selector();
                TTransport transport = new TSocket(address.getHostName(), address.getPort(), this.timeOut);
                transport.open();
                if (callback != null) {
                    callback.make(transport);
                }
                return transport;
            } catch (Exception e) {
                logger.error("creat transport error:", e);
                throw new RuntimeException(e);
            }
        }
    
        /**
         * 销毁对象
         */
        @Override
        public void destroyObject(TTransport transport) throws Exception {
            if (transport != null && transport.isOpen()) {
                transport.close();
            }
        }
    
        /**
         * 检验对象是否可以由pool安全返回
         */
        @Override
        public boolean validateObject(TTransport transport) {
            try {
                if (transport != null && transport instanceof TSocket) {
                    TSocket thriftSocket = (TSocket) transport;
                    if (thriftSocket.isOpen()) {
                        return true;
                    } else {
                        return false;
                    }
                } else {
                    return false;
                }
            } catch (Exception e) {
                return false;
            }
        }
    
        @Override
        public void activateObject(TTransport obj) throws Exception {
            // TODO Auto-generated method stub
            
        }
    
        @Override
        public void passivateObject(TTransport obj) throws Exception {
            // TODO Auto-generated method stub
            
        }
    
        public static interface PoolOperationCallBack {
            // 创建成功是执行
            void make(TTransport transport);
    
            // 销毁之前执行
            void destroy(TTransport transport);
        }
    }
    View Code

          连接池管理 

    package com.wy.thriftpool.commzkconnpool;
    
    import org.apache.thrift.transport.TSocket;  
    import org.slf4j.Logger;  
    import org.slf4j.LoggerFactory;  
    import org.springframework.beans.factory.annotation.Autowired;  
    import org.springframework.stereotype.Service;  
    
      
    /**
     * 连接池管理
     *   
     * @author wy
     *
     */
    @Service  
    public class ConnectionManager {  
        private final Logger logger = LoggerFactory.getLogger(getClass());  
        // 保存local对象 
        ThreadLocal<TSocket> socketThreadSafe = new ThreadLocal<TSocket>();  
      
        // 连接提供池   
        @Autowired  
        private ConnectionProvider connectionProvider;  
      
        public TSocket getSocket() {  
            TSocket socket = null;  
            try {  
                socket = connectionProvider.getConnection();  
                socketThreadSafe.set(socket);  
                return socketThreadSafe.get();  
            } catch (Exception e) {  
                logger.error("error ConnectionManager.invoke()", e);  
            } finally {  
                connectionProvider.returnCon(socket);  
                socketThreadSafe.remove();  
            }  
            return socket;  
        }  
      
    }  
    View Code

          spring配置文件  

    <!-- zookeeper -->
        <bean id="thriftZookeeper" class="com.wy.thriftpool.commzkpool.zookeeper.ZookeeperFactory" destroy-method="close">
            <property name="connectString" value="127.0.0.1:2181" />
            <property name="namespace" value="thrift/thrift-service" />
        </bean>
        <bean id="connectionProvider" class="com.wy.thriftpool.commzkconnpool.impl.ConnectionProviderImpl">
            <property name="maxActive" value="10" />
            <property name="maxIdle" value="10" />
            <property name="conTimeOut" value="2000" />
            <property name="testOnBorrow" value="true" />
            <property name="testOnReturn" value="true" />
            <property name="testWhileIdle" value="true" />
            
            <property name="addressProvider">
                <bean class="com.wy.thriftpool.commzkpool.support.impl.DynamicAddressProvider">
                    <property name="configPath" value="UserServiceImpl" />
                    <property name="zookeeper" ref="thriftZookeeper" />
                </bean>
            </property>
        </bean>
    View Code

    参考:http://www.cnblogs.com/mumuxinfei/p/3876187.html

    由于本人经验有限,文章中难免会有错误,请浏览文章的您指正或有不同的观点共同探讨!

  • 相关阅读:
    hihoCoder #1176 : 欧拉路·一 (简单)
    228 Summary Ranges 汇总区间
    227 Basic Calculator II 基本计算器II
    226 Invert Binary Tree 翻转二叉树
    225 Implement Stack using Queues 队列实现栈
    224 Basic Calculator 基本计算器
    223 Rectangle Area 矩形面积
    222 Count Complete Tree Nodes 完全二叉树的节点个数
    221 Maximal Square 最大正方形
    220 Contains Duplicate III 存在重复 III
  • 原文地址:https://www.cnblogs.com/exceptioneye/p/4966645.html
Copyright © 2011-2022 走看看