zoukankan      html  css  js  c++  java
  • BIO NIO编程

    首先明确Socket通信模型

     Socket通信实现步骤

    1. 创建ServerSocket和Socket
    2. 打开连接到的Socket的输入/输出流
    3. 按照协议对Socket进行读/写操作
    4. 关闭输入输出流,以及Socket

    BIO编程流程

    服务端:

    1. 创建ServerSocket实例
    2. 绑定端口
    3. 通过accept来监听客户端的连接,有客户端连接会返回socket实例
    4. 进行读写操作
    5. 关闭资源

    客户端:

    1. 创建socket实例
    2. 通过connect并指定服务端的IP+端口连接服务端
    3. 进行读写操作
    4. 关闭资源

    客户端发送一个消息,服务端回复一个同样的消息的实现

    服务端:

    public class BIOServer {
        public static void main(String[] args) throws IOException {
            ServerSocket ssocket = null;
            BufferedReader bufferedReader = null;
            OutputStream outputStream = null;
            Socket accept = null;
            try {
                ssocket = new ServerSocket();
                //端口绑定
                ssocket.bind(new InetSocketAddress(8888));
                System.out.println("服务端启动了!");
                //监听
                accept = ssocket.accept();
                System.out.println("客户端:"+accept.getRemoteSocketAddress()+"连接上了");
                while (true) {
                    //读取客户端消息
                    bufferedReader = new BufferedReader(new InputStreamReader(accept.getInputStream()));
                    String msg = bufferedReader.readLine();
                    if(msg.equals("exit")) break;
                    System.out.println("客户端" + accept.getRemoteSocketAddress() + "发来消息:" + msg);
                    //给客户端恢复消息
                    outputStream = accept.getOutputStream();
                    outputStream.write(("回复:" + msg + "
    ").getBytes());
                }
    
            } catch (IOException e) {
                e.printStackTrace();
            }finally {
                //释放资源
                if(ssocket != null) ssocket.close();
                if(accept != null) accept.close();
                if(bufferedReader != null) bufferedReader.close();
                if(outputStream != null) outputStream.close();
                System.out.println("服务端关闭");
            }
    
        }
    }

    客户端:

    public class BIOClient {
        public static void main(String[] args) throws IOException {
            Socket socket = null;
            OutputStream outputStream = null;
            BufferedReader bufferedReader = null;
            try {
                //绑定端口和地址
                socket = new Socket();
                socket.connect(new InetSocketAddress("127.0.0.1",8888));
                System.out.println("连接服务端成功!");
                while (true) {
                    //给客户端发消息
                    outputStream = socket.getOutputStream();
                    Scanner scanner = new Scanner(System.in);
                    System.out.println("请输入消息:");
                    String msg = scanner.nextLine();
                    if(msg.equals("exit"))break;
                    outputStream.write((msg+ "
    ").getBytes());
                    //刷出缓存
                    outputStream.flush();
    
                    //读取客户端消息
                    bufferedReader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
                    System.out.println("服务端" + bufferedReader.readLine());
                }
            } catch (IOException e) {
                e.printStackTrace();
            }finally {
                //释放资源
                if(socket != null) socket.close();
                if(bufferedReader != null) bufferedReader.close();
                if(outputStream != null) outputStream.close();
                System.out.println("客户端端关闭");
            }
        }
    }

    BIO是同步阻塞模型,其阻塞体现在:

    • accept():阻塞接收客户端的连接
    • read() /write()
    • connect():和服务端建立连接(三次握手),连接的过程中connect会阻塞

    BIO的多个客户端的处理(多线程):

     客户端无需改变,服务端如下:

    public class TServer {
        public static void main(String[] args) throws IOException {
            ServerSocket ssocket = null;
            Socket accept = null;
            try {
                ssocket = new ServerSocket();
                //端口绑定
                ssocket.bind(new InetSocketAddress(8888));
                System.out.println("服务端启动了!");
                while (true) {
                //监听,等待多个客户端连接
                accept = ssocket.accept();
                System.out.println("客户端:"+accept.getRemoteSocketAddress()+"连接上了");
                //将Socket实例交给子线程处理
                new TServer().new ServerHanlder(accept).start();
                }
    
            } catch (IOException e) {
                e.printStackTrace();
            }finally {
                //释放资源
                if(ssocket != null) ssocket.close();
                if(accept != null) accept.close();
                System.out.println("服务端关闭");
            }
    
        }
        class ServerHanlder extends Thread{
            private Socket socket;
            private BufferedReader bufferedReader = null;
            private OutputStream outputStream = null;
            public ServerHanlder(Socket socket){
                this.socket = socket;
            }
            @Override
            public void run() {
    
                //读取客户端消息
                try {
                    while (true) {
                    bufferedReader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
                    String msg = bufferedReader.readLine();
                        //程序结束条件
                        if("exit".equals(msg)) break;
                        System.out.println("客户端" + socket.getRemoteSocketAddress() + "发来消息:" + msg);
                        //给客户端恢复消息
                        outputStream = socket.getOutputStream();
                        outputStream.write(("回复:" + msg + "
    ").getBytes());
                    }
                } catch (IOException e) {
                    e.printStackTrace();
                }finally {
                    if (bufferedReader != null) {
                        try {
                            bufferedReader.close();
                        } catch (IOException e) {
                            e.printStackTrace();
                        }
                    }
                    if(outputStream != null) {
                        try {
                            outputStream.close();
                        } catch (IOException e) {
                            e.printStackTrace();
                        }
                    }
                    System.out.println("客户端:"+socket.getRemoteSocketAddress()+"关闭了");
                }
    
            }
        }
    }

          在多个用户的连接情况下,为了达到服务端处理的并发量,就需要对每一个用户的连接要分配一个新线程来处理,但是线程资源时有限的,不能无限制的创建出新的资源,那么对于高并发的支持也就大打折扣了。

    NIO编程流程

    服务端:

    1. 实例化通道:ServerSocketChannel
    2. 绑定端口:通过ServerSocketChannel实例调用bindI()方法绑定端口
    3. 将ServerSocketChannel设置为非阻塞
    4. 实例化选择器(IO复用器)Selector
    5. 将ServerSocketChannel注册给选择器,并且关注accept事件
    6. 监听事件是否完成,selector.select,如果事件未完成则一直阻塞直到事件完成
    7. 获取已完成事件的集合并遍历,判断是否是accept事件,是,则调用accept方法,获取SocketChannel通道
    8. 设置SocketChannel为非阻塞,并将SocketChannel注册到选择器Selector,并关注read事件
    9. 监听事件是否完成,若有事件完成,则判断是否是read读事件
    10. 通过SocketChannel通道读取数据(Buffer中),读完数据循环事件监听,即步骤6
    11. 关闭资源:ServerSocketChannel,SocketChannel,Selector

    客户端:

    1. 实例化通道:SocketChannel
    2. 设置SocketChannel为非阻塞
    3. 实例化复用器:Selector
    4. 连接服务器connect()(该方法不会阻塞直接返回结果,返回为Boolean,是否连接成功)
    5. 若返回为false,则将SocketChannel注册到复用器中,并监听connect可读事件
    6. 监听复用器事件是否完成(Selector.select),判断完成集合中是否有可连接事件,将可连接事件完成(channel.finishConnet())
    7. 给服务端发送消息,channel.write()操作
    8. 关闭资源:selector、SocketChannel

    服务端:

    public class NIOServer {
        public static void main(String[] args) {
            ServerSocketChannel serverSocketChannel = null;
            try {
                //创建ServerSocketChannel通道实例
                serverSocketChannel = ServerSocketChannel.open();
    
                //绑定端口
                serverSocketChannel.bind(new InetSocketAddress(9998));
                System.out.println("服务端启动了");
    
                //将serverSocketChannel设置为非阻塞  configureBlocking设置阻塞非阻塞 false:非阻塞  true:阻塞
                serverSocketChannel.configureBlocking(false);
    
                //创建selector选择器
                Selector selector = Selector.open();
    
                //将通道serverSocketChannel注册到选择器selector,关注可接受事件
                serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
    
                //等待监听结果,调用选择器的select阻塞等待,直到有事件发生才返回
                while (selector.select() > 0) {
                    Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
                    while (iterator.hasNext()) {
                        SelectionKey selectionKey = iterator.next();
                        iterator.remove();
                        //是否是可接受事件
                        if (selectionKey.isAcceptable()) {
                            System.out.println("可接受事件");
                            //有新用户连接
                            ServerSocketChannel serverSocketChannel1 = (ServerSocketChannel) selectionKey.channel();
    
                            //接受客户端的连接,通过accept(不在阻塞)接受一个SocketChannel通道
                            SocketChannel socketChannel = serverSocketChannel1.accept();
    
                            //设置socketChannel为非阻塞
                            socketChannel.configureBlocking(false);
    
                            //将socketChannel注册到选择器selector选择器,关注可读事件
                            socketChannel.register(selector, SelectionKey.OP_READ);
                        }
    
                        //是否是可读事件
                        if (selectionKey.isReadable()) {
                            System.out.println("可读事件");
    
                            //获取SocketChannel通道
                            SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
    
                            //创建Buffer
                            ByteBuffer buffer = allocate(100);
                            //进行读取操作
                            socketChannel.read(buffer);
                            //进行读写模式的切换
                            buffer.flip();
                            //将数据从Buffer中读取
                            byte[] bytes = new byte[buffer.remaining()];
                            buffer.get(bytes);
                            String msg = new String(bytes, 0, bytes.length);
    
                            //打印结果
                            System.out.println("客户端:"+socketChannel.getRemoteAddress()+msg);
                            if(msg.equals("")){
                                socketChannel.close();
                            }
    
                        }
                    }
                }
    
    
            } catch (IOException e) {
                e.printStackTrace();
            } finally {
                //关闭资源
                try {
                    if (serverSocketChannel != null) {
                        serverSocketChannel.close();
                    }
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    
    }

    客户端:

    public class NIOClient {
        public static void main(String[] args) {
            SocketChannel socketChannel = null;
            try {
                //创建SocketChannel通道
                socketChannel = SocketChannel.open();
    
                //设置socketChannel为非阻塞
                socketChannel.configureBlocking(false);
    
                //创建Selector选择器
                Selector selector = Selector.open();
    
                //主动的进行连接,connect操作不在会阻塞,会直接返回,如果连接成功返回true ,连接还未完成返回false
                if (!socketChannel.connect(new InetSocketAddress("127.0.0.1",9998))) {
                    //当前连接操作未完成
                    //将SocketChannel注册到选择器,并关注可连接事件
                    socketChannel.register(selector, SelectionKey.OP_CONNECT);
    
                    //等待连接完成
                    selector.select();
    
                    Iterator <SelectionKey> iterator = selector.selectedKeys().iterator();
                    while (iterator.hasNext()) {
                        SelectionKey selectionKey = iterator.next();
                        iterator.remove();
    
                        //是否是可连接事件
                        if (selectionKey.isConnectable()) {
                            //可连接事件完成
                            SocketChannel channel = (SocketChannel) selectionKey.channel();
                            //连接操作完成
                            channel.finishConnect();
                        }
                    }
                }
    
                //连接成功,给服务端发送消息
                ByteBuffer buffer = ByteBuffer.allocate(100);
                //将发送的数据写到Buffer中
                buffer.put("hello
    ".getBytes());
                //读写模式的切换
                buffer.flip();
                socketChannel.write(buffer);
    
                //关闭资源
                selector.close();
            } catch (IOException e) {
                e.printStackTrace();
            } finally {
                if (socketChannel != null) {
                    try {
                        socketChannel.close();
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
            }
        }
    }

    客户端发送一个消息,服务端回复一个同样的消息的实现

     服务端:

    public class NIOServer {
        public static void main(String[] args) {
            ServerSocketChannel serverSocketChannel = null;
            try {
                //创建ServerSocketChannel通道实例
                serverSocketChannel = ServerSocketChannel.open();
    
                //绑定端口
                serverSocketChannel.bind(new InetSocketAddress(9998));
                System.out.println("服务端启动了");
    
                //将serverSocketChannel设置为非阻塞  configureBlocking设置阻塞非阻塞 false:非阻塞  true:阻塞
                serverSocketChannel.configureBlocking(false);
    
                //创建selector选择器
                Selector selector = Selector.open();
    
                //将通道serverSocketChannel注册到选择器selector,关注可接受事件
                serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
    
                //等待监听结果,调用选择器的select阻塞等待,直到有事件发生才返回
                while (selector.select() > 0) {
                    Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
                    while (iterator.hasNext()) {
                        SelectionKey selectionKey = iterator.next();
                        iterator.remove();
                        //是否是可接受事件
                        if (selectionKey.isAcceptable()) {
                            System.out.println("可接受事件");
                            //有新用户连接
                            ServerSocketChannel serverSocketChannel1 = (ServerSocketChannel) selectionKey.channel();
    
                            //接受客户端的连接,通过accept(不在阻塞)接受一个SocketChannel通道
                            SocketChannel socketChannel = serverSocketChannel1.accept();
    
                            //设置socketChannel为非阻塞
                            socketChannel.configureBlocking(false);
    
                            //将socketChannel注册到选择器selector选择器,关注可读事件
                            socketChannel.register(selector, SelectionKey.OP_READ);
                        }
    
                        //是否是可读事件
                        if (selectionKey.isReadable()) {
                            System.out.println("可读事件");
    
                            //获取SocketChannel通道
                            SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
    
                            //创建Buffer
                            ByteBuffer buffer = allocate(100);
                            //进行读取操作
                            socketChannel.read(buffer);
                            //进行读写模式的切换
                            buffer.flip();
                            //将数据从Buffer中读取
                            byte[] bytes = new byte[buffer.remaining()];
                            buffer.get(bytes);
                            String msg = new String(bytes, 0, bytes.length);
                            //给客户端回复消息
                            buffer.clear();
                            buffer.put(("服务端回复"+msg).getBytes());
                            buffer.flip();
                            socketChannel.write(buffer);
                            //打印结果
                            System.out.println("客户端:"+socketChannel.getRemoteAddress()+msg);
                            if(msg.equals("") || "exit".equals(msg)){
                                selectionKey.cancel();
                                socketChannel.close();
                            }
    
                        }
                    }
                }
    
    
            } catch (IOException e) {
                e.printStackTrace();
            } finally {
                //关闭资源
                try {
                    if (serverSocketChannel != null) {
                        serverSocketChannel.close();
                    }
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    
    }

    客户端:

    public class NIOClient {
        public static void main(String[] args) {
            SocketChannel socketChannel = null;
            try {
                //创建SocketChannel通道
                socketChannel = SocketChannel.open();
    
                //设置socketChannel为非阻塞
                socketChannel.configureBlocking(false);
    
                //创建Selector选择器
                Selector selector = Selector.open();
    
                //主动的进行连接,connect操作不在会阻塞,会直接返回,如果连接成功返回true ,连接还未完成返回false
                if (!socketChannel.connect(new InetSocketAddress("127.0.0.1",9998))) {
                    //当前连接操作未完成
                    //将SocketChannel注册到选择器,并关注可连接事件
                    socketChannel.register(selector, SelectionKey.OP_CONNECT);
    
                    //等待连接完成
                    selector.select();
    
                    Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
                    while (iterator.hasNext()) {
                        SelectionKey selectionKey = iterator.next();
                        iterator.remove();
    
                        //是否是可连接事件
                        if (selectionKey.isConnectable()) {
                            //可连接事件完成
                            SocketChannel channel = (SocketChannel) selectionKey.channel();
                            //连接操作完成
                            channel.finishConnect();
                        }
                    }
                }
    
                //注册读事件
                socketChannel.register(selector,SelectionKey.OP_READ);
                Scanner scanner = new Scanner(System.in);
                //连接成功,给服务端发送消息
                ByteBuffer buffer = ByteBuffer.allocate(100);
                while (scanner.hasNext()){
                    String msg = scanner.nextLine();
                    //将发送的数据写到Buffer中
                    buffer.put((msg+"
    ").getBytes());
                    //读写模式的切换
                    buffer.flip();
                    socketChannel.write(buffer);
    
                    //关注服务端的读事件
                    selector.select();
                    Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
                    while (iterator.hasNext()){
                        SelectionKey selectionKey = iterator.next();
                        //判断是否是可读事件
                        if(selectionKey.isReadable()){
                            SocketChannel channel = (SocketChannel) selectionKey.channel();
                            //读操作
                            buffer.clear();
                            channel.read(buffer);
                            //读写模式切换
                            buffer.flip();
                            byte[] bytes = new byte[buffer.remaining()];
                            buffer.get(bytes);
                            String s = new String(bytes);
                            System.out.println(s);
                        }
                    }
                    if("".equals(msg) || "exit".equals(msg)) break;
                        buffer.clear();   //每次使用需要清空,循环使用
    
                }
    
    
    
                //关闭资源
                selector.close();
            } catch (IOException e) {
                e.printStackTrace();
            } finally {
                if (socketChannel != null) {
                    try {
                        socketChannel.close();
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
            }
        }
    }

    NIO+多线程

        NIO中一个selector可以关注多个用户的连接(即一个线程可以同时处理多个用户的通信),为了并发用户量能够处理更多,可以使用NIO+多线程的形式来处理;其主要处理思路是主线程接收来自客户端的连接(accept),子线程处理用户的IO操作,主线程接收到客户端连接Socketchannel通道,将SocketChannel交给子线程。并且应该在主线程和子线程中使用各自的Selector,假如主线程和子线程共用一个选择器,当处理来自客户端的请求是一个连接请求时,子线程恰好获取到,就会造成混乱。

    服务端:

    public class MTNIOServer {
        public static void main(String[] args) {
            ServerSocketChannel serverSocketChannel = null;
            try {
                //创建ServerSocketChannel通道实例
                serverSocketChannel = ServerSocketChannel.open();
    
                //绑定端口
                serverSocketChannel.bind(new InetSocketAddress(9998));
                System.out.println("服务端启动了");
    
                //将serverSocketChannel设置为非阻塞  configureBlocking设置阻塞非阻塞 false:非阻塞  true:阻塞
                serverSocketChannel.configureBlocking(false);
    
                //创建selector选择器
                Selector selector = Selector.open();
    
                //将通道serverSocketChannel注册到选择器selector,关注可接受事件
                serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
    
                //子线程以线程池的形式提供
                ExecutorService executorService = Executors.newFixedThreadPool(10);
    
                //等待监听结果,调用选择器的select阻塞等待,直到有事件发生才返回
                while (selector.select() > 0) {
                    Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
                    while (iterator.hasNext()) {
                        SelectionKey selectionKey = iterator.next();
                        iterator.remove();
                        //是否是可接受事件
                        if (selectionKey.isAcceptable()) {
                            System.out.println("可接受事件");
                            //有新用户连接
                            ServerSocketChannel serverSocketChannel1 = (ServerSocketChannel) selectionKey.channel();
                            //获取新用户channel
                            SocketChannel socketChannel = serverSocketChannel1.accept();
                            System.out.println(Thread.currentThread().getName()+":客户端:"+socketChannel.getRemoteAddress()+" 连接上了");
    
                            //将通道交给子线程
                            executorService.submit(new NIOServerHandler(socketChannel));
    
                        }
                    }
                }
            } catch (Exception e) {
    
            }
        }
    }
    
    
    
    public class NIOServerHandler implements Runnable {
        //通过主线程将socketChannel获取到
        private SocketChannel socketChannel;
        //创建selector实例
        private Selector selector = null ;
        
        public NIOServerHandler(SocketChannel socketChannel) {
            this.socketChannel = socketChannel;
            try {
                //在一个子线程中只需要创建一个selector实例
                if (selector == null)
                    selector = Selector.open();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    
        @Override
        public void run() {
            try {
                //将socketChannel设置为非阻塞
                socketChannel.configureBlocking(false);
    
                //将socketChannel注册到选择器中,并且关注可读事件
                socketChannel.register(selector, SelectionKey.OP_READ);
    
                while (selector.select() > 0) {
                    Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
                    while (iterator.hasNext()) {
                        SelectionKey selectionKey = iterator.next();
                        iterator.remove();
    
                        //是否是可读事件
                        if (selectionKey.isReadable()) {
                            //获取SocketChannel通道
                            SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
    
                            //创建Buffer
                            ByteBuffer buffer = ByteBuffer.allocate(100);
                            //进行读取操作
                            socketChannel.read(buffer);
                            //进行读写模式的切换
                            buffer.flip();
                            //将数据从Buffer中读取
                            byte[] bytes = new byte[buffer.remaining()];
                            buffer.get(bytes);
    
                            String msg = new String(bytes, 0, bytes.length);
                            //给客户端回复消息
                            buffer.clear();
                            buffer.put(("客户端回复:"+msg).getBytes());
                            //读写模式切换
                            buffer.flip();
                            //回复消息
                            socketChannel.write(buffer);
    
                            //打印结果
                            System.out.println(Thread.currentThread().getName()+"客户端:"+socketChannel.getRemoteAddress()+" 消息:"+msg);
                            if ("".equals(msg)|| "exit".equals(msg)){
                                System.out.println(Thread.currentThread().getName()+"客户端:"+socketChannel.getRemoteAddress()+" 下线");
                                //当前注册的感兴趣事件取消
                                selectionKey.cancel();
                                //关闭通道
                                socketChannel.close();
                            }
    
                        }
                    }
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

    NIO相比于BIO,一个线程可以处理多个socket,NIO+多线程提高网络服务性能,降低线程的数量。在服务器端线程过多地影响:

    1. Java中创建线程,最终会映射到本地操作系统上的进程所对应的线程,Linux为例,fork是重量级的操作,系统开销大
    2. 多线程需要CPU调度,会有上下文的切换,线程过多时,上下文切换的时间会趋近于或大于线程本身业务执行的时间,CPU就存在浪费,降低了系统的性能
    3. 线程创建需要开辟线程私有的内空间,线程过多的话,为线程运行准备的内存就会占去很大一部分,真正用来分配还给业务的内存就大大减少,系统运行不可靠
    4. 线程过多的业务中,线程阻塞,等待网络事件的发生,如果一瞬间客户端的请求量比较大,系统会瞬间唤醒很多数量的线程,造成系统内存和CPU使用率居高不小,造成系统负载过高
  • 相关阅读:
    ASP.NET Core 集成 WebSocket
    如何在CentOS7上安装桌面环境?
    Visual Studio 2017 远程调试(Remote Debugger)应用
    cmd sc命令进行服务操作
    EntityFrameworkCode 操作MySql 相关问题
    Windows下安装PHP开发环境
    未在本地计算机上注册“microsoft.ACE.oledb.12.0”提供程序
    System.Runtime.InteropServices.COMException (0x800A03EC): 无法访问文件
    c# WinFo判断当前程序是否已经启动或存在的几种方式
    MVC自定义视图引擎地址
  • 原文地址:https://www.cnblogs.com/128-cdy/p/13202656.html
Copyright © 2011-2022 走看看