zoukankan      html  css  js  c++  java
  • 使用common-pool2构建thrift客户端连接池

      本文不讨论thrift是否有发展前途,重点在于common-pool2的使用。

      背景说明:最近在维护公司的一个老项目,项目目的是使公司内部产品和购买的产品进行交互,主要功能有导入用户、岗位、权限等。由于购买的产品有缓存设置,所以我们无法使用数据库导入的方式进行数据修改,而必须使用它内部的方法进行处理。公司选用thrift进行远程接口调用,我们在购买的产品中增加一个jar包,部署thrift服务端,接受客户端请求后再调用内部接口函数完成操作。

      在接手这个项目后,我发现代码中维护一个org.apache.thrift.transport.TSocket长连接,所有的线程都使用这个连接进行传输数据。我就想,所有线程共用一个TSocket客户端,并发高时不会出错么?我们直接拿代码测试一下(thrift版本为0.13.0)

      服务端代码:

    package com.zhi.demo.server;
    
    import org.apache.thrift.TProcessor;
    import org.apache.thrift.protocol.TBinaryProtocol;
    import org.apache.thrift.server.TServer;
    import org.apache.thrift.server.TThreadPoolServer;
    import org.apache.thrift.transport.TServerSocket;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    import com.zhi.demo.api.HelloWorld;
    
    /**
     * Thrift服务端示例
     * 
     * @author 张远志
     * @since 2020年5月27日15:53:29
     *
     */
    public class ThriftServer {
        private final static Logger logger = LoggerFactory.getLogger(ThriftServer.class);
    
        @SuppressWarnings({ "rawtypes", "unchecked" })
        public static void main(String[] args) {
            try {
                TServerSocket serverTransport = new TServerSocket(9090);
                TBinaryProtocol.Factory proFactory = new TBinaryProtocol.Factory();
                TProcessor processor = new HelloWorld.Processor(new HelloWorldImpl());
    
                TThreadPoolServer.Args serverArgs = new TThreadPoolServer.Args(serverTransport);
                serverArgs.processor(processor);
                serverArgs.protocolFactory(proFactory);
                serverArgs.maxWorkerThreads(100);
    
                logger.info("Thrift服务端启动,监听端口9090");
    
                TServer server = new TThreadPoolServer(serverArgs);
                server.serve();
            } catch (Throwable e) {
                logger.error("RPC服务报错", e);
            }
        }
    }

      客户端测试代码:

    package com.zhi.demo.client;
    
    import java.util.concurrent.CountDownLatch;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    
    import org.apache.thrift.TException;
    import org.apache.thrift.protocol.TBinaryProtocol;
    import org.apache.thrift.transport.TSocket;
    import org.apache.thrift.transport.TTransport;
    import org.apache.thrift.transport.TTransportException;
    import org.junit.jupiter.api.BeforeAll;
    import org.junit.jupiter.api.MethodOrderer;
    import org.junit.jupiter.api.Test;
    import org.junit.jupiter.api.TestInstance;
    import org.junit.jupiter.api.TestInstance.Lifecycle;
    import org.junit.jupiter.api.TestMethodOrder;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    import com.zhi.demo.api.HelloWorld;
    
    /**
     * TSocket测试<br>
     * 
     * @author 张远志
     * @since 2020年5月30日16:16:47
     *
     */
    @TestInstance(Lifecycle.PER_CLASS)
    @TestMethodOrder(MethodOrderer.OrderAnnotation.class)
    public class TSocketTest {
        private final static Logger logger = LoggerFactory.getLogger(TSocketTest.class);
        private int jobCount = 10;
        private ExecutorService executor;
    
        /**
         * 初始化
         */
        @BeforeAll
        public void init() {
            executor = Executors.newFixedThreadPool(jobCount);
        }
    
        @Test
        public void test() throws InterruptedException, TTransportException {
            CountDownLatch latch = new CountDownLatch(jobCount);
            TTransport transport = new TSocket("127.0.0.1", 9090);
            transport.open();
            for (int i = 0; i < jobCount; i++) {
                executor.execute(new Job(latch, transport));
            }
            latch.await();
            transport.close();
        }
    
        class Job implements Runnable {
            CountDownLatch latch;
            TTransport transport;
    
            public Job(CountDownLatch latch, TTransport transport) {
                this.latch = latch;
                this.transport = transport;
            }
    
            @Override
            public void run() {
                try {
                    HelloWorld.Client client = new HelloWorld.Client(new TBinaryProtocol(transport));
                    client.sayHello("张三");
                } catch (TException e) {
                    logger.error("调用RPC服务报错", e);
                }
                latch.countDown();
            }
        }
    }

      果然,几个线程一运行,thrift服务端马上出错

    21:48:51.553 [pool-1-thread-2] ERROR org.apache.thrift.server.TThreadPoolServer - Thrift Error occurred during processing of message.
    org.apache.thrift.protocol.TProtocolException: Negative length: -2147418111
    	at org.apache.thrift.protocol.TBinaryProtocol.checkStringReadLength(TBinaryProtocol.java:434)
    	at org.apache.thrift.protocol.TBinaryProtocol.readString(TBinaryProtocol.java:396)
    	at org.apache.thrift.protocol.TBinaryProtocol.readMessageBegin(TBinaryProtocol.java:249)
    	at org.apache.thrift.TBaseProcessor.process(TBaseProcessor.java:27)
    	at org.apache.thrift.server.TThreadPoolServer$WorkerProcess.run(TThreadPoolServer.java:313)
    	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    	at java.lang.Thread.run(Thread.java:748)

      为了解决这个问题,我们必须为每个线程创建一个TSocket连接,但问题又来了,如果并发比较高,对服务端的压力就增大了,而且创建连接也需要时间,有没有办法可以让TSocket池化,需要时从池中取,用完后放会池中,我想到了common-pool2。为了实现了池化功能,我百度了一些资料,并且参阅了jedis和实现,总算实现了TSocket池化操作。

      第一步:改造TSocket,不让直接关闭,而是归还到pool中

    package com.zhi.thrift.pool;
    
    import org.apache.commons.pool2.ObjectPool;
    import org.apache.thrift.transport.TSocket;
    import org.apache.thrift.transport.TTransportException;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    /**
     * 可池化的TSocket连接
     * 
     * @author 张远志
     * @since 2020年5月27日20:29:39
     *
     */
    public class PoolableTSocket extends TSocket {
        private final Logger logger = LoggerFactory.getLogger(this.getClass());
        private ObjectPool<PoolableTSocket> pool;
        private boolean broken = false;
    
        public PoolableTSocket(String host, int port, int timeout) {
            super(host, port, timeout);
        }
    
        public void setPool(ObjectPool<PoolableTSocket> pool) {
            this.pool = pool;
        }
    
        /**
         * 重写close方法,如果使用了池,则不会直接关闭,而是归还到池中
         */
        @Override
        public void close() {
            logger.trace("{}PoolableTSocket对象,{}", pool == null ? "关闭" : (isBroken() ? "销毁" : "归还"), this);
            if (pool == null) {
                super.close();
            } else if (isBroken()) {
                try {
                    pool.invalidateObject(this);
                } catch (Exception e) {
                    throw new RuntimeException(e);
                }
            } else {
                try {
                    pool.returnObject(this);
                } catch (Exception e) {
                    throw new RuntimeException(e);
                }
            }
        }
    
        public boolean isBroken() {
            return broken;
        }
    
        /**
         * 真正的关闭
         */
        protected void reallyClose() {
            super.close();
        }
    
        @Override
        public void flush() throws TTransportException {
            try {
                super.flush();
            } catch (TTransportException e) {
                broken = true;
                throw e;
            }
        }
    }

      第二步:创建池化工厂

    package com.zhi.thrift.pool;
    
    import org.apache.commons.pool2.BasePooledObjectFactory;
    import org.apache.commons.pool2.ObjectPool;
    import org.apache.commons.pool2.PooledObject;
    import org.apache.commons.pool2.impl.DefaultPooledObject;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    /**
     * PoolableTSocket连接池
     * 
     * @author 张远志
     * @since 2020年5月27日17:12:46
     *
     */
    public class PooledTSocketFactory extends BasePooledObjectFactory<PoolableTSocket> {
        private final Logger logger = LoggerFactory.getLogger(this.getClass());
        private volatile ObjectPool<PoolableTSocket> pool;
        private String host;
        private int port;
        private int timeout;
    
        public PooledTSocketFactory(String host, int port, int timeout) {
            this.host = host;
            this.port = port;
            this.timeout = timeout;
        }
    
        @Override
        public PoolableTSocket create() throws Exception {
            PoolableTSocket socket = new PoolableTSocket(host, port, timeout);
            socket.open();
            socket.setPool(pool); // 保证连接创建成功后才对pool赋值
            logger.trace("成功创建PoolableTSocket对象,{},本地端口:{}", socket, socket.getSocket().getLocalPort());
            return socket;
        }
    
        @Override
        public PooledObject<PoolableTSocket> wrap(PoolableTSocket socket) {
            return new DefaultPooledObject<PoolableTSocket>(socket);
        }
    
        @Override
        public void destroyObject(PooledObject<PoolableTSocket> p) throws Exception {
            PoolableTSocket socket = p.getObject();
            logger.trace("销毁PoolableTSocket对象,{}", socket);
            if (socket.isOpen()) {
                socket.reallyClose();
            }
            p.markAbandoned();
        }
    
        public synchronized void setPool(final ObjectPool<PoolableTSocket> pool) {
            if (null != this.pool && pool != this.pool) {
                try {
                    this.pool.close();
                } catch (final Exception e) {
                }
            }
            this.pool = pool;
        }
    
    }

    第三步:创建一个池

    package com.zhi.thrift.pool;
    
    import java.util.NoSuchElementException;
    
    import org.apache.commons.pool2.ObjectPool;
    import org.apache.commons.pool2.impl.GenericObjectPool;
    
    /**
     * PoolableTSocket连接池
     * 
     * @author 张远志
     * @since 2020年5月30日01:35:02
     *
     */
    public class TSocketPool implements ObjectPool<PoolableTSocket> {
        private final GenericObjectPool<PoolableTSocket> pool;
    
        public TSocketPool() {
            this("127.0.0.1", 9090, 2000);
        }
    
        public TSocketPool(String host, int port, int timeout) {
            this(new TSocketPoolConfig(), host, port, timeout);
        }
    
        public TSocketPool(TSocketPoolConfig config, String host, int port, int timeout) {
            PooledTSocketFactory factory = new PooledTSocketFactory(host, port, timeout);
            pool = new GenericObjectPool<>(factory, config);
            factory.setPool(pool);
        }
    
        public GenericObjectPool<PoolableTSocket> getPool() {
            return pool;
        }
    
        @Override
        public void addObject() throws Exception, IllegalStateException, UnsupportedOperationException {
            pool.addObject();
        }
    
        @Override
        public PoolableTSocket borrowObject() throws Exception, NoSuchElementException, IllegalStateException {
            return pool.borrowObject();
        }
    
        @Override
        public void clear() throws Exception, UnsupportedOperationException {
            pool.clear();
        }
    
        @Override
        public void close() {
            pool.close();
        }
    
        @Override
        public int getNumActive() {
            return pool.getNumActive();
        }
    
        @Override
        public int getNumIdle() {
            return pool.getNumIdle();
        }
    
        @Override
        public void invalidateObject(PoolableTSocket obj) throws Exception {
            pool.invalidateObject(obj);
        }
    
        @Override
        public void returnObject(PoolableTSocket obj) throws Exception {
            pool.returnObject(obj);
        }
    }

      第四步:定义自己的池配置

    package com.zhi.thrift.pool;
    
    import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
    
    /**
     * PoolableTSocket连接池参数配置,默认参数配置参照JedisPoolConfig
     * 
     * @author 张远志
     * @since 2020年5月27日21:08:40
     *
     */
    public class TSocketPoolConfig extends GenericObjectPoolConfig<PoolableTSocket> {
        public TSocketPoolConfig() {
            setMaxTotal(20); // 不能超过thrift服务端线程池设置
            setTestWhileIdle(true);
            setMinEvictableIdleTimeMillis(60000);
            setTimeBetweenEvictionRunsMillis(30000);
            setNumTestsPerEvictionRun(-1);
        }
    }

      简单四步就完成了TSocket的池化改造,现在我们就可以验证功能了

    package com.zhi.demo.client;
    
    import java.util.Random;
    
    import org.apache.thrift.protocol.TBinaryProtocol;
    import org.apache.thrift.protocol.TProtocol;
    import org.junit.jupiter.api.MethodOrderer;
    import org.junit.jupiter.api.Test;
    import org.junit.jupiter.api.TestInstance;
    import org.junit.jupiter.api.TestMethodOrder;
    import org.junit.jupiter.api.TestInstance.Lifecycle;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    import com.zhi.demo.api.HelloWorld;
    import com.zhi.thrift.pool.PoolableTSocket;
    import com.zhi.thrift.pool.TSocketPool;
    
    /**
     * Thrift池化连接测试
     * 
     * @author 张远志
     * @since 2020年5月30日01:32:15
     *
     */
    @TestInstance(Lifecycle.PER_CLASS)
    @TestMethodOrder(MethodOrderer.OrderAnnotation.class)
    public class MultiClient {
        private final static Logger logger = LoggerFactory.getLogger(MultiClient.class);
        private TSocketPool pool;
        private volatile boolean flag = false; // 用于保证有一个线程已经开启
    
        @Test
        public void test() throws Exception {
            pool = new TSocketPool();
    
            int jobCount = 20;
            for (int i = 0; i < jobCount; i++) {
                new Job().start();
            }
            while (true) {
                boolean tem = flag; // 重要,跳出循环不能直接使用flag;
                int active = pool.getNumActive(), idle = pool.getNumIdle();
                logger.info("active={}, idle={}", active, idle);
                if (tem && active == 0 && idle == pool.getPool().getMinIdle()) {
                    break;
                }
                Thread.sleep(1000);
            }
    logger.info("池中资源已完成释放,可以关闭池了"); pool.close(); }
    class Job extends Thread { @Override public void run() { MultiClient.sleep(new Random().nextInt(5) * 1000); // 让资源申请发生在不同的时间 try (PoolableTSocket socket = pool.borrowObject();) { // 借来的东西务必要还,不然对象一致会被占用,其他线程无法申请 flag = true; TProtocol protocol = new TBinaryProtocol(socket); HelloWorld.Client client = new HelloWorld.Client(protocol); client.sayHello("张三"); MultiClient.sleep(1000); // 模拟占用一段时间的资源 } catch (Exception e) { logger.error("调用Thrift远程接口失败", e); } } } static void sleep(long time) { try { Thread.sleep(time); } catch (InterruptedException e) { } } }

      测试结果:

    22:22:19.233 [main] INFO com.zhi.demo.client.MultiClient - active=0, idle=0
    22:22:20.249 [main] INFO com.zhi.demo.client.MultiClient - active=6, idle=0
    22:22:21.249 [main] INFO com.zhi.demo.client.MultiClient - active=5, idle=3
    22:22:22.250 [main] INFO com.zhi.demo.client.MultiClient - active=3, idle=5
    22:22:23.251 [main] INFO com.zhi.demo.client.MultiClient - active=6, idle=2
    22:22:24.252 [main] INFO com.zhi.demo.client.MultiClient - active=0, idle=8
    22:22:25.252 [main] INFO com.zhi.demo.client.MultiClient - active=0, idle=8
    ...
    22:23:48.300 [main] INFO com.zhi.demo.client.MultiClient - active=0, idle=8
    22:23:49.301 [main] INFO com.zhi.demo.client.MultiClient - active=0, idle=0
    22:23:49.301 [main] INFO com.zhi.demo.client.MultiClient - 池中资源已完成释放,可以关闭池了

      效果很好,完美了实现了我的要求。

  • 相关阅读:
    kinect 2(ubuntu16.04)
    编译、执行错误合辑
    Qt5学习笔记(基础)
    内核格式化(C++)
    (3)视觉里程计 Visual Odometry
    字符串搜索 find()
    (2)特征点匹配,并求旋转矩阵R和位移向量t
    27. Remove Element
    快看漫画面试经历
    面试题40:最小的 k 个数
  • 原文地址:https://www.cnblogs.com/zhi-leaf/p/12995113.html
Copyright © 2011-2022 走看看