zoukankan      html  css  js  c++  java
  • Apache Thrift系列详解(二)

    前言

    Thrift提供的网络服务模型:单线程、多线程、事件驱动,从另一个角度划分为:阻塞服务模型、非阻塞服务模型。

    • 阻塞服务模型:TSimpleServerTThreadPoolServer

    • 非阻塞服务模型:TNonblockingServerTHsHaServerTThreadedSelectorServer

    TServer类的层次关系:


    正文

    TServer

    TServer定义了静态内部类ArgsArgs继承自抽象类AbstractServerArgsAbstractServerArgs采用了建造者模式,向TServer提供各种工厂:

    工厂属性工厂类型作用
    ProcessorFactory TProcessorFactory 处理层工厂类,用于具体的TProcessor对象的创建
    InputTransportFactory TTransportFactory 传输层输入工厂类,用于具体的TTransport对象的创建
    OutputTransportFactory TTransportFactory 传输层输出工厂类,用于具体的TTransport对象的创建
    InputProtocolFactory TProtocolFactory 协议层输入工厂类,用于具体的TProtocol对象的创建
    OutputProtocolFactory TProtocolFactory 协议层输出工厂类,用于具体的TProtocol对象的创建

    下面是TServer的部分核心代码:

    1.  
      public abstract class TServer {
    2.  
      public static class Args extends org.apache.thrift.server.TServer.AbstractServerArgs<org.apache.thrift.server.TServer.Args> {
    3.  
      public Args(TServerTransport transport) {
    4.  
      super(transport);
    5.  
      }
    6.  
      }
    7.  
       
    8.  
      public static abstract class AbstractServerArgs<T extends org.apache.thrift.server.TServer.AbstractServerArgs<T>> {
    9.  
      final TServerTransport serverTransport;
    10.  
      TProcessorFactory processorFactory;
    11.  
      TTransportFactory inputTransportFactory = new TTransportFactory();
    12.  
      TTransportFactory outputTransportFactory = new TTransportFactory();
    13.  
      TProtocolFactory inputProtocolFactory = new TBinaryProtocol.Factory();
    14.  
      TProtocolFactory outputProtocolFactory = new TBinaryProtocol.Factory();
    15.  
       
    16.  
      public AbstractServerArgs(TServerTransport transport) {
    17.  
      serverTransport = transport;
    18.  
      }
    19.  
      }
    20.  
       
    21.  
      protected TProcessorFactory processorFactory_;
    22.  
      protected TServerTransport serverTransport_;
    23.  
      protected TTransportFactory inputTransportFactory_;
    24.  
      protected TTransportFactory outputTransportFactory_;
    25.  
      protected TProtocolFactory inputProtocolFactory_;
    26.  
      protected TProtocolFactory outputProtocolFactory_;
    27.  
      private boolean isServing;
    28.  
       
    29.  
      protected TServer(org.apache.thrift.server.TServer.AbstractServerArgs args) {
    30.  
      processorFactory_ = args.processorFactory;
    31.  
      serverTransport_ = args.serverTransport;
    32.  
      inputTransportFactory_ = args.inputTransportFactory;
    33.  
      outputTransportFactory_ = args.outputTransportFactory;
    34.  
      inputProtocolFactory_ = args.inputProtocolFactory;
    35.  
      outputProtocolFactory_ = args.outputProtocolFactory;
    36.  
      }
    37.  
       
    38.  
      public abstract void serve();
    39.  
      public void stop() {}
    40.  
       
    41.  
      public boolean isServing() {
    42.  
      return isServing;
    43.  
      }
    44.  
       
    45.  
      protected void setServing(boolean serving) {
    46.  
      isServing = serving;
    47.  
      }
    48.  
      }
    49.  
      复制代码

    TServer的三个方法:serve()stop()isServing()serve()用于启动服务,stop()用于关闭服务,isServing()用于检测服务的起停状态。

    TServer的不同实现类的启动方式不一样,因此serve()定义为抽象方法。不是所有的服务都需要优雅的退出, 因此stop()方法没有被定义为抽象。


    TSimpleServer

    TSimpleServer的工作模式采用最简单的阻塞IO,实现方法简洁明了,便于理解,但是一次只能接收和处理一个socket连接,效率比较低。它主要用于演示Thrift的工作过程,在实际开发过程中很少用到它。

    (一) 工作流程

    (二) 使用入门

    服务端:

    1.  
      ServerSocket serverSocket = new ServerSocket(ServerConfig.SERVER_PORT);
    2.  
      TServerSocket serverTransport = new TServerSocket(serverSocket);
    3.  
      HelloWorldService.Processor processor =
    4.  
      new HelloWorldService.Processor<HelloWorldService.Iface>(new HelloWorldServiceImpl());
    5.  
      TBinaryProtocol.Factory protocolFactory = new TBinaryProtocol.Factory();
    6.  
       
    7.  
      TSimpleServer.Args tArgs = new TSimpleServer.Args(serverTransport);
    8.  
      tArgs.processor(processor);
    9.  
      tArgs.protocolFactory(protocolFactory);
    10.  
      // 简单的单线程服务模型 一般用于测试
    11.  
      TServer tServer = new TSimpleServer(tArgs);
    12.  
      System.out.println("Running Simple Server");
    13.  
      tServer.serve();
    14.  
      复制代码

    客户端:

    1.  
      TTransport transport = new TSocket(ServerConfig.SERVER_IP, ServerConfig.SERVER_PORT, ServerConfig.TIMEOUT);
    2.  
      TProtocol protocol = new TBinaryProtocol(transport);
    3.  
      HelloWorldService.Client client = new HelloWorldService.Client(protocol);
    4.  
      transport.open();
    5.  
       
    6.  
      String result = client.say("Leo");
    7.  
      System.out.println("Result =: " + result);
    8.  
      transport.close();
    9.  
      复制代码

    (三) 源码分析

    查看上述流程的源代码,即TSimpleServer.java中的serve()方法如下:

    serve()方法的操作:

    1. 设置TServerSocketlisten()方法启动连接监听。
    2. 以阻塞的方式接受客户端地连接请求,每进入一个连接即为其创建一个通道TTransport对象。
    3. 为客户端创建处理器对象、输入传输通道对象、输出传输通道对象、输入协议对象和输出协议对象。
    4. 通过TServerEventHandler对象处理具体的业务请求。

    ThreadPoolServer

    TThreadPoolServer模式采用阻塞socket方式工作,主线程负责阻塞式监听是否有新socket到来,具体的业务处理交由一个线程池来处理。

    (一) 工作流程

    (二) 使用入门

    服务端:

    1.  
      ServerSocket serverSocket = new ServerSocket(ServerConfig.SERVER_PORT);
    2.  
      TServerSocket serverTransport = new TServerSocket(serverSocket);
    3.  
      HelloWorldService.Processor<HelloWorldService.Iface> processor =
    4.  
      new HelloWorldService.Processor<>(new HelloWorldServiceImpl());
    5.  
       
    6.  
      TBinaryProtocol.Factory protocolFactory = new TBinaryProtocol.Factory();
    7.  
      TThreadPoolServer.Args ttpsArgs = new TThreadPoolServer.Args(serverTransport);
    8.  
      ttpsArgs.processor(processor);
    9.  
      ttpsArgs.protocolFactory(protocolFactory);
    10.  
       
    11.  
      // 线程池服务模型 使用标准的阻塞式IO 预先创建一组线程处理请求
    12.  
      TServer ttpsServer = new TThreadPoolServer(ttpsArgs);
    13.  
      System.out.println("Running ThreadPool Server");
    14.  
      ttpsServer.serve();
    15.  
      复制代码

    客户端:

    1.  
      TTransport transport = new TSocket(ServerConfig.SERVER_IP, ServerConfig.SERVER_PORT, ServerConfig.TIMEOUT);
    2.  
      TProtocol protocol = new TBinaryProtocol(transport);
    3.  
      HelloWorldService.Client client = new HelloWorldService.Client(protocol);
    4.  
       
    5.  
      transport.open();
    6.  
      String result = client.say("ThreadPoolClient");
    7.  
      System.out.println("Result =: " + result);
    8.  
      transport.close();
    9.  
      复制代码

    (三) 源码分析

    ThreadPoolServer解决了TSimpleServer不支持并发和多连接的问题,引入了线程池。实现的模型是One Thread Per Connection。查看上述流程的源代码,先查看线程池的代码片段:

    TThreadPoolServer.java中的serve()方法如下:

    serve()方法的操作:

    1. 设置TServerSocketlisten()方法启动连接监听。
    2. 以阻塞的方式接受客户端的连接请求,每进入一个连接,将通道对象封装成一个WorkerProcess对象(WorkerProcess实现了Runnabel接口),并提交到线程池。
    3. WorkerProcessrun()方法负责业务处理,为客户端创建了处理器对象、输入传输通道对象、输出传输通道对象、输入协议对象和输出协议对象。
    4. 通过TServerEventHandler对象处理具体的业务请求。

    WorkerProcessrun()方法:

    (四) 优缺点

    TThreadPoolServer模式的优点

    拆分了监听线程(Accept Thread)和处理客户端连接的工作线程(Worker Thread),数据读取和业务处理都交给线程池处理。因此在并发量较大时新连接也能够被及时接受。

    线程池模式比较适合服务器端能预知最多有多少个客户端并发的情况,这时每个请求都能被业务线程池及时处理,性能也非常高。

    TThreadPoolServer模式的缺点

    线程池模式的处理能力受限于线程池的工作能力,当并发请求数大于线程池中的线程数时,新请求也只能排队等待。


    TNonblockingServer

    TNonblockingServer模式也是单线程工作,但是采用NIO的模式,借助Channel/Selector机制, 采用IO事件模型来处理。

    所有的socket都被注册到selector中,在一个线程中通过seletor循环监控所有的socket

    每次selector循环结束时,处理所有的处于就绪状态的socket,对于有数据到来的socket进行数据读取操作,对于有数据发送的socket则进行数据发送操作,对于监听socket则产生一个新业务socket并将其注册到selector上。

    注意:TNonblockingServer要求底层的传输通道必须使用TFramedTransport。

    (一) 工作流程

    (二) 使用入门

    服务端:

    1.  
      TProcessor tprocessor = new HelloWorldService.Processor<HelloWorldService.Iface>(new HelloWorldServiceImpl());
    2.  
      TNonblockingServerSocket tnbSocketTransport = new TNonblockingServerSocket(ServerConfig.SERVER_PORT);
    3.  
       
    4.  
      TNonblockingServer.Args tnbArgs = new TNonblockingServer.Args(tnbSocketTransport);
    5.  
      tnbArgs.processor(tprocessor);
    6.  
      tnbArgs.transportFactory(new TFramedTransport.Factory());
    7.  
      tnbArgs.protocolFactory(new TCompactProtocol.Factory());
    8.  
       
    9.  
      // 使用非阻塞式IO服务端和客户端需要指定TFramedTransport数据传输的方式
    10.  
      TServer server = new TNonblockingServer(tnbArgs);
    11.  
      System.out.println("Running Non-blocking Server");
    12.  
      server.serve();
    13.  
      复制代码

    客户端:

    1.  
      TTransport transport = new TFramedTransport(new TSocket(ServerConfig.SERVER_IP, ServerConfig.SERVER_PORT, ServerConfig.TIMEOUT));
    2.  
      // 协议要和服务端一致
    3.  
      TProtocol protocol = new TCompactProtocol(transport);
    4.  
      HelloWorldService.Client client = new HelloWorldService.Client(protocol);
    5.  
      transport.open();
    6.  
       
    7.  
      String result = client.say("NonBlockingClient");
    8.  
      System.out.println("Result =: " + result);
    9.  
      transport.close();
    10.  
      复制代码

    (三) 源码分析

    TNonblockingServer继承于AbstractNonblockingServer,这里我们更关心基于NIOselector部分的关键代码。

    (四) 优缺点

    TNonblockingServer模式优点

    相比于TSimpleServer效率提升主要体现在IO多路复用上,TNonblockingServer采用非阻塞IO,对accept/read/writeIO事件进行监控和处理,同时监控多个socket的状态变化。

    TNonblockingServer模式缺点

    TNonblockingServer模式在业务处理上还是采用单线程顺序来完成。在业务处理比较复杂、耗时的时候,例如某些接口函数需要读取数据库执行时间较长,会导致整个服务被阻塞住,此时该模式效率也不高,因为多个调用请求任务依然是顺序一个接一个执行。

    THsHaServer

    鉴于TNonblockingServer的缺点,THsHaServer继承于TNonblockingServer,引入了线程池提高了任务处理的并发能力。THsHaServer是半同步半异步(Half-Sync/Half-Async)的处理模式,Half-Aysnc用于IO事件处理(Accept/Read/Write),Half-Sync用于业务handlerrpc的同步处理上。

    注意:THsHaServer和TNonblockingServer一样,要求底层的传输通道必须使用TFramedTransport。

    (一) 工作流程

    (二) 使用入门

    服务端:

    1.  
      TNonblockingServerSocket tnbSocketTransport = new TNonblockingServerSocket(ServerConfig.SERVER_PORT);
    2.  
      TProcessor tprocessor = new HelloWorldService.Processor<HelloWorldService.Iface>(new HelloWorldServiceImpl());
    3.  
      // 半同步半异步
    4.  
      THsHaServer.Args thhsArgs = new THsHaServer.Args(tnbSocketTransport);
    5.  
      thhsArgs.processor(tprocessor);
    6.  
      thhsArgs.transportFactory(new TFramedTransport.Factory());
    7.  
      thhsArgs.protocolFactory(new TBinaryProtocol.Factory());
    8.  
       
    9.  
      TServer server = new THsHaServer(thhsArgs);
    10.  
      System.out.println("Running HsHa Server");
    11.  
      server.serve();
    12.  
      复制代码

    客户端:

    1.  
      TTransport transport = new TFramedTransport(new TSocket(ServerConfig.SERVER_IP, ServerConfig.SERVER_PORT, ServerConfig.TIMEOUT));
    2.  
      // 协议要和服务端一致
    3.  
      TProtocol protocol = new TBinaryProtocol(transport);
    4.  
      HelloWorldService.Client client = new HelloWorldService.Client(protocol);
    5.  
      transport.open();
    6.  
       
    7.  
      String result = client.say("HsHaClient");
    8.  
      System.out.println("Result =: " + result);
    9.  
      transport.close();
    10.  
      复制代码

    (三) 源码分析

    THsHaServer继承于TNonblockingServer,新增了线程池并发处理工作任务的功能,查看线程池的相关代码:

    任务线程池的创建过程:

    下文的TThreadedSelectorServer囊括了THsHaServer的大部分特性,源码分析可参考TThreadedSelectorServer。

    (四) 优缺点

    THsHaServer的优点

    THsHaServerTNonblockingServer模式相比,THsHaServer在完成数据读取之后,将业务处理过程交由一个线程池来完成,主线程直接返回进行下一次循环操作,效率大大提升。

    THsHaServer的缺点

    主线程仍然需要完成所有socket的监听接收、数据读取和数据写入操作。当并发请求数较大时,且发送数据量较多时,监听socket上新连接请求不能被及时接受。


    TThreadedSelectorServer

    TThreadedSelectorServer是对THsHaServer的一种扩充,它将selector中的读写IO事件(read/write)从主线程中分离出来。同时引入worker工作线程池,它也是种Half-Sync/Half-Async的服务模型。

    TThreadedSelectorServer模式是目前Thrift提供的最高级的线程服务模型,它内部有如果几个部分构成:

    1. 一个AcceptThread线程对象,专门用于处理监听socket上的新连接。
    2. 若干个SelectorThread对象专门用于处理业务socket的网络I/O读写操作,所有网络数据的读写均是有这些线程来完成。
    3. 一个负载均衡器SelectorThreadLoadBalancer对象,主要用于AcceptThread线程接收到一个新socket连接请求时,决定将这个新连接请求分配给哪个SelectorThread线程。
    4. 一个ExecutorService类型的工作线程池,在SelectorThread线程中,监听到有业务socket中有调用请求过来,则将请求数据读取之后,交给ExecutorService线程池中的线程完成此次调用的具体执行。主要用于处理每个rpc请求的handler回调处理(这部分是同步的)。

    (一) 工作流程

    (二) 使用入门

    服务端:

    1.  
      TNonblockingServerSocket serverSocket = new TNonblockingServerSocket(ServerConfig.SERVER_PORT);
    2.  
      TProcessor processor = new HelloWorldService.Processor<HelloWorldService.Iface>(new HelloWorldServiceImpl());
    3.  
      // 多线程半同步半异步
    4.  
      TThreadedSelectorServer.Args ttssArgs = new TThreadedSelectorServer.Args(serverSocket);
    5.  
      ttssArgs.processor(processor);
    6.  
      ttssArgs.protocolFactory(new TBinaryProtocol.Factory());
    7.  
      // 使用非阻塞式IO时 服务端和客户端都需要指定数据传输方式为TFramedTransport
    8.  
      ttssArgs.transportFactory(new TFramedTransport.Factory());
    9.  
       
    10.  
      // 多线程半同步半异步的服务模型
    11.  
      TThreadedSelectorServer server = new TThreadedSelectorServer(ttssArgs);
    12.  
      System.out.println("Running ThreadedSelector Server");
    13.  
      server.serve();
    14.  
      复制代码

    客户端:

    1.  
      for (int i = 0; i < 10; i++) {
    2.  
      new Thread("Thread " + i) {
    3.  
      @Override
    4.  
      public void run() {
    5.  
      // 设置传输通道 对于非阻塞服务 需要使用TFramedTransport(用于将数据分块发送)
    6.  
      for (int j = 0; j < 10; j++) {
    7.  
      TTransport transport = null;
    8.  
      try {
    9.  
      transport = new TFramedTransport(new TSocket(ServerConfig.SERVER_IP, ServerConfig.SERVER_PORT, ServerConfig.TIMEOUT));
    10.  
      TProtocol protocol = new TBinaryProtocol(transport);
    11.  
      HelloWorldService.Client client = new HelloWorldService.Client(protocol);
    12.  
      transport.open();
    13.  
      String result = client.say("ThreadedSelector Client");
    14.  
      System.out.println("Result =: " + result);
    15.  
      transport.close();
    16.  
      } catch (Exception e) {
    17.  
      e.printStackTrace();
    18.  
      } finally {
    19.  
      // 关闭传输通道
    20.  
      transport.close();
    21.  
      }
    22.  
      }
    23.  
      }
    24.  
      }.start();
    25.  
      }
    26.  
      复制代码

    (三) 核心代码

    以上工作流程的三个组件AcceptThreadSelectorThreadExecutorService在源码中的定义如下:

    TThreadedSelectorServer模式中有一个专门的线程AcceptThread用于处理新连接请求,因此能够及时响应大量并发连接请求;另外它将网络I/O操作分散到多个SelectorThread线程中来完成,因此能够快速对网络I/O进行读写操作,能够很好地应对网络I/O较多的情况。

    TThreadedSelectorServer默认参数定义如下:

    • 负责网络IO读写的selector默认线程数(selectorThreads):2
    • 负责业务处理的默认工作线程数(workerThreads):5
    • 工作线程池单个线程的任务队列大小(acceptQueueSizePerThread):4

    创建、初始化并启动AcceptThreadSelectorThreads,同时启动selector线程的负载均衡器(selectorThreads)。

    AcceptThread源码

    AcceptThread继承于Thread,可以看出包含三个重要的属性:非阻塞式传输通道(TNonblockingServerTransport)、NIO选择器(acceptSelector)和选择器线程负载均衡器(threadChooser)。

    查看AcceptThreadrun()方法,可以看出accept线程一旦启动,就会不停地调用select()方法:

    查看select()方法,acceptSelector选择器等待IO事件的到来,拿到SelectionKey即检查是不是accept事件。如果是,通过handleAccept()方法接收一个新来的连接;否则,如果是IO读写事件,AcceptThread不作任何处理,交由SelectorThread完成。

    handleAccept()方法中,先通过doAccept()去拿连接通道,然后Selector线程负载均衡器选择一个Selector线程,完成接下来的IO读写事件。

    接下来继续查看doAddAccept()方法的实现,毫无悬念,它进一步调用了SelectorThreadaddAcceptedConnection()方法,把非阻塞传输通道对象传递给选择器线程做进一步的IO读写操作。

    SelectorThreadLoadBalancer源码

    SelectorThreadLoadBalancer如何创建?

    SelectorThreadLoadBalancer是一个基于轮询算法的Selector线程选择器,通过线程迭代器为新进来的连接顺序分配SelectorThread

    SelectorThread源码

    SelectorThreadAcceptThread一样,是TThreadedSelectorServer的一个成员内部类,每个SelectorThread线程对象内部都有一个阻塞式的队列,用于存放该线程被接收的连接通道。

    阻塞队列的大小可由构造函数指定:

    上面看到,在AcceptThreaddoAddAccept()方法中调用了SelectorThreadaddAcceptedConnection()方法。

    这个方法做了两件事:

    1. 将被此SelectorThread线程接收的连接通道放入阻塞队列中。
    2. 通过wakeup()方法唤醒SelectorThread中的NIO选择器selector

    既然SelectorThread也是继承于Thread,查看其run()方法的实现:

    SelectorThread方法的select()监听IO事件,仅仅用于处理数据读取和数据写入。如果连接有数据可读,读取并以frame的方式缓存;如果需要向连接中写入数据,缓存并发送客户端的数据。且在数据读写处理完成后,需要向NIOselector清空和注销自身的SelectionKey

    • 数据写操作完成以后,整个rpc调用过程也就结束了,handleWrite()方法如下:
    • 数据读操作完成以后,Thrift会利用已读数据执行目标方法,handleRead()方法如下:

    handleRead方法在执行read()方法,将数据读取完成后,会调用requestInvoke()方法调用目标方法完成具体业务处理。requestInvoke()方法将请求数据封装为一个Runnable对象,提交到工作任务线程池(ExecutorService)进行处理。

    select()方法完成后,线程继续运行processAcceptedConnections()方法处理下一个连接的IO事件。

    这里比较核心的几个操作:

    1. 尝试从SelectorThread的阻塞队列acceptedQueue中获取一个连接的传输通道。如果获取成功,调用registerAccepted()方法;否则,进入下一次循环。
    2. registerAccepted()方法将传输通道底层的连接注册到NIO的选择器selector上面,获取到一个SelectionKey
    3. 创建一个FrameBuffer对象,并绑定到获取的SelectionKey上面,用于数据传输时的中间读写缓存。

    总结

    本文对Thrift的各种线程服务模型进行了介绍,包括2种阻塞式服务模型:TSimpleServerTThreadPoolServer,3种非阻塞式服务模型:TNonblockingServerTHsHaServerTThreadedSelectorServer。对各种服务模型的具体用法、工作流程、原理和源码实现进行了一定程度的分析。

  • 相关阅读:
    使用pipenv管理虚拟环境
    使用cookiecutter创建django项目
    Django中ModelViewSet的应用
    Redis添加历史浏览记录
    Django中配置用Redis做缓存和session
    点击即复制
    PostGreSQL数据库安装配置说明
    IntelliJ IDEA 2017.1.4 x64配置说明
    Struts2之2.5.10.1HelloWorld
    Apache Shiro系列(1)
  • 原文地址:https://www.cnblogs.com/sea520/p/12165778.html
Copyright © 2011-2022 走看看