zoukankan      html  css  js  c++  java
  • java NIO编程(转)

    一、概念

    在传统的java网络编程中,都是在服务端创建一个ServerSocket,然后为每一个客户端单独创建一个线程Thread分别处理各自的请求,由于对于CPU而言,线程的开销是很大的,无限创建线程会让操作系统崩溃,因此,比较好的方法是在系统启动的时候创建一个动态的线程池,例如鼎鼎大名的服务器Tomcat,就是采用这种解决方案,然而,这种解决方案在高并发的情况下,情况就不太乐观了,当线程池大小超过CPU瓶颈的时候,相应速度,就极其低下了。

    传统的java网络编程的结构图如下

    NIO单线程编写高性能、高并发服务器 - Seans Blog - Seans Blog
     
     
     
           在JDK1.4后,java引入的NIO的概念,即非阻塞的IO,服务端无需创建多个线程,仅仅只需要1个线程(将读写分别创建线程有利于提高性能)即可以处理全部客户端,解决了在性能和并发的2大问题。

           NIO采用了通道Channel和选择器Selector的核心对象,Select 机制,不用为每一个客户端连接新启线程处理,而是将其注册到特定的Selector 对象上,这就可以在单线程中利用Selector 对象管理大量并发的网络连接,更好的利用了系统资源;采用非阻塞I/O 的通信方式,不要求阻塞等待I/O 操作完成即可返回,从而减少了管理I/O 连接导致的系统开销,大幅度提高了系统性能。

    当有读或写等任何注册的事件发生时,可以从Selector 中获得相应的SelectionKey , 从SelectionKey 中可以找到发生的事件和该事件所发生的具体的SelectableChannel,以获得客户端发送过来的数据。由于在非阻塞网络I/O 中采用了事件触发机制,处理程序可以得到系统的主动通知,从而可以实现底层网络I/O 无阻塞、流畅地读写,而不像在原来的阻塞模式下处理程序需要不断循环等待。使用NIO,可以编写出性能更好、更易扩展的并发型服务器程序。

         NIO的结构如下

    NIO单线程编写高性能、高并发服务器 - Seans Blog - Seans Blog

     由此可见,服务端最少只需要一个线程,既可以处理所有客户端Socket

    NIO的设计原理

           设计原理有点像设计模式中的观察者模式,由Selector去轮流咨询各个SocketChannel通道是否有事件发生,如果有,则选择出所有的Key集合,然后传递给处理程序。我们通过每个key就可以获取客户端的SocketChannel,从而进行通信。

           如果Selector发现所有通道都没有事件发生,则线程进入睡眠状态Sleep,阻塞。等到客户端有事件发生,会自动唤醒wakeup选择器selector,是不是有点类似观察者模式!!!!

    下面以两个例子来说明,工程目录如下:

    虽然是两个例子,但是代码都放在了一个工程里面,下面将分开介绍

    二、例子1

    1、DataPacket类

        该类是服务端和客户端传输的数据包

    package com.nio;
    
    import java.io.Serializable;
    import java.util.Date;
    
    /**
     * 数据包
     * @author Administrator
     *
     */
    public class DataPacket implements Serializable{
        private long id;
        private String content;
        private Date sendTime;
        public long getId() {
            return id;
        }
        public void setId(long id) {
            this.id = id;
        }
        public String getContent() {
            return content;
        }
        public void setContent(String content) {
            this.content = content;
        }
        public Date getSendTime() {
            return sendTime;
        }
        public void setSendTime(Date sendTime) {
            this.sendTime = sendTime;
        }
    
    }

    2、服务端

      NIOServer,服务端,接收客户端发送过来的数据,并将接受到的数据再发送到客户端

    package com.nio;
    
    import java.io.ByteArrayInputStream;
    import java.io.ObjectInputStream;
    import java.net.InetSocketAddress;
    import java.net.ServerSocket;
    import java.nio.ByteBuffer;
    import java.nio.channels.SelectionKey;
    import java.nio.channels.Selector;
    import java.nio.channels.ServerSocketChannel;
    import java.nio.channels.SocketChannel;
    import java.util.Iterator;
    
    public class NIOServer {
        
        private Selector selector;
        private ServerSocketChannel serverSocketChannel;
        private ServerSocket serverSocket;
        private static int PORT;
        private static int BUFFER_SIZE;
        private ByteBuffer buf;
        
        /**
         * 服务器构造
         * @param port
         * @param buffersize
         */
        public NIOServer(int port,int buffersize){
            this.PORT=port;
            this.BUFFER_SIZE=buffersize;
            buf=ByteBuffer.allocate(BUFFER_SIZE);
        }
        /**
         * 启动监听服务
         * @throws Exception
         */
        public void startListen() throws Exception{
            //打开选择器
            selector=Selector.open();
            //打开服务通道
            serverSocketChannel=ServerSocketChannel.open();
            //将服务通道设置为非阻塞
            serverSocketChannel.configureBlocking(false);
            //创建服务端Socket
            serverSocket=serverSocketChannel.socket();
            //服务端socket绑定端口
            serverSocket.bind(new InetSocketAddress(PORT));
            //服务端通道注册链接事件
            serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
            System.out.println("端口注册完毕");
            Iterator<SelectionKey> iterator=null;
            SelectionKey selectionKey=null;
            while(true){
                //选择一批选择键(线程在此阻塞)
                selector.select();
                iterator=selector.selectedKeys().iterator();
                while(iterator.hasNext()){
                    //selectionKey里包含了客户端发送过来的信息
                    selectionKey=iterator.next();
                    this.handleKey(selectionKey);
                    iterator.remove();
                }
            }
        }
        
        /**
         * 处理选择的键
         * @param selectionKey
         * @throws Exception
         */
        @SuppressWarnings("unused")
        private void handleKey(SelectionKey selectionKey)throws Exception{
            //如果是链接事件
            if(selectionKey.isAcceptable()){
                //链接客户端通道(非阻塞)
                SocketChannel socketChannel=this.serverSocketChannel.accept();
                //设置客户端通道(非阻塞)
                socketChannel.configureBlocking(false);
                //注册读事件
                socketChannel.register(selector, SelectionKey.OP_READ);
                System.out.println("有新链接");
            }
            //如果是读信息事件
            else if(selectionKey.isReadable()){
                //获取客户端socket通道
                SocketChannel socketChannel=(SocketChannel)selectionKey.channel();
                //清空缓冲区
                buf.clear();
                //读取数据到缓冲区,并返回读取的字节数
                int a=socketChannel.read(buf);
                if(a>0){
                    //将开始指针指向0;把结束指针指向实际有效位置
                    buf.flip();
                    //得到的b数据组大小
                    byte[] b=new byte[buf.limit()];
                    //取的时实际有效的数据
                    buf.get(b,buf.position(),buf.limit());
                    //ObjectInputStream 不能直接接受byte数组,所以先转换成ByteArrayInputStream
                    ByteArrayInputStream byteIn=new ByteArrayInputStream(b);
                    ObjectInputStream objIn=new ObjectInputStream(byteIn);
                    DataPacket dataPacket=(DataPacket) objIn.readObject();
                    objIn.close();
                    byteIn.close();
                    
                    System.out.println("从客户端发送到服务端:"+dataPacket.getContent());
                    System.out.println("接收时间:"+dataPacket.getSendTime().toLocaleString());
                    
                    buf.flip();
                    //将发过来的数据再发送到客户端
                    socketChannel.write(buf);                
                }
                else{
                    //关闭客户端socket通道
                    socketChannel.close();
                }
            }
        }
    
    }

    3、客户端

      NIOClient,客户端输入提示字符按回车,只要不是null都将信息发送到服务端,并监听客户端传过来的数据

    package com.nio;
    
    import java.io.BufferedReader;
    import java.io.ByteArrayInputStream;
    import java.io.ByteArrayOutputStream;
    import java.io.IOException;
    import java.io.InputStreamReader;
    import java.io.ObjectInputStream;
    import java.io.ObjectOutputStream;
    import java.net.InetSocketAddress;
    import java.net.SocketAddress;
    import java.nio.ByteBuffer;
    import java.nio.channels.SocketChannel;
    import java.util.Date;
    
    /**
     * 
     * @author Administrator
     *
     */
    public class NIOClient {
        
        public static void main(String[] args){
            
            try {
                SocketAddress address=new InetSocketAddress("127.0.0.1",9999);
                //客户端通道打开的时候要指向一个地址和端口
                SocketChannel clientChannel=SocketChannel.open(address);
                clientChannel.configureBlocking(false);
                ByteBuffer buf=ByteBuffer.allocate(1024);
                while(true){
                    buf.clear();
                    System.out.println("请输入发送数据包:");
                    //把输入的字节流转换成字符串
                    String msg=new BufferedReader(new InputStreamReader(System.in)).readLine();
                    if(msg.equals("null")){
                        break;
                    }
                    DataPacket dataPacket=new DataPacket();
                    dataPacket.setContent("I am hzb");
                    dataPacket.setSendTime(new Date());
                    dataPacket.setId(1);
                    ByteArrayOutputStream baos=new ByteArrayOutputStream();
                    ObjectOutputStream oos=new ObjectOutputStream(baos);
                    //把对象写入了oos流里面,但是没有到缓冲
                    oos.writeObject(dataPacket);
                    //把流的数据写入到缓冲区
                    buf.put(baos.toByteArray());
                    buf.flip();
                    //把缓冲区里面的数据写到通道里面
                    clientChannel.write(buf);
                    System.out.println("客户端发送数据:"+msg);
                    while(true){
                        int len=clientChannel.read(buf);
                        if(len>0){
                            buf.flip();
                            byte[] b=new byte[buf.limit()];
                            buf.get(b,buf.position(),buf.limit());
                            //注意:如果想要把服务端传过来的数据还原成对像,需要用
                            //ByteArrayInputStream byteIn=new ByteArrayInputStream(b);
                            //ObjectInputStream objIn=new ObjectInputStream(byteIn);
                            //DataPacket dataPacket=(DataPacket) objIn.readObject();
                            System.out.println("服务端传来数据:"+new String(b,"utf-8"));
                            break;
                        }
                    }
                }
            } catch (IOException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }
    }

     三、例子2

       功能是,客户端将F:/work/nioSample/fileTest/client/client_send.txt发送给服务端,服务端接收到后存成F:/work/nioSample/fileTest/server/server_receive.txt

    然后,服务端将F:/work/nioSample/fileTest/server/server_send.txt发送给客户端,客户端接收到后存成F:/work/nioSample/fileTest/client/client_receive.txt

    1、服务端

    package com.nio;
    
    import java.io.File;
    import java.io.FileInputStream;
    import java.io.FileOutputStream;
    import java.io.IOException;
    import java.net.InetSocketAddress;
    import java.nio.ByteBuffer;
    import java.nio.channels.FileChannel;
    import java.nio.channels.SelectableChannel;
    import java.nio.channels.SelectionKey;
    import java.nio.channels.Selector;
    import java.nio.channels.ServerSocketChannel;
    import java.nio.channels.SocketChannel;
    import java.util.Iterator;
    import java.util.logging.Level;
    import java.util.logging.Logger;
    
    public class NIOFileServer {
        private final static Logger logger = Logger.getLogger(NIOFileServer.class.getName());  
        /**
         * 主方法
         * @param args
         */
         public static void main(String[] args){
             Selector selector=null;
             ServerSocketChannel serverSocketChannel=null;
             try {
                selector=Selector.open();
                serverSocketChannel=ServerSocketChannel.open();
                serverSocketChannel.configureBlocking(false);
                serverSocketChannel.socket().setReuseAddress(true);
                serverSocketChannel.socket().bind(new InetSocketAddress(10000));
                //注册链接事件
                serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
                while(selector.select()>0){
                    Iterator<SelectionKey> iterator=selector.selectedKeys().iterator();
                    while(iterator.hasNext()){
                        SelectionKey key=iterator.next();
                        iterator.remove();
                        doiterator((ServerSocketChannel) key.channel());
                    }
                }
                
            } catch (IOException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
                logger.log(Level.SEVERE, e.getMessage(),e);
            }finally{
                 try {
                    selector.close();
                } catch (IOException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
                try {
                    serverSocketChannel.close();
                } catch (IOException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
            }
         }
         /**
          * 
          * @param serverSocketChannel,定义成final保证在方法内部serverSocketChannel的内容不会被改变
          */
         private static  void doiterator(final ServerSocketChannel serverSocketChannel){
             SocketChannel socketChannel=null;
             try {
                socketChannel=serverSocketChannel.accept();
                receiveFile(socketChannel, new File("F:/work/nioSample/fileTest/server/server_receive.txt"));
                sendFile(socketChannel, new File("F:/work/nioSample/fileTest/server/server_send.txt"));
            } catch (IOException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
         }
         
         /**
          * 接收文件
          * @param socketChannel
          * @param file
          * @throws IOException
          */
         private static void receiveFile(SocketChannel socketChannel,File file) throws IOException{
             FileOutputStream fos=null;
             FileChannel fileChannel=null;
             try{
                 //保存文件要保存的路径
                 fos=new FileOutputStream(file);
                 fileChannel=fos.getChannel();
                 ByteBuffer buffer=ByteBuffer.allocateDirect(1024);
                 int len=0;
                 //把客户端通道socketChannel的文件读到缓冲区,再从缓冲区写到本地文件通道channel的路径下
                 while((len=socketChannel.read(buffer))!=-1){
                     buffer.flip();
                     if(len>0){
                         buffer.limit(len);
                         fileChannel.write(buffer);
                         buffer.clear();
                     }
                 }
             }catch(Exception ex){
                 ex.printStackTrace();
             }finally{
                 fos.close();
                 fileChannel.close();
             }     
         }
         
         /**
          * 发送文件
          * @param socketChannel
          * @param file
          * @throws IOException
          */
         private static void sendFile(SocketChannel socketChannel,File file) throws IOException{
             FileInputStream fis=null;
             FileChannel fileChannel=null;
             try{
                 fis=new FileInputStream(file);
                 fileChannel=fis.getChannel();
                 ByteBuffer buffer=ByteBuffer.allocateDirect(1024);
                 int len=0;
                 while((len=fileChannel.read(buffer))!=-1){
                     //将buffer的游标position指向0
                     buffer.rewind();
                     buffer.limit(len);
                     socketChannel.write(buffer);
                     buffer.clear();
                 }
                 //防止正在发送的过程中又发送一个文件
                 socketChannel.socket().shutdownOutput(); 
             }catch(Exception ex){
                 ex.printStackTrace();
             }finally{
                 fis.close();
                 fileChannel.close();
             }
         }
    }

    2、客户端

    package com.nio;
    
    import java.io.File;
    import java.io.FileInputStream;
    import java.io.FileOutputStream;
    import java.io.IOException;
    import java.net.InetSocketAddress;
    import java.net.SocketAddress;
    import java.nio.ByteBuffer;
    import java.nio.channels.FileChannel;
    import java.nio.channels.SocketChannel;
    import java.util.logging.Level;
    import java.util.logging.Logger;
    
    public class NIOFileClient {
    
        private final static Logger logger = Logger.getLogger(NIOFileClient.class.getName());
        
        public static void main(String[] args) throws Exception {
            new Thread(new MyRunnable()).start();
        }
        
        private static final class MyRunnable implements Runnable {
            public void run() {
                SocketChannel socketChannel = null;
                try {
                    socketChannel = SocketChannel.open();
                    SocketAddress socketAddress = new InetSocketAddress("127.0.0.1", 10000);
                    socketChannel.connect(socketAddress);
    
                    sendFile(socketChannel, new File("F:/work/nioSample/fileTest/client/client_send.txt"));
                    receiveFile(socketChannel, new File("F:/work/nioSample/fileTest/client/client_receive.txt"));
                    
                } catch (Exception ex) {
                    logger.log(Level.SEVERE, null, ex);
                } finally {
                    try {
                        socketChannel.close();
                    } catch(Exception ex) {}
                }
            }
    
            private void sendFile(SocketChannel socketChannel, File file) throws IOException {
                FileInputStream fis = null;
                FileChannel channel = null;
                try {
                    fis = new FileInputStream(file);
                    channel = fis.getChannel();
                    ByteBuffer buffer = ByteBuffer.allocateDirect(1024);
                    int size = 0;
                    while ((size = channel.read(buffer)) != -1) {
                        buffer.rewind();
                        buffer.limit(size);
                        socketChannel.write(buffer);
                        buffer.clear();
                    }
                    socketChannel.socket().shutdownOutput();
                } finally {
                    try {
                        channel.close();
                    } catch(Exception ex) {}
                    try {
                        fis.close();
                    } catch(Exception ex) {}
                }
            }
    
            private void receiveFile(SocketChannel socketChannel, File file) throws IOException {
                FileOutputStream fos = null;
                FileChannel channel = null;
                
                try {
                    fos = new FileOutputStream(file);
                    channel = fos.getChannel();
                    ByteBuffer buffer = ByteBuffer.allocateDirect(1024);
    
                    int size = 0;
                    while ((size = socketChannel.read(buffer)) != -1) {
                        buffer.flip();
                        if (size > 0) {
                            buffer.limit(size);
                            channel.write(buffer);
                            buffer.clear();
                        }
                    }
                } finally {
                    try {
                        channel.close();
                    } catch(Exception ex) {}
                    try {
                        fos.close();
                    } catch(Exception ex) {}
                }
            }
        }
    }
  • 相关阅读:
    Live Writer配置
    protobufnet 学习手记
    好的Sql语句也能提高效率(二)
    关于CodeSmith的输出问题
    [Scrum]12.29
    [scrum] 1.4
    分享 关于c#注释的规范
    [Scrum] 1.3
    分享:将XML(VS提取注释时生成)转换为Chm的一个方法
    【Scrum】2010.12.27
  • 原文地址:https://www.cnblogs.com/boshen-hzb/p/5897230.html
Copyright © 2011-2022 走看看