zoukankan      html  css  js  c++  java
  • JavaIO演进之路

    先提两大问题:

    • 为什么要学Netty?

    Spring5 底层用Netty

    Spring Boot 不需要netty,它内部实现了web容器

    Zookeeper 也是用的Netty

    Dubbo 分布式服务框架 多协议支持(RPC) Netty

    有可能成为Java架构师的你的筑基

    • Netty能帮我们解决什么问题?

    框架:简化开发一系列解决方案的集合,封装IO操作的框架

    复杂业务场景中,没有说用一个单独IO API

    IO + 多线程来解决问题 Netty是用来封装IO操作

    本篇学习目标:

    • 掌握Java中BIO、NIO、AIO之间的区别及应用场景。
    • 透彻理解阻塞(Block)与非阻塞(Non-Block)区别。
    • 透彻理解同步(Synchronization)和异步(Asynchronous)的区别。

    本篇内容定位:

    •  适合具有网络通信开发经验的人群。
    • 适合具有1-3年Java Web开发经验的人群。

    input和output:

    input和output是相对于内存而言的,向内存中写入input,从内存中读出output

     阻塞要等待机器内存数据完全准备好,才能读向程序内存,非阻塞有动态轮询机制。

     阻塞和非阻塞是一种读取数据的策略,非阻塞实现了多路复用

     数据先进入缓冲区,然后轮询完成

    同步和异步:

    在处理数据的时候,在同一时间点,能做多个处理:异步,在同一时间只能做一个处理:同步。

    阻塞(Block)和非阻塞(Non-Block)

    阻塞:往往需要等待缓冲区中的数据准备好过后才处理其他事情,否则一直等待在那里。

    非阻塞:当我们的进程访问我们的数据缓冲区的时候,如果数据没有准备好则直接返回,不会等待。如果数据已经准备好,也直接返回。

     阻塞和非阻塞是进程在访问数据的时候,数据是否准备就绪的一种处理方式,当数据没有准备的时候。

    IO(BIO)、NIO与AIO对比:

     IO(BIO) Block IO  同步阻塞IO

    NIO  Non-Block IO 同步非阻塞IO (可以用线程池实现异步)

    AIO(NIO2) Async IO 异步非阻塞IO (事件驱动,回调)

     改进目的:提升IO操作的性能、IO框架出现

    代码案例

    BIO服务端:

    /**
     * @ClassName BIOServer
     * @Author 周聪
     * @Date 2021/2/1 21:50
     * @Version 1.0
     * @Description BIO服务端
     */
    public class BIOServer {
    
        /**
         * 服务的网络IO模型的封装对象
         */
        ServerSocket serverSocket;
    
        /**
         * 服务器
         *
         * @param port
         */
        public BIOServer(int port) {
            try {
    //            Tomcat 默认端口8080
    //            只要是Java写的底层都是ServerSocket
                serverSocket = new ServerSocket(port);
                System.out.println("BIO服务已启动,监听端口是:" + port);
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    
        public static void main(String[] args) throws IOException {
            new BIOServer(8080).listen();
        }
    
        /**
         * 开始监听,并处理逻辑
         *
         * @throws IOException
         */
        public void listen() throws IOException {
    //        循环监听
            while (true) {
    //            等待客户端连接,阻塞方法,只有客户端把数据发过来的时候才会动,否则一直等,程序不会进行下去
    //            Socket数据发送者在服务端的引用
                Socket client = serverSocket.accept();
                System.out.println(client.getPort());
    //            对方发数据给我了,读Input
                InputStream is = client.getInputStream();
    //            JVM内存
    //            网络客户端把数据发送到网卡,机器所得到的数据读到了JVM内存中
                byte[] buff = new byte[1024];
                int len = is.read(buff);
                if (len > 0) {
                    String msg = new String(buff, 0, len);
                    System.out.println("收到" + msg);
                }
    
            }
        }
    }

    客户端(NIO和BIO都可用)

    /**
     * @ClassName BIOClient
     * @Author 周聪
     * @Date 2021/2/1 21:43
     * @Version 1.0
     * @Description BIO和NIO的客户端
     */
    public class BIOClient {
    //    FileOutputStream、FileInputStream 这里不拿磁盘操作案例,大家都很熟悉
    
        public static void main(String[] args) throws IOException {
    //        要和谁进行通信,服务器IP、服务器的端口
    //      一台机器的端口号是有限的
            Socket client = new Socket("localhost", 8080);
    //        输出 不管是客户端还是服务端,都有可能write和read
            OutputStream os = client.getOutputStream();
    //        生成一个随机的ID
            String name = UUID.randomUUID().toString();
            System.out.println("客户端发送数据:" + name);
            os.write(name.getBytes());
            os.close();
            client.close();
        }
    }

    NIO服务端

    /**
     * @ClassName NIOServerDemo
     * @Author 周聪
     * @Date 2021/2/1 22:27
     * @Version 1.0
     * @Description NIO服务端 NIO的操作过于繁琐,于是才有了Netty
     * Netty就是对这一系列非常繁琐的操作进行了封装.
     */
    public class NIOServerDemo {
    
        private int port = 8080;
        //    准备两个东西
        /**
         * 轮询器 Selector 大堂经理
         */
        private Selector selector;
    
        /**
         * 缓冲区 Buffer  等候区
         */
        private ByteBuffer buffer = ByteBuffer.allocate(1024);
    
        public NIOServerDemo(int port) {
    //        初始化大堂经理,开门营业
            try {
                this.port = port;
                ServerSocketChannel server = ServerSocketChannel.open();
    //            我得告诉地址,接客 IP/Port
                server.bind(new InetSocketAddress(this.port));
    //            BIO 升级版本 NIO ,为了兼容BIO ,NIO模型默认是采用阻塞式
                server.configureBlocking(false);
    //            大堂经理准备就绪,接客
                selector = Selector.open();
    //            在门口端牌子,正在营业
                server.register(selector, SelectionKey.OP_ACCEPT);
    
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    
        public static void main(String[] args) {
            new NIOServerDemo(8080).listen();
        }
    
        public void listen() {
            System.out.println("listen on " + this.port + ".");
    //        轮询主线程
            try {
                while (true) {
                    System.out.println("listen on " + this.port + ".");
    //                大堂经理再叫号
                    selector.select();
    //                每次都拿到所以的号子
                    Set<SelectionKey> selectionKeys = selector.selectedKeys();
    //                不断地迭代,就叫轮询
                    Iterator<SelectionKey> iterator = selectionKeys.iterator();
    //                同步体现在这里,因为每次只能拿一个key,每次只能处理一种状态
                    while (iterator.hasNext()) {
                        SelectionKey key = iterator.next();
                        iterator.remove();
    //                    每一个key代表一种状态
    //                    每一个号对应一个业务,这里体现为 数据就绪,数据可读,数据可写等待...
                        process(key);
                    }
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    
        /**
         * 具体办业务的方法,坐班柜员
         * 每一次轮询就是调用一次process方法,而每次调用只能干一件事
         * 在同一时间点,只能干一件事
         *
         * @param key
         */
        private void process(SelectionKey key) {
    
            try {
    //        针对每一种状态给一个反应
                if (key.isAcceptable()) {
                    ServerSocketChannel server = (ServerSocketChannel) key.channel();
    //                这个方法体现非阻塞,不管你数据有没有准备好
    //                你给我一个状态和反馈
                    SocketChannel channel = server.accept();
                    channel.configureBlocking(false);
    //                当数据准备就绪的时候,将状态改为可读
                    key = channel.register(selector, SelectionKey.OP_READ);
                } else if (key.isReadable()) {
    //                key.channel 从多路复用器中拿客户端的引用
                    SocketChannel channel = (SocketChannel) key.channel();
                    int len = channel.read(buffer);
                    if (len > 0) {
                        buffer.flip();
                        String content = new String(buffer.array(), 0, len);
                        channel.register(selector, SelectionKey.OP_WRITE);
    //                    在key上携带一个附件,一会再写出去
                        key.attach(content);
                        System.out.println("读取内容:" + content);
                    }
                } else if (key.isWritable()) {
                    SocketChannel channel = (SocketChannel) key.channel();
                    String content = (String) key.attachment();
                    channel.write(ByteBuffer.wrap(("输出:" + content).getBytes()));
                    channel.close();
                }
    
    
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

    AIO服务端

    /**
     * @ClassName AIOServer
     * @Author 周聪
     * @Date 2021/2/1 23:21
     * @Version 1.0
     * @Description AIO服务端
     */
    public class AIOServer {
    
        private final int port;
    
        public AIOServer(int port) {
            this.port = port;
            listen();
        }
    
        public static void main(String[] args) {
            int port = 8000;
            new AIOServer(port);
        }
    
        private void listen() {
            try {
                ExecutorService executorService = Executors.newCachedThreadPool();
                AsynchronousChannelGroup threadPool = AsynchronousChannelGroup.withCachedThreadPool(executorService, 1);
    //            开门营业
    //            工作线程,用来侦听回调,事件响应的时候需要回调
                final AsynchronousServerSocketChannel server = AsynchronousServerSocketChannel.open(threadPool);
                server.bind(new InetSocketAddress(port));
                System.out.println("服务已启动,监听端口: " + port);
    
    //            准备接受数据
                server.accept(null, new CompletionHandler<AsynchronousSocketChannel, Object>() {
    
                    final ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
    
                    /**
                     * 回调有两个状态:成功
                     * 实现completed方法来回调 由操作系统来触发
                     * @param result
                     * @param attachment
                     */
                    @Override
                    public void completed(AsynchronousSocketChannel result, Object attachment) {
                        System.out.println("IO操作成功,开始获取数据");
                        try {
                            byteBuffer.clear();
                            result.read(byteBuffer).get();
                            byteBuffer.flip();
                            result.write(byteBuffer);
                            byteBuffer.flip();
                        } catch (Exception e) {
                            e.printStackTrace();
                        } finally {
                            try {
                                result.close();
                                server.accept(null, this);
                            } catch (Exception e) {
                                e.printStackTrace();
                            }
                        }
                        System.out.println("操作完成");
                    }
    
                    /**
                     * 回调有两个状态:失败
                     * @param exc
                     * @param attachment
                     */
                    @Override
                    public void failed(Throwable exc, Object attachment) {
                        System.out.println("IO操作失败: " + exc.getStackTrace());
                    }
    
                });
    
                try {
                    Thread.sleep(Integer.MAX_VALUE);
                } catch (Exception e) {
                    e.printStackTrace();
                }
    
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

    AIO客户端

    /**
     * @ClassName AIOClient
     * @Author 周聪
     * @Date 2021/2/1 23:39
     * @Version 1.0
     * @Description AIO客户端
     */
    public class AIOClient {
    
        private final AsynchronousSocketChannel clientChannel;
    
        public AIOClient() throws Exception {
            clientChannel = AsynchronousSocketChannel.open();
        }
    
        public static void main(String[] args) throws Exception {
            new AIOClient().connect("localhost", 8000);
        }
    
        public void connect(String host, int port) throws Exception {
            clientChannel.connect(new InetSocketAddress(host, port), null, new CompletionHandler<Void, Object>() {
                /**
                 * 回调成功状态的方法
                 * @param result
                 * @param attachment
                 */
                @Override
                public void completed(Void result, Object attachment) {
                    try {
                        clientChannel.write(ByteBuffer.wrap("这是一条测试数据".getBytes())).get();
                        System.out.println("已发送至服务器");
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
    
                /**
                 * 回调失败状态的方法
                 * @param exc
                 * @param attachment
                 */
                @Override
                public void failed(Throwable exc, Object attachment) {
                    exc.printStackTrace();
                }
            });
    
            final ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
            clientChannel.read(byteBuffer, null, new CompletionHandler<Integer, Object>() {
                /**
                 * 回调成功状态的方法
                 * @param result
                 * @param attachment
                 */
                @Override
                public void completed(Integer result, Object attachment) {
                    System.out.println("IO操作完成:" + result);
                    System.out.println("获取反馈结果:" + new String(byteBuffer.array()));
                }
    
                /**
                 * 回调失败状态的方法
                 * @param exc
                 * @param attachment
                 */
                @Override
                public void failed(Throwable exc, Object attachment) {
                    exc.printStackTrace();
                }
            });
    
            try {
                Thread.sleep(Integer.MAX_VALUE);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

    总结(应用场景)

    JDK1.4以前都是BIO,之后NIO出现,性能得到大幅提升,Netty默认用的NIO

    JDK1.7   NIO  === > 出现NIO2(AIO) 操作系统的性能,决定的IO的性能(存在兼容问题),目前不是主流

    所有的IO实现异步都很容易,加入线程就可以,线程过多会有问题(CPU会爆),Netty引入了反应堆(线程池+调度)的概念。而AIO提供异步不需要。

    BIO/NIO/AIO  他们的底层都是TCP/IP协议

    欢迎批评指正。 附:源码地址

  • 相关阅读:
    数组
    Spring创建对象的三种方式以及创建时间
    Struts文件上传下载
    自定义拦截器
    Struts过滤器
    mybatis整合ehcache
    mybatis主键返回
    shell脚本 列出所有网卡的ip地址
    Servlet执行过程
    centos时区
  • 原文地址:https://www.cnblogs.com/itzhoucong/p/14359887.html
Copyright © 2011-2022 走看看