zoukankan      html  css  js  c++  java
  • SocketChannel / ServerSocketChannel / Selector


    Channel
     
    Channel用来操作数据块,写入buffer或者从IO中读到buffer。
     

    SocketChannel
     
    创建
     
    // 创建,并阻塞等待连接
    SocketChannel sc = SocketChannel.open(new InetSocketAddress("abc.com",80));
     
    // 创建,使用connect()连接
    SocketChannel sc = SocketChannel.open();
    sc.connect(new InetSocketAddress("abc.com",80)); // blocking
     
    // 创建,非阻塞
    SocketChannel sc = SocketChannel.open();
    sc.configureBlocking(false);     // set no-block
    sc.connect(new InetSocketAddress("abc.com",80)); // no-blocking
     
    // 非阻塞模式的一般处理方式——轮询!
    while(true)
    {
         if(sc.finishConnect() /* 立即返回,已经连接返回true,否则false */){
              break; 
              /* connected,start write */
         }
    }
     
    .isConnected()
    .isConnectionPending() // 正在setup,还未open
     
     
     
    int sc.read(ByteBuffer dst)
    尽可能多的读数据到dst中。
    返回读取的字节数量,如果读取完毕,返回-1。
     
    while(buffer.hasRemaining() && channel.read(buffer)!=-1 )
    {
    }
     
    Scatter 读取并复制到多个Buffer
     
    ByteBuffer[] buffers = new ByteBuffer[2];
    buffers[0] = ByteBuffer.allocate(1000);
    buffers[1] = ByteBuffer.allocate(1000);
    while(buffer[1].hasRemaining() && channel.read(buffer)!=-1 )
    {
    }
     
     
     
    while(buffer.hasRemaining() && channel.write(buffer)!=-1 )
    {
    }
     
    gather 从多个buffer读取,写入到同一个socket中
     
    channel.write(ByteBuffer[] dsts);
     
     
    关闭
     
    if( channel.isOpen() )
    {
         channel.close();
    }
     


    ServerSocketChannel
     
     
    创建
     
    ServerSocketChannel ssc = ServerSocketChannel.open(); // open并非打开,而是创建
    ServerSocket ss = ssc.socket();
    ss.bind(new InetSocketAddress(80));
     
    监听
     
    .appcept();
    默认是 blocking 模式
    non-blocking模式下,.accept()在没有连接的情况下立刻返回null。需要使用 Selector 处理。
     
     


    Selector
     
    Selector selector = Selector.open();
     
    向selector注册channel
    ServerSocketChannel ssc = ... ;
    ssc.register(selector,SelectionKey.OP_ACCEPT/* 监听事件 */|SelectionKey.OP_CONNECT);
    ssc.register(selector,OP,object state);
     
    事件类型
    SelectionKey.OP_ACCEPT
    SocketChannel无本事件,ServerSocketChannel有
              
    SelectionKey.OP_CONNECT          
    SelectionKey.OP_READ
    SelectionKey.OP_WRITE
     
    各个channel注册到selector后,随时可以轮询哪些channel有了事件需要处理
    selector.selectNow() // no-blocking ,没有则返回0
    selector.select()     // blocing,直到至少有一个事件
    selector.select(long timeout)
     
    有事件准备好处理之后,获取对应的channel
    Set selector.selectedKeys();
     
    关闭
    selector.close()
     
     
    SelectionKey
     
    当要处理selectedKey的时候,先判断是哪个事件
    SelectionKey key = selector.selectedKeys().get(0);
    if(key.isAcceptable()){}
    if(key.isConnectionable()){}
     
    // 获取对应的channel
    SelectableChannel c = key.channel();
     
    // 获取附加状态
    key.attachment()
     
    // 不再跟踪
    key.cancel()
     

    DEMO:
    阻塞模式下的SocketChannel与阻塞模式下的ServerSocketChannel交互(无需用到Selector)
     
        /**
        * 阻塞模式下的ServerSocketChannel
        */
        public static void startBlockingServerChannel() {
            new Thread(new Runnable() {
                    @Override
                    public void run() {
                        ServerSocketChannel serverSocketChannel = null;
    
                        try {
                            while (true) {
                                serverSocketChannel = ServerSocketChannel.open();
                                serverSocketChannel.configureBlocking(true);
                                serverSocketChannel.bind(new InetSocketAddress(90));
    
                                System.out.println("[server]accepting...");
    
                                SocketChannel client = serverSocketChannel.accept();
    
                                ByteBuffer dst = ByteBuffer.allocate(1024);
                                client.read(dst);
    
                                String result = new String(dst.array(), "UTF-8");
    
                                System.out.println("->" + result);
                                serverSocketChannel.close();
                            }
                        } catch (IOException e) {
                            e.printStackTrace();
                        } finally {
                            serverSocketChannel.close();
                        }
                    }
                }).start();
        }
    
        /**
        * 阻塞SocketChanel调用ServerSocketChannel
        */
        public static void startBlockingClinetUsingSocketChannel() {
            new Thread(new Runnable() {
                    @Override
                    public void run() {
                        SocketChannel s = null;
    
                        try {
                            System.out.println("[client]start a client socket:");
                            s = SocketChannel.open();
                            //s.configureBlocking(true);
                            System.out.println("[client]try connect...");
                            s.connect(new InetSocketAddress("127.0.0.1", 90));
                            System.out.println("[client]after connect");
    
                            ByteBuffer src = ByteBuffer.allocate(20);
                            src.put("Hello".getBytes("UTF-8"));
                            src.flip(); // 为读准备好
                            s.write(src);
                            s.close();
                        } catch (Exception e) {
                            e.printStackTrace();
                        } finally {
                            try {
                                s.close();
                            } catch (IOException e) {
                                e.printStackTrace();
                            }
                        }
                    }
                }).start();
        }
     
    DEMO:
    非阻塞模式下的SocketChannel作为客户端
     
        public static void startNoBlocingSocketChannelAsClinet() {
            SocketChannel socketChannel = null;
    
            try {
                System.out.println("[client]start a client socket:");
                s = SocketChannel.open();
                s.configureBlocking(false);
                s.connect(new InetSocketAddress("127.0.0.1", 90));
    
                while (true) {
                    if (s.finishConnect()) {
                        System.out.println("[client]connected...");
    
                        ByteBuffer src = ByteBuffer.allocate(100);
                        src.put("Hello".getBytes("UTF-8"));
                        src.flip();
                        s.write(src);
    
                        break;
                    }
                }
            } catch (Exception e) {
            } finally { /*close*/
            }
        }
     
    DEMO:
    非阻塞模式下的ServerSocketChannel + Selector 实现服务器端(单线程多通道)
     
       public static void startNoBlockingServerChannel() {
            new Thread(new Runnable() {
                    @Override
                    public void run() {
                        ServerSocketChannel serverSocketChannel = null;
                        Selector selector = null;
    
                        try {
                            selector = Selector.open();
    
                            serverSocketChannel = ServerSocketChannel.open();
                            serverSocketChannel.configureBlocking(false);
                            serverSocketChannel.socket()
                                               .bind(new InetSocketAddress(90));
    
                            // register to selector
                            serverSocketChannel.register(selector,
                                SelectionKey.OP_ACCEPT);
    
                            // 轮询selector
                            while (true) {
                                System.out.println(
                                    "[server] Selector select and blocking ...");
                                selector.select(); // blocking,wait until 1 event
    
                                Set<SelectionKey> keys = selector.selectedKeys(); // not .keys()
                                Iterator iter = keys.iterator();
    
                                while (iter.hasNext()) {
                                    SelectionKey key = (SelectionKey) iter.next();
                                    iter.remove();
    
                                    // 以下判断最好使用 if else
                                    if (key.isValid() == false) {
                                        continue;
                                    } else if (key.isAcceptable()) {
                                        SocketChannel client = ((ServerSocketChannel) key.channel()).accept();
                                        client.configureBlocking(false); // 继续设置为非阻塞模式(也可以是阻塞,直接处理)
                                        client.register(selector,
                                            SelectionKey.OP_READ); // 继续向Selector注册
                                    } else if (key.isReadable()) {
                                        // start read
                                        SocketChannel client = (SocketChannel) key.channel();
    
                                        ByteBuffer bf = ByteBuffer.allocate(1000);
                                        int count = client.read(bf);
                                        System.out.println("-->" +
                                            new String(bf.array(), "UTF-8"));
                                        // bf.flip();
                                        key.cancel();
                                    } else if (key.isWritable()) {
                                        // 不做处理
                                    }
                                }
                            }
                        } catch (IOException e) {
                            e.printStackTrace();
                        } finally { /* close selector and ServerSocketChannel*/
                        }
                    }
                }).start();
        }
     
  • 相关阅读:
    Kotlin使用常见问题汇总
    浅谈Kotlin(五): 静态变量&静态方法
    浅谈Kotlin(八):空安全、空类型
    浅谈Kotlin(七):lateinit、by lazy 使用
    浅谈Kotlin(六):data class的使用
    实例:([Flappy Bird Qlearning]
    强化学习之MountainCarContinuous(注册自己的gym环境)
    seq2seq之双向解码
    AwesomeVisualCaptioning
    VUE hash路由和history路由的区别
  • 原文地址:https://www.cnblogs.com/caca/p/3585295.html
Copyright © 2011-2022 走看看