zoukankan      html  css  js  c++  java
  • Thrift 个人实战--Thrift 网络服务模型

    前言:
      Thrift作为Facebook开源的RPC框架, 通过IDL中间语言, 并借助代码生成引擎生成各种主流语言的rpc框架服务端/客户端代码. 不过Thrift的实现, 简单使用离实际生产环境还是有一定距离, 本系列将对Thrift作代码解读和框架扩充, 使得它更加贴近生产环境. 本文主要讲解Thrift的高性能网络框架模型, 讲解各种网络模型的特点和区别.

    Thrift 高性能网络服务模型
    1). TServer类层次体系

    TSimpleServer/TThreadPoolServer是阻塞服务模型
    TNonblockingServer/THsHaServer/TThreadedSelectotServer是非阻塞服务模型(NIO)

    2). TServer抽象类的定义
    内部静态类Args的定义, 用于TServer类用于串联软件栈(传输层, 协议层, 处理层)

    public abstract class TServer {
      public static class Args extends AbstractServerArgs<Args> {
        public Args(TServerTransport transport) {
          super(transport);
        }
      }
    
      public static abstract class AbstractServerArgs<T extends AbstractServerArgs<T>> {
        public AbstractServerArgs(TServerTransport transport);
        public T processorFactory(TProcessorFactory factory);
        public T processor(TProcessor processor);
        public T transportFactory(TTransportFactory factory);
        public T protocolFactory(TProtocolFactory factory);
      }
    }

    TServer类定义的抽象类

    public abstract class TServer {
      public abstract void serve();
      public void stop();
    
      public boolean isServing();
      public void setServerEventHandler(TServerEventHandler eventHandler);
    }

    评注:

      抽象函数serve由具体的TServer实例来实现, 而并非所有的服务都需要优雅的退出, 因此stop没有被定义为抽象

    3). TSimpleServer
    TSimpleServer实现, 正如其名Simple, 其实现非常的简单, 是个单线程阻塞模型, 只适合测试开发使用
    抽象的代码可简单描述如下:

    // *) server socket进行监听
    serverSocket.listen();
    while ( isServing() ) {
      // *) 接受socket链接
      client = serverSocket.accept();
      // *) 封装处理器
      processor = factory.getProcess(client);
      while ( true ) {
        // *) 阻塞处理rpc的输入/输出
        if ( !processor.process(input, output) ) {
          break;	
        }	
      }
    }

    4). ThreadPoolServer
    ThreadPoolServer解决了TSimple不支持并发和多连接的问题, 引入了线程池. 实现的模型是One Thread Per Connection

    线程池代码片段:

      private static ExecutorService createDefaultExecutorService(Args args) {
        SynchronousQueue<Runnable> executorQueue =
          new SynchronousQueue<Runnable>();
        return new ThreadPoolExecutor(args.minWorkerThreads,
                                      args.maxWorkerThreads,
                                      60,
                                      TimeUnit.SECONDS,
                                      executorQueue);
      }

    评注:
      采用同步队列(SynchronousQueue), 线程池采用能线程数可伸缩的模式.
    主线程循环

    setServing(true);
    while (!stopped_) {
      try {
        TTransport client = serverTransport_.accept();
        WorkerProcess wp = new WorkerProcess(client);
        executorService_.execute(wp);
      } catch (TTransportException ttx) {
      }
    }

    评注:
      拆分了监听线程(accept)和处理客户端连接的工作线程(worker), 监听线程每接到一个客户端, 就投给线程池去处理. 这种模型能提高并发度, 但并发数取决于线程数, IO依旧阻塞, 从而限制该服务的服务能力.

    5). TNonblockingServer
    TNonblockingServer采用NIO的模式, 借助Channel/Selector机制, 采用IO事件模型来处理.

    private void select() {
      try {
        selector.select();	// wait for io events.
        // process the io events we received
        Iterator<SelectionKey> selectedKeys = selector.selectedKeys().iterator();
        while (!stopped_ && selectedKeys.hasNext()) {
          SelectionKey key = selectedKeys.next();
          selectedKeys.remove();
          if (key.isAcceptable()) {
            handleAccept(); // deal with accept
          } else if (key.isReadable()) {
            handleRead(key);	// deal with reads
          } else if (key.isWritable()) {
            handleWrite(key); // deal with writes
          } 
        }
      } catch (IOException e) {
      }
    }

    评注:

      select代码里对accept/read/write等IO事件进行监控和处理, 唯一可惜的这个单线程处理. 当遇到handler里有阻塞的操作时, 会导致整个服务被阻塞住.

    6). THsHaServer
    鉴于TNonblockingServer的缺点, THsHa引入了线程池去处理, 其模型把读写任务放到线程池去处理.
    HsHa是: Half-sync/Half-async的处理模式, Half-aysnc是在处理IO事件上(accept/read/write io), Half-sync用于handler对rpc的同步处理上.

    7). TThreadedSelectorServer
    TThreadedSelectorServer是最成熟,也是被业界所推崇的RPC服务模型
    TThreadedSelectorServer是对以上NonblockingServer的扩充, 其分离了Accept和Read/Write的Selector线程, 同时引入Worker工作线程池. 它也是种Half-sync/Half-async的服务模型.

    总结:

      MainReactor就是Accept线程, 用于监听客户端连接, SubReactor采用IO事件线程(多个), 主要负责对所有客户端的IO读写事件进行处理. 而Worker工作线程主要用于处理每个rpc请求的handler回调处理(这部分是同步的).

    问题:

      这边提几个小小的问题, 考考读者?
      1). Java NIO中 Selector采用什么方式实现? c++中的select/poll/epool? 如果是epool的话, 采用的是水平触发,还是边缘触发?
      2). 这边非阻塞模型是HsHa, 有没有全异步的模式? 为何通用的模型是采用TThreadedSelectorServer这种模式呢?
      期待你的回答, 也敬请关注后续的文章.

  • 相关阅读:
    c++基础_矩阵乘法
    c++基础_字符串对比
    c++基础_时间转换
    c++基础_特殊回文数
    c++基础_回文数
    c++基础_特殊的数字
    c++基础_杨辉三角形
    c++基础_字母图形
    c++基础_01字串
    java 常用集合类型--以及其特性
  • 原文地址:https://www.cnblogs.com/mumuxinfei/p/3875165.html
Copyright © 2011-2022 走看看