zoukankan      html  css  js  c++  java
  • Reactor模式和NIO

    本文可看成是对Doug Lea Scalable IO in Java一文的翻译。

    当前分布式计算 Web Services盛行天下,这些网络服务的底层都离不开对socket的操作。他们都有一个共同的结构:
    1. Read request
    2. Decode request
    3. Process service
    4. Encode reply
    5. Send reply

    经典的网络服务的设计如下图,在每个线程中完成对数据的处理:

    但这种模式在用户负载增加时,性能将下降非常的快。我们需要重新寻找一个新的方案,保持数据处理的流畅,很显然,事件触发机制是最好的解决办法,当有事件发生时,会触动handler,然后开始数据的处理。

    Reactor模式类似于AWT中的Event处理:

    Reactor模式参与者

    1.Reactor 负责响应IO事件,一旦发生,广播发送给相应的Handler去处理,这类似于AWT的thread
    2.Handler 是负责非堵塞行为,类似于AWT ActionListeners;同时负责将handlers与event事件绑定,类似于AWT addActionListener

    如图:

    Java的NIO为reactor模式提供了实现的基础机制,它的Selector当发现某个channel有数据时,会通过SlectorKey来告知我们,在此我们实现事件和handler的绑定。

    我们来看看Reactor模式代码:


    public class Reactor implements Runnable{

      final Selector selector;
      final ServerSocketChannel serverSocket;

      Reactor(int port) throws IOException {
        selector = Selector.open();
        serverSocket = ServerSocketChannel.open();
        InetSocketAddress address = new InetSocketAddress(InetAddress.getLocalHost(),port);
        serverSocket.socket().bind(address);

        serverSocket.configureBlocking(false);
        //向selector注册该channel
         SelectionKey sk =serverSocket.register(selector,SelectionKey.OP_ACCEPT);

        logger.debug("-->Start serverSocket.register!");

        //利用sk的attache功能绑定Acceptor 如果有事情,触发Acceptor
        sk.attach(new Acceptor());
        logger.debug("-->attach(new Acceptor()!");
      }


      public void run() { // normally in a new Thread
        try {
        while (!Thread.interrupted())
        {
          selector.select();
          Set selected = selector.selectedKeys();
          Iterator it = selected.iterator();
          //Selector如果发现channel有OP_ACCEPT或READ事件发生,下列遍历就会进行。
          while (it.hasNext())
            //来一个事件 第一次触发一个accepter线程
            //以后触发SocketReadHandler
            dispatch((SelectionKey)(it.next()));
            selected.clear();
          }
        }catch (IOException ex) {
            logger.debug("reactor stop!"+ex);
        }
      }

      //运行Acceptor或SocketReadHandler
      void dispatch(SelectionKey k) {
        Runnable r = (Runnable)(k.attachment());
        if (r != null){
          // r.run();

        }
      }

      class Acceptor implements Runnable { // inner
        public void run() {
        try {
          logger.debug("-->ready for accept!");
          SocketChannel c = serverSocket.accept();
          if (c != null)
            //调用Handler来处理channel
            new SocketReadHandler(selector, c);
          }
        catch(IOException ex) {
          logger.debug("accept stop!"+ex);
        }
        }
      }
    }

    以上代码中巧妙使用了SocketChannel的attach功能,将Hanlder和可能会发生事件的channel链接在一起,当发生事件时,可以立即触发相应链接的Handler。

    再看看Handler代码:

    public class SocketReadHandler implements Runnable {

      public static Logger logger = Logger.getLogger(SocketReadHandler.class);

      private Test test=new Test();

      final SocketChannel socket;
      final SelectionKey sk;

       static final int READING = 0, SENDING = 1;
      int state = READING;

      public SocketReadHandler(Selector sel, SocketChannel c)
        throws IOException {

        socket = c;

        socket.configureBlocking(false);
         sk = socket.register(sel, 0);

        //将SelectionKey绑定为本Handler 下一步有事件触发时,将调用本类的run方法。
        //参看dispatch(SelectionKey k)
        sk.attach(this);

        //同时将SelectionKey标记为可读,以便读取。
        sk.interestOps(SelectionKey.OP_READ);
        sel.wakeup();
      }

      public void run() {
        try{
        // test.read(socket,input);
          readRequest() ;
        }catch(Exception ex){
        logger.debug("readRequest error"+ex);
        }
      }


    /**
    * 处理读取data
    * @param key
    * @throws Exception
    */
    private void readRequest() throws Exception {

      ByteBuffer input = ByteBuffer.allocate(1024);
      input.clear();
      try{

        int bytesRead = socket.read(input);

        ......

        //激活线程池 处理这些request
        requestHandle(new Request(socket,btt));

        .....


      }catch(Exception e) {
      }

    }

    注意在Handler里面又执行了一次attach,这样,覆盖前面的Acceptor,下次该Handler又有READ事件发生时,将直接触发Handler.从而开始了数据的读 处理 写 发出等流程处理。

    将数据读出后,可以将这些数据处理线程做成一个线程池,这样,数据读出后,立即扔到线程池中,这样加速处理速度:

    更进一步,我们可以使用多个Selector分别处理连接和读事件。

    一个高性能的Java网络服务机制就要形成,激动人心的集群并行计算即将实现。

    Scalable IO in Java原文

    NIO原理与应用

    用NIO开发一个高性能聊天系统

    Socket打造高性能服务器

    更多NIO专题系列讨论....

    服务器后端性能大比拼

    事件驱动编程

    并发模型

    Rx (Reactive Extensions)介绍

    Reactive编程

    EDA

  • 相关阅读:
    MVC ORM 架构
    Kubernetes 第八章 Pod 控制器
    Kubernetes 第七章 Configure Liveness and Readiness Probes
    Kubernetes 第六章 pod 资源对象
    Kubernetes 第五章 YAML
    Kubernetes 核心组件
    Kubernetes 架构原理
    Kubernetes 第四章 kubectl
    Kubernetes 第三章 kubeadm
    yum 配置及yum 源配置
  • 原文地址:https://www.cnblogs.com/jrmy/p/14316326.html
Copyright © 2011-2022 走看看