zoukankan      html  css  js  c++  java
  • JAVA BIO,NIO,Reactor模式总结

    传统同步阻塞I/O(BIO)

    在NIO之前编写服务器使用的是同步阻塞I/O(Blocking I/O)。下面是一个典型的线程池客服端服务器示例代码,这段代码在连接数急剧上升的情况下,这个服务器代码就会不好使了,因为serverSocket.accept(),以及IO的read(),write()方法都是同步阻塞的,虽然通过线程池,避免频繁创建线程开销,但是该系统过于依赖线程,一个是线程的创建和销毁很耗时,再者线程的切换开销很大,尤其是在高并发的情况下系统压力不堪设想。

    BIO线程池客服端服务器示例代码

    /**
     * BIO服务器
     * @author monkjavaer
     * @date 2019/7/17 13:55
     */
    public class BioServer {
        public static final int PORT = 8888;
        public static void main(String[] args) {
            ServerSocket serverSocket = null;
            try {
                serverSocket = new ServerSocket(PORT);
                Socket socket = null;
                ThreadFactory namedThreadFactory = new ThreadFactory() {
                    @Override
                    public Thread newThread(Runnable r) {
                        Thread t = new Thread(r);
                        t.setName("demo-pool-%d");
                        return t;
                    }
                };
                //通过线程池,避免频繁创建线程开销
                ExecutorService singleThreadPool = new ThreadPoolExecutor(1, 1,
                        0L, TimeUnit.MILLISECONDS,
                        new LinkedBlockingQueue<Runnable>(1024), namedThreadFactory, new ThreadPoolExecutor.AbortPolicy());
    
                //主线程死循环等待新连接到来
                while(!Thread.currentThread().isInterrupted()){
                    socket = serverSocket.accept();
                    singleThreadPool.execute(new BioServerHandler(socket));
                }
            } catch (IOException e) {
                //TODO异常处理
            } finally {
             //TODO关闭资源
            }
        }
    }
    
    /**
     * BIO服务器事件处理方法
     * @author monkjavaer
     * @date 2019/7/17 14:00
     */
    public class BioServerHandler implements Runnable {
        private Socket socket;
        public BioServerHandler(Socket socket) {
            this.socket = socket;
        }
        @Override
        public void run() {
            try {
                byte[] input = new byte[1024];
                //服务器接收的数据
                socket.getInputStream().read(input);
                byte[] output = "服务器返回数据".getBytes();
                socket.getOutputStream().write(output);
           } catch (IOException e) {
                //TODO异常处理
            } finally {
             //TODO关闭资源
            }
        }
    }
    
    /**
     * 服务端
     * @author monkjavaer
     * @date 2019/7/17 15:06
     */
    public class BioClient {
        public static final int PORT = 8888;
        public static final String IP = "127.0.0.1";
        public static void main(String[] args) {
            Socket socket = null;
            PrintWriter printWriter = null;
            try {
                socket = new Socket(IP,PORT);
                socket.setSoTimeout(5000);
                printWriter = new PrintWriter(socket.getOutputStream());
                printWriter.println("客户端发送数据");
                printWriter.flush();
            } catch (IOException e) {
                //TODO异常处理
            } finally {
             //TODO关闭资源
            }
        }
    }
    

    NIO(非阻塞I/O)

    NIO就是非阻塞I/O(Non-blocking I/O)

    NIO重要组件回顾

    • 缓冲区(Buffer):一个Buffer对象是固定数量的数据的容器。其作用是一个存储器,或者分段运输区,在这里数据可被存储并在之后用于检索。ByteBuffer、IntBuffer、CharBuffer、LongBuffer、DoubleBuffer、FloatBuffer、ShortBuffer都是其实现类。
    • 通道(Channel):Channel 用于在字节缓冲区和位于通道另一侧的实体(通常是一个文件或套接字)之间有效地传输数据。 Channel是全双工的。
    • 选择器(Selector):Selector是NIO的多路复用器。Selector会不断轮询注册在它上面的通道Channel,找出就绪状态的Channel(Channel通道发生读、写事件)。Selector是基于底层操作系统机制,不同模式、不同版本都存在区别。Linux 上依赖于epoll;所以没有最大句柄的限制,因此一个线程做Selector轮询就能接入大量的客户端连接。

    NIO服务器示例代码

    NIO实现服务器代码步骤非常多,比较繁杂,所以推荐使用成熟的NIO框架Netty等。

    public class NioServer implements Runnable {
        private static Logger LOGGER = LoggerFactory.getLogger(NioServer.class);
        @Override
        public void run() {
            try {
                //1、打开ServerSocketChannel,监听客户端的链接
                ServerSocketChannel serverSocket = ServerSocketChannel.open();
                //2、绑定监听端口,设置backlog(默认50):请求传入连接队列的最大长度
                serverSocket.socket().bind(new InetSocketAddress(9011), 1024);
                //3、false,设置为非阻塞模式
                serverSocket.configureBlocking(false);
                //4、创建Selector,Selector是NIO的多路复用器,Selector会不断轮询注册在它上面的通道Channel,
                //找出就绪状态的Channel(Channel通道发生读、写事件)。
                Selector selector = Selector.open();
                //5、注册通道Channel到多路复用器Selector,并说明关注点SelectionKey.OP_ACCEPT,监听ACCEPT事件
                serverSocket.register(selector, SelectionKey.OP_ACCEPT);
                LOGGER.info("Listening on port {}" , 9011);
    
                //6、Selector轮询就绪的Channel
                while (true) {
                    // 阻塞等待就绪的 Channel,这是关键点之一
                    //selector1秒被唤醒
                    int n = selector.select(1000);
                    if (n == 0) {
                        continue;
                    }
                    Set<SelectionKey> selectedKeys = selector.selectedKeys();
                    Iterator<SelectionKey> iter = selectedKeys.iterator();
                    while (iter.hasNext()) {
                        SelectionKey key = iter.next();
                        if (key.isValid()) {
    
                            if (key.isAcceptable()) {
                                //SelectionKey可以获取就绪状态的Channel
                                ServerSocketChannel serverSocketChannel = (ServerSocketChannel) key.channel();
                                //7、多路复用器Selector监听到有新的客户端连接,完成TCP三次握手建立连接。
                                SocketChannel clientSocketChannel = serverSocketChannel.accept();
                                //8、设置客户端SocketChannel为非阻塞模式
                                clientSocketChannel.configureBlocking(false);
                                //9、注册加入新的通道OP_READ
                                clientSocketChannel.register(selector, SelectionKey.OP_READ);
                            }
    
                            //读取客户端数据
                            //if(key.isReadable())等价于if((key.readyOps( ) & SelectionKey.OP_READ) != 0)
                            if (key.isReadable()) {
                                SocketChannel socketChannel = (SocketChannel) key.channel();
                                //创建buffer
                                ByteBuffer readBuffer = ByteBuffer.allocate(1024);
                                int readPosition = socketChannel.read(readBuffer);
                                if (readPosition > 0) {
                                    //flip()方法,Buffer从写模式切换到读模式,将limit设置为position,position设为0。
                                    readBuffer.flip();
                                    byte[] bytes = new byte[readBuffer.remaining()];
                                    //从可读buffer中读取数据
                                    readBuffer.get(bytes);
                                    LOGGER.info("接收客户端发送消息:{}" , new String(bytes, StandardCharsets.UTF_8));
    
                                    byte[] sendBytes = "server 收到".getBytes();
                                    ByteBuffer writeBuffer = ByteBuffer.allocate(1024);
                                    writeBuffer.flip();
                                    //put 向buffer添加元素
                                    writeBuffer.put(sendBytes);
                                    socketChannel.write(writeBuffer);
                                }
    
                                if (readPosition < 0) {
                                    // Close channel on EOF, invalidates the key
                                    key.cancel();
                                    socketChannel.close();
                                }
                            }
                        }
                        iter.remove();
                    }
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    
        public static void main(String[] args) {
            new Thread(new NioServer()).start();
        }
    }
    
    public class NioClient {
        private static Logger LOGGER = LoggerFactory.getLogger(NioClient.class);
        private static int PORT = 9011;
        private static String[] messages = {"这是服务器"};
    
        public static void main(String[] args) {
            try {
                SocketChannel socketChannel = SocketChannel.open(new InetSocketAddress(InetAddress.getLocalHost(), PORT));
                for (String msg : messages) {
                    ByteBuffer myBuffer = ByteBuffer.allocate(1024);
                    myBuffer.put(msg.getBytes());
                    myBuffer.flip();
                    socketChannel.write(myBuffer);
                }
                LOGGER.info("Closing Client connection...");
                socketChannel.close();
            } catch (IOException e) {
                LOGGER.error(e.getMessage());
            }
        }
    
    }
    

    Reactor模式

    Reactor模式首先是事件驱动的,有一个或多个并发输入源,有一个Service Handler,有多个Request Handlers;这个Service Handler会同步的将输入的请求(Event)多路复用的分发给相应的Request Handler。是一种为处理并发服务请求,并将请求提交到一个或者多个服务处理程序的事件设计模式。

    Reactor模式模块组成

    http://www.blogjava.net/DLevin/archive/2015/09/02/427045.html

    Scalable IO in Java原文和翻译

    http://gee.cs.oswego.edu/dl/cpjslides/nio.pdf
    https://www.cnblogs.com/luxiaoxun/archive/2015/03/11/4331110.html

    Reactor An Object Behavioral Pattern for Demultiplexing and Dispatching Handles for Synchronous Events

    http://www.dre.vanderbilt.edu/~schmidt/PDF/reactor-siemens.pdf

  • 相关阅读:
    感觉每天打开自己的博客园, 想编程的心情就多了起来~~~
    算法图解相关代码整理
    github cli
    What's WebFlux ? And how to use it ? 一股有咖喱味的WebFlux简介
    style
    gradle 1
    gradle打包可运行jar
    外面下着雨
    天晴朗 看花儿多多开放
    Full Stack Reactive with React and Spring WebFlux
  • 原文地址:https://www.cnblogs.com/monkjavaer/p/11209141.html
Copyright © 2011-2022 走看看