zoukankan      html  css  js  c++  java
  • JAVA NIO 之 Selector 组件

    NIO 重要功能就是实现多路复用。Selector是SelectableChannel对象的多路复用器。一些基础知识:

    选择器(Selector):选择器类管理着一个被注册的通道集合的信息和它们的就绪状态。

    可选择通道(SelectableChannel):这个抽象类提供了实现通道的可选择性所需要的公共方法。它是所有支持就绪检查的通道类的

    父类。例如:ServerSocketChannel、SocketChannel。可选择通道可以被注册到选择器上。

    选择键(SelectionKey):选择键封装了特定的通道与特定的选择器的注册关系。

    前面的一篇文章NIO简介中介绍了传统io的同步阻塞服务器实现,现在来看看NIO多路复用服务器的实现。NIO 利用单线程轮询事件机制,定位就绪的Channel,决定执行什么,

    仅仅 select()方法阶段是阻塞的。这样一个选择器避免了之前的多个客服端时切换线程的问题。下面的一张图能描述这种场景:

     代码实现:

    服务器server:

    public class SelectSockets {
    
        private static int PORT_NUMBER = 9011;
    
        /**
         * allocateDirect(1024) 此方法创建的buffer无法调用array();直接内存
         */
        private ByteBuffer buffer = ByteBuffer.allocate(1024);
    
        public static void main(String[] argv) throws Exception {
            new SelectSockets().go(argv);
        }
    
        public void go(String[] argv) throws Exception {
            System.out.println("Listening on port " + PORT_NUMBER);
            // 创建ServerSocketChannel
            ServerSocketChannel serverChannel = ServerSocketChannel.open();
            // 获得ServerSocket
            ServerSocket serverSocket = serverChannel.socket();
            // 创建Selector
            Selector selector = Selector.open();
            // 绑定
            serverSocket.bind(new InetSocketAddress(PORT_NUMBER));
            // false设置为非阻塞模式
            serverChannel.configureBlocking(false);
            // 注册通道
            ////ServerSocketChannel只能注册SelectionKey.OP_ACCEPT;register(Selector sel, int ops)的ops参数可以通过serverSocketChannel.validOps()获取。
            serverChannel.register(selector, SelectionKey.OP_ACCEPT);
            while (true) {
                //选择器select有三种方式,这种带时间的表示,没有连接阻塞10秒后继续或者有连接进来时继续
                int n = selector.select(10000);
                if (n == 0) {
                    continue;
                }
                //selectedKeys()已选择的键
                Iterator it = selector.selectedKeys().iterator();
                while (it.hasNext()) {
                    SelectionKey key = (SelectionKey) it.next();
                    //检查是否有效
                    if (!key.isValid()) {
                        continue;
                    }
                    //accept
                    if (key.isAcceptable()) {
                        ServerSocketChannel server = (ServerSocketChannel) key.channel();
                        SocketChannel channel = server.accept();
                        System.out.println ("Incoming connection from: "+ channel.socket().getRemoteSocketAddress( ));
                        registerChannel(selector, channel, SelectionKey.OP_READ);
                        buffer.clear();
                        buffer.put("你好,我是服务器!
    ".getBytes());
                        buffer.flip();
                        channel.write(buffer);
                    }
                    //if(key.isReadable())等价于if((key.readyOps( ) & SelectionKey.OP_READ) != 0)
                    if (key.isReadable()) {
                        readHandler(key);
                    }
                    it.remove();
                }
            }
        }
    
        /**
         * 设置感兴趣的通道属性
         * @param selector
         * @param channel
         * @param ops
         * @throws Exception
         */
        protected void registerChannel(Selector selector, SelectableChannel channel, int ops) throws Exception {
            if (channel == null) {
                return;
            }
            channel.configureBlocking(false);
            channel.register(selector, ops);
        }
    
        /**
         * 处理读取数据
         * @param key
         * @throws Exception
         */
        protected void readHandler(SelectionKey key) throws Exception {
            SocketChannel socketChannel = (SocketChannel) key.channel();
            int count;
            StringBuilder sb = new StringBuilder();
            ByteBuffer tmpByteBuffer = ByteBuffer.allocate(1024);
            //读取客服端消息
            while ((count = socketChannel.read(tmpByteBuffer)) > 0) {
                tmpByteBuffer.flip();
                sb.append(new String(tmpByteBuffer.array()));
                // 这里可以回写给客服端
                while (tmpByteBuffer.hasRemaining()) {
                    socketChannel.write(tmpByteBuffer);
                }
                tmpByteBuffer.clear();
            }
            System.out.println("客服端"+socketChannel.socket().getRemoteSocketAddress()+"说:"+sb.toString());
    
            if (count < 0) {
                // Close channel on EOF, invalidates the key
                socketChannel.close();
            }
        }
    
    }
    

      客服端:

    /**
     * @author monkjavaer
     * @date 2018/10/23 22:23
     */
    public class Client {
    
        private static final int BUFFER_SIZE = 1024;
        private static  int PORT = 9011;
        private static String[] messages =
                {"今天读到一句话,觉得很好:但行好事,莫问前程。"};
    
        public static void main(String[] args) {
            try {
                InetAddress inetAddress = InetAddress.getLocalHost();
                InetSocketAddress address =new InetSocketAddress(inetAddress, PORT);
                SocketChannel socketChannel = SocketChannel.open(address);
    
                for (String msg: messages) {
                    ByteBuffer myBuffer=ByteBuffer.allocate(BUFFER_SIZE);
                    myBuffer.put(msg.getBytes());
                    myBuffer.flip();
                    int bytesWritten = socketChannel.write(myBuffer);
                    logger(String.format("Sending Message...: %s
    bytesWritten...: %d",msg, bytesWritten));
                }
                logger("Closing Client connection...");
                socketChannel.close();
            } catch (IOException e) {
                logger(e.getMessage());
                e.printStackTrace();
            }
        }
    
        public static void logger(String msg) {
            System.out.println(msg);
        }
    
    }
    

      也可以用telnet命令测试:

        telnet 127.0.0.1 9011

  • 相关阅读:
    阿里巴巴的云原生应用开源探索与实践
    Helm 3 发布 | 云原生生态周报 Vol. 27
    带你上手一款下载超 10 万次的 IDEA 插件
    最强CP!阿里云联手支付宝小程序如何助力双11?
    媲美5G的Wifi网速、“备战”资产一键领……揭秘双11小二背后的保障力量
    dubbo-go 的开发、设计与功能介绍
    饿了么交付中心语言栈转型总结
    数据一致性检测的应用场景与最佳实践
    2684亿!阿里CTO张建锋:不是任何一朵云都撑得住双11
    《DNS稳定保障系列3--快如闪电,域名解析秒级生效》
  • 原文地址:https://www.cnblogs.com/monkjavaer/p/9839687.html
Copyright © 2011-2022 走看看