zoukankan      html  css  js  c++  java
  • nio原理和示例代码

    我正在为学习大数据打基础中,为了手撸rpc框架,需要懂得nio的原理,在搞懂nio框架前,我会带着大家手撸一些比较底层的代码,当然今后当我们学会了框架,这些繁琐的代码也就不用写了,但是学一学底层的代码也是有好处的嘛。

    java.nio全称java non-blocking IO(实际上是 new io),是指jdk1.4 及以上版本里提供的新api(New IO) ,为所有的原始类型(boolean类型除外)提供缓存支持的数据容器,使用它可以提供非阻塞式的高伸缩性网络。

    前面我写的socket的服务端与客户端的通信是线程阻塞的,这在实际应用场景中并不竟如人意,我们更多需要的是异步操作,用户无感知,当我们在操作主线程的时候,一些通信相关的线程不应该阻塞我们的主线程。我们需要传送数据,我们只要将请求发送出去,这时候具体的发送细节就应该交由底层的操作系统帮我们完成,我们应该可以操作主线程继续完成其他事情。nio就为我们解决这些事情提供了很好的办法。

    学会nio之前我们需要了解这几个概念:

    Channel:

    Channel是一个对象,可以通过它读取和写入数据。拿 NIO 与原来的 I/O 做个比较,通道就像是流,而且他们面向缓冲区的。
    所有数据都通过 Buffer 对象来处理。您永远不会将字节直接写入通道中,相反,您是将数据写入包含一个或者多个字节的缓冲区。同样,您不会直接从通道中读取字节,而是将数据从通道读入缓冲区,再从缓冲区获取这个字节。
    通道与流的不同之处在于通道是双向的。而流只是在一个方向上移动(一个流必须是 InputStream 或者 OutputStream 的子类), 而 通道 可以用于读、写或者同时用于读写。
    因为它们是双向的,所以通道可以比流更好地反映底层操作系统的真实情况。特别是在 UNIX 模型中,底层操作系统通道是双向的。
    缓冲区:
    是一个固定数据量的指定基本类型的数据容器。除内容之外,缓冲区还具有位置 和界限,其中位置是要读写的下一个元素的索引,界限是第一个应该读写的元素的索引。基本 Buffer 类定义了这些属性以及清除、反转 和重绕 方法,用以标记 当前位置,以及将当前位置重置 为前一个标记处。
    每个非布尔基本类型都有一个缓冲区类。每个类定义了一系列用于将数据移出或移入缓冲区的 get 和 put 方法,用于压缩、复制 和切片 缓冲区的方法,以及用于分的异类或同类二进制数据序列),访问要么是以 big-endian字节顺序进行,要么是以 little-endian 字节顺序进行。
    判断:
    由于客户端断开连接时,服务器端SocketChannel不会立即自动改变连接状态,其仍然可以read()。所以通常以read()返回值进行判断。当read()返回为-1时即判断该连接断开。即当channel读到末尾后仍然没有数据发送,服务器即断开连接。
    我们示例的基本架构如下:

    先定义一个TimeServer

    package com.wenbing.nio;
    
    public class TimeServer {
        public static void main(String[] args) {
            int port = 8085;
            if (args != null && args.length < 0) {
                port = Integer.valueOf(args[0]);
            }
            MultiplexerTimeServer timeServer = new MultiplexerTimeServer(port);
            new Thread(timeServer,"NIO-MultiplexerTimeServer-001").start();
        }
    }

    再定义一个MultiplexerTimeServer去实现Runnable接口,每个通信的操作交由这一个线程去完成。

    package com.wenbing.nio;
    
    import java.io.IOException;
    import java.net.InetSocketAddress;
    import java.nio.ByteBuffer;
    import java.nio.channels.*;
    import java.util.Iterator;
    import java.util.Set;
    
    public class MultiplexerTimeServer implements Runnable {
    
        private Selector selector;
    
        private ServerSocketChannel servChannel;
    
        private volatile boolean stop;
    
        /**
         * 初始化多路复用器、绑定监听端口
         *
         * @param port
         */
        public MultiplexerTimeServer(int port) {
            try {
                selector = Selector.open();
                servChannel = ServerSocketChannel.open();
    //            非阻塞
                servChannel.configureBlocking(false);
    //            绑定端口
                servChannel.socket().bind(new InetSocketAddress(port), 1024);
                servChannel.register(selector, SelectionKey.OP_ACCEPT);
                System.out.println("The time server is start in port : " + port);
            } catch (IOException e) {
                e.printStackTrace();
                System.exit(1);
            }
        }
    
        public void stop() {
            this.stop = true;
        }
    
        @Override
        public void run() {
            while (!stop) {
                try {
                    selector.select(1000);
    //                查询存在的活跃的key
                    Set<SelectionKey> selectedKeys = selector.selectedKeys();
    //                迭代所有活跃的key,进行操作
                    Iterator<SelectionKey> it = selectedKeys.iterator();
                    SelectionKey key = null;
                    while (it.hasNext()) {
                        key = it.next();
    //                    拿到某个key后,就将其从迭代器里除去
                        it.remove();
                        try {
                            handleInput(key);
                        } catch (Exception e) {
                            if (key != null) {
                                key.cancel();
                                if (key.channel() != null) {
                                    key.channel().close();
                                }
                            }
                        }
                    }
                } catch (Throwable t) {
                    t.printStackTrace();
                }
            }
    
    //        多路复用器关闭后,所有注册在上面的Channel和Pipe等资源都会被自动去注册并关闭,所以不需要单个关闭
            if (selector != null) {
                try {
                    selector.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    
        private void handleInput(SelectionKey key) throws IOException {
    
            if (key.isValid()) {
    //            处理新接入的请求消息
                if (key.isAcceptable()) {
    //                Accept the new connection
                    ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
                    SocketChannel sc = ssc.accept();
                    sc.configureBlocking(false);
    //                Add the new connection to the selector
                    sc.register(selector, SelectionKey.OP_READ);
                }
                if (key.isReadable()) {
    //                Read the data
                    SocketChannel sc = (SocketChannel) key.channel();
                    ByteBuffer readBuffer = ByteBuffer.allocate(1024);
                    int readBytes = sc.read(readBuffer);
                    if (readBytes > 0) {
                        readBuffer.flip();
                        byte[] bytes = new byte[readBuffer.remaining()];
                        readBuffer.get(bytes);
                        String body = new String(bytes, "UTF-8");
                        System.out.println("The time server receive order : " + body);
                        //将当前时间发回去
                        String currentTime = "QUERY TIME ORDER"
                                .equalsIgnoreCase(body) ? new java.util.Date(
                                System.currentTimeMillis()).toString() : "BAD ORDER";
                        doWrite(sc, currentTime);
                    } else if (readBytes < 0) {
    //                    对端链路关闭
                        key.cancel();
                        sc.close();
                    } else
                        ; //读到0字节,忽略
                }
            }
        }
    
        private void doWrite(SocketChannel channel, String response) throws IOException{
            if (response != null && response.trim().length() > 0) {
                byte[] bytes = response.getBytes();
                ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length);
                writeBuffer.put(bytes);
                writeBuffer.flip();
                channel.write(writeBuffer);
            }
        }
    
    }

    定义TimeClient

    package com.wenbing.nio;
    
    public class TimeClient {
    
        /**
         *
         * @param args
         */
        public static void main(String[] args) {
            int port = 8085;
            if (args != null && args.length > 0) {
                try {
                    port = Integer.valueOf(args[0]);
                } catch (NumberFormatException e) {
                    //采用默认值
                }
            }
            new Thread(new TimeClientHandle("127.0.0.1", port), "TimeClient-001").start();
        }
    }

    定义TimeClientHandle同样继承Runnable接口,与上面的MultiplexerTimeServer作用类似。

    package com.wenbing.nio;
    
    import java.io.IOException;
    import java.net.InetSocketAddress;
    import java.nio.ByteBuffer;
    import java.nio.channels.SelectionKey;
    import java.nio.channels.Selector;
    import java.nio.channels.SocketChannel;
    import java.util.Iterator;
    import java.util.Set;
    
    public class TimeClientHandle implements Runnable {
    
        private String host;
        private int port;
        private Selector selector;
        private SocketChannel socketChannel;
        private volatile boolean stop;
    
        public TimeClientHandle(String host, int port) {
            this.host = host == null ? "127.0.0.1" : host;
            this.port = port;
            try {
                selector = Selector.open();
                socketChannel = SocketChannel.open();
                socketChannel.configureBlocking(false);
            } catch (IOException e) {
                e.printStackTrace();
                System.exit(1);
            }
        }
    
        @Override
        public void run() {
    
            try {
                doConnect();
            } catch (IOException e) {
                e.printStackTrace();
                System.exit(1);
            }
            while (!stop) {
                try {
                    selector.select(1000);
                    Set<SelectionKey> selectedKeys = selector.selectedKeys();
                    Iterator<SelectionKey> it = selectedKeys.iterator();
                    SelectionKey key = null;
                    while (it.hasNext()) {
                        key = it.next();
                        it.remove();
                        try {
                            handleInput(key);
                        } catch (Exception e) {
                            if (key != null) {
                                key.cancel();
                                if (key.channel() != null) {
                                    key.channel().close();
                                }
                            }
                        }
                    }
                } catch (IOException e) {
                    e.printStackTrace();
                    System.exit(1);
                }
            }
        }
    
        private void doConnect() throws IOException {
            //如果直接连接成功,则注册到多路复用器上,发送请求消息,读应答
            if (socketChannel.connect(new InetSocketAddress(host, port))) {
                socketChannel.register(selector, SelectionKey.OP_READ);
                doWriter(socketChannel);
            } else {
                socketChannel.register(selector, SelectionKey.OP_CONNECT);
            }
        }
    
        private void doWriter(SocketChannel sc) throws IOException {
            byte[] req = "QUERY TIME ORDER".getBytes();
            ByteBuffer writeBuffer = ByteBuffer.allocate(1024);
            writeBuffer.put(req);
            writeBuffer.flip();
            sc.write(writeBuffer);
            if (!writeBuffer.hasRemaining()) {
                System.out.println("Send order 2 server succeed.");
            }
    
        }
    
        private void handleInput(SelectionKey key) throws IOException {
            if (key.isValid()) {
    //            判断连接是否成功
                SocketChannel sc = (SocketChannel) key.channel();
                if (key.isConnectable()) {
                    if (sc.finishConnect()) {
                        sc.register(selector, SelectionKey.OP_READ);
                        doWriter(sc);
                    } else {
                        System.exit(1);//连接失败,进程退出
                    }
                }
                if (key.isReadable()) {
                    ByteBuffer readBuffer = ByteBuffer.allocate(1024);
                    int readBytes = sc.read(readBuffer);
                    if (readBytes > 0) {
                        readBuffer.flip();
                        byte[] bytes = new byte[readBuffer.remaining()];
                        readBuffer.get(bytes);
                        String body = new String(bytes, "UTF-8");
                        System.out.println("Now is : " + body);
                        this.stop = true;
                    } else if (readBytes < 0) {
                        //对端链路关闭
                        key.cancel();
                        sc.close();
                    } else
                        ;//读到0字节,忽略
                }
            }
        }
    }
     启动TimeServer和TimeClient的main方法,运行结果如下:
    TimeServer控制台打印如下:
    The time server is start in port : 8085
    The time server receive order : QUERY TIME ORDER
    TimeClient控制台打印如下:
    Send order 2 server succeed.
    Now is : Sun Nov 04 00:10:56 CST 2018
     
    纸上得来终觉浅,绝知此事要躬行,快去动手自己撸一撸吧。
     
  • 相关阅读:
    如何只通过Sandboxed Solution启动一个定时执行的操作
    创建与SharePoint 2010风格一致的下拉菜单 (续) 整合Feature Custom Action框架
    创建与SharePoint 2010风格一致的下拉菜单
    《SharePoint 2010 应用程序开发指南》第二章预览
    SharePoint 2013 App 开发 (1) 什么是SharePoint App?
    使用Jscex增强SharePoint 2010 JavaScript Client Object Model (JSOM)
    搜索范围的管理
    SharePoint 2010 服务应用程序(Service Application)架构(1)
    SharePoint 2010 服务应用程序(Service Application)架构(2)
    SharePoint 2013 App 开发 (2) 建立开发环境
  • 原文地址:https://www.cnblogs.com/wenbinshen/p/9902814.html
Copyright © 2011-2022 走看看