zoukankan      html  css  js  c++  java
  • Netty 框架学习 —— 传输


    概述

    流经网络的数据总是具有相同的类型:字节,这些字节如何传输主要取决于我们所说的网络传输。用户并不关心传输的细节,只在乎字节是否被可靠地发送和接收

    如果使用 Java 网络编程,你会发现,某些时候当你需要支持高并发连接,随后你尝试将阻塞传输切换为非阻塞传输,那么你会因为这两种 API 的截然不同而遇到问题。Netty 提供了一个通用的 API,这使得转换更加简单。


    传统的传输方式

    这里介绍仅使用 JDK API 来实现应用程序的阻塞(OIO)和非阻塞版本(NIO)

    阻塞网络编程如下:

    public class PlainOioServer {
    
        public void server(int port) throws IOException {
            // 将服务器绑定到指定端口
            final ServerSocket socket = new ServerSocket(port);
            try {
                while (true) {
                    // 接收连接
                    final Socket clientSocket = socket.accept();
                    System.out.println("Accepted connection from " + clientSocket);
                    // 创建一个新的线程来处理连接
                    new Thread(() -> {
                        OutputStream out;
                        try {
                            out = clientSocket.getOutputStream();
                            // 将消息写给已连接的客户端
                            out.write("Hi
    ".getBytes(StandardCharsets.UTF_8));
                            out.flush();
                            // 关闭连接x
                            clientSocket.close();
                        } catch (IOException e) {
                            e.printStackTrace();
                        } finally {
                            try {
                                clientSocket.close();
                            } catch (IOException e) {
                                e.printStackTrace();
                            }
                        }
                    }).start();
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
    

    这段代码可以处理中等数量的并发客户端,但随着并发连接的增多,你决定改用异步网络编程,但异步的 API 是完全不同的

    非阻塞版本如下:

    public class PlainNioServer {
    
        public void server(int port) throws IOException {
            ServerSocketChannel serverChannel = ServerSocketChannel.open();
            serverChannel.configureBlocking(false);
            ServerSocket ssocket = serverChannel.socket();
            InetSocketAddress address = new InetSocketAddress(port);
            // 将服务器绑定到选定的端口
            ssocket.bind(address);
            // 打开 Selector 来处理 Channel
            Selector selector = Selector.open();
            // 将 ServerSocket 注册到 Selector 以接受连接
            serverChannel.register(selector, SelectionKey.OP_ACCEPT);
            final ByteBuffer msg = ByteBuffer.wrap("Hi
    ".getBytes());
            while (true) {
                try {
                    // 等待需要处理的新事件,阻塞将一直持续到下一个传入事件
                    selector.select();
                } catch (IOException e) {
                    e.printStackTrace();
                    break;
                }
                Set<SelectionKey> readKeys = selector.selectedKeys();
                Iterator<SelectionKey> iterator = readKeys.iterator();
                while (iterator.hasNext()) {
                    SelectionKey key = iterator.next();
                    iterator.remove();
                    try {
                        // 检查事件是否是一个新的已经就绪可以被接受的连接
                        if (key.isAcceptable()) {
                            ServerSocketChannel server = (ServerSocketChannel) key.channel();
                            SocketChannel client = server.accept();
                            client.configureBlocking(false);
                            // 接受客户端,并将它注册到选择器
                            client.register(selector, SelectionKey.OP_WRITE | SelectionKey.OP_READ, msg.duplicate());
                            System.out.println("Accepted connection from " + client);
                        }
                        // 检查套接字是否已经准备好写数据
                        if (key.isWritable()) {
                            SocketChannel client = (SocketChannel) key.channel();
                            ByteBuffer buffer = (ByteBuffer) key.attachment();
                            while (buffer.hasRemaining()) {
                                // 将数据写到已连接的客户端
                                if (client.write(buffer) == 0) {
                                    break;
                                }
                            }
                            client.close();
                        }
                    } catch (IOException exception) {
                        key.cancel();
                        try {
                            key.channel().close();
                        } catch (IOException cex) {
                            cex.printStackTrace();
                        }
                    }
                }
            }
        }
    }
    

    可以看到,阻塞和非阻塞的代码是截然不同的。如果为了实现非阻塞而完全重写程序,无疑十分困难


    基于 Netty 的传输

    使用 Netty 的阻塞网络处理如下:

    public class NettyOioServer {
    
        public void server(int port) throws Exception {
            final ByteBuf buf = Unpooled.unreleasableBuffer(
                    Unpooled.copiedBuffer("Hi
    
    ", StandardCharsets.UTF_8));
            EventLoopGroup group = new OioEventLoopGroup();
            try {
                ServerBootstrap b = new ServerBootstrap();
                b.group(group)
                        // 使用阻塞模式
                        .channel(OioServerSocketChannel.class)
                        .localAddress(new InetSocketAddress(port))
                        .childHandler(new ChannelInitializer<SocketChannel>() {
    
                            @Override
                            protected void initChannel(SocketChannel ch) throws Exception {
                                ch.pipeline().addLast(
                                        new SimpleChannelInboundHandler<>() {
                                            @Override
                                            protected void messageReceived(ChannelHandlerContext ctx, Object msg) throws Exception {
                                                ctx.writeAndFlush(buf.duplicate())
                                                        .addListener(ChannelFutureListener.CLOSE);
                                            }
                                        });
                            }
                        });
                ChannelFuture f = b.bind().sync();
                f.channel().closeFuture().sync();
            } finally {
                group.shutdownGracefully().sync();
            }
        }
    }
    

    而非阻塞版本和阻塞版本几乎一模一样,只需要改动两处地方

    EventLoopGroup group = new NioEventLoopGroup();
    b.group(group).channel(NioServerSocketChannel.class);
    

    传输 API

    传输 API 的核心是 interface Channel,它被用于所有的 IO 操作。每个 Channel 都将被分配一个 ChannelPipeline 和 ChannelConfig,ChannelConfig 包含了该 Channel 的所有配置设置,ChannelPipeline 持有所有将应用于入站和出站数据以及事件的 ChannelHandler 实例

    除了访问所分配的 ChannelPipeline 和 ChannelConfig 之外,也可以利用 Channel 的其他方法

    方法名 描述
    eventLoop 返回分配给 Channel 的 EventLoop
    pipeline 返回分配给 Channel 的 ChannelPipeline
    isActive 如果 Channel 活动的,返回 true
    localAddress 返回本地的 SocketAddress
    remoteAddress 返回远程的 SocketAddress
    write 将数据写到远程节点
    flush 将之前已写的数据冲刷到底层传输
    writeAndFlush 等同于调用 write() 并接着调用 flush()

    内置的传输

    Netty 内置了一些可开箱即用的传输,但它们所支持的协议不尽相同,因此你必须选择一个和你的应用程序所使用协议相容的传输

    名称 描述
    NIO io.netty.channel.socket.nio 使用 java.nio.channels 包作为基础
    Epoll io.netty.channel.epoll 由 JNI 驱动的 epoll() 和非阻塞 IO,可支持只有在 Linux 上可用的多种特性,比 NIO 传输更快,且完全非阻塞
    OIO io.netty.channel.socket.oio 使用 java.net 包作为基础
    Local io.netty.channel.local 可以在 VM 内部通过管道进行通信的本地传输
    Embedded io.netty.channel.embedded Embedded 传输,允许使用 ChannelHandler 而不需要一个真正的基于网络的传输,主要用于测试

  • 相关阅读:
    20189215 2018-2019-2 《密码与安全新技术专题》第5周作业
    2018-2019-2 20189215 《网络攻防技术》第五周作业
    Ubuntu18.04安装Openssl-1.1.1
    2018-2019-2 20189215 《网络攻防技术》第四周作业
    2018-2019-2 20189215 《网络攻防技术》第三周作业
    Python—构造单向链表数据类型
    Python—使用列表构造栈数据结构
    Python—快速排序算法
    Python—使用Json序列化Datetime类型
    Linux基本命令
  • 原文地址:https://www.cnblogs.com/Yee-Q/p/14878616.html
Copyright © 2011-2022 走看看