本文不讨论thrift是否有发展前途,重点在于common-pool2的使用。
背景说明:最近在维护公司的一个老项目,项目目的是使公司内部产品和购买的产品进行交互,主要功能有导入用户、岗位、权限等。由于购买的产品有缓存设置,所以我们无法使用数据库导入的方式进行数据修改,而必须使用它内部的方法进行处理。公司选用thrift进行远程接口调用,我们在购买的产品中增加一个jar包,部署thrift服务端,接受客户端请求后再调用内部接口函数完成操作。
在接手这个项目后,我发现代码中维护一个org.apache.thrift.transport.TSocket长连接,所有的线程都使用这个连接进行传输数据。我就想,所有线程共用一个TSocket客户端,并发高时不会出错么?我们直接拿代码测试一下(thrift版本为0.13.0)
服务端代码:
package com.zhi.demo.server; import org.apache.thrift.TProcessor; import org.apache.thrift.protocol.TBinaryProtocol; import org.apache.thrift.server.TServer; import org.apache.thrift.server.TThreadPoolServer; import org.apache.thrift.transport.TServerSocket; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.zhi.demo.api.HelloWorld; /** * Thrift服务端示例 * * @author 张远志 * @since 2020年5月27日15:53:29 * */ public class ThriftServer { private final static Logger logger = LoggerFactory.getLogger(ThriftServer.class); @SuppressWarnings({ "rawtypes", "unchecked" }) public static void main(String[] args) { try { TServerSocket serverTransport = new TServerSocket(9090); TBinaryProtocol.Factory proFactory = new TBinaryProtocol.Factory(); TProcessor processor = new HelloWorld.Processor(new HelloWorldImpl()); TThreadPoolServer.Args serverArgs = new TThreadPoolServer.Args(serverTransport); serverArgs.processor(processor); serverArgs.protocolFactory(proFactory); serverArgs.maxWorkerThreads(100); logger.info("Thrift服务端启动,监听端口9090"); TServer server = new TThreadPoolServer(serverArgs); server.serve(); } catch (Throwable e) { logger.error("RPC服务报错", e); } } }
客户端测试代码:
package com.zhi.demo.client; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import org.apache.thrift.TException; import org.apache.thrift.protocol.TBinaryProtocol; import org.apache.thrift.transport.TSocket; import org.apache.thrift.transport.TTransport; import org.apache.thrift.transport.TTransportException; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.MethodOrderer; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.TestInstance; import org.junit.jupiter.api.TestInstance.Lifecycle; import org.junit.jupiter.api.TestMethodOrder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.zhi.demo.api.HelloWorld; /** * TSocket测试<br> * * @author 张远志 * @since 2020年5月30日16:16:47 * */ @TestInstance(Lifecycle.PER_CLASS) @TestMethodOrder(MethodOrderer.OrderAnnotation.class) public class TSocketTest { private final static Logger logger = LoggerFactory.getLogger(TSocketTest.class); private int jobCount = 10; private ExecutorService executor; /** * 初始化 */ @BeforeAll public void init() { executor = Executors.newFixedThreadPool(jobCount); } @Test public void test() throws InterruptedException, TTransportException { CountDownLatch latch = new CountDownLatch(jobCount); TTransport transport = new TSocket("127.0.0.1", 9090); transport.open(); for (int i = 0; i < jobCount; i++) { executor.execute(new Job(latch, transport)); } latch.await(); transport.close(); } class Job implements Runnable { CountDownLatch latch; TTransport transport; public Job(CountDownLatch latch, TTransport transport) { this.latch = latch; this.transport = transport; } @Override public void run() { try { HelloWorld.Client client = new HelloWorld.Client(new TBinaryProtocol(transport)); client.sayHello("张三"); } catch (TException e) { logger.error("调用RPC服务报错", e); } latch.countDown(); } } }
果然,几个线程一运行,thrift服务端马上出错
21:48:51.553 [pool-1-thread-2] ERROR org.apache.thrift.server.TThreadPoolServer - Thrift Error occurred during processing of message. org.apache.thrift.protocol.TProtocolException: Negative length: -2147418111 at org.apache.thrift.protocol.TBinaryProtocol.checkStringReadLength(TBinaryProtocol.java:434) at org.apache.thrift.protocol.TBinaryProtocol.readString(TBinaryProtocol.java:396) at org.apache.thrift.protocol.TBinaryProtocol.readMessageBegin(TBinaryProtocol.java:249) at org.apache.thrift.TBaseProcessor.process(TBaseProcessor.java:27) at org.apache.thrift.server.TThreadPoolServer$WorkerProcess.run(TThreadPoolServer.java:313) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748)
为了解决这个问题,我们必须为每个线程创建一个TSocket连接,但问题又来了,如果并发比较高,对服务端的压力就增大了,而且创建连接也需要时间,有没有办法可以让TSocket池化,需要时从池中取,用完后放会池中,我想到了common-pool2。为了实现了池化功能,我百度了一些资料,并且参阅了jedis和实现,总算实现了TSocket池化操作。
第一步:改造TSocket,不让直接关闭,而是归还到pool中
package com.zhi.thrift.pool; import org.apache.commons.pool2.ObjectPool; import org.apache.thrift.transport.TSocket; import org.apache.thrift.transport.TTransportException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * 可池化的TSocket连接 * * @author 张远志 * @since 2020年5月27日20:29:39 * */ public class PoolableTSocket extends TSocket { private final Logger logger = LoggerFactory.getLogger(this.getClass()); private ObjectPool<PoolableTSocket> pool; private boolean broken = false; public PoolableTSocket(String host, int port, int timeout) { super(host, port, timeout); } public void setPool(ObjectPool<PoolableTSocket> pool) { this.pool = pool; } /** * 重写close方法,如果使用了池,则不会直接关闭,而是归还到池中 */ @Override public void close() { logger.trace("{}PoolableTSocket对象,{}", pool == null ? "关闭" : (isBroken() ? "销毁" : "归还"), this); if (pool == null) { super.close(); } else if (isBroken()) { try { pool.invalidateObject(this); } catch (Exception e) { throw new RuntimeException(e); } } else { try { pool.returnObject(this); } catch (Exception e) { throw new RuntimeException(e); } } } public boolean isBroken() { return broken; } /** * 真正的关闭 */ protected void reallyClose() { super.close(); } @Override public void flush() throws TTransportException { try { super.flush(); } catch (TTransportException e) { broken = true; throw e; } } }
第二步:创建池化工厂
package com.zhi.thrift.pool; import org.apache.commons.pool2.BasePooledObjectFactory; import org.apache.commons.pool2.ObjectPool; import org.apache.commons.pool2.PooledObject; import org.apache.commons.pool2.impl.DefaultPooledObject; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * PoolableTSocket连接池 * * @author 张远志 * @since 2020年5月27日17:12:46 * */ public class PooledTSocketFactory extends BasePooledObjectFactory<PoolableTSocket> { private final Logger logger = LoggerFactory.getLogger(this.getClass()); private volatile ObjectPool<PoolableTSocket> pool; private String host; private int port; private int timeout; public PooledTSocketFactory(String host, int port, int timeout) { this.host = host; this.port = port; this.timeout = timeout; } @Override public PoolableTSocket create() throws Exception { PoolableTSocket socket = new PoolableTSocket(host, port, timeout); socket.open(); socket.setPool(pool); // 保证连接创建成功后才对pool赋值 logger.trace("成功创建PoolableTSocket对象,{},本地端口:{}", socket, socket.getSocket().getLocalPort()); return socket; } @Override public PooledObject<PoolableTSocket> wrap(PoolableTSocket socket) { return new DefaultPooledObject<PoolableTSocket>(socket); } @Override public void destroyObject(PooledObject<PoolableTSocket> p) throws Exception { PoolableTSocket socket = p.getObject(); logger.trace("销毁PoolableTSocket对象,{}", socket); if (socket.isOpen()) { socket.reallyClose(); } p.markAbandoned(); } public synchronized void setPool(final ObjectPool<PoolableTSocket> pool) { if (null != this.pool && pool != this.pool) { try { this.pool.close(); } catch (final Exception e) { } } this.pool = pool; } }
第三步:创建一个池
package com.zhi.thrift.pool; import java.util.NoSuchElementException; import org.apache.commons.pool2.ObjectPool; import org.apache.commons.pool2.impl.GenericObjectPool; /** * PoolableTSocket连接池 * * @author 张远志 * @since 2020年5月30日01:35:02 * */ public class TSocketPool implements ObjectPool<PoolableTSocket> { private final GenericObjectPool<PoolableTSocket> pool; public TSocketPool() { this("127.0.0.1", 9090, 2000); } public TSocketPool(String host, int port, int timeout) { this(new TSocketPoolConfig(), host, port, timeout); } public TSocketPool(TSocketPoolConfig config, String host, int port, int timeout) { PooledTSocketFactory factory = new PooledTSocketFactory(host, port, timeout); pool = new GenericObjectPool<>(factory, config); factory.setPool(pool); } public GenericObjectPool<PoolableTSocket> getPool() { return pool; } @Override public void addObject() throws Exception, IllegalStateException, UnsupportedOperationException { pool.addObject(); } @Override public PoolableTSocket borrowObject() throws Exception, NoSuchElementException, IllegalStateException { return pool.borrowObject(); } @Override public void clear() throws Exception, UnsupportedOperationException { pool.clear(); } @Override public void close() { pool.close(); } @Override public int getNumActive() { return pool.getNumActive(); } @Override public int getNumIdle() { return pool.getNumIdle(); } @Override public void invalidateObject(PoolableTSocket obj) throws Exception { pool.invalidateObject(obj); } @Override public void returnObject(PoolableTSocket obj) throws Exception { pool.returnObject(obj); } }
第四步:定义自己的池配置
package com.zhi.thrift.pool; import org.apache.commons.pool2.impl.GenericObjectPoolConfig; /** * PoolableTSocket连接池参数配置,默认参数配置参照JedisPoolConfig * * @author 张远志 * @since 2020年5月27日21:08:40 * */ public class TSocketPoolConfig extends GenericObjectPoolConfig<PoolableTSocket> { public TSocketPoolConfig() { setMaxTotal(20); // 不能超过thrift服务端线程池设置 setTestWhileIdle(true); setMinEvictableIdleTimeMillis(60000); setTimeBetweenEvictionRunsMillis(30000); setNumTestsPerEvictionRun(-1); } }
简单四步就完成了TSocket的池化改造,现在我们就可以验证功能了
package com.zhi.demo.client; import java.util.Random; import org.apache.thrift.protocol.TBinaryProtocol; import org.apache.thrift.protocol.TProtocol; import org.junit.jupiter.api.MethodOrderer; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.TestInstance; import org.junit.jupiter.api.TestMethodOrder; import org.junit.jupiter.api.TestInstance.Lifecycle; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.zhi.demo.api.HelloWorld; import com.zhi.thrift.pool.PoolableTSocket; import com.zhi.thrift.pool.TSocketPool; /** * Thrift池化连接测试 * * @author 张远志 * @since 2020年5月30日01:32:15 * */ @TestInstance(Lifecycle.PER_CLASS) @TestMethodOrder(MethodOrderer.OrderAnnotation.class) public class MultiClient { private final static Logger logger = LoggerFactory.getLogger(MultiClient.class); private TSocketPool pool; private volatile boolean flag = false; // 用于保证有一个线程已经开启 @Test public void test() throws Exception { pool = new TSocketPool(); int jobCount = 20; for (int i = 0; i < jobCount; i++) { new Job().start(); } while (true) { boolean tem = flag; // 重要,跳出循环不能直接使用flag; int active = pool.getNumActive(), idle = pool.getNumIdle(); logger.info("active={}, idle={}", active, idle); if (tem && active == 0 && idle == pool.getPool().getMinIdle()) { break; } Thread.sleep(1000); }
logger.info("池中资源已完成释放,可以关闭池了"); pool.close(); } class Job extends Thread { @Override public void run() { MultiClient.sleep(new Random().nextInt(5) * 1000); // 让资源申请发生在不同的时间 try (PoolableTSocket socket = pool.borrowObject();) { // 借来的东西务必要还,不然对象一致会被占用,其他线程无法申请 flag = true; TProtocol protocol = new TBinaryProtocol(socket); HelloWorld.Client client = new HelloWorld.Client(protocol); client.sayHello("张三"); MultiClient.sleep(1000); // 模拟占用一段时间的资源 } catch (Exception e) { logger.error("调用Thrift远程接口失败", e); } } } static void sleep(long time) { try { Thread.sleep(time); } catch (InterruptedException e) { } } }
测试结果:
22:22:19.233 [main] INFO com.zhi.demo.client.MultiClient - active=0, idle=0 22:22:20.249 [main] INFO com.zhi.demo.client.MultiClient - active=6, idle=0 22:22:21.249 [main] INFO com.zhi.demo.client.MultiClient - active=5, idle=3 22:22:22.250 [main] INFO com.zhi.demo.client.MultiClient - active=3, idle=5 22:22:23.251 [main] INFO com.zhi.demo.client.MultiClient - active=6, idle=2 22:22:24.252 [main] INFO com.zhi.demo.client.MultiClient - active=0, idle=8 22:22:25.252 [main] INFO com.zhi.demo.client.MultiClient - active=0, idle=8 ... 22:23:48.300 [main] INFO com.zhi.demo.client.MultiClient - active=0, idle=8 22:23:49.301 [main] INFO com.zhi.demo.client.MultiClient - active=0, idle=0 22:23:49.301 [main] INFO com.zhi.demo.client.MultiClient - 池中资源已完成释放,可以关闭池了
效果很好,完美了实现了我的要求。