zoukankan      html  css  js  c++  java
  • Netty(三)Netty模型

    1. Netty模型

    Netty主要基于主从Reactor多线程模型做了一定的改进,其中主从Reactor多线程模型有多个Reactor。

    简版图:

     

    说明:

    1. BossGroup线程维护Selector,只关注Accept
    2. 当接收到Accept事件,获取到对应的SocketChannel,封装成NIOSocketChannel并注册到Worker线程(事件循环),并进行维护
    3. 当Worker线程监听到selector中通道发生自己感兴趣的事件后,就进行处理(就由handler),注意,handler已经加入到channel

    Netty模型进阶版图:

     

    详细版:

     

    模型说明:

    1. Netty抽象出2组线程池:
      • BossGroup:专门接受客户端的连接
      • WorkerGroup:专门负责网络的读写
    2. BossGroup和WorkerGroup类型都是NioEventLoopGroup
    3. NioEventLoopGroup相当于一个事件循环组,这个组中含有多个事件循环,每一个事件循环是NioEventLoop
    4. NioEventLoop表示一个不断循环的执行处理任务的线程,每个NioEventLoop都有一个selector,用于监听绑定在其上的socket的网络通讯
    5. NioEventLoop可以有多个线程,即可以含有多个NioEventLoop
    6. 每个Boss NioEventLoop循环执行的步骤有3步:
      • 轮询accept事件
      • 处理accept事件,与client建立连接,生成NioSocketChannel,并将其注册到某个worker
      • 处理任务队列的任务,即runAllTasks
    7. 每个Worker NIOEventLoop循环执行的步骤:
      • 轮询read,write事件
      • 处理I/O事件,即read,write事件,在对应NioSocketChannel处理
      • 处理任务队列的任务,即runAllTasks
    8. 每个Worker NIOEventLoop处理业务时,会使用pipeline(管道),pipeline中包含了channel,即通过pipeline可以获取到对应channel,管道中维护了很多的处理器

    2. Netty实现简单服务端与客户端交互

    NettyServer.java

    package com.tang.netty.simple;
    
    import io.netty.bootstrap.ServerBootstrap;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.ChannelOption;
    import io.netty.channel.EventLoopGroup;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioServerSocketChannel;
    
    public class NettyServer {
    
        public static void main(String[] args) {
            // 创建 BossGroup 和 WorkerGroup
            // 1.创建 2 个线程组 boosGroup 和 workerGroup
            // 2.bossGroup只是处理连接请求,真正与客户端的业务处理,会交给workerGroup
            // 3.两个都是无限循环
            // 4.bossGroup 和 workerGroup 含有的子线程(NioEventLoop)的个数,默认为实际 CPU 核数*2
            EventLoopGroup bossGroup = new NioEventLoopGroup();
            EventLoopGroup workerGroup = new NioEventLoopGroup();
    
            // 创建服务器端的启动对象,配置参数
            ServerBootstrap bootstrap = new ServerBootstrap();
    
            // 使用链式编程来进行设置
            bootstrap.group(bossGroup, workerGroup) // 设置2个线程组
                    .channel(NioServerSocketChannel.class)  // 使用 NioServerSocketChannel 作为服务器通道的实现
                    .option(ChannelOption.SO_BACKLOG, 128)  // 设置线程队列得到连接数
                    .childOption(ChannelOption.SO_KEEPALIVE, true)  // 设置保持活动连接状态
                    .childHandler(new ChannelInitializer<SocketChannel>() {  // 创建一个通道测试对象(匿名对象)
                        // 给pipeline 设置处理器
                        // 使用的自己实现的 NettyServerHandler
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ch.pipeline().addLast(new NettyServerHandler());
                        }
                    });  // 给我们 workerGroup 的 EventLoop 对应的管道设置处理器
    
            System.out.println("....Server is ready...");
    
            try {
                // 绑定一个端口并且同步,生成了一个 ChannelFuture 对象
                // 启动服务器
                ChannelFuture cf = bootstrap.bind(6668).sync();
    
                // 对关闭通道进行监听
                cf.channel().closeFuture().sync();
    
            } catch (InterruptedException e){
                e.printStackTrace();
            } finally {
                bossGroup.shutdownGracefully();
                workerGroup.shutdownGracefully();
            }
    
        }
    }

    NettyServerHandler.java

    package com.tang.netty.simple;
    
    import io.netty.buffer.ByteBuf;
    import io.netty.buffer.Unpooled;
    import io.netty.channel.Channel;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.channel.ChannelInboundHandlerAdapter;
    import io.netty.channel.ChannelPipeline;
    import io.netty.util.CharsetUtil;
    
    import java.nio.ByteBuffer;
    
    /**
     *  我们自定义一个Handler 需要继承netty 规定好的某个 HandlerAdapter(规范)
     *  这个 ChannelInboundHandlerAdapter 是 Pipeline 里的 ChannelHandler 的一个实现
     */
    public class NettyServerHandler extends ChannelInboundHandlerAdapter {
    
        // 读取数据(这里我们可以读取客户端发送的消息)
    
        /**
         1.ChannelHandlerContext ctx:上下文对象,含有管道 pipeline,通道 channel,地址
         2.Object msg:就是客户端发送的数据,默认Object
         */
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception{
            System.out.println("服务器读取线程 " + Thread.currentThread().getName());
            System.out.println("server ctx = " + ctx);
            System.out.println("查看 channel 与 pipeline 的关系");
            Channel channel = ctx.channel();
            ChannelPipeline pipeline = ctx.pipeline();  // 本质是一个双向链表,出战入站
    
            // 将 msg 转为 ByteBuf
            // ByteBuf 是 Netty 提供的,不是 NIO 的 ByteBuffer
            ByteBuf buf = (ByteBuf) msg;
            System.out.println("客户端发送的消息是:" + buf.toString(CharsetUtil.UTF_8));
            System.out.println("客户端地址:" + channel.remoteAddress());
    
        }
    
        // 数据读取完毕
        @Override
        public void channelReadComplete(ChannelHandlerContext ctx) throws Exception{
    
            // writeAndFlush 是 write + flush
            // 将数据写入到缓冲,并刷新
            // 一般讲,我们对这个发送的数据进行编码
            ctx.writeAndFlush(Unpooled.copiedBuffer("Hello,客户端~ 收到", CharsetUtil.UTF_8));
        }
    
        // 处理异常,一般是需要关闭通道
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            ctx.close();
        }
    }

    NettyClient.java

    package com.tang.netty.simple;
    
    import io.netty.bootstrap.Bootstrap;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.EventLoopGroup;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioSocketChannel;
    
    public class NettyClient {
    
        public static void main(String[] args) {
            // 客户端需要一个事件循环组
            EventLoopGroup eventExecutors = new NioEventLoopGroup();
    
            try{
                // 创建客户端启动对象
                // 注意客户端使用的不是 ServerBootstrap 而是 Bootstrap
                Bootstrap bootstrap = new Bootstrap();
    
                // 设置相关参数
                bootstrap.group(eventExecutors)  // 设置线程组
                        .channel(NioSocketChannel.class) // 设置客户端Channel的实现类
                        .handler(new ChannelInitializer<SocketChannel>() {
                            @Override
                            protected void initChannel(SocketChannel ch) throws Exception {
                                ch.pipeline().addLast(new NettyClientHandler());  // 加入自己的处理器
                            }
                        });
    
                System.out.println("客户端 ok...");
    
                // 启动客户端去连接服务器端
                ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 6668).sync();
    
                // 给关闭通道进行监听
                channelFuture.channel().closeFuture().sync();
    
            } catch (InterruptedException e){
                e.printStackTrace();
            } finally {
                eventExecutors.shutdownGracefully();
            }
    
        }
    }

    NettyClientHandler.java

    package com.tang.netty.simple;
    
    import io.netty.buffer.ByteBuf;
    import io.netty.buffer.Unpooled;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.channel.ChannelInboundHandlerAdapter;
    import io.netty.util.CharsetUtil;
    
    public class NettyClientHandler extends ChannelInboundHandlerAdapter {
    
        // 当 Channel 就绪时,就会触发该方法
        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
            System.out.println("client " + ctx);
            ctx.writeAndFlush(Unpooled.copiedBuffer("hello, server, this is client", CharsetUtil.UTF_8));
    
        }
    
        // 当 Channel 有读取事件时,会触发
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            ByteBuf buf = (ByteBuf) msg;
            System.out.println("服务器回复的消息:" + buf.toString(CharsetUtil.UTF_8));
            System.out.println("服务器的地址:" + ctx.channel().remoteAddress());
        }
    
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            cause.printStackTrace();
            ctx.close();
        }
    }

    ------------------------------------

    此 Netty 笔记为学习尚硅谷韩老师讲的 Netty 整理完成,原视频讲解十分详细,建议对 Netty 框架感兴趣的同学们可以看一遍原视频:

    https://www.bilibili.com/video/BV1DJ411m7NR

  • 相关阅读:
    gThumb 3.1.2 发布,支持 WebP 图像
    航空例行天气预报解析 metaf2xml
    Baruwa 1.1.2 发布,邮件监控系统
    Bisect 1.3 发布,Caml 代码覆盖测试
    MoonScript 0.2.2 发布,基于 Lua 的脚本语言
    Varnish 入门
    快速增量备份程序 DeltaCopy
    恢复模糊的图像 SmartDeblur
    Cairo 1.12.8 发布,向量图形会图库
    iText 5.3.4 发布,Java 的 PDF 开发包
  • 原文地址:https://www.cnblogs.com/zackstang/p/14584321.html
Copyright © 2011-2022 走看看