zoukankan      html  css  js  c++  java
  • Java NIO服务器端开发

    一、NIO类库简介

      1、缓冲区Buffer

      Buffer是一个对象,包含一些要写入和读出的数据。

      在NIO中,所有的数据都是用缓冲区处理的,读取数据时,它是从通道(Channel)直接读到缓冲区中,在写入数据时,也是从缓冲区写入到通道。

      缓冲区实质上是一个数组,通常是一个字节数组(ByteBuffer),也可以是其它类型的数组,此外缓冲区还提供了对数据的结构化访问以及维护读写位置等信息。

      Buffer类的继承关系如下图所示:

      

      2、通道Channel

      Channel是一个通道,网络数据通过Channel读取和写入。通道和流的不同之处在于通道是双向的(通道可以用于读、写后者二者同时进行),流只是在一个方向上移动。

      Channel大体上可以分为两类:用于网络读写的SelectableChannel(ServerSocketChannel和SocketChannel就是其子类)、用于文件操作的FileChannel。

      下面的例子给出通过FileChannel来向文件中写入数据、从文件中读取数据,将文件数据拷贝到另一个文件中:

    public class NioTest
    {
        public static void main(String[] args) throws IOException
        {
            copyFile();
        }
        //拷贝文件
        private static void copyFile()
        {
            FileInputStream in=null;
            FileOutputStream out=null;
            try
            {
                in=new FileInputStream("src/main/java/data/in-data.txt");
                out=new FileOutputStream("src/main/java/data/out-data.txt");
                FileChannel inChannel=in.getChannel();
                FileChannel outChannel=out.getChannel();
                ByteBuffer buffer=ByteBuffer.allocate(1024);
                int bytesRead = inChannel.read(buffer);
                while (bytesRead!=-1)
                {
                    buffer.flip();
                    outChannel.write(buffer);
                    buffer.clear();
                    bytesRead = inChannel.read(buffer);
                }
            }
            catch (FileNotFoundException e)
            {
                // TODO Auto-generated catch block
                e.printStackTrace();
            } catch (IOException e)
            {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }    
        }
        //写文件
        private static void writeFileNio()
        {
            try
            {
                RandomAccessFile fout = new RandomAccessFile("src/main/java/data/nio-data.txt", "rw");
                FileChannel fc=fout.getChannel();
                ByteBuffer buffer=ByteBuffer.allocate(1024);
                buffer.put("hi123".getBytes());
                buffer.flip();
                try
                {
                    fc.write(buffer);
                } catch (IOException e)
                {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
            } 
            catch (FileNotFoundException e)
            {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }
        //读文件
        private static void readFileNio()
        {
            FileInputStream fileInputStream;
            try
            {
                fileInputStream = new FileInputStream("src/main/java/data/nio-data.txt");
                FileChannel fileChannel=fileInputStream.getChannel();//从 FileInputStream 获取通道
                ByteBuffer byteBuffer=ByteBuffer.allocate(1024);//创建缓冲区
                int bytesRead=fileChannel.read(byteBuffer);//将数据读到缓冲区
                while(bytesRead!=-1)
                {
                    /*limit=position
                     * position=0;
                     */
                    byteBuffer.flip();
                    //hasRemaining():告知在当前位置和限制之间是否有元素
                    while (byteBuffer.hasRemaining())
                    {
                        System.out.print((char) byteBuffer.get());
                    }
                    /*
                     * 清空缓冲区
                     * position=0;
                     * limit=capacity;
                     */
                    byteBuffer.clear();
                    bytesRead = fileChannel.read(byteBuffer);
                }
            } catch (FileNotFoundException e)
            {
                // TODO Auto-generated catch block
                e.printStackTrace();
            } catch (IOException e)
            {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }
    }

      3、多路复用器Selector

      多路复用器提供选择已经就绪的任务的能力。Selector会不断的轮询注册在其上的Channel,如果某个Channel上面发送读或者写事件,这个Channel就处于就绪状态,会被Selector轮询出来,然后通过SelectionKey可以获取就绪Channel的集合,进行后续的I/O操作。

      一个多路复用器Selector可以同时轮询多个Channel,由于JDK使用了epoll代替了传统的select实现,所以它没有最大连接句柄1024/2048的限制,意味着只需要一个线程负责Selector的轮询,就可以接入成千上万的客户端。其模型如下图所示:

      

      用单线程处理一个Selector。要使用Selector,得向Selector注册Channel,然后调用它的select()方法。这个方法会一直阻塞到某个注册的通道有事件就绪。一旦这个方法返回,线程就可以处理这些事件,事件的例子有如新连接进来,数据接收等。 

      注:

      1、什么select模型?

      select是事件触发机制,当等待的事件发生就触发进行处理,多用于Linux实现的服务器对客户端的处理。

      可以阻塞地同时探测一组支持非阻塞的IO设备,是否有事件发生(如可读、可写,有高优先级错误输出等),直至某一个设备触发了事件或者超过了指定的等待时间。也就是它们的职责不是做IO,而是帮助调用者寻找当前就绪的设备。

      2、什么是epoll模型?

      epoll的设计思路,是把select/poll单个的操作拆分为1个epoll_create+多个epoll_ctrl+一个wait。此外,内核针对epoll操作添加了一个文件系统”eventpollfs”,每一个或者多个要监视的文件描述符都有一个对应的eventpollfs文件系统的inode节点,主要信息保存在eventpoll结构体中。而被监视的文件的重要信息则保存在epitem结构体中。所以他们是一对多的关系。

    二、NIO服务器端开发

      功能说明:开启服务器端,对每一个接入的客户端都向其发送hello字符串。

      使用NIO进行服务器端开发主要有以下几个步骤:

      1、创建ServerSocketChannel,配置它为非阻塞模式

        serverSocketChannel = ServerSocketChannel.open();
        serverSocketChannel.configureBlocking(false);

      2、绑定监听,配置TCP参数,如backlog大小

        serverSocketChannel.socket().bind(new InetSocketAddress(8080));

      3、创建一个独立的I/O线程,用于轮询多路复用器Selector

      4、创建Selector,将之前创建的ServerSocketChannel注册到Selector上,监听SelectionKey.ACCEPT

      selector=Selector.open();
       serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);

      5、启动I/O线程,在循环体内执行Selector.select()方法,轮询就绪的Channel

      while(true)
       {
            try
            {
               //select()阻塞到至少有一个通道在你注册的事件上就绪了
               //如果没有准备好的channel,就在这一直阻塞
               //select(long timeout)和select()一样,除了最长会阻塞timeout毫秒(参数)。
               selector.select();
            } 
            catch (IOException e)
            {
               // TODO Auto-generated catch block
               e.printStackTrace();
               break;
             }
     }

      6、当轮询到了处于就绪状态的Channel时,需对其进行判断,如果是OP_ACCEPT状态,说明是新的客户端接入,则调用ServerSocketChannel.accept()方法接受新的客户端

          //返回已经就绪的SelectionKey,然后迭代执行
                Set<SelectionKey> readKeys=selector.selectedKeys();
                for(Iterator<SelectionKey> it=readKeys.iterator();it.hasNext();)
                {
                    SelectionKey key=it.next();
                    it.remove();
                    try
                    {
                        if(key.isAcceptable())
                        {
                            ServerSocketChannel server=(ServerSocketChannel) key.channel();
                            SocketChannel client=server.accept();
                            client.configureBlocking(false);
                            client.register(selector,SelectionKey.OP_WRITE);
                        }
                        else if(key.isWritable())
                        {
                            SocketChannel client=(SocketChannel) key.channel();
                            ByteBuffer buffer=ByteBuffer.allocate(20);
                            String str="hello";
                            buffer=ByteBuffer.wrap(str.getBytes());
                            client.write(buffer);
                            key.cancel();
                        } 
                    }catch(IOException e)
                    {
                        e.printStackTrace();
                        key.cancel();
                        try
                        {
                            key.channel().close();
                        } catch (IOException e1)
                        {
                            // TODO Auto-generated catch block
                            e1.printStackTrace();
                        }
                        
                    }
                }    

      7、设置新接入的客户端链路SocketChannel为非阻塞模式,配置其他的一些TCP参数

      if(key.isAcceptable())
        {
            ServerSocketChannel server=(ServerSocketChannel) key.channel();
            SocketChannel client=server.accept();
            client.configureBlocking(false);
            ...
        }

      8、将SocketChannel注册到Selector,监听OP_WRITE

      client.register(selector,SelectionKey.OP_WRITE);

      9、如果轮询的Channel为OP_WRITE,则说明要向SockChannel中写入数据,则构造ByteBuffer对象,写入数据包

      else if(key.isWritable())
        {
            SocketChannel client=(SocketChannel) key.channel();
            ByteBuffer buffer=ByteBuffer.allocate(20);
            String str="hello";
            buffer=ByteBuffer.wrap(str.getBytes());
            client.write(buffer);
            key.cancel();
        } 

      完整代码如下:

    import java.io.IOException;
    import java.net.InetSocketAddress;
    import java.nio.ByteBuffer;
    import java.nio.channels.SelectionKey;
    import java.nio.channels.Selector;
    import java.nio.channels.ServerSocketChannel;
    import java.nio.channels.SocketChannel;
    import java.util.Iterator;
    import java.util.Set;
    
    public class ServerSocketChannelDemo
    {
        public static void main(String[] args)
        {
            ServerSocketChannel serverSocketChannel;
            Selector selector=null;
            try
            {
                serverSocketChannel = ServerSocketChannel.open();
                serverSocketChannel.configureBlocking(false);
                serverSocketChannel.socket().bind(new InetSocketAddress(8080));
                selector=Selector.open();
                serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
            }
            catch (IOException e)
            {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
            while(true)
            {
                try
                {
                    //select()阻塞到至少有一个通道在你注册的事件上就绪了
                    //如果没有准备好的channel,就在这一直阻塞
                    //select(long timeout)和select()一样,除了最长会阻塞timeout毫秒(参数)。
                    selector.select();
                } 
                catch (IOException e)
                {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                    break;
                }
                //返回已经就绪的SelectionKey,然后迭代执行
                Set<SelectionKey> readKeys=selector.selectedKeys();
                for(Iterator<SelectionKey> it=readKeys.iterator();it.hasNext();)
                {
                    SelectionKey key=it.next();
                    it.remove();
                    try
                    {
                        if(key.isAcceptable())
                        {
                            ServerSocketChannel server=(ServerSocketChannel) key.channel();
                            SocketChannel client=server.accept();
                            client.configureBlocking(false);
                            client.register(selector,SelectionKey.OP_WRITE);
                        }
                        else if(key.isWritable())
                        {
                            SocketChannel client=(SocketChannel) key.channel();
                            ByteBuffer buffer=ByteBuffer.allocate(20);
                            String str="hello";
                            buffer=ByteBuffer.wrap(str.getBytes());
                            client.write(buffer);
                            key.cancel();
                        } 
                    }catch(IOException e)
                    {
                        e.printStackTrace();
                        key.cancel();
                        try
                        {
                            key.channel().close();
                        } catch (IOException e1)
                        {
                            // TODO Auto-generated catch block
                            e1.printStackTrace();
                        }
                        
                    }
                }    
            }
        }
    }
    View Code

      我们用telnet localhost 8080模拟出多个客户端:

      

      程序运行结果如下:

      

    三、参考资料

      1、netty权威指南(李林峰)

      

  • 相关阅读:
    使用dom4j来处理xml的一些常用方法
    xgqfrms™, xgqfrms® : xgqfrms's offical website of GitHub!
    xgqfrms™, xgqfrms® : xgqfrms's offical website of GitHub!
    xgqfrms™, xgqfrms® : xgqfrms's offical website of GitHub!
    xgqfrms™, xgqfrms® : xgqfrms's offical website of GitHub!
    xgqfrms™, xgqfrms® : xgqfrms's offical website of GitHub!
    xgqfrms™, xgqfrms® : xgqfrms's offical website of GitHub!
    xgqfrms™, xgqfrms® : xgqfrms's offical website of GitHub!
    xgqfrms™, xgqfrms® : xgqfrms's offical website of GitHub!
    xgqfrms™, xgqfrms® : xgqfrms's offical website of GitHub!
  • 原文地址:https://www.cnblogs.com/xujian2014/p/5657540.html
Copyright © 2011-2022 走看看