zoukankan      html  css  js  c++  java
  • NIO 包及工作原理收藏

    NIO 包及工作原理


    针对传统I/O 工作模式的不足,NIO 工具包提出了基于Buffer(缓冲区)、Channel(通
    道)、Selector(选择器)的新模式;Selector(选择器)、可选择的Channel(通道)和
    SelectionKey(选择键)配合起来使用,可以实现并发的非阻塞型I/O 能力。

    NIO 工具包的成员


     Buffer(缓冲器)


    Buffer 类是一个抽象类,它有7 个子类分别对应于七种基本的数据类型:ByteBuffer、
    CharBuffer、DoubleBuffer、FloatBuffer、IntBuffer、LongBuffer 和ShortBuffer。每一个Buffer
    对象相当于一个数据容器,可以把它看作内存中的一个大的数组,用来存储和提取所有基本
    类型(boolean 型除外)的数据。Buffer 类的核心是一块内存区,可以直接对其执行与内存有关
    的操作,利用操作系统特性和能力提高和改善Java 传统I/O 的性能。


     Channel(通道)


    Channel 被认为是NIO 工具包的一大创新点,是(Buffer)缓冲器和I/O 服务之间的通道,
    具有双向性,既可以读入也可以写出,可以更高效的传递数据。我们这里主要讨论
    ServerSocketChannel 和SocketChannel,它们都继承了SelectableChannel,是可选择的通道,
    分别可以工作在同步和异步两种方式下(这里的可选择不是指可以选择两种工作方式,而是
    指可以有选择的注册自己感兴趣的事件)。当通道工作在同步方式时,它的功能和编程方法
    与传统的ServerSocket、Socket 对象相似;当通道工作在异步工作方式时,进行输入输出处
    理不必等到输入输出完毕才返回,并且可以将其感兴趣的(如:接受操作、连接操作、读出
    操作、写入操作)事件注册到Selector 对象上,与Selector 对象协同工作可以更有效率的支

    持和管理并发的网络套接字连接。


    Selector(选择器)和SelectionKey(选择键)


    各类 Buffer 是数据的容器对象;各类Channel 实现在各类Buffer 与各类I/O 服务间传输
    数据。Selector 是实现并发型非阻塞I/O 的核心,各种可选择的通道将其感兴趣的事件注册
    到Selector 对象上,Selector 在一个循环中不断轮循监视这各些注册在其上的Socket 通道。
    SelectionKey 类则封装了SelectableChannel 对象在Selector 中的注册信息。当Selector 监测
    到在某个注册的SelectableChannel 上发生了感兴趣的事件时,自动激活产生一个SelectionKey
    对象,在这个对象中记录了哪一个SelectableChannel 上发生了哪种事件,通过对被激活的
    SelectionKey 的分析,外界可以知道每个SelectableChannel 发生的具体事件类型,进行相应的

    处理。

    NIO 工作原理


    通过上面的讨论,我们可以看出在并发型服务器程序中使用NIO,实际上是通过网络事
    件驱动模型实现的。我们应用Select 机制,不用为每一个客户端连接新启线程处理,而是将
    其注册到特定的Selector 对象上,这就可以在单线程中利用Selector 对象管理大量并发的网
    络连接,更好的利用了系统资源;采用非阻塞I/O 的通信方式,不要求阻塞等待I/O 操作完
    成即可返回,从而减少了管理I/O 连接导致的系统开销,大幅度提高了系统性能。

    当有读或写等任何注册的事件发生时,可以从Selector 中获得相应的
    SelectionKey , 从SelectionKey 中可以找到发生的事件和该事件所发生的具体的
    SelectableChannel,以获得客户端发送过来的数据。由于在非阻塞网络I/O 中采用了事件触
    发机制,处理程序可以得到系统的主动通知,从而可以实现底层网络I/O 无阻塞、流畅地读
    写,而不像在原来的阻塞模式下处理程序需要不断循环等待。使用NIO,可以编写出性能更
    好、更易扩展的并发型服务器程序。

    服务器端程序:

    public class HelloWorldServer {    

      

        static int BLOCK = 1024;    

        static String name = "";    

        protected Selector selector;    

        protected ByteBuffer clientBuffer = ByteBuffer.allocate(BLOCK);    

        protected CharsetDecoder decoder;    

        static CharsetEncoder encoder = Charset.forName("GB2312").newEncoder();    

      

        public HelloWorldServer(int port) throws IOException {    

            selector = this.getSelector(port);    

            Charset charset = Charset.forName("GB2312");    

            decoder = charset.newDecoder();    

        }    

      

        // 获取Selector    

        protected Selector getSelector(int port) throws IOException {    

            ServerSocketChannel server = ServerSocketChannel.open();    

            Selector sel = Selector.open();    

            server.socket().bind(new InetSocketAddress(port));    

            server.configureBlocking(false);    

            server.register(sel, SelectionKey.OP_ACCEPT);    

            return sel;    

        }    

      

        // 监听端口    

        public void listen() {    

            try {    

                for (;;) {    

                    selector.select();    

                    Iterator iter = selector.selectedKeys().iterator();    

                    while (iter.hasNext()) {    

                        SelectionKey key = (SelectionKey) iter.next();    

                        iter.remove();    

                        process(key);    

                    }    

                }    

            } catch (IOException e) {    

                e.printStackTrace();    

            }    

        }    

      

        // 处理事件    

        protected void process(SelectionKey key) throws IOException {    

            if (key.isAcceptable()) { // 接收请求    

                ServerSocketChannel server = (ServerSocketChannel) key.channel();    

                SocketChannel channel = server.accept();    

                //设置非阻塞模式    

                channel.configureBlocking(false);    

                channel.register(selector, SelectionKey.OP_READ);    

            } else if (key.isReadable()) { // 读信息    

                SocketChannel channel = (SocketChannel) key.channel();    

                int count = channel.read(clientBuffer);    

                if (count > 0) {    

                    clientBuffer.flip();    

                    CharBuffer charBuffer = decoder.decode(clientBuffer);    

                    name = charBuffer.toString();    

                    // System.out.println(name);    

                    SelectionKey sKey = channel.register(selector,    

                            SelectionKey.OP_WRITE);    

                    sKey.attach(name);    

                } else {    

                    channel.close();    

                }    

      

                clientBuffer.clear();    

            } else if (key.isWritable()) { // 写事件    

                SocketChannel channel = (SocketChannel) key.channel();    

                String name = (String) key.attachment();    

                    

                ByteBuffer block = encoder.encode(CharBuffer    

                        .wrap("Hello !" + name));    

                    

      

                channel.write(block);    

      

                //channel.close();    

      

            }    

        }    

      

        public static void main(String[] args) {    

            int port = 8888;    

            try {    

                HelloWorldServer server = new HelloWorldServer(port);    

                System.out.println("listening on " + port);    

                    

                server.listen();    

                    

            } catch (IOException e) {    

                e.printStackTrace();    

            }    

        }    

    }   

    客户端程序:

    public class HelloWorldClient {    

      

        static int SIZE = 10;    

        static InetSocketAddress ip = new InetSocketAddress("localhost", 8888);    

        static CharsetEncoder encoder = Charset.forName("GB2312").newEncoder();    

      

        static class Message implements Runnable {    

            protected String name;    

            String msg = "";    

      

            public Message(String index) {    

                this.name = index;    

            }    

      

            public void run() {    

                try {    

                    long start = System.currentTimeMillis();    

                    //打开Socket通道    

                    SocketChannel client = SocketChannel.open();    

                    //设置为非阻塞模式    

                    client.configureBlocking(false);    

                    //打开选择器    

                    Selector selector = Selector.open();    

                    //注册连接服务端socket动作    

                    client.register(selector, SelectionKey.OP_CONNECT);    

                    //连接    

                    client.connect(ip);    

                    //分配内存    

                    ByteBuffer buffer = ByteBuffer.allocate(8 * 1024);    

                    int total = 0;    

      

                    _FOR: for (;;) {    

                        selector.select();    

                        Iterator iter = selector.selectedKeys().iterator();    

      

                        while (iter.hasNext()) {    

                            SelectionKey key = (SelectionKey) iter.next();    

                            iter.remove();    

                            if (key.isConnectable()) {    

                                SocketChannel channel = (SocketChannel) key    

                                        .channel();    

                                if (channel.isConnectionPending())    

                                    channel.finishConnect();    

                                channel    

                                        .write(encoder    

                                                .encode(CharBuffer.wrap(name)));    

      

                                channel.register(selector, SelectionKey.OP_READ);    

                            } else if (key.isReadable()) {    

                                SocketChannel channel = (SocketChannel) key    

                                        .channel();    

                                int count = channel.read(buffer);    

                                if (count > 0) {    

                                    total += count;    

                                    buffer.flip();    

      

                                    while (buffer.remaining() > 0) {    

                                        byte b = buffer.get();    

                                        msg += (char) b;    

                                            

                                    }    

      

                                    buffer.clear();    

                                } else {    

                                    client.close();    

                                    break _FOR;    

                                }    

                            }    

                        }    

                    }    

                    double last = (System.currentTimeMillis() - start) * 1.0 / 1000;    

                    System.out.println(msg + "used time :" + last + "s.");    

                    msg = "";    

                } catch (IOException e) {    

                    e.printStackTrace();    

                }    

            }    

        }    

      

        public static void main(String[] args) throws IOException {    

            

            String names[] = new String[SIZE];    

      

            for (int index = 0; index < SIZE; index++) {    

                names[index] = "jeff[" + index + "]";    

                new Thread(new Message(names[index])).start();    

            }    

            

        }    

    }   

  • 相关阅读:
    devel包
    Tomcat性能调优
    详述Oracle RAC的五大优势及其劣势
    Oracle实例内存(SGA和PGA)调整
    ubuntu upstart启动流程分析
    Python爬虫示例
    Tcp连接的七次握手浅析
    Apache的prefork模式和worker模式
    减少mysql主从数据同步延迟
    Ubuntu14.04 64bit安装Android-Studio
  • 原文地址:https://www.cnblogs.com/xinzhuangzi/p/4100469.html
Copyright © 2011-2022 走看看