zoukankan      html  css  js  c++  java
  • Spring Boot整合Thrift RPC

    【转载】 https://coder4.com/homs_online/spring-boot/sb-thrift.html

    Spring Boot自动配置简介

    在介绍RPC之前,我们先来学习下Spring Boot的自动配置。

    我们前面已经提到:Spring Boot来源于Spring,并且做了众多改进,其中最有用的设计理念是约定优于配置,它通过自动配置功能(大多数开发者平时习惯设置的配置作为默认配置)为开发者快速、准确地构建出标准化的应用。

    以集成MySQL数据库为例,在Spring Boot出现之前,我们要

    1. 配置JDBC驱动依赖
    2. 配置XML文件中数据源
    3. 配置XML中的DataSource Bean
    4. 配置XML中的XXXTemplate Bean
    5. 配置XML中的XXXTransactionManager Bean

    有了Spring Boot的自动配置后,自动配置帮我们生成了各种DataSource、XXXTemplate、XXXTransactionManager,我们所需要做的只有一条,就是激活它

    1. maven中依赖包含自动配置的包
    2. 配置JDBC驱动依赖
    3. yaml文件中定义数据源

    自动配置进行智能检测,只要满足上述3个条件,其他的Bean都会被自动生成并注入到Spring环境中。我们需要使用时只需要@Autowired一下就可以了,是不是非常简单!

    由于篇幅所限,本书不会对自动配置的书写做零起点教学,如果你想了解自动配置的原理,可以参考这篇文章spring boot实战(第十三篇)自动配置原理分析

    在本节的后续部分,我们会以Thrift RPC Server为例,看看自动配置是如何书写的。

    RPC简介

    远程过程调用(remote procedure call或简称RPC),指的是运行于本地(客户端)的程序像调用本地程序一样,直接调用另一台计算机(服务器端)的程序,而程序员无需额外为远程交互做额外的编程。

    RPC极大地简化了分布似乎系统中节点之间网络通信的开发工作量,是微服务架构中的重要组件之一。

    在本书中,我们选用Thrift作为RPC框架。由于篇幅所限,我们不会对Thrift RPC作出详尽的介绍,如果你还不熟悉,可以参考官方的快速入门文档

    Spring Boot整合Thrift RPC服务端

    简要来说,启动一个Thrift RPC的服务端需要如下步骤:

    1. 书写DSL(.thrift文件),定义函数、数据结构等。
    2. 编译并生成桩代码。
    3. 编写Handler(RPC的逻辑入口)。
    4. 基于上述Handler,构造Processor。
    5. 构造Server,Thrift提供了多种服务端供选择,常用的有TThreadPoolServer(多线程服务器)和TNonblockingServer(非阻塞服务器)。
    6. 设置Server的Protocol,类似的,Thrift提供了多种传输协议,最常用的是TBinaryProtocol和TCompactProtocol。
    7. 设置Server的Transport(Factory),用这种方式指定底层的传输协议,常用的有TFramedTransport、TNonBlockingTransport,不同的Transport可以类似Java的IOStreawm方式,相互叠加,以产生更强大的效果。

    上述对Thrift服务器的架构做了简要介绍,如果想更深入了解,可以自行阅读官方源码

    首先,我们来看一下thrift定义(根据上一节的介绍,文件放在lmsia-abc-common包中)

    namespace java com.coder4.lmsia.abc
    
    service lmsiaAbcThrift {
    
        string sayHi()
    }
    

    调用thrift进行编译后,我们也将对应的桩文件放置在lmsia-abc-client下,目录结构可以参见上一节。

    为了更方便的在Spring Boot中集成Thrift服务器,我将相应代码抽取成了公用库lmsia-thrift-server

    
    ├── build.gradle
    ├── gradle
    │   └── wrapper
    │       ├── gradle-wrapper.jar
    │       └── gradle-wrapper.properties
    ├── gradlew
    ├── gradlew.bat
    ├── README.md
    ├── settings.gradle
    └── src
        ├── main
        │   ├── java
        │   │   └── com
        │   │       └── coder4
        │   │           └── lmsia
        │   │               └── thrift
        │   │                   └── server
        │   │                       ├── configuration
        │   │                       │   └── ThriftServerConfiguration.java
        │   │                       └── ThriftServerRunnable.java
        │   └── resources
        │       └── META-INF
        │           └── spring.factories
        └── test
            └── java
    

    简单解析下项目结构: gradle相关: 与前节介绍的类似,只不过这里是单项目功能。 ThriftServerConfiguration: 自动配置,当满足条件后会自动激活,激活后可自动启动Thrift RPC服务。 ThriftServerRunnable: Thrift RPC服务器的构造逻辑、运行线程。 spring.factories: 当我们以类库方式提供自动配置时,需要增加这个spring.factories,让别的项目能"定位到"要检查的自动配置。

    首先,我们来看一下ThriftServerRunnable.java

    
    package com.coder4.lmsia.thrift.server;
    
    import org.apache.thrift.TProcessor;
    import org.apache.thrift.protocol.TBinaryProtocol;
    import org.apache.thrift.protocol.TProtocolFactory;
    import org.apache.thrift.server.TServer;
    import org.apache.thrift.server.TThreadedSelectorServer;
    import org.apache.thrift.transport.TFramedTransport;
    import org.apache.thrift.transport.TNonblockingServerSocket;
    import org.apache.thrift.transport.TNonblockingServerTransport;
    import org.apache.thrift.transport.TTransportException;
    import org.apache.thrift.transport.TTransportFactory;
    
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.SynchronousQueue;
    import java.util.concurrent.ThreadPoolExecutor;
    import java.util.concurrent.TimeUnit;
    
    /**
     * @author coder4
     */
    public class ThriftServerRunnable implements Runnable {
    
        private static final int THRIFT_PORT = 3000;
    
        private static final int THRIFT_TIMEOUT = 5000;
    
        private static final int THRIFT_TCP_BACKLOG = 5000;
    
        private static final int THRIFT_CORE_THREADS = 128;
    
        private static final int THRIFT_MAX_THREADS = 256;
    
        private static final int THRIFT_SELECTOR_THREADS = 16;
    
        private static final TProtocolFactory THRIFT_PROTOCOL_FACTORY = new TBinaryProtocol.Factory();
    
        // 16MB
        private static final int THRIFT_MAX_FRAME_SIZE = 16 * 1024 * 1024;
    
        // 4MB
        private static final int THRIFT_MAX_READ_BUF_SIZE = 4 * 1024 * 1024;
    
        protected ExecutorService threadPool;
    
        protected TServer server;
    
        protected Thread thread;
    
        private TProcessor processor;
    
        private boolean isDestroy = false;
    
        public ThriftServerRunnable(TProcessor processor) {
            this.processor = processor;
        }
    
        public TServer build() throws TTransportException {
            TNonblockingServerSocket.NonblockingAbstractServerSocketArgs socketArgs =
                    new TNonblockingServerSocket.NonblockingAbstractServerSocketArgs();
            socketArgs.port(THRIFT_PORT);
            socketArgs.clientTimeout(THRIFT_TIMEOUT);
            socketArgs.backlog(THRIFT_TCP_BACKLOG);
    
            TNonblockingServerTransport transport = new TNonblockingServerSocket(socketArgs);
    
            threadPool =
                    new ThreadPoolExecutor(THRIFT_CORE_THREADS, THRIFT_MAX_THREADS,
                            60L, TimeUnit.SECONDS,
                            new SynchronousQueue<>());
    
            TTransportFactory transportFactory = new TFramedTransport.Factory(THRIFT_MAX_FRAME_SIZE);
            TThreadedSelectorServer.Args args = new TThreadedSelectorServer.Args(transport)
                    .selectorThreads(THRIFT_SELECTOR_THREADS)
                    .executorService(threadPool)
                    .transportFactory(transportFactory)
                    .inputProtocolFactory(THRIFT_PROTOCOL_FACTORY)
                    .outputProtocolFactory(THRIFT_PROTOCOL_FACTORY)
                    .processor(processor);
    
            args.maxReadBufferBytes = THRIFT_MAX_READ_BUF_SIZE;
    
            return new TThreadedSelectorServer(args);
        }
    
        @Override
        public void run() {
            try {
                server = build();
                server.serve();
            } catch (Exception e) {
                e.printStackTrace();
                throw new RuntimeException("Start Thrift RPC Server Exception");
            }
        }
    
        public void stop() throws Exception {
            threadPool.shutdown();
            server.stop();
        }
    
    }
    

    我们来解释一下:

    • build方法用于构造一个可供运行的Thrift RPC Server
      1. 构造非阻塞Socket,并设置监听端口、超时
      2. 构造非阻塞Transport
      3. 构造线程池,在这里我们的服务器模型是非阻塞线程池RPC服务器。
      4. 构造底层传输协议即TFramedTransport
      5. 构造ThriftServer,并设置前面构造的非阻塞Transport、线程池、协议TBinaryProtocol
    • 整个ThriftServerRunnable类是一个线程Runnablerun,run函数中构造RPC服务,并启动服务(servee)
    • stop服务提供停止服务的方法

    下面我们来看一下自动配置ThriftServerConfiguration.java:

    package com.coder4.lmsia.thrift.server.configuration;
    
    import com.coder4.lmsia.thrift.server.ThriftServerRunnable;
    import org.apache.thrift.TProcessor;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.beans.factory.DisposableBean;
    import org.springframework.beans.factory.InitializingBean;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
    import org.springframework.context.annotation.Configuration;
    
    import java.util.concurrent.TimeUnit;
    
    /**
     * @author coder4
     */
    @Configuration
    @ConditionalOnBean(value = {TProcessor.class})
    public class ThriftServerConfiguration implements InitializingBean, DisposableBean {
    
        private Logger LOG = LoggerFactory.getLogger(ThriftServerConfiguration.class);
    
        private static final int GRACEFUL_SHOWDOWN_SEC = 3;
    
        @Autowired
        private TProcessor processor;
    
        private ThriftServerRunnable thriftServer;
    
        private Thread thread;
    
        @Override
        public void destroy() throws Exception {
            LOG.info("Wait for graceful shutdown on destroy(), {} seconds", GRACEFUL_SHOWDOWN_SEC);
            Thread.sleep(TimeUnit.SECONDS.toMillis(GRACEFUL_SHOWDOWN_SEC));
            LOG.info("Shutdown rpc server.");
            thriftServer.stop();
            thread.join();
        }
    
        @Override
        public void afterPropertiesSet() throws Exception {
            thriftServer = new ThriftServerRunnable(processor);
            thread = new Thread(thriftServer);
            thread.start();
        }
    }
    

    这是我们编写的第一个自动配置,我们稍微详细的解释一下:

    • 启动条件: 仅当服务提供了TProcessor才启用,我们稍后会在lmsia-abc项目中看到,后者封装了RPC的桩入口,提供了TProcessor。
    • InitializingBean: 自动配置实现了InitializingBean,为什么要实现这个接口呢?当这个自动配置被初始化时,所有Autowired的属性被自动注入(即Processor),而前面ThriftServerRunnable中我么已经看到,只有拿到了TProcessor,才能启动RPC服务。因此,我们使用了InitializingBean,它自带了afterPropertiesSet这个回调,会在所有属性被注入完成后,调用这个回调函数。
      • 在这里,我们调用了ThriftServerRunnable实现了Thrift RPC服务器的启动。
    • DisposableBean: 除了InitializingBean,我们还实现了DisposableBean。看名字就可以知道,这是Spring为了服务关闭时清理资源而设计的接口。事实也是如此,当服务关闭时,会依次调用每个自动配置,如果实现了DisposableBean,则回调destroy函数。
      • 在这里,我们先让线程休眠3秒,然后才关闭Thrift RPC服务,这主要是为了Graceful Shutdown而设计的("优雅关闭"),关于这一点,我们会在下一节会做详细讲解。

    最后,我们的自动配置默认是无法被发现的,需要一个配置文件spring.factories:

    org.springframework.boot.autoconfigure.EnableAutoConfiguration=com.coder4.lmsia.thrift.server.configuration.ThriftServerConfiguration
    

    解读完lmsia-thrift-server后,我们看看如何将它整合进lmsia-abc项目中。

    1. 在lmsia-abc-server子项目中的build.gradle中加入:

      compile 'com.github.liheyuan:lmsia-thrift-server:0.0.1'
      
    2. 提供一个TProcessor,如前文所述,这是启用自动配置的必要条件,ThriftProcessorConfiguration: ```java package com.coder4.lmsia.abc.server.configuration;

    import com.coder4.lmsia.abc.thrift.LmsiaAbcThrift; import com.coder4.lmsia.abc.server.thrift.ThriftServerHandler; import org.apache.thrift.TProcessor; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration;

    /**

    • @author coder4 */ @Configuration @ConditionalOnProperty(name = "thriftServer.enabled", matchIfMissing = true) public class ThriftProcessorConfiguration {

      @Bean(name = "thriftProcessor") public TProcessor processor(ThriftServerHandler handler) {

       return new LmsiaAbcThrift.Processor(handler);
      

      }

    }

    
    我们简单解释下:
    * 这也是一个自动配置,仅当配置文件中thriftServer.enabled=true时才启用(不配置默认true)
    * 提供的TProcessor,需要依赖ThriftServerHandler,这个就是Thrift生成的桩函数,项目结构分析中已经提到过,这是RPC服务器的逻辑入口。
    
    怎么样,使用了自动配置后,启动一个Thrift 服务器是不是非常简单?
    
    ## Spring Boot整合Thrift RPC客户端
    
    只有服务端是不行的,还需要有客户端。
    
    类似地,为了方便的生成客户端,我们把代码进行了整理和抽象,放到了[lmsia-thrift-client](https://github.com/liheyuan/lmsia-thrift-client)项目中。
    
    首先看一下项目结构:
    ```shell
    ├── build.gradle
    ├── gradle
    │   └── wrapper
    │       ├── gradle-wrapper.jar
    │       └── gradle-wrapper.properties
    ├── gradlew
    ├── gradlew.bat
    ├── README.md
    ├── settings.gradle
    └── src
        ├── main
        │   ├── java
        │   │   └── com
        │   │       └── coder4
        │   │           └── lmsia
        │   │               └── thrift
        │   │                   └── client
        │   │                       ├── ThriftClient.java
        │   │                       ├── AbstractThriftClient.java
        │   │                       ├── EasyThriftClient.java
        │   │                       ├── K8ServiceThriftClient.java
        │   │                       ├── K8ServiceKey.java
        │   │                       ├── builder
        │   │                       │   ├── EasyThriftClientBuilder.java
        │   │                       │   └── K8ServiceThriftClientBuilder.java
        │   │                       ├── func
        │   │                       │   ├── ThriftCallFunc.java
        │   │                       │   └── ThriftExecFunc.java
        │   │                       ├── pool
        │   │                       │   ├── TTransportPoolFactory.java
        │   │                       │   └── TTransportPool.java
        │   │                       └── utils
        │   │                           └── ThriftUrlStr.java
        │   └── resources
        └── test
            └── java
                └── LibraryTest.java
    

    解释下项目结构:

    • gradle相关的与之前类似,不再赘述
    • ThriftClient相关,定义了Thrift的客户端
      1. ThriftClient 抽象了客户端的接口
      2. AbstractThriftClient 实现了除连接外的Thrift Client操作
      3. EasyThriftClient 使用IP和端口直连的Thrift Client
      4. K8ServiceThriftClient 使用Kubernetes服务名字(根据微服务自动发现一节中的介绍,服务名字实际也是Host)和端口的Thrift Client,并内置了连接池。
    • func 函数编程工具类
    • builder 方便快速构造上述两种Thrift Client
    • pool 客户端连接池

    本小节主要对IP、端口直连的客户端即EasyThriftClient进行介绍。关于支持服务自动发现以及连接池功能的K8ServiceThriftClient,将在下一节进行介绍。

    先看一下接口定义,ThriftClient:

    package com.coder4.lmsia.thrift.client;
    
    import com.coder4.lmsia.thrift.client.func.ThriftCallFunc;
    import com.coder4.lmsia.thrift.client.func.ThriftExecFunc;
    import org.apache.thrift.TServiceClient;
    
    import java.util.concurrent.Future;
    
    /**
     * @author coder4
     */
    public interface ThriftClient<TCLIENT extends TServiceClient> {
    
        /**
         * sync call with return value
         * @param tcall thrift rpc client call
         * @param <TRET> return type
         * @return
         */
        <TRET> TRET call(ThriftCallFunc<TCLIENT, TRET> tcall);
    
        /**
         * sync call without return value
         * @param texec thrift rpc client
         */
        void exec(ThriftExecFunc<TCLIENT> texec);
    
        /**
         * async call with return value
         * @param tcall thrift rpc client call
         * @param <TRET>
         * @return
         */
        <TRET> Future<TRET> asyncCall(ThriftCallFunc<TCLIENT, TRET> tcall);
    
    
        /**
         * asnyc call without return value
         * @param texec thrift rpc client call
         */
        <TRET> Future<?> asyncExec(ThriftExecFunc<TCLIENT> texec);
    
    }
    

    这里需要解释一下,上述实际分成了两大类:

    • exec 无返回值的rpc调用
    • call 有返回值的调用

    这里使用了Java 8的函数式编程进行抽象。如果不太熟悉的朋友,可以自行查阅相关资料。

    在函数式编程的帮助下,我们可以将每一个rpc调用都分为同步和异步两种,异步的调用会返回一个Future。

    再来看一下AbstractThriftClient:

    /**
     * @(#)AbstractThriftClient.java, Aug 01, 2017.
     * <p>
     * Copyright 2017 fenbi.com. All rights reserved.
     * FENBI.COM PROPRIETARY/CONFIDENTIAL. Use is subject to license terms.
     */
    package com.coder4.lmsia.thrift.client;
    
    import com.coder4.lmsia.thrift.client.func.ThriftCallFunc;
    import com.coder4.lmsia.thrift.client.func.ThriftExecFunc;
    import org.apache.thrift.TServiceClient;
    import org.apache.thrift.TServiceClientFactory;
    import org.apache.thrift.protocol.TBinaryProtocol;
    import org.apache.thrift.protocol.TProtocol;
    import org.apache.thrift.transport.TTransport;
    
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Future;
    import java.util.concurrent.LinkedBlockingDeque;
    import java.util.concurrent.ThreadPoolExecutor;
    import java.util.concurrent.TimeUnit;
    
    /**
     * @author coder4
     */
    public abstract class AbstractThriftClient<TCLIENT extends TServiceClient> implements ThriftClient<TCLIENT> {
    
        protected static final int THRIFT_CLIENT_DEFAULT_TIMEOUT = 5000;
    
        protected static final int THRIFT_CLIENT_DEFAULT_MAX_FRAME_SIZE = 1024 * 1024 * 16;
    
        private Class<?> thriftClass;
    
        private static final TBinaryProtocol.Factory protocolFactory = new TBinaryProtocol.Factory();
    
        private TServiceClientFactory<TCLIENT> clientFactory;
    
        // For async call
        private ExecutorService threadPool;
    
        public void init() {
            try {
                clientFactory = getThriftClientFactoryClass().newInstance();
            } catch (Exception e) {
                throw new RuntimeException();
            }
    
            if (!check()) {
                throw new RuntimeException("Client config failed check!");
            }
    
            threadPool = new ThreadPoolExecutor(
                    10, 100, 0,
                    TimeUnit.MICROSECONDS, new LinkedBlockingDeque<>());
        }
    
        protected boolean check() {
            if (thriftClass == null) {
                return false;
            }
            return true;
        }
    
        @Override
        public <TRET> Future<TRET> asyncCall(ThriftCallFunc<TCLIENT, TRET> tcall) {
            return threadPool.submit(() -> this.call(tcall));
        }
    
        @Override
        public <TRET> Future<?> asyncExec(ThriftExecFunc<TCLIENT> texec) {
            return threadPool.submit(() -> this.exec(texec));
        }
    
        protected TCLIENT createClient(TTransport transport) throws Exception {
            // Step 1: get TProtocol
            TProtocol protocol = protocolFactory.getProtocol(transport);
    
            // Step 2: get client
            return clientFactory.getClient(protocol);
        }
    
        private Class<TServiceClientFactory<TCLIENT>> getThriftClientFactoryClass() {
            Class<TCLIENT> clientClazz = getThriftClientClass();
            if (clientClazz == null) {
                return null;
            }
            for (Class<?> clazz : clientClazz.getDeclaredClasses()) {
                if (TServiceClientFactory.class.isAssignableFrom(clazz)) {
                    return (Class<TServiceClientFactory<TCLIENT>>) clazz;
                }
            }
            return null;
        }
    
        private Class<TCLIENT> getThriftClientClass() {
            for (Class<?> clazz : thriftClass.getDeclaredClasses()) {
                if (TServiceClient.class.isAssignableFrom(clazz)) {
                    return (Class<TCLIENT>) clazz;
                }
            }
            return null;
        }
    
        public void setThriftClass(Class<?> thriftClass) {
            this.thriftClass = thriftClass;
        }
    }
    

    上述抽象的Thrift客户端实现了如下功能:

    1. 客户端线程池,这里主要是为异步调用准备的,与之前构造的服务端的线程池是完全不同的。
      • asyncCall和asyncExec使用了线程池来完成异步调用
    2. thriftClass 存储了Thrift的桩代码了类,不同业务生成的ThriftClass不一样,所以这里存储了class。
    3. createClient提供了共用函数,传入一个transport,即可构造生成一个Thrift Client,特别注意的是,这里设定的通信协议为TBinaryProtocol,必须与服务端保持一致,否则无法成功通信。

    由于call和exec与连接实现较为相关,因此并未在这一层中实现,最后我们来看一下EasyThriftClient:

    package com.coder4.lmsia.thrift.client;
    
    import com.coder4.lmsia.thrift.client.func.ThriftCallFunc;
    import com.coder4.lmsia.thrift.client.func.ThriftExecFunc;
    import org.apache.thrift.TServiceClient;
    import org.apache.thrift.transport.TFramedTransport;
    import org.apache.thrift.transport.TSocket;
    import org.apache.thrift.transport.TTransport;
    
    /**
     * @author coder4
     */
    public class EasyThriftClient<TCLIENT extends TServiceClient> extends AbstractThriftClient<TCLIENT> {
    
        private static final int EASY_THRIFT_BUFFER_SIZE = 1024 * 16;
    
        protected String thriftServerHost;
    
        protected int thriftServerPort;
    
        @Override
        protected boolean check() {
            if (thriftServerHost == null || thriftServerHost.isEmpty()) {
                return false;
            }
            if (thriftServerPort <= 0) {
                return false;
            }
            return super.check();
        }
    
        private TTransport borrowTransport() throws Exception {
            TSocket socket = new TSocket(thriftServerHost, thriftServerPort, THRIFT_CLIENT_DEFAULT_TIMEOUT);
    
            TTransport transport = new TFramedTransport(
                    socket, THRIFT_CLIENT_DEFAULT_MAX_FRAME_SIZE);
    
            transport.open();
    
            return transport;
        }
    
        private void returnTransport(TTransport transport) {
            if (transport != null && transport.isOpen()) {
                transport.close();
            }
        }
    
        private void returnBrokenTransport(TTransport transport) {
            if (transport != null && transport.isOpen()) {
                transport.close();
            }
        }
    
        @Override
        public <TRET> TRET call(ThriftCallFunc<TCLIENT, TRET> tcall) {
    
            // Step 1: get TTransport
            TTransport tpt = null;
            try {
                tpt = borrowTransport();
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
    
            // Step 2: get client & call
            try {
                TCLIENT tcli = createClient(tpt);
                TRET ret = tcall.call(tcli);
                returnTransport(tpt);
                return ret;
            } catch (Exception e) {
                returnBrokenTransport(tpt);
                throw new RuntimeException(e);
            }
        }
    
        @Override
        public void exec(ThriftExecFunc<TCLIENT> texec) {
            // Step 1: get TTransport
            TTransport tpt = null;
            try {
                tpt = borrowTransport();
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
    
            // Step 2: get client & exec
            try {
                TCLIENT tcli = createClient(tpt);
                texec.exec(tcli);
                returnTransport(tpt);
            } catch (Exception e) {
                returnBrokenTransport(tpt);
                throw new RuntimeException(e);
            }
        }
    
        public String getThriftServerHost() {
            return thriftServerHost;
        }
    
        public void setThriftServerHost(String thriftServerHost) {
            this.thriftServerHost = thriftServerHost;
        }
    
        public int getThriftServerPort() {
            return thriftServerPort;
        }
    
        public void setThriftServerPort(int thriftServerPort) {
            this.thriftServerPort = thriftServerPort;
        }
    

    简单解释下上述代码

    1. 需要外部传入RPC服务器的主机名和端口 thriftServerHost和thriftServerPort
    2. borrowTransport完成Transport(Thrift中类似Socket的抽象) 的构造,注意这里要使用TFramedTransport,与之前服务端的构造保持一致。
    3. returnTransport关闭Transport
    4. returnBrokenTransport关闭出异常的Transport
    5. call和exec 在拿到Transport后,使用函数式编程的方式,完成rpc调用,如果有异常则关闭连接。

    最后我们来看一下对应的Builder,EasyThriftClientBuilder:

    package com.coder4.lmsia.thrift.client.builder;
    
    import com.coder4.lmsia.thrift.client.EasyThriftClient;
    import org.apache.thrift.TServiceClient;
    
    /**
     * @author coder4
     */
    public class EasyThriftClientBuilder<TCLIENT extends TServiceClient> {
    
        private final EasyThriftClient<TCLIENT> client = new EasyThriftClient<>();
    
        protected EasyThriftClient<TCLIENT> build() {
            client.init();
            return client;
        }
    
        protected EasyThriftClientBuilder<TCLIENT> setHost(String host) {
            client.setThriftServerHost(host);
            return this;
        }
    
        protected EasyThriftClientBuilder<TCLIENT> setPort(int port) {
            client.setThriftServerPort(port);
            return this;
        }
    
        protected EasyThriftClientBuilder<TCLIENT> setThriftClass(Class<?> thriftClass) {
            client.setThriftClass(thriftClass);
            return this;
        }
    }
    

    Builder的代码比较简单,就是以链式调用的方式,通过主机和端口,方便地构造一个EasyThriftClient。

    看了EasyThriftClient后下面我们来看一下如何集成到项目中。

    Gradle子项目划分与微服务的代码结构一节中,我们已经提到,将每个微服务的RPC客户端放在xx-client子工程中,现在我们再来回顾下lmsia-abc-client的目录结构。

    ├── build.gradle
    └── src
        ├── main
        │   ├── java
        │   │   └── com
        │   │       └── coder4
        │   │           └── lmsia
        │   │               └── abc
        │   │                   └── client
        │   │                       ├── configuration
        │   │                       │   └── LmsiaAbcThriftClientConfiguration.java
        │   │                       ├── LmsiaAbcEasyThriftClientBuilder.java
        │   │                       └── LmsiaK8ServiceThriftClientBuilder.java
        │   └── resources
        │       └── META-INF
        │           └── spring.factories
        └── test
    

    我们简单介绍一下:

    1. LmsiaAbcThriftClientConfiguration: 客户端自动配置,当激活时,自动生成lmsia-abc对应的RPC服务的客户端。引用者直接@Autowired一下,就可以使用了。
    2. LmsiaAbcEasyThriftClientBuilder: EasyThriftClient构造器,主要是自动配置需要。
    3. spring.factories: 与服务端的自动配置类似,需要在这个文件中指定自动配置的类路径,才能让Spring Boot自动扫描到自动配置。
    4. 其他K8ServiceThriftClient相关的部分,我们将在下一小节进行介绍。

    LmsiaAbcEasyThriftClientBuilder文件:

    package com.coder4.lmsia.abc.client;
    
    import com.coder4.lmsia.abc.thrift.LmsiaAbcThrift;
    import com.coder4.lmsia.abc.thrift.LmsiaAbcThrift.Client;
    import com.coder4.lmsia.thrift.client.ThriftClient;
    import com.coder4.lmsia.thrift.client.builder.EasyThriftClientBuilder;
    
    /**
     * @author coder4
     */
    public class LmsiaAbcEasyThriftClientBuilder extends EasyThriftClientBuilder<Client> {
    
        public LmsiaAbcEasyThriftClientBuilder(String host, int port) {
            setThriftClass(LmsiaAbcThrift.class);
    
            setHost(host);
            setPort(port);
        }
    
        public static ThriftClient<Client> buildClient(String host, int port) {
            return new LmsiaAbcEasyThriftClientBuilder(host, port).build();
        }
    
    }
    

    上述Builder完成了实际的参数填充,主要有:

    1. ThriftClient的桩代码类设置(LmsiaAbcThrift.class)
    2. 设置主机名和端口

    LmsiaAbcClientConfiguration文件:

    package com.coder4.lmsia.abc.client.configuration;
    
    import com.coder4.lmsia.abc.client.LmsiaAbcEasyThriftClientBuilder;
    import com.coder4.lmsia.abc.client.LmsiaK8ServiceClientBuilder;
    import com.coder4.lmsia.abc.thrift.LmsiaAbcThrift;
    import com.coder4.lmsia.abc.thrift.LmsiaAbcThrift.Client;
    import com.coder4.lmsia.thrift.client.K8ServiceKey;
    import com.coder4.lmsia.thrift.client.ThriftClient;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.beans.factory.annotation.Qualifier;
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
    import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.context.annotation.Profile;
    
    @Configuration
    public class LmsiaAbcThriftClientConfiguration {
    
        private Logger LOG = LoggerFactory.getLogger(getClass());
    
        @Bean(name = "lmsiaAbcThriftClient")
        @ConditionalOnMissingBean(name = "lmsiaAbcThriftClient")
        @ConditionalOnProperty(name = {"lmsiaAbcThriftServer.host", "lmsiaAbcThriftServer.port"})
        public ThriftClient<Client> easyThriftClient(
                @Value("${lmsiaAbcThriftServer.host}") String host,
                @Value("${lmsiaAbcThriftServer.port}") int port
        ) {
            LOG.info("######## LmsiaAbcClientConfiguration ########");
            LOG.info("easyClient host = {}, port = {}", host, port);
            return LmsiaAbcEasyThriftClientBuilder.buildClient(host, port);
        }
    
    }
    

    如上所示,满足两个条件时,会自动构造LmsiaAbcEasyThriftClient:

    1. 还没有生成其他的LmsiaAbcEasyThriftClient(ConditionalOnMissingBean)
    2. 配置中指定了lmsiaAbcThriftServer.host和lmsiaAbcThriftServer.port

    根据我们前面的介绍,大家应该能理解,虽然有自动配置,但上述配置是一种很糟糕的方式。试想一下,如果我们的服务依赖了5个其他RPC服务,那么岂不是要分别配置5组IP和端口?此外,这种方式也无法支持节点的负载均衡。

    如何解决这个问题呢?我们将在K8ServiceThriftClient中解决。

    本小节的最后,我们看一下spring.factories:

    org.springframework.boot.autoconfigure.EnableAutoConfiguration=com.coder4.lmsia.abc.client.configuration.LmsiaAbcThriftClientConfiguration
    

    和之前lmsia-abc-server子工程中的文件类似,这里设置了自动配置的详细类路径,方便Spring Boot的自动扫描。

    K8ServiceThriftClient

    在对EasyThriftClient的介绍中,我们发现了一个问题,需要单独配置IP和端口,不支持服务自动发现。

    此外,在这个客户端的实现中,默认每次都要建立新的连接。而对于后端服务而言,RPC的服务端和客户端多数都是在内网环境中,连接情况比较稳定,可以通过连接池的方式减少连接握手开销,从而提升RPC服务的性能。如果你对连接池的原理还不太熟悉,可以参考百科连接池

    为此,我们本将介绍K8ServiceThriftClient,它很好的解决了上述问题。

    首先,我们使用commons-pool2来构建了TTransport层的连接池。

    TTransportPoolFactory:

    package com.coder4.lmsia.thrift.client.pool;
    
    import com.coder4.lmsia.thrift.client.K8ServiceKey;
    import org.apache.commons.pool2.BaseKeyedPooledObjectFactory;
    import org.apache.commons.pool2.PooledObject;
    import org.apache.commons.pool2.impl.DefaultPooledObject;
    import org.apache.thrift.transport.TFramedTransport;
    import org.apache.thrift.transport.TSocket;
    import org.apache.thrift.transport.TTransport;
    
    /**
     * @author coder4
     */
    public class TTransportPoolFactory extends BaseKeyedPooledObjectFactory<K8ServiceKey, TTransport> {
    
        protected static final int THRIFT_CLIENT_DEFAULT_TIMEOUT = 5000;
    
        protected static final int THRIFT_CLIENT_DEFAULT_MAX_FRAME_SIZE = 1024 * 1024 * 16;
    
        @Override
        public TTransport create(K8ServiceKey key) throws Exception {
            if (key != null) {
                String host = key.getK8ServiceHost();
                int port = key.getK8ServicePort();
                TSocket socket = new TSocket(host, port, THRIFT_CLIENT_DEFAULT_TIMEOUT);
    
                TTransport transport = new TFramedTransport(
                        socket, THRIFT_CLIENT_DEFAULT_MAX_FRAME_SIZE);
    
                transport.open();
    
                return transport;
            } else {
                return null;
            }
        }
    
        @Override
        public PooledObject<TTransport> wrap(TTransport transport) {
            return new DefaultPooledObject<>(transport);
        }
    
        @Override
        public void destroyObject(K8ServiceKey key, PooledObject<TTransport> obj) throws Exception {
            obj.getObject().close();
        }
    
        @Override
        public boolean validateObject(K8ServiceKey key, PooledObject<TTransport> obj) {
            return obj.getObject().isOpen();
        }
    
    }
    

    上述代码主要完成以下功能:

    1. 连接超时配置(5秒)
    2. create, 生成新连接(TTransport),这里与之前的EasyThriftClient非常类似,不再赘述
    3. 验证连接是否有效,通过TTransport的isOpen判断。

    TTransportPool:

    package com.coder4.lmsia.thrift.client.pool;
    
    import com.coder4.lmsia.thrift.client.K8ServiceKey;
    import org.apache.commons.pool2.impl.GenericKeyedObjectPool;
    import org.apache.thrift.transport.TTransport;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    /**
     * @author coder4
     */
    public class TTransportPool extends GenericKeyedObjectPool<K8ServiceKey, TTransport> {
    
        private Logger LOG = LoggerFactory.getLogger(getClass());
    
        private static int MAX_CONN = 1024;
        private static int MIN_IDLE_CONN = 8;
        private static int MAX_IDLE_CONN = 32;
    
        public TTransportPool(TTransportPoolFactory factory) {
            super(factory);
    
            setTimeBetweenEvictionRunsMillis(45 * 1000);
            setNumTestsPerEvictionRun(5);
            setMaxWaitMillis(30 * 1000);
    
            setMaxTotal(MAX_CONN);
            setMaxTotalPerKey(MAX_CONN);
            setMinIdlePerKey(MIN_IDLE_CONN);
            setMaxTotalPerKey(MAX_IDLE_CONN);
    
            setTestOnCreate(true);
            setTestOnBorrow(true);
            setTestWhileIdle(true);
        }
    
        @Override
        public TTransportPoolFactory getFactory() {
            return (TTransportPoolFactory) super.getFactory();
        }
    
        public void returnBrokenObject(K8ServiceKey key, TTransport transport) {
            try {
                invalidateObject(key, transport);
            } catch (Exception e) {
                LOG.warn("return broken key " + key);
                e.printStackTrace();
            }
        }
    }
    

    上述代码主要是完成连接池的配置,比较直观:

    1. 设置最大连接数1024
    2. 设置最大空闲数32,最小空闲数8,每间隔45秒尝试更改维护连接池中的连接数量。
    3. 当每次"创建"、从池子中"借用"、"空闲"时,检查连接是否有效。

    下面我们来看一下如何在K8ServiceThriftClient中使用:

    package com.coder4.lmsia.thrift.client;
    
    import com.coder4.lmsia.thrift.client.func.ThriftCallFunc;
    import com.coder4.lmsia.thrift.client.func.ThriftExecFunc;
    import com.coder4.lmsia.thrift.client.pool.TTransportPool;
    import com.coder4.lmsia.thrift.client.pool.TTransportPoolFactory;
    import org.apache.thrift.TServiceClient;
    import org.apache.thrift.transport.TTransport;
    
    public class K8ServiceThriftClient<TCLIENT extends TServiceClient>
            extends AbstractThriftClient<TCLIENT> {
    
        private K8ServiceKey k8ServiceKey;
    
        private TTransportPool connPool;
    
        @Override
        public void init() {
            super.init();
            // check
            if (k8ServiceKey == null) {
                throw new RuntimeException("invalid k8ServiceName or k8Serviceport");
            }
            // init pool
            connPool = new TTransportPool(new TTransportPoolFactory());
        }
    
        @Override
        public <TRET> TRET call(ThriftCallFunc<TCLIENT, TRET> tcall) {
    
            // Step 1: get TTransport
            TTransport tpt = null;
            K8ServiceKey key = getConnBorrowKey();
            try {
                tpt = connPool.borrowObject(key);
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
    
            // Step 2: get client & call
            try {
                TCLIENT tcli = createClient(tpt);
                TRET ret = tcall.call(tcli);
                returnTransport(key, tpt);
                return ret;
            } catch (Exception e) {
                returnBrokenTransport(key, tpt);
                throw new RuntimeException(e);
            }
        }
    
        @Override
        public void exec(ThriftExecFunc<TCLIENT> texec) {
            // Step 1: get TTransport
            TTransport tpt = null;
            K8ServiceKey key = getConnBorrowKey();
            try {
    
                // borrow transport
                tpt = connPool.borrowObject(key);
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
    
            // Step 2: get client & exec
            try {
                TCLIENT tcli = createClient(tpt);
                texec.exec(tcli);
                returnTransport(key, tpt);
            } catch (Exception e) {
                returnBrokenTransport(key, tpt);
                throw new RuntimeException(e);
            }
        }
    
        private K8ServiceKey getConnBorrowKey() {
            return k8ServiceKey;
        }
    
        private void returnTransport(K8ServiceKey key, TTransport transport) {
            connPool.returnObject(key, transport);
        }
    
        private void returnBrokenTransport(K8ServiceKey key, TTransport transport) {
            connPool.returnBrokenObject(key, transport);
        }
    
        public K8ServiceKey getK8ServiceKey() {
            return k8ServiceKey;
        }
    
        public void setK8ServiceKey(K8ServiceKey k8ServiceKey) {
            this.k8ServiceKey = k8ServiceKey;
        }
    
    }
    

    上述大部分代码和EasyThriftClient非常接近,有差异的部分主要是与连接的"借用"、"归还"相关的:

    1. 在call和exec中,借用连接
      • getConnBorrowKey先构造一个key,包含了主机名和端口。这里的主机名是微服务的自动发现中提到的Kubernetes服务,如果你对相关原理不太熟悉,可以自行回顾对应章节。
      • 从connPool中借用一个连接(TTransport)
      • 剩余发起rpc调用的步骤就和EasyThriftClient相同了,不再赘述。
    2. 当rpc调用结束后
      • 正常结束,调用connPool.returnObject将TTransport归还到连接池中。
      • 非正常结束,调用connPool.returnBrokenTransport,让连接池销毁这个连接,以防后续借用到这个可能出错的TTransport。

    类似的,我们也配套了对应的Builder:

    package com.coder4.lmsia.thrift.client.builder;
    
    import com.coder4.lmsia.thrift.client.EasyThriftClient;
    import org.apache.thrift.TServiceClient;
    
    /**
     * @author coder4
     */
    public class EasyThriftClientBuilder<TCLIENT extends TServiceClient> {
    
        private final EasyThriftClient<TCLIENT> client = new EasyThriftClient<>();
    
        protected EasyThriftClient<TCLIENT> build() {
            client.init();
            return client;
        }
    
        protected EasyThriftClientBuilder<TCLIENT> setHost(String host) {
            client.setThriftServerHost(host);
            return this;
        }
    
        protected EasyThriftClientBuilder<TCLIENT> setPort(int port) {
            client.setThriftServerPort(port);
            return this;
        }
    
        protected EasyThriftClientBuilder<TCLIENT> setThriftClass(Class<?> thriftClass) {
            client.setThriftClass(thriftClass);
            return this;
        }
    }
    

    上述Builder主要是设置所需的两个参数,Host和Port,看起来和EasyThriftClient并没有什么不同?

    别着急,我们继续看一下lmsia-abc-client中的集成:

    package com.coder4.lmsia.abc.client;
    
    import com.coder4.lmsia.abc.thrift.LmsiaAbcThrift;
    import com.coder4.lmsia.abc.thrift.LmsiaAbcThrift.Client;
    import com.coder4.lmsia.thrift.client.K8ServiceKey;
    import com.coder4.lmsia.thrift.client.ThriftClient;
    import com.coder4.lmsia.thrift.client.builder.K8ServiceThriftClientBuilder;
    
    /**
     * @author coder4
     */
    public class LmsiaK8ServiceThriftClientBuilder extends K8ServiceThriftClientBuilder<Client> {
    
        public LmsiaK8ServiceThriftClientBuilder(K8ServiceKey k8ServiceKey) {
            setThriftClass(LmsiaAbcThrift.class);
    
            setK8ServiceKey(k8ServiceKey);
        }
    
        public static ThriftClient<Client> buildClient(K8ServiceKey k8ServiceKey) {
            return new LmsiaK8ServiceThriftClientBuilder(k8ServiceKey).build();
        }
    
    }
    

    在集成的时候,我们需要传入一个key,可以手动制定,也可以自动配置

    我们看一下完整的自动配置代码,LmsiaAbcThriftClientConfiguration:

    public class LmsiaAbcThriftClientConfiguration {
    
        @Bean(name = "lmsiaAbcThriftClient")
        @ConditionalOnMissingBean(name = "lmsiaAbcThriftClient")
        @ConditionalOnProperty(name = {"lmsiaAbcThriftServer.host", "lmsiaAbcThriftServer.port"})
        public ThriftClient<Client> easyThriftClient(
                @Value("${lmsiaAbcThriftServer.host}") String host,
                @Value("${lmsiaAbcThriftServer.port}") int port
        ) {
            LOG.info("######## LmsiaAbcThriftClientConfiguration ########");
            LOG.info("easyThriftClient host = {}, port = {}", host, port);
            return LmsiaAbcEasyThriftClientBuilder.buildClient(host, port);
        }
    
        @Bean(name = "lmsiaAbcThriftClient")
        @ConditionalOnMissingBean(name = "lmsiaAbcThriftClient")
        public ThriftClient<LmsiaAbcThrift.Client> k8ServiceThriftClient() {
            LOG.info("######## LmsiaAbcThriftClientConfiguration ########");
            K8ServiceKey k8ServiceKey = new K8ServiceKey(K8_SERVICE_NAME, K8_SERVICE_PORT);
            LOG.info("k8ServiceThriftClient key:" + k8ServiceKey);
            return LmsiaK8ServiceThriftClientBuilder.buildClient(k8ServiceKey);
        }
    
        //...
    
    }
    

    对比easyThriftClient和k8ServiceThriftClient不难发现,K8ServiceThriftClient的参数,是通过常量直接写死的。也就是我们在微服务的自动发现与负载均衡中提到的,约定好服务的命名规则。

    看下常量定义:

    public class LmsiaAbcConstant {
    
        // ......
    
        public static final String PROJECT_NAME = "lmsia-abc";
    
        public static final String K8_SERVICE_NAME = PROJECT_NAME + "-server";
    
        public static final int K8_SERVICE_PORT = 3000;
    
        // ......
    }
    

    这样以来,一旦确定了项目名,那么Kubernetes中的服务名字也确定了。因此,k8ServiceThriftClient自动配置会被自动激活,即只要引用了lmsia-abc-client这个包,就会自动配置好一个RPC客户端,是不是非常方便?

    我们来看一下具体的使用例子:

    import com.coder4.lmsia.thrift.client.ThriftClient;
    
    public class LmsiaAbctProxy {
    
        @Autowired
        private ThriftClient<Client> client;
    
        public String hello() {
            return client.call(cli -> cli.sayHi());
        }
    

    至此,我们已经完成了在Spring Boo中集成Thrift RPC的服务端、客户端的工作。

    • 服务端,我们通过ThriftServerConfiguration、ThriftProcessorConfiguration自动配置了Thrift RPC服务端。
    • 客户端,通过Kubernetes的服务功能,自动配置了带服务发现功能的Thrift RPC客户端K8ServiceThriftClient。该客户端同时内置了连接池,用于节省连接开销。
  • 相关阅读:
    织梦DEDEcms首页调用文档整篇内容
    dedecms专题列表页不显示标题的解决办法
    怎么让织梦文章按照权重排序
    Codeforces274B
    HDU5693
    HDU2476
    POJ3613
    「LibreOJ NOIP Round #1」旅游路线
    Educational Codeforces Round 48
    组合博弈学习笔记
  • 原文地址:https://www.cnblogs.com/exmyth/p/13738347.html
Copyright © 2011-2022 走看看