zoukankan      html  css  js  c++  java
  • JAVA NIO Socket通道

     

    DatagramChannel和SocketChannel都实现定义读写功能,ServerSocketChannel不实现,只负责监听传入的连接,并建立新的SocketChannel,本身不传输数据。

    Socket通道被实例化时都会创建一个对等的socket,通过此方式创建的socket都会有关联的通道,通过getChannel()获取。

    继承于 SelectableChannel,所以socket可以在非阻塞模式下运行:

     

    Readiness Selection:就绪选择,查询通道的机制,该机制可以判断通道是否准备好执行下一个目标操作(读,写...),其价值在于潜在的大量通道可以同时进行就绪检查,真正的就绪选择需要由操作系统来做,处理IO请求,并通知各个线程数据准备情况。

    Selector选择器:提供了这种抽象(抽象接口),是的Java代码能够以可移植的方式,请求底层操作系统提供这种服务。

    Selector选择器类:管理着一个被注册的通道集合的信息和他们的状态,通道和选择器是一起被注册的,并且使用选择器来更新通道状态。 

    一个通道可以被注册到多个选择器上,但在每个选择器上,只能注册一次。

    SelectionKey选择键:封装了通道和选择器的注册关系,选择键被SelectableChannel.register()返回并提供标识这种注册关系的标记。

    通道在被注册到选择器之前必须设置为noblocking模式,正常状态。

     

    chanel.register(selector, keystate):通道注册选择器。

    selector.select():阻塞操作,直到某一个channel的keystate发生。

    selectionKey.cancel(),取消注册关系。 

    通道关闭,相关的注册键会自动取消,选择器关闭,则所有注册到该选择器的通道都将被注销,并且相关的键会立刻失效。

    selectionkey包含两个以整数型式进行编码的比特掩码,一个用于指示那些通道和选择器组合所关心的操作,另一个表示通道准备好要执行的操作。当前的interest集合可以通过调用见对象的interestOps()方法来获取,且永远不会被选择器改变,但可以调用interestOps()方法,传入一个新的比特码来改变。

    readyOpts()获取相关通道的已就绪的操作,ready集合是interest集合的子集,表示从上次调用select()之后已经就绪的操作。如下:

    if((key.readOps() & SelctionKey.OP_READ) != 0){

        buffer.clear();

        key.channel().read(buffer);

        do()....

    }

    附加参数:attach()

    SelectionKey key = SelectableChannel.register(Selector, SelectionKey.OP_XXX, paramObj);

    等价:

    SelectionKey key = SelectableChannel.register(Selector, SelectionKey.OP_XXX);

    key.attach(paramObj);

    SelectionKey 多线程应用同步问题。

    选择器:

    Selector上的已注册键集合中,会存在失效键、null,keys()返回,不可修改。

    已选择键集合,selectedKeys()返回,已经准备好的键集合,可能为空。

    核心:选择过程,是对select(),poll(),epoll()等本地调用(native call)或者类似的操作系统的本地调用的包装(抽象),期间,将执行以下过程:

    1. 已取消的键的集合将会被检查,如果非空,则会被从其它两个集合(已注册、已选择)移除,相关通道将会被注销,键被清空。

    2. 已注册键的集合的键的interest集合将会被检查,就绪条件确定,底层操作系统对通道所关心的操作的就绪状态进行检查,如果没有,则阻塞当前(超时值)。

      对于已经就绪的通道执行:

      a. 如果通道的键未在已选择的键集合中,那么键的reay集合将会被清空,然后设置当前准备好的比特掩码。

      b. 如果通道的键已在已选择的键集合中,键的ready集合更新。不再就绪的状态不会被清除。

    3. select返回的是从上一次select()调用之后进入就绪状态的通道数量,之前的调用中已经就绪的,并且本次调用中仍然就绪的不会被计入。

    使用内部已取消的键的集合来延迟注销,防止线程在取消键时阻塞及与正在进行的选择操作冲突的优化,

    三种形式的select: select(), select(timeout),selectNow()(非阻塞,立刻返回当前状况)。

    调用 Selector 对象的 wakeup( )方法将使得选择器上的第一个还没有返回的选择操作立即回。如果当前没有在进行中的选择,那么下一次对 select( )方法的一种形式的调用将立即返回。后续的选择操作将正常进行。在选择操作之间多次调用 wakeup( )方法与调用它一次没有什么不同。有时这种延迟的唤醒行为并不是您想要的。您可能只想唤醒一个睡眠中的线程,而使得后续的

    选择继续正常地进行。您可以通过在调用 wakeup( )方法后调用 selectNow( )方法来绕过这个问题。 

    通常的做法是在选择器上调用一次 select 操作(这将更新已选择的键的集合),然后遍历 selectKeys( )方法返回的键的集合。在按顺序进行检查每个键的过程中,相关的通道也根据键的就绪集合进行处理。然后键将从已选择的键的集合中被移除(通过在 Iterator对象上调用 remove( )方法),然后检查下一个键。完成后,通过再次调用 select( )方法重复这个循环。如下:

    package org.windwant.nio;
    
    import java.io.IOException;
    import java.net.InetSocketAddress;
    import java.net.ServerSocket;
    import java.nio.ByteBuffer;
    import java.nio.channels.SelectionKey;
    import java.nio.channels.Selector;
    import java.nio.channels.ServerSocketChannel;
    import java.nio.channels.SocketChannel;
    import java.util.Iterator;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    
    /**
     * Created by windwant on 2016/10/27.
     */
    public class SocketChannelOpt {
    
        private static final String HOST = "localhost";
        private static final int PORT = 8888;
    
        private static ExecutorService read = Executors.newFixedThreadPool(5);
        private static ExecutorService write = Executors.newFixedThreadPool(5);
    
        public static void main(String[] args){
            ServerSocketChannel serverSocketChannel = null;
            ServerSocket serverSocket = null;
            Selector selector = null;
            try {
                serverSocketChannel = ServerSocketChannel.open();//工厂方法创建ServerSocketChannel
                serverSocket = serverSocketChannel.socket(); //获取channel对应的ServerSocket
                serverSocket.bind(new InetSocketAddress(HOST, PORT)); //绑定地址
                serverSocketChannel.configureBlocking(false); //设置ServerSocketChannel非阻塞模式
                selector = Selector.open();//工厂方法创建Selector
                serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);//通道注册选择器,接受连接就绪状态。
                while (true){//循环检查
                    if(selector.select() == 0){//阻塞检查,当有就绪状态发生,返回键集合
                        continue;
                    }
    
                    Iterator<SelectionKey> it = selector.selectedKeys().iterator(); //获取就绪键遍历对象。
                    while (it.hasNext()){
                        SelectionKey selectionKey = it.next();
                        //处理就绪状态
                        if (selectionKey.isAcceptable()){
                            ServerSocketChannel schannel = (ServerSocketChannel) selectionKey.channel();//只负责监听,阻塞,管理,不发送、接收数据
                            SocketChannel socketChannel = schannel.accept();//就绪后的操作,刚到达的socket句柄
                            if(null == socketChannel){
                                continue;
                            }
                            socketChannel.configureBlocking(false);
                            socketChannel.register(selector, SelectionKey.OP_READ); //告知选择器关心的通道,准备好读数据
                        }else if(selectionKey.isReadable()){
                            SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
                            ByteBuffer byteBuffer = ByteBuffer.allocate(4*1024);
    
                            StringBuilder result = new StringBuilder();
                            while (socketChannel.read(byteBuffer) > 0){//确保读完
                                byteBuffer.flip();
                                result.append(new String(byteBuffer.array()));
                                byteBuffer.clear();//每次清空 对应上面flip()
                            }
    
                            System.out.println("server receive: " + result.toString());
                            socketChannel.register(selector, SelectionKey.OP_WRITE);
    
                        }else if(selectionKey.isWritable()){
                            SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
                            String sendStr = "server send data: " + Math.random();
                            ByteBuffer send = ByteBuffer.wrap(sendStr.getBytes());
                            while (send.hasRemaining()){
                                socketChannel.write(send);
                            }
                            socketChannel.register(selector, SelectionKey.OP_READ);
                            System.out.println(sendStr);
                        }
                        it.remove();
                    }
                }
    
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

    Selector多线程执行,同步需求。

    一个线程监控通道的就绪状态,一个线程池处理业务需求。线程池也可以扩展为不同的业务处理线程池,如日志、业务、心跳。

    package org.windwant.nio;
    
    import java.io.IOException;
    import java.net.InetSocketAddress;
    import java.net.ServerSocket;
    import java.nio.ByteBuffer;
    import java.nio.channels.SelectionKey;
    import java.nio.channels.Selector;
    import java.nio.channels.ServerSocketChannel;
    import java.nio.channels.SocketChannel;
    import java.util.Iterator;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    
    /**
     * 线程处理读取,写出
     * Created by windwant on 2016/10/27.
     */
    public class TSocketChannelOpt {
    
        private static final String HOST = "localhost";
        private static final int PORT = 8888;
    
        private static ExecutorService read = Executors.newFixedThreadPool(5);
        private static ExecutorService write = Executors.newFixedThreadPool(5);
    
        public static void main(String[] args){
            ServerSocketChannel serverSocketChannel = null;
            ServerSocket serverSocket = null;
            Selector selector = null;
            try {
                serverSocketChannel = ServerSocketChannel.open();//工厂方法创建ServerSocketChannel
                serverSocket = serverSocketChannel.socket(); //获取channel对应的ServerSocket
                serverSocket.bind(new InetSocketAddress(HOST, PORT)); //绑定地址
                serverSocketChannel.configureBlocking(false); //设置ServerSocketChannel非阻塞模式
                selector = Selector.open();//工厂方法创建Selector
                serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);//通道注册选择器,接受连接就绪状态。
                while (true){//循环检查
                    if(selector.select() == 0){//阻塞检查,当有就绪状态发生,返回键集合
                        continue;
                    }
    
                    Iterator<SelectionKey> it = selector.selectedKeys().iterator(); //获取就绪键遍历对象。
                    while (it.hasNext()){
                        SelectionKey selectionKey = it.next();
                        it.remove();
                        //处理就绪状态
                        if (selectionKey.isAcceptable()){
                            ServerSocketChannel schannel = (ServerSocketChannel) selectionKey.channel();//只负责监听,阻塞,管理,不发送、接收数据
                            SocketChannel socketChannel = schannel.accept();//就绪后的操作,刚到达的socket句柄
                            if(null == socketChannel){
                                continue;
                            }
                            socketChannel.configureBlocking(false);
                            socketChannel.register(selector, SelectionKey.OP_READ); //告知选择器关心的通道,准备好读数据
                        }else if(selectionKey.isReadable()){
                            SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
                            read.execute(new MyReadRunnable(socketChannel));
    
    //                        SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
    //                        ByteBuffer byteBuffer = ByteBuffer.allocate(4*1024);
    //
    //                        StringBuilder result = new StringBuilder();
    //                        while (socketChannel.read(byteBuffer) > 0){//确保读完
    //                            byteBuffer.flip();
    //                            result.append(new String(byteBuffer.array()));
    //                            byteBuffer.clear();//每次清空 对应上面flip()
    //                        }
    //
    //                        System.out.println("server receive: " + result.toString());
                            socketChannel.register(selector, SelectionKey.OP_WRITE);
    
                        }else if(selectionKey.isWritable()){
                            SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
                            write.execute(new MyWriteRunnable(socketChannel));
    //                        String sendStr = "server send data: " + Math.random();
    //                        ByteBuffer send = ByteBuffer.wrap(sendStr.getBytes());
    //                        while (send.hasRemaining()){
    //                            socketChannel.write(send);
    //                        }
    //                        System.out.println(sendStr);
                            socketChannel.register(selector, SelectionKey.OP_READ);
                        }
                    }
                }
    
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    
        static class MyReadRunnable implements Runnable {
    
            private SocketChannel channel;
    
            public MyReadRunnable(SocketChannel channel){
                this.channel = channel;
            }
    
            @Override
            public synchronized void  run() {
                ByteBuffer byteBuffer = ByteBuffer.allocate(4*1024);
    
                StringBuilder result = new StringBuilder();
                try {
                    while (channel.read(byteBuffer) > 0){//确保读完
                        byteBuffer.flip();
                        result.append(new String(byteBuffer.array()));
                        byteBuffer.clear();//每次清空 对应上面flip()
                    }
                    System.out.println("server receive: " + result.toString());
                } catch (IOException e) {
                    e.printStackTrace();
                }
    
    
            }
        }
    
        static class MyWriteRunnable implements Runnable {
    
            private SocketChannel channel;
    
            public MyWriteRunnable(SocketChannel channel){
                this.channel = channel;
            }
    
            @Override
            public void run() {
                String sendStr = "server send data: " + Math.random();
                ByteBuffer send = ByteBuffer.wrap(sendStr.getBytes());
                try {
                    while (send.hasRemaining()) {
                        channel.write(send);
                    }
                    System.out.println(sendStr);
                }catch (Exception e){
                    e.printStackTrace();
                }
    
            }
        }
    }
     
  • 相关阅读:
    (转)两千行PHP学习笔记
    PHP语言 -- 基础
    数据库-T-SQL 语句-高级查询
    数据库-T-SQL 语句-简单查询
    数据库-T-SQL 语句-创建表,删除表,CRUD操作的添加数据,修改数据,删除数据
    PHP 数组
    PHP函数
    PhP 基础
    无缝循环
    l两张图片轮播
  • 原文地址:https://www.cnblogs.com/niejunlei/p/6003896.html
Copyright © 2011-2022 走看看