zoukankan      html  css  js  c++  java
  • 由浅入深了解Thrift之客户端连接池化

    一、问题描述                                                                                        

        在上一篇《由浅入深了解Thrift之服务模型和序列化机制》文章中,我们已经了解了thrift的基本架构和网络服务模型的优缺点。如今的互联网圈中,RPC服务化的思想如火如荼。我们又该如何将thrift服务化应用到我们的项目中哪?实现thrift服务化前,我们先想想这几个问题:服务注册、服务发现、服务健康检测、服务“Load Balance”、隐藏client和server端的交互细节、服务调用端的对象池化。

    • 服务的注册、发现和健康检测,我们使用zookeeper可以很好的解决
    • 服务“Load Balance",我们可以使用简单的算法“权重+随机”,当然也可以使用成熟复杂的算法
    • 服务调用端的对象池化,我们可以使用common pool,使用简单又可以满足我们的需求   

    二、实现思路                                                                                        

        1、thrift server端启动时,每个实例向zk集群以临时节点方式注册(这样,遍历zk上/server下有多少个临时节点就知道有哪些server实例)

            thrift server端可以单机多端口多实例或多机部署多实例方式运行。

       2、服务调用方实现一个连接池,连接池初始化时,通过zk将在线的server实例信息同步到本地并缓存,同时监听zk下的节点变化。

       3、服务调用方与Server通讯时,从连接池中取一个可用的连接,用它实现RPC调用。

                     

    三、具体实现                                                                                       

       1、thrift server端   

          thrift server端,向zk中注册server address

    package com.wy.thriftpool.commzkpool;
    
    import java.lang.instrument.IllegalClassFormatException;
    import java.lang.reflect.Constructor;
    
    import org.apache.thrift.protocol.TBinaryProtocol;
    import org.apache.thrift.protocol.TBinaryProtocol.Factory;
    import org.apache.thrift.server.TServer;
    import org.apache.thrift.server.TThreadedSelectorServer;
    import org.apache.thrift.transport.TFramedTransport;
    import org.apache.thrift.transport.TNonblockingServerSocket;
    import org.springframework.beans.factory.InitializingBean;
    
    import com.wy.thrift.service.UserService.Processor;
    import com.wy.thriftpool.commzkpool.support.ThriftServerAddressReporter;
    import com.wy.thriftpool.commzkpool.support.ThriftServerIpTransfer;
    import com.wy.thriftpool.commzkpool.support.impl.LocalNetworkIpTransfer;
    
    /**
     * thrift server端,向zk中注册server address
     * 
     * @author wy
     * 
     */
    public class ThriftServiceServerFactory implements InitializingBean {
    
        // thrift server 服务端口
        private Integer port;
        // default 权重
        private Integer priority = 1;
        // service实现类
        private Object service;
        // thrift server 注册路径
        private String configPath;
    
        private ThriftServerIpTransfer ipTransfer;
        // thrift server注册类
        private ThriftServerAddressReporter addressReporter;
        // thrift server开启服务
        private ServerThread serverThread;
    
        @Override
        public void afterPropertiesSet() throws Exception {
            if (ipTransfer == null) {
                ipTransfer = new LocalNetworkIpTransfer();
            }
            String ip = ipTransfer.getIp();
            if (ip == null) {
                throw new NullPointerException("cant find server ip...");
            }
            String hostname = ip + ":" + port + ":" + priority;
            Class<? extends Object> serviceClass = service.getClass();
            ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
            Class<?>[] interfaces = serviceClass.getInterfaces();
            if (interfaces.length == 0) {
                throw new IllegalClassFormatException("service-class should implements Iface");
            }
    
            // reflect,load "Processor";
            Processor<?> processor = null;
            for (Class<?> clazz : interfaces) {
                String cname = clazz.getSimpleName();
                if (!cname.equals("Iface")) {
                    continue;
                }
                String pname = clazz.getEnclosingClass().getName() + "$Processor";
                try {
                    Class<?> pclass = classLoader.loadClass(pname);
                    if (!pclass.isAssignableFrom(Processor.class)) {
                        continue;
                    }
                    Constructor<?> constructor = pclass.getConstructor(clazz);
                    processor = (Processor<?>) constructor.newInstance(service);
                    break;
                } catch (Exception e) {
                    // TODO
                }
            }
    
            if (processor == null) {
                throw new IllegalClassFormatException("service-class should implements Iface");
            }
            // 需要单独的线程,因为serve方法是阻塞的.
            serverThread = new ServerThread(processor, port);
            serverThread.start();
            // report
            if (addressReporter != null) {
                addressReporter.report(configPath, hostname);
            }
        }
    
        class ServerThread extends Thread {
            private TServer server;
    
            ServerThread(Processor<?> processor, int port) throws Exception {
                // 设置传输通道
                TNonblockingServerSocket serverTransport = new TNonblockingServerSocket(port);
                // 设置二进制协议
                Factory protocolFactory = new TBinaryProtocol.Factory();
                
                TThreadedSelectorServer.Args tArgs = new TThreadedSelectorServer.Args(serverTransport);
                tArgs.processor(processor);
                tArgs.transportFactory(new TFramedTransport.Factory());
                tArgs.protocolFactory(protocolFactory);
                int num = Runtime.getRuntime().availableProcessors() * 2 + 1;
                tArgs.selectorThreads(num);
                tArgs.workerThreads(num * 10);
                
                // 网络服务模型
                server = new TThreadedSelectorServer(tArgs);
                
            }
    
            @Override
            public void run() {
                try {
                    server.serve();
                } catch (Exception e) {
                    //TODO
                }
            }
    
            public void stopServer() {
                server.stop();
            }
        }
    
        public void close() {
            serverThread.stopServer();
        }
    
        public void setService(Object service) {
            this.service = service;
        }
    
        public void setPriority(Integer priority) {
            this.priority = priority;
        }
    
        public void setPort(Integer port) {
            this.port = port;
        }
    
        public void setIpTransfer(ThriftServerIpTransfer ipTransfer) {
            this.ipTransfer = ipTransfer;
        }
    
        public void setAddressReporter(ThriftServerAddressReporter addressReporter) {
            this.addressReporter = addressReporter;
        }
    
        public void setConfigPath(String configPath) {
            this.configPath = configPath;
        }
    }
    View Code

          thrift server address注册到zk   

    package com.wy.thriftpool.commzkpool.support.impl;
    
    import org.apache.curator.framework.CuratorFramework;
    import org.apache.curator.framework.imps.CuratorFrameworkState;
    import org.apache.zookeeper.CreateMode;
    
    import com.wy.thriftpool.commzkpool.support.ThriftServerAddressReporter;
    
    /**
     * thrift server address注册到zk
     * 
     * @author wy
     *
     */
    public class DynamicAddressReporter implements ThriftServerAddressReporter {
    
        private CuratorFramework zookeeper;
    
        public DynamicAddressReporter() {
        }
    
        public DynamicAddressReporter(CuratorFramework zookeeper) {
            this.zookeeper = zookeeper;
        }
    
        public void setZookeeper(CuratorFramework zookeeper) {
            this.zookeeper = zookeeper;
        }
    
        @Override
        public void report(String service, String address) throws Exception {
            if (zookeeper.getState() == CuratorFrameworkState.LATENT) {
                zookeeper.start();
                zookeeper.newNamespaceAwareEnsurePath(service);
            }
            zookeeper.create().creatingParentsIfNeeded()
                    .withMode(CreateMode.EPHEMERAL_SEQUENTIAL)
                    .forPath(service + "/i_", address.getBytes("utf-8"));
        }
    
        public void close() {
            zookeeper.close();
        }
    
    }
    View Code

          。。。

          spring配置文件

    <!-- zookeeper -->
        <bean id="thriftZookeeper" class="com.wy.thriftpool.commzkpool.zookeeper.ZookeeperFactory" destroy-method="close">
            <property name="connectString" value="127.0.0.1:2181"></property>
            <property name="namespace" value="thrift/thrift-service"></property>
        </bean>
        
        <bean id="serviceAddressReporter" class="com.wy.thriftpool.commzkpool.support.impl.DynamicAddressReporter" destroy-method="close">
            <property name="zookeeper" ref="thriftZookeeper"></property>
        </bean>
        
        <bean id="userService" class="com.wy.thrift.service.UserServiceImpl"/>
        
        <bean class="com.wy.thriftpool.commzkpool.ThriftServiceServerFactory" destroy-method="close">
            <property name="service" ref="userService"></property>
            <property name="configPath" value="UserServiceImpl"></property>
            <property name="port" value="9090"></property>
            <property name="addressReporter" ref="serviceAddressReporter"></property>
        </bean>
    View Code

       2、服务调用端

          连接池实现  

         杯了个具,为啥就不能提交。代码在评论中。

       连接池工厂,负责与Thrift server通信 

    package com.wy.thriftpool.commzkconnpool;
    
    import java.net.InetSocketAddress;
    
    import org.apache.commons.pool.PoolableObjectFactory;
    import org.apache.thrift.transport.TSocket;
    import org.apache.thrift.transport.TTransport;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    import com.wy.thriftpool.commzkpool.support.ThriftServerAddressProvider;
    
    /**
     * 连接池工厂,负责与Thrift server通信
     * 
     * @author wy
     *
     */
    public class ThriftPoolFactory implements PoolableObjectFactory<TTransport> {
        private final Logger logger = LoggerFactory.getLogger(getClass());
        // 超时设置 
        public int timeOut;
        
        private final ThriftServerAddressProvider addressProvider;
        private PoolOperationCallBack callback;
        
        public ThriftPoolFactory(ThriftServerAddressProvider addressProvider, PoolOperationCallBack callback) {
            super();
            this.addressProvider = addressProvider;
            this.callback = callback;
        }
        
        public ThriftPoolFactory(ThriftServerAddressProvider addressProvider, PoolOperationCallBack callback, int timeOut) {
            super();
            this.addressProvider = addressProvider;
            this.callback = callback;
            this.timeOut = timeOut;
        }
    
        /**
         * 创建对象
         */
        @Override
        public TTransport makeObject() throws Exception {
            try {
                InetSocketAddress address = addressProvider.selector();
                TTransport transport = new TSocket(address.getHostName(), address.getPort(), this.timeOut);
                transport.open();
                if (callback != null) {
                    callback.make(transport);
                }
                return transport;
            } catch (Exception e) {
                logger.error("creat transport error:", e);
                throw new RuntimeException(e);
            }
        }
    
        /**
         * 销毁对象
         */
        @Override
        public void destroyObject(TTransport transport) throws Exception {
            if (transport != null && transport.isOpen()) {
                transport.close();
            }
        }
    
        /**
         * 检验对象是否可以由pool安全返回
         */
        @Override
        public boolean validateObject(TTransport transport) {
            try {
                if (transport != null && transport instanceof TSocket) {
                    TSocket thriftSocket = (TSocket) transport;
                    if (thriftSocket.isOpen()) {
                        return true;
                    } else {
                        return false;
                    }
                } else {
                    return false;
                }
            } catch (Exception e) {
                return false;
            }
        }
    
        @Override
        public void activateObject(TTransport obj) throws Exception {
            // TODO Auto-generated method stub
            
        }
    
        @Override
        public void passivateObject(TTransport obj) throws Exception {
            // TODO Auto-generated method stub
            
        }
    
        public static interface PoolOperationCallBack {
            // 创建成功是执行
            void make(TTransport transport);
    
            // 销毁之前执行
            void destroy(TTransport transport);
        }
    }
    View Code

          连接池管理 

    package com.wy.thriftpool.commzkconnpool;
    
    import org.apache.thrift.transport.TSocket;  
    import org.slf4j.Logger;  
    import org.slf4j.LoggerFactory;  
    import org.springframework.beans.factory.annotation.Autowired;  
    import org.springframework.stereotype.Service;  
    
      
    /**
     * 连接池管理
     *   
     * @author wy
     *
     */
    @Service  
    public class ConnectionManager {  
        private final Logger logger = LoggerFactory.getLogger(getClass());  
        // 保存local对象 
        ThreadLocal<TSocket> socketThreadSafe = new ThreadLocal<TSocket>();  
      
        // 连接提供池   
        @Autowired  
        private ConnectionProvider connectionProvider;  
      
        public TSocket getSocket() {  
            TSocket socket = null;  
            try {  
                socket = connectionProvider.getConnection();  
                socketThreadSafe.set(socket);  
                return socketThreadSafe.get();  
            } catch (Exception e) {  
                logger.error("error ConnectionManager.invoke()", e);  
            } finally {  
                connectionProvider.returnCon(socket);  
                socketThreadSafe.remove();  
            }  
            return socket;  
        }  
      
    }  
    View Code

          spring配置文件  

    <!-- zookeeper -->
        <bean id="thriftZookeeper" class="com.wy.thriftpool.commzkpool.zookeeper.ZookeeperFactory" destroy-method="close">
            <property name="connectString" value="127.0.0.1:2181" />
            <property name="namespace" value="thrift/thrift-service" />
        </bean>
        <bean id="connectionProvider" class="com.wy.thriftpool.commzkconnpool.impl.ConnectionProviderImpl">
            <property name="maxActive" value="10" />
            <property name="maxIdle" value="10" />
            <property name="conTimeOut" value="2000" />
            <property name="testOnBorrow" value="true" />
            <property name="testOnReturn" value="true" />
            <property name="testWhileIdle" value="true" />
            
            <property name="addressProvider">
                <bean class="com.wy.thriftpool.commzkpool.support.impl.DynamicAddressProvider">
                    <property name="configPath" value="UserServiceImpl" />
                    <property name="zookeeper" ref="thriftZookeeper" />
                </bean>
            </property>
        </bean>
    View Code

    参考:http://www.cnblogs.com/mumuxinfei/p/3876187.html

    由于本人经验有限,文章中难免会有错误,请浏览文章的您指正或有不同的观点共同探讨!

  • 相关阅读:
    SQL Server -- 数据收缩详解
    查看SQL数据库表大小
    drop、truncate和delete的区别
    【汇总】Windows linux 敏感目录 路径汇总——主要是主机配置文件、web配置文件
    BFS_拓扑排序 使用图遍历思想也是OK的 虽然代码多了点
    深度森林原理及实现——原来是借鉴了残差网络和highway的思想,将其用于树类算法
    BFS——单词接龙,这种题一定要当心环路
    BFS——克隆图,发现直接copy会出现某些环路的边会丢失,还是要先copy节点,再copy边
    双指针——最接近的三数之和,细节处理还是很关键的
    双指针——三角形计数,就是一些细节处理要严谨,否则还是容易出错
  • 原文地址:https://www.cnblogs.com/exceptioneye/p/4966645.html
Copyright © 2011-2022 走看看