zoukankan      html  css  js  c++  java
  • Nio使用Selector客户端与服务器的通信

    使用NIO的一个最大优势就是客户端于服务器自己的不再是阻塞式的,也就意味着服务器无需通过为每个客户端的链接而开启一个线程。而是通过一个叫Selector的轮循器来不断的检测那个Channel有消息处理。
    简单来讲,Selector会不断地轮询注册在其上的Channel,如果某个Channel上面有新的TCP连接接入、读和写事件,这个Channel就处于就绪状态,会被Selector轮询出来,然后通过SelectionKey可以获取就绪Channel的Set集合,进行后续的I/O操作。
    由于select操作只管对selectedKeys的集合进行添加而不负责移除,所以当某个消息被处理后我们需要从该集合里去掉。

    一个多路复用器Selector可以同时轮询多个Channel,由于JDK使用了epoll()代替传统的select实现,所以它并没有最大连接句柄1024/2048的限制。这也就意味着只需要一个线程负责Selector的轮询,就可以接入成千上万的客户端,这确实是个非常巨大的进步。

    下面,我们通过NIO编程的序列图和源码分析来熟悉相关的概念,以便巩固我们前面所学的NIO基础知识。

    è¿éåå¾çæè¿°

    下面,我们对NIO服务端的主要创建过程进行讲解和说明,作为NIO的基础入门,我们将忽略掉一些在生产环境中部署所需要的一些特性和功能(比如TCP半包等问题)。

    步骤一:打开ServerSocketChannel,用于监听客户端的连接,它是所有客户端连接的父管道,代码示例如下。

    ServerSocketChannel server = ServerSocketChannel.open();
    步骤二:绑定监听端口,设置连接为非阻塞模式,示例代码如下。

    server.socket().bind(new InetSocketAddress(7777),1024);
    // 设置为非阻塞模式, 这个非常重要
    server.configureBlocking(false);
    步骤三:创建Reactor线程,创建多路复用器并启动线程,代码如下。(即 selector)

    Selector selector = Selector.open();
    new Thread(new ReactorTask()).start();
    步骤四:将ServerSocketChannel注册到Reactor线程的多路复用器Selector上,监听ACCEPT事件,代码如下。

    server.register(selector, SelectionKey.OP_ACCEPT);
    步骤五:多路复用器在线程run方法的无限循环体内轮询准备就绪的Key,代码如下。

    while(true){
    selector.select(1000);
    Set<SelectionKey> keys = selector.selectedKeys();
    Iterator<SelectionKey> it = keys.iterator();
    SelectionKey key = null;
    while (it.hasNext()) {
    key = (SelectKey)it.next();
    //处理io
    }
    }
    步骤六:多路复用器监听到有新的客户端接入,处理新的接入请求,完成TCP三次握手,建立物理链路,代码示例如下。

    // 得到与客户端的套接字通道
    SocketChannel channel = ssc.accept();
    步骤七:设置客户端链路为非阻塞模式,示例代码如下。

    channel.configureBlocking(false);
    步骤八:将新接入的客户端连接注册到Reactor线程的多路复用器上,监听读操作,用来读取客户端发送的网络消息,代码如下。

    channel.register(selector, SelectionKey.OP_READ);
    步骤九:异步读取客户端请求消息到缓冲区,示例代码如下。

    int readBytes = channel.read(byteBuffer);
    步骤十:对ByteBuffer进行编解码,如果有半包消息指针reset,继续读取后续的报文,将解码成功的消息封装成Task,投递到业务线程池中,进行业务逻辑编排,示例代码如下。

    Object message = null;
    while (buffer.hasRemain()) {
    buffer.mark();
    message = decode(buffer);
    if(message == null){
    buffer.reset();
    break;
    }
    }
    if(!buffer.hasRemain()){
    buffer.clear();
    }else {
    buffer.compact();
    }

    //业务线程处理message
    步骤十一:将POJO对象encode成ByteBuffer,调用SocketChannel的异步write接口,将消息异步发送给客户端,示例代码如下。

    socketChannel.write(byteBuffer);
    注意:如果发送区TCP缓冲区满,会导致写半包,此时,需要注册监听写操作位,循环写,直到整包消息写入TCP缓冲区,此处不赘述,后续会详细分析Netty的处理策略。

    package chanel;
    
    import java.io.IOException;
    import java.net.InetSocketAddress;
    import java.nio.ByteBuffer;
    import java.nio.channels.SelectionKey;
    import java.nio.channels.Selector;
    import java.nio.channels.ServerSocketChannel;
    import java.nio.channels.SocketChannel;
    import java.nio.charset.Charset;
    import java.util.Iterator;
    import java.util.Set;
    
    public class NioService {
        public static void main(String[] args) {
            try {
                NioService server = new NioService();
                server.init();
    
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
        public  void init() throws IOException {
            Charset charset = Charset.forName("UTF-8");
            // 创建一个选择器,可用close()关闭,isOpen()表示是否处于打开状态,他不隶属于当前线程
            Selector selector = Selector.open();
            // 创建ServerSocketChannel,并把它绑定到指定端口上
            ServerSocketChannel server = ServerSocketChannel.open();
            server.socket().bind(new InetSocketAddress(7777),1024);
            // 设置为非阻塞模式, 这个非常重要
            server.configureBlocking(false);
            // 在选择器里面注册关注这个服务器套接字通道的accept事件
            // ServerSocketChannel只有OP_ACCEPT可用,OP_CONNECT,OP_READ,OP_WRITE用于SocketChannel
            server.register(selector, SelectionKey.OP_ACCEPT);
    
    
            while (true) {
                //休眠时间为1s,无论是否有读写等事件发生,selector每隔1s都被唤醒一次
                selector.select(1000);
                Set<SelectionKey> keys = selector.selectedKeys();
                Iterator<SelectionKey> it = keys.iterator();
                SelectionKey key = null;
                while (it.hasNext()) {
                    //如果key对应的Channel包含客户端的链接请求
                    // OP_ACCEPT 这个只有ServerSocketChannel才有可能触发
                    key=it.next();
                    // 由于select操作只管对selectedKeys进行添加,所以key处理后我们需要从里面把key去掉
                    it.remove();
                    if (key.isAcceptable()) {
                        ServerSocketChannel ssc  = (ServerSocketChannel) key.channel();
                        // 得到与客户端的套接字通道
                        //ServerSocketChannel的accept接收客户端的连接请求并创建SocketChannel实例,完成上述操作后,相当于完成了TCP的三次握手,TCP物理链路正式建立。
                        //我们需要将新创建的SocketChannel设置为异步非阻塞,同时也可以对其TCP参数进行设置,例如TCP接收和发送缓冲区的大小等。此处省掉
                        SocketChannel channel = ssc.accept();
                        channel.configureBlocking(false);
                        channel.register(selector, SelectionKey.OP_READ);
                        //将key对应Channel设置为准备接受其他请求
                        key.interestOps(SelectionKey.OP_ACCEPT);
                    }
                    if (key.isReadable()) {
                        SocketChannel channel = (SocketChannel) key.channel();
                        ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
                        String content = "";
                        try {
                            int readBytes = channel.read(byteBuffer);
                            if (readBytes > 0) {
                                byteBuffer.flip(); //为write()准备
                                byte[] bytes = new byte[byteBuffer.remaining()];
                                byteBuffer.get(bytes);
                                content+=new String(bytes);
                                System.out.println(content);
                                //回应客户端
                                doWrite(channel);
                            }
                            // 写完就把状态关注去掉,否则会一直触发写事件(改变自身关注事件)
                            key.interestOps(SelectionKey.OP_READ);
                        } catch (IOException i) {
                            //如果捕获到该SelectionKey对应的Channel时出现了异常,即表明该Channel对于的Client出现了问题
                            //所以从Selector中取消该SelectionKey的注册
                            key.cancel();
                            if (key.channel() != null) {
                                key.channel().close();
                            }
                        }
                    }
                }
            }
        }
        private  void doWrite(SocketChannel sc) throws IOException{
            byte[] req ="服务器已接受aaa".getBytes();
            ByteBuffer byteBuffer = ByteBuffer.allocate(req.length);
            byteBuffer.put(req);
            byteBuffer.flip();
            sc.write(byteBuffer);
            if(!byteBuffer.hasRemaining()){
                System.out.println("Send 2 Service successed");
            }
        }
    }


    现在我们来看编写客户端的流程:

    è¿éåå¾çæè¿°

    步骤一:打开SocketChannel,绑定客户端本地地址(可选,默认系统会随机分配一个可用的本地地址),示例代码如下。

    SocketChannel channel = SocketChannel.open();
    步骤二:设置SocketChannel为非阻塞模式,同时设置客户端连接的TCP参数,示例代码如下。

    channel.configureBlocking(false);
    步骤三:异步连接服务端,示例代码如下。

    步骤四:判断是否连接成功,如果连接成功,则直接注册读状态位到多路复用器中,如果当前没有连接成功(异步连接,返回false,说明客户端已经发送sync包,服务端没有返回ack包,物理链路还没有建立),示例代码如下。

    if(channel.connect(new InetSocketAddress("127.0.0.1",7777))){
    channel.register(selector, SelectionKey.OP_READ);
    //发送消息
    doWrite(channel, "66666666");
    }else {
    channel.register(selector, SelectionKey.OP_CONNECT);
    }
    步骤五:向Reactor线程的多路复用器注册OP_CONNECT状态位,监听服务端的TCP ACK应答,示例代码如下。

    channel.register(selector, SelectionKey.OP_CONNECT);
    步骤六:创建Reactor线程,创建多路复用器并启动线程,代码如下。

    selector = Selector.open();
    new Thread(new ReactorTask()).start();
    步骤七:多路复用器在线程run方法的无限循环体内轮询准备就绪的Key,代码如下。

    while (!stop){
    selector.select(1000);
    Set<SelectionKey> keys = selector.selectedKeys();
    Iterator<SelectionKey> it = keys.iterator();
    SelectionKey key = null;
    while (it.hasNext()){
    }
    }
    步骤八:接收connect事件进行处理并判断是否链接成功,示例代码如下。

    if (key.isConnectable()){
    if (channel.finishConnect()) {
    }
    }
    步骤九:注册读事件到多路复用器,示例代码如下。

    channel.register(selector, SelectionKey.OP_READ);
    步骤十:异步读客户端请求消息到缓冲区,示例代码如下。

    int readBytes = channel.read(byteBuffer);
    步骤十一:对ByteBuffer进行编解码,如果有半包消息指针reset,继续读取后续的报文,将解码成功的消息封装成Task,投递到业务线程池中,进行业务逻辑编排,示例代码如下。

    Object message = null;
    while (buffer.hasRemain()) {
    buffer.mark();
    message = decode(buffer);
    if(message == null){
    buffer.reset();
    break;
    }
    }
    if(!buffer.hasRemain()){
    buffer.clear();
    }else {
    buffer.compact();
    }

    //业务线程处理message
    步骤十二:将发生对象encode成ByteBuffer,调用SocketChannel的异步write接口,将消息异步发送给客户端,示例代码如下。

    socketChannel.write(byteBuffer);

    package chanel;
    
    import java.io.IOException;
    import java.net.InetSocketAddress;
    import java.nio.ByteBuffer;
    import java.nio.channels.SelectionKey;
    import java.nio.channels.Selector;
    import java.nio.channels.SocketChannel;
    import java.nio.charset.Charset;
    import java.util.Iterator;
    import java.util.Set;
    import java.util.concurrent.ArrayBlockingQueue;
    
    public class NioClient {
        public static void main(String[] args) {
            try {
                NioClient client = new NioClient();
                client.init();
    
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    
        // 创建一个套接字通道,注意这里必须使用无参形式
        private Selector selector = null;
        static Charset charset = Charset.forName("UTF-8");
        private volatile boolean stop = false;
        public ArrayBlockingQueue<String> arrayQueue = new ArrayBlockingQueue<String>(8);
    
        public void init() throws IOException {
            selector = Selector.open();
            SocketChannel channel = SocketChannel.open();
            // 设置为非阻塞模式,这个方法必须在实际连接之前调用(所以open的时候不能提供服务器地址,否则会自动连接)
            channel.configureBlocking(false);
            if (channel.connect(new InetSocketAddress("127.0.0.1", 7777))) {
                channel.register(selector, SelectionKey.OP_READ);
                //发送消息
                doWrite(channel, "66666666");
            } else {
                channel.register(selector, SelectionKey.OP_CONNECT);
            }
    
    
            //启动一个接受服务器反馈的线程
            //  new Thread(new ReceiverInfo()).start();
    
            while (!stop) {
                selector.select(1000);
                Set<SelectionKey> keys = selector.selectedKeys();
                Iterator<SelectionKey> it = keys.iterator();
                SelectionKey key = null;
                while (it.hasNext()) {
                    key = it.next();
                    it.remove();
                    SocketChannel sc = (SocketChannel) key.channel();
                    // OP_CONNECT 两种情况,链接成功或失败这个方法都会返回true
                    if (key.isConnectable()) {
                        // 由于非阻塞模式,connect只管发起连接请求,finishConnect()方法会阻塞到链接结束并返回是否成功
                        // 另外还有一个isConnectionPending()返回的是是否处于正在连接状态(还在三次握手中)
                        if (channel.finishConnect()) {
                           /* System.out.println("准备发送数据");
                            // 链接成功了可以做一些自己的处理
                            channel.write(charset.encode("I am Coming"));
                            // 处理完后必须吧OP_CONNECT关注去掉,改为关注OP_READ
                            key.interestOps(SelectionKey.OP_READ);*/
                            sc.register(selector, SelectionKey.OP_READ);
                            //    new Thread(new DoWrite(channel)).start();
                            doWrite(channel, "66666666");
                        } else {
                            //链接失败,进程推出或直接抛出IOException
                            System.exit(1);
                        }
                    }
                    if (key.isReadable()) {
                        //读取服务端的响应
                        ByteBuffer buffer = ByteBuffer.allocate(1024);
                        int readBytes = sc.read(buffer);
                        String content = "";
                        if (readBytes > 0) {
                            buffer.flip();
                            byte[] bytes = new byte[buffer.remaining()];
                            buffer.get(bytes);
                            content += new String(bytes);
                            stop = true;
                        } else if (readBytes < 0) {
                            //对端链路关闭
                            key.channel();
                            sc.close();
                        }
                        System.out.println(content);
                        key.interestOps(SelectionKey.OP_READ);
                    }
                }
            }
        }
    
        private void doWrite(SocketChannel sc, String data) throws IOException {
            byte[] req = data.getBytes();
            ByteBuffer byteBuffer = ByteBuffer.allocate(req.length);
            byteBuffer.put(req);
            byteBuffer.flip();
            sc.write(byteBuffer);
            if (!byteBuffer.hasRemaining()) {
                System.out.println("Send 2 client successed");
            }
        }
    }

    启动客户端类的init()方法之后就会向服务器发送一个字符串,服务器接受到之后会向客户端回应一个。运行如上代码,结果正确。

    通过源码对比分析,我们发现NIO编程难度确实比同步阻塞BIO大很多,我们的NIO例程并没有考虑“半包读”和“半包写”,如果加上这些,代码将会更加复杂。NIO代码既然这么复杂,为什么它的应用却越来越广泛呢,使用NIO编程的优点总结如下。

    (1)客户端发起的连接操作是异步的,可以通过在多路复用器注册OP_CONNECT等待后续结果,不需要像之前的客户端那样被同步阻塞。

    (2)SocketChannel的读写操作都是异步的,如果没有可读写的数据它不会同步等待,直接返回,这样I/O通信线程就可以处理其他的链路,不需要同步等待这个链路可用。

    3)线程模型的优化:由于JDK的Selector在Linux等主流操作系统上通过epoll实现,它没有连接句柄数的限制(只受限于操作系统的最大句柄数或者对单个进程的句柄限制),这意味着一个Selector线程可以同时处理成千上万个客户端连接,而且性能不会随着客户端的增加而线性下降,因此,它非常适合做高性能、高负载的网络服务器。

    JDK1.7升级了NIO类库,升级后的NIO类库被称为NIO2.0,引人注目的是,Java正式提供了异步文件I/O操作,同时提供了与UNIX网络编程事件驱动I/O对应的AIO,后续我们学习下如何利用NIO2.0编写AIO程序,还是以客户端服务器通信为例进行讲解。

  • 相关阅读:
    无法重用Linq2Entity Query
    The Joel Test
    MSBuilder directly instead of default VSComplie with keyborad shotcut 原创
    客户端缓存(Client Cache)
    关于代码重构和UT的一些想法,求砖头
    ExtJS2.0实用简明教程 应用ExtJS
    Perl information,doc,module document and FAQ.
    使用 ConTest 进行多线程单元测试 为什么并行测试很困难以及如何使用 ConTest 辅助测试
    史上最简单的Hibernate入门简介
    汽车常识全面介绍 传动系统
  • 原文地址:https://www.cnblogs.com/cxhfuujust/p/10458619.html
Copyright © 2011-2022 走看看