zoukankan      html  css  js  c++  java
  • netty简单NIO模型



    首先是使用java原生nio类库编写的例子,开发一套nio框架不简单,所以选择了netty,该例完成后,是netty举例。

    package com.smkj.netty;
    
    public class TimeServer {
        public static void main(String[] args) {
            int port = 8080;
            if(args!=null&&args.length!=0) {
                try {
                    port = Integer.valueOf(args[0]);
                } catch (NumberFormatException e) {
                    //采用默认值
                }
            }
            
            MultiplexerTimeServer timeServer = new MultiplexerTimeServer(port);
            new Thread(timeServer,"NIO-MultiplexerTimeServer-001").start();
        }
    }
    package com.smkj.netty;
    
    import java.io.IOException;
    import java.net.InetSocketAddress;
    import java.nio.ByteBuffer;
    import java.nio.channels.SelectionKey;
    import java.nio.channels.Selector;
    import java.nio.channels.ServerSocketChannel;
    import java.nio.channels.SocketChannel;
    import java.util.Iterator;
    import java.util.Set;
    
    public class MultiplexerTimeServer implements Runnable {
        private Selector selector;
        private ServerSocketChannel servChannel;
        private volatile boolean stop;
    
        /**
         * 初始化多路复用器,绑定监听端口
         * 
         * @param port
         */
        public MultiplexerTimeServer(int port) {
            try {
                selector = Selector.open();
                servChannel = ServerSocketChannel.open();
                servChannel.configureBlocking(false);
                servChannel.socket().bind(new InetSocketAddress(port), 1024);
                servChannel.register(selector, SelectionKey.OP_ACCEPT);
                System.out.println("The time server is start in port:" + port);
            } catch (IOException e) {
                e.printStackTrace();
                System.exit(1);
            }
        }
    
        public void stop() {
            this.stop = true;
        }
    
        @Override
        public void run() {
            while (!stop) {
                try {
                    selector.select(1000);
                    Set<SelectionKey> selectedKeys = selector.selectedKeys();
                    Iterator<SelectionKey> it = selectedKeys.iterator();
                    SelectionKey key = null;
                    while (it.hasNext()) {
                        key = it.next();
                        it.remove();
                        try {
                            handleInput(key); // 处理key
                        } catch (Exception e) {
                            // TODO: handle exception
                            if (key != null) {
                                key.cancel();
                                if (key.channel() != null) {
                                    key.channel().close();
                                }
                            }
                        }
                    }
                } catch (Exception e) {
                    // TODO: handle exception
                    e.printStackTrace();
                }
            }
            // 多路复用器关闭后,所有注册在上面的Channel和Pipe等资源都会被自动去注册,所以不需要重复释放资源
            if (selector != null) {
                try {
                    selector.close();
                } catch (Exception e) {
                    // TODO: handle exception
                    e.printStackTrace();
                }
            }
    
        }
    
        private void handleInput(SelectionKey key) throws IOException {
            if (key.isValid()) {
                // 处理新接入的请求消息
                if (key.isAcceptable()) {
    
                    // Accept the new connection
                    ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
                    SocketChannel sc = ssc.accept();
                    sc.configureBlocking(false);
    
                    // Add the new connection to the selector
                    sc.register(selector, SelectionKey.OP_READ);
                }
                if (key.isReadable()) {
                    // Read the Data
                    SocketChannel sc = (SocketChannel) key.channel();
                    ByteBuffer readBuffer = ByteBuffer.allocate(1024);
                    int readBytes = sc.read(readBuffer);
                    if (readBytes > 0) {
                        readBuffer.flip();
                        byte[] bytes = new byte[readBuffer.remaining()];
                        readBuffer.get(bytes);
                        String body = new String(bytes, "UTF-8");
                        System.out.println("the time server receive order:" + body);
    
                        String currentTime = "QUERY TIME ORDER".equalsIgnoreCase(body)
                                ? new java.util.Date(System.currentTimeMillis()).toString()
                                : "Bad Order";
                                doWrite(sc,currentTime);
                    }else if(readBytes<0) {
                        //对端链路关闭
                        key.cancel();
                        sc.close();
                    }else {
                        //读到0字节 忽略
                        ;
                    }
                }
            }
        }
        
        private void doWrite(SocketChannel channel,String response) throws IOException {
            
            if(response!=null&&response.trim().length()>0) {
                byte[] bytes = response.getBytes();
                ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length);
                writeBuffer.put(bytes);
                writeBuffer.flip();
                channel.write(writeBuffer);
                   if (!writeBuffer.hasRemaining())
    
                        System.out.println("Send response to client succeed.");
    
                    }
            }
            
    }
    package com.smkj.netty;
    
    public class TimeClient {
    public static void main(String[] args) {
        int port = 8080;
        if(args!=null&&args.length!=0) {
            try {
                port = Integer.valueOf(args[0]);
            } catch (NumberFormatException e) {
                //采用默认值
            }
        }
        
        new Thread(new TimeClientHandle("127.0.0.1",port),"TimeClient-001").start();
    }
    }
    package com.smkj.netty;
    
    import java.io.IOException;
    import java.net.InetSocketAddress;
    import java.nio.ByteBuffer;
    import java.nio.channels.SelectionKey;
    import java.nio.channels.Selector;
    import java.nio.channels.SocketChannel;
    import java.util.Iterator;
    import java.util.Set;
    
    public class TimeClientHandle implements Runnable {
    
        private String host;
    
        private int port;
    
        private Selector selector;
    
        private SocketChannel socketChannel;
    
        private volatile boolean stop;
    
     
        public TimeClientHandle(String host, int port) {
    
        this.host = host == null ? "127.0.0.1" : host;
    
        this.port = port;
    
        try {
    
            selector = Selector.open();
    
            socketChannel = SocketChannel.open();
    
            socketChannel.configureBlocking(false);
    
        } catch (IOException e) {
    
            e.printStackTrace();
    
            System.exit(1);
    
        }
        }
        /*
    022
         * (non-Javadoc)
    023
         *
    024
         * @see java.lang.Runnable#run()
    025
         */
        @Override
        public void run() {
        try {
            doConnect();
        } catch (IOException e) {
            e.printStackTrace();
    
            System.exit(1);
    
        }
    
        while (!stop) {
    
            try {
    
            selector.select(1000);
    
            Set<SelectionKey> selectedKeys = selector.selectedKeys();
    
            Iterator<SelectionKey> it = selectedKeys.iterator();
    
            SelectionKey key = null;
    
            while (it.hasNext()) {
    
                key = it.next();
    
                it.remove();
    
                try {
    
                handleInput(key);
    
                } catch (Exception e) {
    
                if (key != null) {
    
                    key.cancel();
    
                    if (key.channel() != null)
    
                    key.channel().close();
    
                }
    
                }
    
            }
    
            } catch (Exception e) {
    
            e.printStackTrace();
    
            System.exit(1);
    
            }
    
        }
    
     
    
        // 多路复用器关闭后,所有注册在上面的Channel和Pipe等资源都会被自动去注册并关闭,所以不需要重复释放资源
    
        if (selector != null)
    
            try {
    
            selector.close();
    
            } catch (IOException e) {
    
            e.printStackTrace();
    
            }
    
        }
    
     
    
        private void handleInput(SelectionKey key) throws IOException {
    
     
    
        if (key.isValid()) {
    
            // 判断是否连接成功
    
            SocketChannel sc = (SocketChannel) key.channel();
    
            if (key.isConnectable()) {
    
            if (sc.finishConnect()) {
    
                sc.register(selector, SelectionKey.OP_READ);
    
                doWrite(sc);
    
            } else
    
                System.exit(1);// 连接失败,进程退出
    
            }
    
            if (key.isReadable()) {
    
            ByteBuffer readBuffer = ByteBuffer.allocate(1024);
    
            int readBytes = sc.read(readBuffer);
    
            if (readBytes > 0) {
    
                readBuffer.flip();
    
                byte[] bytes = new byte[readBuffer.remaining()];
    
                readBuffer.get(bytes);
    
                String body = new String(bytes, "UTF-8");
    
                System.out.println("Now is : " + body);
    
                this.stop = true;
    
            } else if (readBytes < 0) {
    
                // 对端链路关闭
    
                key.cancel();
    
                sc.close();
        } else
    
                ; // 读到0字节,忽略
    
            }
    
        }
    
     
    
        }
        private void doConnect() throws IOException {
    
        // 如果直接连接成功,则注册到多路复用器上,发送请求消息,读应答
    
        if (socketChannel.connect(new InetSocketAddress(host, port))) {
    
            socketChannel.register(selector, SelectionKey.OP_READ);
    
            doWrite(socketChannel);
    
        } else
    
            socketChannel.register(selector, SelectionKey.OP_CONNECT);
    
        }
    
    
        private void doWrite(SocketChannel sc) throws IOException {
    
        byte[] req = "QUERY TIME ORDER".getBytes();
    
        ByteBuffer writeBuffer = ByteBuffer.allocate(req.length);
    
        writeBuffer.put(req);
    
        writeBuffer.flip();
    
        sc.write(writeBuffer);
    
        if (!writeBuffer.hasRemaining())
    
            System.out.println("Send order 2 server succeed.");
    
        }
    
    }

    可以发现服务端的最后进行了remove()操作,将SelectionKey从迭代器中删除了,博主一开始总觉得很纳闷,SelectionKey中可是记录了相关的channel信息,如果将SelectionKey删除了,那不就代表着将通道信息也抹除了吗,那后续还怎么继续获取通道,说来惭愧,这问题问的确实缺乏水准。

    后来博主理了理selector的思路,要知道,一码事归一码事,channel是注册在selector中的,在后面的轮询中,是先将已准备好的channel挑选出来,即selector.select(),再通过selectedKeys()生成的一个SelectionKey迭代器进行轮询的,一次轮询会将这个迭代器中的每个SelectionKey都遍历一遍,每次访问后都remove()相应的SelectionKey,但是移除了selectedKeys中的SelectionKey不代表移除了selector中的channel信息(这点很重要),注册过的channel信息会以SelectionKey的形式存储在selector.keys()中,也就是说每次select()后的selectedKeys迭代器中是不能还有成员的,但keys()中的成员是不会被删除的(以此来记录channel信息)。

    那么为什么要删除呢,要知道,迭代器如果只需要访问的话,直接访问就好了,完全没必要remove()其中的元素啊,查询了相关资料,一致的回答是为了防止重复处理(大雾),后来又有信息说明:每次循环调用remove()是因为selector不会自己从已选择集合中移除selectionKey实例,必须在处理完通道时自己移除,这样,在下次select时,会将这个就绪通道添加到已选择通道集合中,其实到这里就已经可以理解了,selector不会自己删除selectedKeys()集合中的selectionKey,那么如果不人工remove(),将导致下次select()的时候selectedKeys()中仍有上次轮询留下来的信息,这样必然会出现错误,假设这次轮询时该通道并没有准备好,却又由于上次轮询未被remove()的原因被认为已经准备好了,这样能不出错吗?

    即selector.select()会将准备好的channel以SelectionKey的形式放置于selector的selectedKeys()中供使用者迭代,使用的过程中需将selectedKeys清空,这样下次selector.select()时就不会出现错误了。

  • 相关阅读:
    Java 实现 蓝桥杯 生兔子问题
    Java实现 蓝桥杯 基因牛的繁殖
    Java实现 蓝桥杯 基因牛的繁殖
    Java实现 蓝桥杯 基因牛的繁殖
    Java实现 LeetCode 33 搜索旋转排序数组
    Java实现 LeetCode 33 搜索旋转排序数组
    Java实现 LeetCode 33 搜索旋转排序数组
    深入探究VC —— 资源编译器rc.exe(3)
    深入探究VC —— 编译器cl.exe(2)
    深入探究VC —— 编译器cl.exe(1)
  • 原文地址:https://www.cnblogs.com/fengwenzhee/p/10407587.html
Copyright © 2011-2022 走看看