zoukankan      html  css  js  c++  java
  • Reactor模式

    本文内容主要转载于也谈Reactor模式,点击可以查看原文。

    什么是Reactor模式

    反应器设计模式(Reactor pattern)是一种为处理并发服务请求,并将请求提交到一个或者多个服务处理程序的事件设计模式。当客户端请求抵达后,服务处理程序使用多路分配策略,由一个非阻塞的线程来接收所有的请求,然后派发这些请求至相关的工作线程进行处理。

    对于高并发系统,常会使用Reactor模式,其代替了常用的多线程处理方式,节省系统的资源,提高系统的吞吐量(并不能提升响应速度)。像我们平时常用的Tomcat服务器和Netty框架中都有Reactor模式的实现。

    这边博客就来介绍下Reactor模式这个抽象的概念。

    从BIO模式到Reactor模式

    BIO模式

    Java1.4(2002年)以前,IO都是Blocking的,也就是常说的BIO,它在等待请求、读、写(返回)三个环节都是阻塞的。在等待请求阶段,系统无法知道请求何时到达,因此需要一个主线程一直守着,当有请求进来时,将请求分发给读写线程。如图:

    img

    代码如下:

        ExecutorService executor = Excutors.newFixedThreadPollExecutor(100);//线程池
        ServerSocket serverSocket = new ServerSocket();
        serverSocket.bind(8088); 
        while(!Thread.currentThread.isInturrupted()){//主线程死循环等待新连接到来        
            Socket socket = serverSocket.accept();
            executor.submit(new ConnectIOnHandler(socket));//为新的连接创建新的线程 
        }
    
        class ConnectIOnHandler extends Thread{
            private Socket socket; 
            public ConnectIOnHandler(Socket socket){ this.socket = socket; }     
            public void run(){ 
                while(!Thread.currentThread.isInturrupted()&&!socket.isClosed()){//死循环处理读写事件 
                    String someThing = socket.read()....//读取数据 
                    if(someThing!=null){ 
               ......//处理数据 
               socket.write()....//写数据 
                } 
            }
        }            
    

    需知,请求进来(accept),并不表示数据马上达到了,可能隔一段时间才会传进来,这个时候socket.read()也是一直阻塞的状态。socket.write()也同理,当向磁盘或其它socket写数据时,也要等对方准备好才能写入,在对方准备阶段,socket.write()也是阻塞的。这两个环节可能的无效阻塞导致读写线程的低效。

    NIO模式

    Java1.4开始,引入了NIO。NIO有三个概念:Selector、Buffer、Channel。与BIO的区别是,请求进来后,并不会马上分派IO线程,而是依靠操作系统底层的多路复用机制(select/poll/epoll等),在监听到socket读写就绪之后,再分配IO线程(实际可由当前线程[使用Buffer和Channel]直接读写,因为读写本身的效率很高),这就避免了线程等待。且与BIO多线程方式相比,使用I/O多路复用技术,系统不必创建和维护庞大的线程池,从而大大减小了开销。这部分工作是NIO的核心,由Selector负责,本质上是多路复用的Java封装。而Buffer和Channel又封装了一层socket的读写,应该为的是将IO与业务代码彻底分离。以下图示为本人理解:

    img

    如图示,与BIO中监听线程职责不同,Selector监听的不只是连接请求,还有读写就绪事件,当某个事件发生时,即通知注册了该事件的Channel,由Channel操作socket读写Buffer。虚线表示需要具体的NIO框架或业务代码自己处理,比如Channel如何注册以及注册何种事件,Channel处理IO的方式(如在当前线程处理还是新开线程,若新开线程,则可看作是AIO模式)等。NIO只是提供了一套机制,具体使用还是需要编程实现(Reactor模式就是OO的一种实现)。

    示例代码(摘自Java NIO详解

    服务端:

    package cn.blog.test.NioTest;
    
    
    import java.io.IOException;
    import java.net.InetSocketAddress;
    import java.nio.ByteBuffer;
    import java.nio.channels.*;
    import java.nio.charset.Charset;
    import java.util.Iterator;
    import java.util.Set;
    
    
    public class MyNioServer {
        private Selector selector;          //创建一个选择器
        private final static int port = 8686;
        private final static int BUF_SIZE = 10240;
    
        private void initServer() throws IOException {
            //创建通道管理器对象selector
            this.selector=Selector.open();
    
            //创建一个通道对象channel
            ServerSocketChannel channel = ServerSocketChannel.open();
            channel.configureBlocking(false);       //将通道设置为非阻塞
            channel.socket().bind(new InetSocketAddress(port));       //将通道绑定在8686端口
    
            //将上述的通道管理器和通道绑定,并为该通道注册OP_ACCEPT事件
            //注册事件后,当该事件到达时,selector.select()会返回(一个key),如果该事件没到达selector.select()会一直阻塞
            SelectionKey selectionKey = channel.register(selector,SelectionKey.OP_ACCEPT);
    
            while (true){       //轮询
                selector.select();          //这是一个阻塞方法,一直等待直到有数据可读,返回值是key的数量(可以有多个)
                Set keys = selector.selectedKeys();         //如果channel有数据了,将生成的key访入keys集合中
                Iterator iterator = keys.iterator();        //得到这个keys集合的迭代器
                while (iterator.hasNext()){             //使用迭代器遍历集合
                    SelectionKey key = (SelectionKey) iterator.next();       //得到集合中的一个key实例
                    iterator.remove();          //拿到当前key实例之后记得在迭代器中将这个元素删除,非常重要,否则会出错
                    if (key.isAcceptable()){         //判断当前key所代表的channel是否在Acceptable状态,如果是就进行接收
                        doAccept(key);
                    }else if (key.isReadable()){
                        doRead(key);
                    }else if (key.isWritable() && key.isValid()){
                        doWrite(key);
                    }else if (key.isConnectable()){
                        System.out.println("连接成功!");
                    }
                }
            }
        }
    
        public void doAccept(SelectionKey key) throws IOException {
            ServerSocketChannel serverChannel = (ServerSocketChannel) key.channel();
            System.out.println("ServerSocketChannel正在循环监听");
            SocketChannel clientChannel = serverChannel.accept();
            clientChannel.configureBlocking(false);
            clientChannel.register(key.selector(),SelectionKey.OP_READ);
        }
    
        public void doRead(SelectionKey key) throws IOException {
            SocketChannel clientChannel = (SocketChannel) key.channel();
            ByteBuffer byteBuffer = ByteBuffer.allocate(BUF_SIZE);
            long bytesRead = clientChannel.read(byteBuffer);
            while (bytesRead>0){
                byteBuffer.flip();
                byte[] data = byteBuffer.array();
                String info = new String(data).trim();
                System.out.println("从客户端发送过来的消息是:"+info);
                byteBuffer.clear();
                bytesRead = clientChannel.read(byteBuffer);
            }
            if (bytesRead==-1){
                clientChannel.close();
            }
        }
    
        public void doWrite(SelectionKey key) throws IOException {
            ByteBuffer byteBuffer = ByteBuffer.allocate(BUF_SIZE);
            byteBuffer.flip();
            SocketChannel clientChannel = (SocketChannel) key.channel();
            while (byteBuffer.hasRemaining()){
                clientChannel.write(byteBuffer);
            }
            byteBuffer.compact();
        }
    
        public static void main(String[] args) throws IOException {
            MyNioServer myNioServer = new MyNioServer();
            myNioServer.initServer();
        }
    }
    

    客户端:

    package cn.blog.test.NioTest;
    
    
    import java.io.IOException;
    import java.net.InetSocketAddress;
    import java.nio.ByteBuffer;
    import java.nio.channels.SelectionKey;
    import java.nio.channels.Selector;
    import java.nio.channels.SocketChannel;
    import java.util.Iterator;
    
    public class MyNioClient {
        private Selector selector;          //创建一个选择器
        private final static int port = 8686;
        private final static int BUF_SIZE = 10240;
        private static ByteBuffer byteBuffer = ByteBuffer.allocate(BUF_SIZE);
    
        private void  initClient() throws IOException {
            this.selector = Selector.open();
            SocketChannel clientChannel = SocketChannel.open();
            clientChannel.configureBlocking(false);
            clientChannel.connect(new InetSocketAddress(port));
            clientChannel.register(selector, SelectionKey.OP_CONNECT);
            while (true){
                selector.select();
                Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
                while (iterator.hasNext()){
                    SelectionKey key = iterator.next();
                    iterator.remove();
                    if (key.isConnectable()){
                        doConnect(key);
                    }else if (key.isReadable()){
                        doRead(key);
                    }
                }
            }
        }
    
        public void doConnect(SelectionKey key) throws IOException {
            SocketChannel clientChannel = (SocketChannel) key.channel();
            if (clientChannel.isConnectionPending()){
                clientChannel.finishConnect();
            }
            clientChannel.configureBlocking(false);
            String info = "服务端你好!!";
            byteBuffer.clear();
            byteBuffer.put(info.getBytes("UTF-8"));
            byteBuffer.flip();
            clientChannel.write(byteBuffer);
            //clientChannel.register(key.selector(),SelectionKey.OP_READ);
            clientChannel.close();
        }
    
        public void doRead(SelectionKey key) throws IOException {
            SocketChannel clientChannel = (SocketChannel) key.channel();
            clientChannel.read(byteBuffer);
            byte[] data = byteBuffer.array();
            String msg = new String(data).trim();
            System.out.println("服务端发送消息:"+msg);
            clientChannel.close();
            key.selector().close();
        }
    
        public static void main(String[] args) throws IOException {
            MyNioClient myNioClient = new MyNioClient();
            myNioClient.initClient();
        }
    }
    

    在早期的JDK1.4和1.5 update10版本之前,Selector基于select/poll模型实现,是基于IO复用技术的非阻塞IO,不是异步IO。在JDK1.5 update10和linux core2.6以上版本,sun优化了Selctor的实现,底层使用epoll替换了select/poll。另据说Buffer指向的并非堆内内存,NIO使用 Native 函数库直接分配堆外内存,然后通过一个存储在 Java 堆的 DirectByteBuffer 对象作为这块内存的引用进行操作,避免了在 Java 堆和 Native 堆中来回复制数据。

    NIO的实现解析可参看:深入浅出NIO Socket实现机制

    Reactor模式

    NIO为实现Reactor模式提供了基础,上面的NIO图示其实就是Reactor模式的雏形,只是Reactor以OO的方式抽象出了几个概念,使得职责划分更加明确。

    Reactor将服务器端的整个处理过程分成若干个事件,例如分为接收事件、读事件、写事件、执行事件等。Reactor通过事件检测机制将这些事件分发给不同处理器去处理。若干客户端连接访问服务器端,Reactor负责检测各种事件并分发到处理器,这些处理器包括接收连接的accept处理器、读数据的read处理器、写数据的write处理器以及执行逻辑的process处理器。在整个过程中只要有待处理的事件存在,即可以让Reactor线程不断往下执行,而不会阻塞在某处,所以处理效率很高。

    在实际应用中一般会对Reactor模式做相应的改进。常见的有两种方式:一种是在耗时的process处理器中引入多线程,如使用线程池;另一种是直接使用多个Reactor实例,每个Reactor实例对应一个线程。

    上图整体结构基本上与单线程的Reactor类似,只是引入了一个线程池。由于对连接的接收、对数据的读取和对数据的写入等操作基本上都耗时较少,因此把它们都放到Reactor线程中处理(图中可能画的不是非常准确,应该把accept、read和write的线程和reactor的线程画在一起)。然而,对于逻辑处理可能比较耗时的工作,可以在process处理器中引入线程池,process处理器自己不执行任务,而是交给线程池,从而在Reactor线程中避免了耗时的操作。将耗时的操作转移到线程池中后,尽管Reactor只有一个线程,它也能保证Reactor的高效。

    Reactor模式的另一种改进方式如上图。其中有多个Reactor实例,每个Reactor实例对应一个线程。因为接收事件是相对于服务器端而言的,所以客户端的连接接收工作统一由一个accept处理器负责,accept处理器会将接收的客户端连接均匀分配给所有Reactor实例,每个Reactor实例负责处理分配到该Reactor上的客户端连接,包括连接的读数据、写数据和逻辑处理。

    参考

  • 相关阅读:
    Chrome截屏-截取当前页
    SecureCRT 工具分享
    mongodb在shutdown时报错:shutdown must run from localhost when running db without auth
    gdb如何实现info vtbl命令
    aspose.word 替换图片
    字节跳动校招+社招
    Flink日志输出配置
    Kafka高可用及高性能原因
    基于SAAS模式的客服云平台落地实践
    代码Recode
  • 原文地址:https://www.cnblogs.com/54chensongxia/p/13203538.html
Copyright © 2011-2022 走看看