zoukankan      html  css  js  c++  java
  • Thrift 个人实战--Thrift 网络服务模型

    前言:
      Thrift作为Facebook开源的RPC框架, 通过IDL中间语言, 并借助代码生成引擎生成各种主流语言的rpc框架服务端/客户端代码. 不过Thrift的实现, 简单使用离实际生产环境还是有一定距离, 本系列将对Thrift作代码解读和框架扩充, 使得它更加贴近生产环境. 本文主要讲解Thrift的高性能网络框架模型, 讲解各种网络模型的特点和区别.

    Thrift 高性能网络服务模型
    1). TServer类层次体系

    TSimpleServer/TThreadPoolServer是阻塞服务模型
    TNonblockingServer/THsHaServer/TThreadedSelectotServer是非阻塞服务模型(NIO)

    2). TServer抽象类的定义
    内部静态类Args的定义, 用于TServer类用于串联软件栈(传输层, 协议层, 处理层)

    public abstract class TServer {
      public static class Args extends AbstractServerArgs<Args> {
        public Args(TServerTransport transport) {
          super(transport);
        }
      }
    
      public static abstract class AbstractServerArgs<T extends AbstractServerArgs<T>> {
        public AbstractServerArgs(TServerTransport transport);
        public T processorFactory(TProcessorFactory factory);
        public T processor(TProcessor processor);
        public T transportFactory(TTransportFactory factory);
        public T protocolFactory(TProtocolFactory factory);
      }
    }

    TServer类定义的抽象类

    public abstract class TServer {
      public abstract void serve();
      public void stop();
    
      public boolean isServing();
      public void setServerEventHandler(TServerEventHandler eventHandler);
    }

    评注:

      抽象函数serve由具体的TServer实例来实现, 而并非所有的服务都需要优雅的退出, 因此stop没有被定义为抽象

    3). TSimpleServer
    TSimpleServer实现, 正如其名Simple, 其实现非常的简单, 是个单线程阻塞模型, 只适合测试开发使用
    抽象的代码可简单描述如下:

    // *) server socket进行监听
    serverSocket.listen();
    while ( isServing() ) {
      // *) 接受socket链接
      client = serverSocket.accept();
      // *) 封装处理器
      processor = factory.getProcess(client);
      while ( true ) {
        // *) 阻塞处理rpc的输入/输出
        if ( !processor.process(input, output) ) {
          break;	
        }	
      }
    }

    4). ThreadPoolServer
    ThreadPoolServer解决了TSimple不支持并发和多连接的问题, 引入了线程池. 实现的模型是One Thread Per Connection

    线程池代码片段:

      private static ExecutorService createDefaultExecutorService(Args args) {
        SynchronousQueue<Runnable> executorQueue =
          new SynchronousQueue<Runnable>();
        return new ThreadPoolExecutor(args.minWorkerThreads,
                                      args.maxWorkerThreads,
                                      60,
                                      TimeUnit.SECONDS,
                                      executorQueue);
      }

    评注:
      采用同步队列(SynchronousQueue), 线程池采用能线程数可伸缩的模式.
    主线程循环

    setServing(true);
    while (!stopped_) {
      try {
        TTransport client = serverTransport_.accept();
        WorkerProcess wp = new WorkerProcess(client);
        executorService_.execute(wp);
      } catch (TTransportException ttx) {
      }
    }

    评注:
      拆分了监听线程(accept)和处理客户端连接的工作线程(worker), 监听线程每接到一个客户端, 就投给线程池去处理. 这种模型能提高并发度, 但并发数取决于线程数, IO依旧阻塞, 从而限制该服务的服务能力.

    5). TNonblockingServer
    TNonblockingServer采用NIO的模式, 借助Channel/Selector机制, 采用IO事件模型来处理.

    private void select() {
      try {
        selector.select();	// wait for io events.
        // process the io events we received
        Iterator<SelectionKey> selectedKeys = selector.selectedKeys().iterator();
        while (!stopped_ && selectedKeys.hasNext()) {
          SelectionKey key = selectedKeys.next();
          selectedKeys.remove();
          if (key.isAcceptable()) {
            handleAccept(); // deal with accept
          } else if (key.isReadable()) {
            handleRead(key);	// deal with reads
          } else if (key.isWritable()) {
            handleWrite(key); // deal with writes
          } 
        }
      } catch (IOException e) {
      }
    }

    评注:

      select代码里对accept/read/write等IO事件进行监控和处理, 唯一可惜的这个单线程处理. 当遇到handler里有阻塞的操作时, 会导致整个服务被阻塞住.

    6). THsHaServer
    鉴于TNonblockingServer的缺点, THsHa引入了线程池去处理, 其模型把读写任务放到线程池去处理.
    HsHa是: Half-sync/Half-async的处理模式, Half-aysnc是在处理IO事件上(accept/read/write io), Half-sync用于handler对rpc的同步处理上.

    7). TThreadedSelectorServer
    TThreadedSelectorServer是最成熟,也是被业界所推崇的RPC服务模型
    TThreadedSelectorServer是对以上NonblockingServer的扩充, 其分离了Accept和Read/Write的Selector线程, 同时引入Worker工作线程池. 它也是种Half-sync/Half-async的服务模型.

    总结:

      MainReactor就是Accept线程, 用于监听客户端连接, SubReactor采用IO事件线程(多个), 主要负责对所有客户端的IO读写事件进行处理. 而Worker工作线程主要用于处理每个rpc请求的handler回调处理(这部分是同步的).

    问题:

      这边提几个小小的问题, 考考读者?
      1). Java NIO中 Selector采用什么方式实现? c++中的select/poll/epool? 如果是epool的话, 采用的是水平触发,还是边缘触发?
      2). 这边非阻塞模型是HsHa, 有没有全异步的模式? 为何通用的模型是采用TThreadedSelectorServer这种模式呢?
      期待你的回答, 也敬请关注后续的文章.

  • 相关阅读:
    【Azure 应用服务】由 Azure Functions runtime is unreachable 的错误消息推导出 ASYNC(异步)和 SYNC(同步)混用而引起ThreadPool耗尽问题
    【Azure API 管理】是否可以将Swagger 的API定义导入导Azure API Management中
    【Azure 应用服务】Azure Function 不能被触发
    【Azure 环境】Azure Key Vault (密钥保管库)中所保管的Keys, Secrets,Certificates是否可以实现数据粒度的权限控制呢?
    【Azure 事件中心】为应用程序网关(Application Gateway with WAF) 配置诊断日志,发送到事件中心
    【Azure 事件中心】azure-spring-cloud-stream-binder-eventhubs客户端组件问题, 实践消息非顺序可达
    【Azure API 管理】Azure API Management通过请求中的Path来限定其被访问的频率(如1秒一次)
    【Azure 环境】前端Web通过Azure AD获取Token时发生跨域问题(CORS Error)
    【Azure 应用服务】记一次Azure Spring Cloud 的部署错误 (az spring-cloud app deploy -g dev -s testdemo -n demo -p ./hellospring-0.0.1-SNAPSHOT.jar --->>> Failed to wait for deployment instances to be ready)
    【Azure 应用服务】App Service中抓取 Web Job 的 DUMP 办法
  • 原文地址:https://www.cnblogs.com/mumuxinfei/p/3875165.html
Copyright © 2011-2022 走看看