zoukankan      html  css  js  c++  java
  • Java NIO使用及原理分析(4) 来自网上资料整理

    在上一篇文章中介绍了关于缓冲 区的一些细节内容,现在终于可以进入NIO中最有意思的部分非阻塞I/O。通常在进行同步I/O操作时,如果读取数据,代码会阻塞直至有 可供读取的数据。同样,写入调用将会阻塞直至数据能够写入。传统的Server/Client模式会基于TPR(Thread per Request),服务器会为每个客户端请求建立一个线程,由该线程单独负责处理一个客户请求。这种模式带来的一个问题就是线程数量的剧增,大量的线程会 增大服务器的开销。大多数的实现为了避免这个问题,都采用了线程池模型,并设置线程池线程的最大数量,这由带来了新的问题,如果线程池中有200个线程, 而有200个用户都在进行大文件下载,会导致第201个用户的请求无法及时处理,即便第201个用户只想请求一个几KB大小的页面。传统的 Server/Client模式如下图所示:

    NIO中非阻塞I/O采用了基 于Reactor模式的工作方式,I/O调用不会被阻塞,相反是注册感兴趣的特定I/O事件,如可读数据到达,新的套接字连接等等,在发生特定事件时,系 统再通知我们。NIO中实现非阻塞I/O的核心对象就是Selector,Selector就是注册各种I/O事件地 方,而且当那些事件发生时,就是这个对象告诉我们所发生的事件,如下图所示:

    从图中可以看出,当有读 或写等任何注册的事件发生时,可以从Selector中获得相应的SelectionKey,同时从 SelectionKey中可以找到发生的事件和该事件所发生的具体的SelectableChannel,以获得客户端发送过来的数据。关于 SelectableChannel的可以参考Java NIO使用及原理分析(一)

    使用NIO中非阻塞I/O编写服务器处理程序,大体上可以分为下面三个步骤:

    1. 向Selector对象注册感兴趣的事件
    2. 从Selector中获取感兴趣的事件
    3. 根据不同的事件进行相应的处理

    接下来我们用一个简单的示例来说明整个过程。首先是向Selector对象注册感兴趣的事件:

     

        /* 
         * 注册事件 
         * */  
        protected Selector getSelector() throws IOException {  
            // 创建Selector对象  
            Selector sel = Selector.open();  
              
            // 创建可选择通道,并配置为非阻塞模式  
            ServerSocketChannel server = ServerSocketChannel.open();  
            server.configureBlocking(false);  
              
            // 绑定通道到指定端口  
            ServerSocket socket = server.socket();  
            InetSocketAddress address = new InetSocketAddress(port);  
            socket.bind(address);  
              
            // 向Selector中注册感兴趣的事件  
            server.register(sel, SelectionKey.OP_ACCEPT);   
            return sel;  
        }  

     

     

     

    创 建了ServerSocketChannel对象,并调用configureBlocking()方法,配置为非阻塞模式,接下来的三行代码把该通道绑定 到指定端口,最后向Selector中注册事件,此处指定的是参数是OP_ACCEPT,即指定我们想要监听accept事件,也就是新的连接发 生时所产生的事件,对于ServerSocketChannel通道来说,我们唯一可以指定的参数就是OP_ACCEPT。

     

    从Selector中获取感兴趣的事件,即开始监听,进入内部循环:

     

    /* 
     * 开始监听 
     * */   
    public void listen(Selector selector) {   
        System.out.println("listen on " + port);  
        try {   
            while(true) {   
                // 该调用会阻塞,直到至少有一个事件发生  
                selector.select();   
                Set<SelectionKey> keys = selector.selectedKeys();  
                Iterator<SelectionKey> iter = keys.iterator();  
                while (iter.hasNext()) {   
                    SelectionKey key = (SelectionKey) iter.next();   
                    iter.remove();   
                    process(key);   
                }   
            }   
        } catch (IOException e) {   
            e.printStackTrace();  
        }   
    } 

     

     

     

    在非阻塞I/O中,内部循环模式基本都是遵循这种方式。首先调用select()方法,该方法会阻塞,直到至少有一个事件发生,然后再使用selectedKeys()方法获取发生事件的SelectionKey,再使用迭代器进行循环。

    最后一步就是根据不同的事件,编写相应的处理代码:

     

    /* 
     * 根据不同的事件做处理 
     * */  
    private CharsetDecoder decoder;//解码器
    private CharsetEncoder encoder;//编码器 protected void process(SelectionKey key) throws IOException{ // 接收请求 if (key.isAcceptable()) { ServerSocketChannel server = (ServerSocketChannel) key.channel(); SocketChannel channel = server.accept(); channel.configureBlocking(false); channel.register(selector, SelectionKey.OP_READ); } // 读信息 else if (key.isReadable()) { SocketChannel channel = (SocketChannel) key.channel(); int count = channel.read(buffer); if (count > 0) { buffer.flip(); CharBuffer charBuffer = decoder.decode(buffer); name = charBuffer.toString(); SelectionKey sKey = channel.register(selector, SelectionKey.OP_WRITE); sKey.attach(name); } else { channel.close(); } buffer.clear(); } // 写事件 else if (key.isWritable()) { SocketChannel channel = (SocketChannel) key.channel(); String name = (String) key.attachment(); ByteBuffer block = encoder.encode(CharBuffer.wrap("Hello " + name)); if(block != null) { channel.write(block); } else { channel.close(); } } }

     

     

    另外招来的资料

    package com.vista.Server;
    
    import java.io.IOException;
    import java.net.InetSocketAddress;
    import java.net.ServerSocket;
    import java.nio.ByteBuffer;
    import java.nio.channels.SelectionKey;
    import java.nio.channels.Selector;
    import java.nio.channels.ServerSocketChannel;
    import java.nio.channels.SocketChannel;
    import java.util.Iterator;
    import java.util.LinkedList;
    import java.util.Set;
    
    public class SelectorServer 
    {
        private static int DEFAULT_SERVERPORT = 6018;//默认端口
        private static int DEFAULT_BUFFERSIZE = 1024;//默认缓冲区大小为1024字节
        private ServerSocketChannel channel;
        private LinkedList<SocketChannel> clients;
        private Selector readSelector;
        private ByteBuffer buffer;//字节缓冲区
        private int port;
        
        public SelectorServer(int port) throws IOException
        {
            this.port = port;
            this.clients = new LinkedList<SocketChannel>();
            this.channel = null;
            this.readSelector = Selector.open();//打开选择器
            this.buffer = ByteBuffer.allocate(DEFAULT_BUFFERSIZE);
        }
         // 服务器程序在服务循环中调用sericeClients()方法为已接受的客户服务
        public void serviceClients()throws IOException
        {
            Set keys;
            Iterator it;
            SelectionKey key;
            SocketChannel client;
            // 在readSelector上调用select()方法,参数1代表如果调用select的时候 那么阻塞最多1秒钟等待可用的客户端连接
            if(readSelector.select(1) > 0)
            {
                keys = readSelector.selectedKeys(); // 取得代表端通道的键集合
                it = keys.iterator();
               // 遍历,为每一个客户服务 
                while(it.hasNext()) 
                {
                   key = (SelectionKey)it.next();
                   if(key.isReadable())
                   { // 如果通道可读,那么读此通道到buffer中
                      int bytes;
                      client = (SocketChannel)key.channel();// 取得键对应的通道
                      buffer.clear(); // 清空缓冲区中的内容,设置好position,limit,准备接受数据
                      bytes = client.read(buffer); // 从通道中读数据到缓冲中,返回读取得字节数
                      if(bytes >= 0) 
                      {
                         buffer.flip(); // 准备将缓冲中的数据写回到通道中
                         client.write(buffer);  // 数据写回到通道中
                      } 
                      else if(bytes < 0) 
                      { // 如果返回小于零的值代表读到了流的末尾
                         clients.remove(client);
                      // 通道关闭时,选择键也被取消
                         client.close();
                      }
                   }
                }
             }
        }
        
        public void registerClient(SocketChannel client) throws IOException
        {// 配置和注册代表客户连接的通道对象
            client.configureBlocking(false);  // 设置此通道使用非阻塞模式    
            client.register(readSelector, SelectionKey.OP_READ); // 将这个通道注册到选择器上
            clients.add(client); //保存这个通道对象
        }
        public void listen() throws IOException
        { //服务器开始监听端口,提供服务
            ServerSocket socket;
            SocketChannel client;
            channel = ServerSocketChannel.open(); // 打开通道
            socket = channel.socket();   //得到与通到相关的socket对象
            socket.bind(new InetSocketAddress(port), 10);   //将scoket榜定在制定的端口上
            //配置通到使用非阻塞模式,在非阻塞模式下,可以编写多道程序同时避免使用复杂的多线程
            channel.configureBlocking(false);    
            try 
            {
                while(true) 
                {//     与通常的程序不同,这里使用channel.accpet()接受客户端连接请求,而不是在socket对象上调用accept(),这里在调用accept()方法时如果通道配置为非阻塞模式,那么accept()方法立即返回null,并不阻塞
                    client = channel.accept();    
                    if(client != null)
                    {
                        registerClient(client); // 注册客户信息
                    }
                    serviceClients();  // 为以连接的客户服务
                }
            } 
            finally 
            {
                socket.close(); // 关闭socket,关闭socket会同时关闭与此socket关联的通道
            }
        }
        public static void main(String[] args) throws IOException 
        {
            System.out.println("服务器启动");
            SelectorServer server = new SelectorServer(SelectorServer.DEFAULT_SERVERPORT);
            server.listen(); //服务器开始监听端口,提供服务
    
            
        }
    
    }
    package com.vista.Server;
    
    import java.io.BufferedWriter;
    import java.io.FileInputStream;
    import java.io.IOException;
    import java.io.OutputStreamWriter;
    import java.io.PrintWriter;
    import java.net.InetSocketAddress;
    import java.net.ServerSocket;
    import java.nio.ByteBuffer;
    import java.nio.CharBuffer;
    import java.nio.channels.FileChannel;
    import java.nio.channels.SelectionKey;
    import java.nio.channels.Selector;
    import java.nio.channels.ServerSocketChannel;
    import java.nio.channels.SocketChannel;
    import java.nio.charset.Charset;
    import java.nio.charset.CharsetDecoder;
    import java.util.Iterator;
    import java.util.LinkedList;
    import java.util.Set;
    
    public class SelectorServer 
    {
        private static int DEFAULT_SERVERPORT = 6018;//默认端口
        private static int DEFAULT_BUFFERSIZE = 1024;//默认缓冲区大小为1024字节
        private static String DEFAULT_CHARSET = "GB2312";//默认码集
        private static String DEFAULT_FILENAME = "bigfile.dat";
        private ServerSocketChannel channel;
        private LinkedList<SocketChannel> clients;
        private Selector selector;//选择器
        private ByteBuffer buffer;//字节缓冲区
        private int port;
        private Charset charset;//字符集
        private CharsetDecoder decoder;//解码器
        
        
        public SelectorServer(int port) throws IOException
        {
            this.port = port;
            this.clients = new LinkedList<SocketChannel>();
            this.channel = null;
            this.selector = Selector.open();//打开选择器
            this.buffer = ByteBuffer.allocate(DEFAULT_BUFFERSIZE);
            this.charset = Charset.forName(DEFAULT_CHARSET);
            this.decoder = this.charset.newDecoder();
            
        }
        
         private class HandleClient 
         {
             private String strGreeting = "welcome to VistaQQ";
             public HandleClient() throws IOException 
             {
             }
             public String readBlock() 
             {//读块数据
                 return this.strGreeting;
             }
             public void close() 
             {
                 
             }
        }
    
        protected void handleKey(SelectionKey key) throws IOException
        {//处理事件
              if (key.isAcceptable()) 
              { // 接收请求
                  ServerSocketChannel server = (ServerSocketChannel) key.channel();//取出对应的服务器通道
                  SocketChannel channel = server.accept();
                  channel.configureBlocking(false);
                  channel.register(selector, SelectionKey.OP_READ);//客户socket通道注册读操作
              }
              else if (key.isReadable()) 
              { // 读信息
                  SocketChannel channel = (SocketChannel) key.channel();
                  int count = channel.read(this.buffer);
                  if (count > 0) 
                  {
                    this.buffer.flip();
                    CharBuffer charBuffer = decoder.decode(this.buffer);
                    System.out.println("Client >>" + charBuffer.toString());
                    SelectionKey wKey = channel.register(selector,
                        SelectionKey.OP_WRITE);//为客户sockt通道注册写操作
                    wKey.attach(new HandleClient());
                  } 
                  else
                  {//客户已经断开
                    channel.close();
                  }
                  this.buffer.clear();//清空缓冲区
             }
             else if (key.isWritable()) 
             { // 写事件
                  SocketChannel channel = (SocketChannel) key.channel();
                  HandleClient handle = (HandleClient) key.attachment();//取出处理者
                  ByteBuffer block = ByteBuffer.wrap(handle.readBlock().getBytes());
                  channel.write(block);
                 // channel.socket().getInputStream().(block);
    //              PrintWriter out = new PrintWriter(new BufferedWriter(new OutputStreamWriter(
    //                        channel.socket().getOutputStream())), true);
    //              out.write(block.toString());
    
            }
    
        }
        public void listen() throws IOException
        { //服务器开始监听端口,提供服务
            ServerSocket socket;
            channel = ServerSocketChannel.open(); // 打开通道
            socket = channel.socket();   //得到与通到相关的socket对象
            socket.bind(new InetSocketAddress(port));   //将scoket榜定在制定的端口上
            //配置通到使用非阻塞模式,在非阻塞模式下,可以编写多道程序同时避免使用复杂的多线程
            channel.configureBlocking(false);    
            channel.register(selector, SelectionKey.OP_ACCEPT);
            try 
            {
                while(true) 
                {//     与通常的程序不同,这里使用channel.accpet()接受客户端连接请求,而不是在socket对象上调用accept(),这里在调用accept()方法时如果通道配置为非阻塞模式,那么accept()方法立即返回null,并不阻塞
                    this.selector.select();
                    Iterator iter = this.selector.selectedKeys().iterator();
                    while(iter.hasNext())
                    {
                        SelectionKey key = (SelectionKey)iter.next();
                        iter.remove();
                        this.handleKey(key);
                        
                    }
                }
            } 
            catch(IOException ex)
            {
                ex.printStackTrace();
            }
        }
        public static void main(String[] args) throws IOException 
        {
            System.out.println("服务器启动");
            SelectorServer server = new SelectorServer(SelectorServer.DEFAULT_SERVERPORT);
            server.listen(); //服务器开始监听端口,提供服务
        }
    
    }

    此处分别判断是接受请求、读数据还是写事件,分别作不同的处理。

     

    到这里关于Java NIO使用及原理分析的四篇文章就全部完成了。Java NIO提供了通道、缓冲区、选择器这样一组抽象概念,极大的简化了我们编写高性能并发型服务器程序,后面有机会我会继续谈谈使用Java NIO的一些想法。

     

     

     

     

  • 相关阅读:
    分享点干货(此this非彼this)this的详细解读
    程序员需要掌握的排序算法之希尔排序(最小增量排序)
    JAVA基础学习笔记
    简单的时间日期格式化(未封装成控件)
    面试造航母,工作拧螺丝
    浅谈jquery插件开发模式
    Relative与Absolute组合使用
    表单
    个人介绍
    用计数法解决数组排序问题
  • 原文地址:https://www.cnblogs.com/dongfengyuxy/p/5813548.html
Copyright © 2011-2022 走看看