zoukankan      html  css  js  c++  java
  • NIO基础篇(二)

      Selector(选择器)是Java NIO中能够检测一到多个NIO通道,并能够知晓通道是否为诸如读写事件做好准备的组件。这样,一个单独的线程可以管理多个channel,从而管理多个网络连接。 

      

      传统的Socket IO,服务端为每个客户端连接开启一个线程来处理,这样服务器可以支持的客户端连接数是有限的。当需要处理大量的客户端连接,而每个客户端连接的数据量又不是很大时,使用Socket NIO来实现是可行的。 

      Socket NIO的特点是使用了Selector,使用一个线程处理多个连接的请求,向Selector中注册感兴趣的动作,Selector会在动作发生时通知接收端。

      Socket NIOSocket IO可以建立更多的连接。tomcatNIO的支持可以看看。

      

      下面的例子是客户端从服务器端下载文件,客户端使用了多线程技术模拟同时下载。Selector可以同时处理多个客户端的连接事件并通知服务器端进行响应操作。

      服务器端的代码为:  

    package nio;
    
    import java.io.File;
    import java.io.FileInputStream;
    import java.net.InetSocketAddress;
    import java.nio.ByteBuffer;
    import java.nio.channels.*;
    import java.util.HashMap;
    import java.util.Iterator;
    import java.util.Map;
    import java.util.concurrent.Callable;
    
    //模拟下载服务
    public class DownloadServer<T> implements Callable<T>{
        //A selector may be created by invoking the open method of this class, which will use the system's default selector provider to create a new selector. 
        //A selector may also be created by invoking the openSelector method of a custom selector provider.
        //A selector remains open until it is closed via its close method. 
        //A selectable channel's registration with a selector is represented by a SelectionKey object. 
        private Selector selector;//创建全局selector
        private Map<SocketChannel, Handle> map = new HashMap<SocketChannel, Handle>();//socketChannel和handle之间的映射
    
        //创建一个服务器serverSocketChannel,并与selector进行注册
        public DownloadServer() throws Exception {
            selector = Selector.open();
            //ServerSocketChannel is a selectable channel for stream-oriented listening sockets. 
            ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
            //if false then it will be placed non-blocking mode 
            serverSocketChannel.configureBlocking(false);
            serverSocketChannel.socket().bind(new InetSocketAddress(2361));
            serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); //服务端接收客户端连接事件
        }
    
        //对selector.select进行迭代,并依次进行处理
        public T call() throws Exception {
            System.out.println("startTo listen in 2361....");
            for(; ;) {
                //Selects a set of keys whose corresponding channels are ready for I/O operations
                //select() performs a blocking selection operation. It returns only after at least one channel is selected
                selector.select();
                Iterator<SelectionKey> keyIterator = selector.selectedKeys().iterator();
                while(keyIterator.hasNext()) {
                    //A selectionkey is a token representing the registration of a SelectableChannel with a Selector. 
                    //A selectionkey is created each time a channel is registered with a selector. 
                    SelectionKey key = keyIterator.next();
                    if(key.isValid())
                        handle(key);
                    keyIterator.remove();
                }
            }
        }
    
        //处理每个key,对于acceptable的key,由主类进行处理,而其他事件,则由内部类进行处理
        private void handle(final SelectionKey key) throws Exception {
            //Tests whether this key's channel is ready to accept a new socket connection.
            if(key.isAcceptable()) {
                ServerSocketChannel channel = (ServerSocketChannel) key.channel();
                //Accepts a connection made to this channel's socket. 
                SocketChannel socketChannel = channel.accept();
                socketChannel.configureBlocking(false);
                socketChannel.register(selector, SelectionKey.OP_READ);//注册读事件
                map.put(socketChannel, new Handle());//把socket和handle进行绑定
            }
            //用map中的handle处理read和write事件,以模拟多个文件同时进行下载
            if(key.isReadable() || key.isWritable()) {
                SocketChannel socketChannel = (SocketChannel) key.channel();
                final Handle handle = map.get(socketChannel);
                if(handle != null)
                    handle.handle(key);
            }
        }
    
        //内部类,模拟一个内部类处理一个文件下载服务,多个类可以处理多个文件下载服务
        private class Handle{
            private StringBuilder message;
            private boolean writeOK = true;
            private ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
            private FileChannel fileChannel;
            private String fileName;
    
            private void handle(SelectionKey key) throws Exception {
                if(key.isReadable()) {
                    SocketChannel socketChannel = (SocketChannel) key.channel();
                    if(writeOK)
                        message = new StringBuilder();
                    while(true) {
                        byteBuffer.clear();
                        int r = socketChannel.read(byteBuffer);
                        if(r == 0)
                            break;
                        if(r == -1) {
                            socketChannel.close();
                            key.cancel();
                            return;
                        }
                        message.append(new String(byteBuffer.array(), 0, r));
                    }
                    //将接收到的信息转化成文件名,以映射到服务器上的指定文件
                    if(writeOK && invokeMessage(message)) {
                        socketChannel.register(selector, SelectionKey.OP_WRITE);
                        writeOK = false;
                    }
                }
                //向客户端写数据
                if(key.isWritable()) { //读方法中的socketChannel和写方法中的socketChannel应该是一个,不需要flip吗?
                    if(!key.isValid())
                        return;
                    SocketChannel socketChannel = (SocketChannel) key.channel();
                    if(fileChannel == null)
                        fileChannel = new FileInputStream(fileName).getChannel();
                    byteBuffer.clear();
                    int w = fileChannel.read(byteBuffer);
                    //如果文件已写完,则关掉key和socket
                    if(w <= 0) {
                        fileName = null;
                        fileChannel.close();
                        fileChannel = null;
                        writeOK = true;
                        socketChannel.close();
                        key.channel();
                        return;
                    }
                    byteBuffer.flip();
                    socketChannel.write(byteBuffer);
                }
            }
    
            //将信息转化成文件名
            private boolean invokeMessage(StringBuilder message) {
                String m = message.toString();
                try {
                    File f = new File(m);
                    if(!f.exists())
                        return false;
                    fileName = m;
                    return true;
                } catch(Exception e) {
                    return false;
                }
            }
    
        }
    
        public static void main(String[] args) throws Exception {
            /*
            ExecutorService executorService = Executors.newSingleThreadExecutor();
            executorService.submit(new DownloadServer<Object>());
            executorService.shutdown();
            */
            new DownloadServer().call();
        }
    }

      客户端的代码为:

    package nio;
    
    import java.io.RandomAccessFile;
    import java.net.InetSocketAddress;
    import java.nio.ByteBuffer;
    import java.nio.channels.FileChannel;
    import java.nio.channels.SelectionKey;
    import java.nio.channels.Selector;
    import java.nio.channels.SocketChannel;
    import java.util.Iterator;
    import java.util.concurrent.Callable;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    
    public class SelectorClient<T> implements Callable<T>{
        private FileChannel fileChannel;
        private static Selector selector;
        private ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
        private String serverFileName;//服务器上的文件
        private String localFileName;//下载到客户端的文件名
    
        public SelectorClient(String serverFileName, String localFileName) {
            this.serverFileName = serverFileName;
            this.localFileName = localFileName;
        }
    
        public T call() throws Exception {
            //开启selector,并建立socket到指定端口的连接
            if(selector == null)
                selector = Selector.open();
            SocketChannel channel = SocketChannel.open();
            channel.configureBlocking(false);
            channel.connect(new InetSocketAddress("172.16.72.181", 2361));
            channel.register(selector, SelectionKey.OP_CONNECT); //客户端连接服务端事件 
            //进行信息读取
            for(; ;) {
                selector.select();
                Iterator<SelectionKey> keyIterator = selector.selectedKeys().iterator();
                while(keyIterator.hasNext()) {
                    SelectionKey key = keyIterator.next();
                    keyIterator.remove();
                    //连接事件
                    if(key.isConnectable()) {
                        SocketChannel socketChannel = (SocketChannel) key.channel();
                        if(socketChannel.isConnectionPending())
                            socketChannel.finishConnect();
                        socketChannel.write(ByteBuffer.wrap(serverFileName.getBytes()));//向服务器发信息,信息中即服务器上的文件名
                        socketChannel.register(selector, SelectionKey.OP_READ);  // 读事件
                    }
                    //读事件
                    if(key.isReadable()) {
                        SocketChannel socketChannel = (SocketChannel) key.channel();
                        byteBuffer.clear();
                        if(!socketChannel.isConnected())
                            return null;
                        //向本机下载文件创建文件channel
                        if(fileChannel == null)
                            fileChannel = new RandomAccessFile(localFileName, "rw").getChannel();
                        int r = socketChannel.read(byteBuffer);
                        //如果文件下载完毕,则关掉channel,同时关掉socketChannel
                        if(r <= 0) {
                            if(fileChannel != null)
                                fileChannel.close();
                            channel.close();
                            key.cancel();
                            return null;
                        }
                        byteBuffer.flip();
                        //写到下载文件中
                        fileChannel.write(byteBuffer);
                    }
                }
            }
        }
    
        //客户端用10个线程向服务器端下载文件,并保存为不同的文件
        public static void main(String[] args) throws Exception {
            ExecutorService executorService = Executors.newSingleThreadExecutor();
            for(int i = 0; i < 10; i++) {
                executorService.submit(new SelectorClient<Object>("test.txt", "d:/down" + i + ".txt"));
            }
            executorService.shutdown();
        }
    }
  • 相关阅读:
    iptables 常用命令解析
    iptables 常用处理动作
    centos7 中iptables、firewalld 和 netfilter 的关系
    iptables 的几个状态
    centos7 中没有service iptables save指令来保存防火墙规则
    iptables 数据走向流程
    数据库PDO简介
    php连接mySql,加密函数
    php数组,常量,遍历等
    php的会话控制
  • 原文地址:https://www.cnblogs.com/lnlvinso/p/4601138.html
Copyright © 2011-2022 走看看