zoukankan      html  css  js  c++  java
  • Netty(三)Netty模型

    1. Netty模型

    Netty主要基于主从Reactor多线程模型做了一定的改进,其中主从Reactor多线程模型有多个Reactor。

    简版图:

     

    说明:

    1. BossGroup线程维护Selector,只关注Accept
    2. 当接收到Accept事件,获取到对应的SocketChannel,封装成NIOSocketChannel并注册到Worker线程(事件循环),并进行维护
    3. 当Worker线程监听到selector中通道发生自己感兴趣的事件后,就进行处理(就由handler),注意,handler已经加入到channel

    Netty模型进阶版图:

     

    详细版:

     

    模型说明:

    1. Netty抽象出2组线程池:
      • BossGroup:专门接受客户端的连接
      • WorkerGroup:专门负责网络的读写
    2. BossGroup和WorkerGroup类型都是NioEventLoopGroup
    3. NioEventLoopGroup相当于一个事件循环组,这个组中含有多个事件循环,每一个事件循环是NioEventLoop
    4. NioEventLoop表示一个不断循环的执行处理任务的线程,每个NioEventLoop都有一个selector,用于监听绑定在其上的socket的网络通讯
    5. NioEventLoop可以有多个线程,即可以含有多个NioEventLoop
    6. 每个Boss NioEventLoop循环执行的步骤有3步:
      • 轮询accept事件
      • 处理accept事件,与client建立连接,生成NioSocketChannel,并将其注册到某个worker
      • 处理任务队列的任务,即runAllTasks
    7. 每个Worker NIOEventLoop循环执行的步骤:
      • 轮询read,write事件
      • 处理I/O事件,即read,write事件,在对应NioSocketChannel处理
      • 处理任务队列的任务,即runAllTasks
    8. 每个Worker NIOEventLoop处理业务时,会使用pipeline(管道),pipeline中包含了channel,即通过pipeline可以获取到对应channel,管道中维护了很多的处理器

    2. Netty实现简单服务端与客户端交互

    NettyServer.java

    package com.tang.netty.simple;
    
    import io.netty.bootstrap.ServerBootstrap;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.ChannelOption;
    import io.netty.channel.EventLoopGroup;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioServerSocketChannel;
    
    public class NettyServer {
    
        public static void main(String[] args) {
            // 创建 BossGroup 和 WorkerGroup
            // 1.创建 2 个线程组 boosGroup 和 workerGroup
            // 2.bossGroup只是处理连接请求,真正与客户端的业务处理,会交给workerGroup
            // 3.两个都是无限循环
            // 4.bossGroup 和 workerGroup 含有的子线程(NioEventLoop)的个数,默认为实际 CPU 核数*2
            EventLoopGroup bossGroup = new NioEventLoopGroup();
            EventLoopGroup workerGroup = new NioEventLoopGroup();
    
            // 创建服务器端的启动对象,配置参数
            ServerBootstrap bootstrap = new ServerBootstrap();
    
            // 使用链式编程来进行设置
            bootstrap.group(bossGroup, workerGroup) // 设置2个线程组
                    .channel(NioServerSocketChannel.class)  // 使用 NioServerSocketChannel 作为服务器通道的实现
                    .option(ChannelOption.SO_BACKLOG, 128)  // 设置线程队列得到连接数
                    .childOption(ChannelOption.SO_KEEPALIVE, true)  // 设置保持活动连接状态
                    .childHandler(new ChannelInitializer<SocketChannel>() {  // 创建一个通道测试对象(匿名对象)
                        // 给pipeline 设置处理器
                        // 使用的自己实现的 NettyServerHandler
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ch.pipeline().addLast(new NettyServerHandler());
                        }
                    });  // 给我们 workerGroup 的 EventLoop 对应的管道设置处理器
    
            System.out.println("....Server is ready...");
    
            try {
                // 绑定一个端口并且同步,生成了一个 ChannelFuture 对象
                // 启动服务器
                ChannelFuture cf = bootstrap.bind(6668).sync();
    
                // 对关闭通道进行监听
                cf.channel().closeFuture().sync();
    
            } catch (InterruptedException e){
                e.printStackTrace();
            } finally {
                bossGroup.shutdownGracefully();
                workerGroup.shutdownGracefully();
            }
    
        }
    }

    NettyServerHandler.java

    package com.tang.netty.simple;
    
    import io.netty.buffer.ByteBuf;
    import io.netty.buffer.Unpooled;
    import io.netty.channel.Channel;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.channel.ChannelInboundHandlerAdapter;
    import io.netty.channel.ChannelPipeline;
    import io.netty.util.CharsetUtil;
    
    import java.nio.ByteBuffer;
    
    /**
     *  我们自定义一个Handler 需要继承netty 规定好的某个 HandlerAdapter(规范)
     *  这个 ChannelInboundHandlerAdapter 是 Pipeline 里的 ChannelHandler 的一个实现
     */
    public class NettyServerHandler extends ChannelInboundHandlerAdapter {
    
        // 读取数据(这里我们可以读取客户端发送的消息)
    
        /**
         1.ChannelHandlerContext ctx:上下文对象,含有管道 pipeline,通道 channel,地址
         2.Object msg:就是客户端发送的数据,默认Object
         */
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception{
            System.out.println("服务器读取线程 " + Thread.currentThread().getName());
            System.out.println("server ctx = " + ctx);
            System.out.println("查看 channel 与 pipeline 的关系");
            Channel channel = ctx.channel();
            ChannelPipeline pipeline = ctx.pipeline();  // 本质是一个双向链表,出战入站
    
            // 将 msg 转为 ByteBuf
            // ByteBuf 是 Netty 提供的,不是 NIO 的 ByteBuffer
            ByteBuf buf = (ByteBuf) msg;
            System.out.println("客户端发送的消息是:" + buf.toString(CharsetUtil.UTF_8));
            System.out.println("客户端地址:" + channel.remoteAddress());
    
        }
    
        // 数据读取完毕
        @Override
        public void channelReadComplete(ChannelHandlerContext ctx) throws Exception{
    
            // writeAndFlush 是 write + flush
            // 将数据写入到缓冲,并刷新
            // 一般讲,我们对这个发送的数据进行编码
            ctx.writeAndFlush(Unpooled.copiedBuffer("Hello,客户端~ 收到", CharsetUtil.UTF_8));
        }
    
        // 处理异常,一般是需要关闭通道
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            ctx.close();
        }
    }

    NettyClient.java

    package com.tang.netty.simple;
    
    import io.netty.bootstrap.Bootstrap;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.EventLoopGroup;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioSocketChannel;
    
    public class NettyClient {
    
        public static void main(String[] args) {
            // 客户端需要一个事件循环组
            EventLoopGroup eventExecutors = new NioEventLoopGroup();
    
            try{
                // 创建客户端启动对象
                // 注意客户端使用的不是 ServerBootstrap 而是 Bootstrap
                Bootstrap bootstrap = new Bootstrap();
    
                // 设置相关参数
                bootstrap.group(eventExecutors)  // 设置线程组
                        .channel(NioSocketChannel.class) // 设置客户端Channel的实现类
                        .handler(new ChannelInitializer<SocketChannel>() {
                            @Override
                            protected void initChannel(SocketChannel ch) throws Exception {
                                ch.pipeline().addLast(new NettyClientHandler());  // 加入自己的处理器
                            }
                        });
    
                System.out.println("客户端 ok...");
    
                // 启动客户端去连接服务器端
                ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 6668).sync();
    
                // 给关闭通道进行监听
                channelFuture.channel().closeFuture().sync();
    
            } catch (InterruptedException e){
                e.printStackTrace();
            } finally {
                eventExecutors.shutdownGracefully();
            }
    
        }
    }

    NettyClientHandler.java

    package com.tang.netty.simple;
    
    import io.netty.buffer.ByteBuf;
    import io.netty.buffer.Unpooled;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.channel.ChannelInboundHandlerAdapter;
    import io.netty.util.CharsetUtil;
    
    public class NettyClientHandler extends ChannelInboundHandlerAdapter {
    
        // 当 Channel 就绪时,就会触发该方法
        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
            System.out.println("client " + ctx);
            ctx.writeAndFlush(Unpooled.copiedBuffer("hello, server, this is client", CharsetUtil.UTF_8));
    
        }
    
        // 当 Channel 有读取事件时,会触发
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            ByteBuf buf = (ByteBuf) msg;
            System.out.println("服务器回复的消息:" + buf.toString(CharsetUtil.UTF_8));
            System.out.println("服务器的地址:" + ctx.channel().remoteAddress());
        }
    
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            cause.printStackTrace();
            ctx.close();
        }
    }

    ------------------------------------

    此 Netty 笔记为学习尚硅谷韩老师讲的 Netty 整理完成,原视频讲解十分详细,建议对 Netty 框架感兴趣的同学们可以看一遍原视频:

    https://www.bilibili.com/video/BV1DJ411m7NR

  • 相关阅读:
    R语言数据集合
    转:EXCEL中如何获取从某一字符开始到最右边字符串
    转:EXCEL打乱顺序
    转:excel中怎样做柱状图
    转:linux复制/剪切文件到另一个文件夹
    转:Linux常用命令
    转:怎么在一张PPT里设置很多步骤出现的内容呀
    禅道分析
    转:BUG的严重级别分类 BUG状态标准
    转:Bug的严重等级和优先级
  • 原文地址:https://www.cnblogs.com/zackstang/p/14584321.html
Copyright © 2011-2022 走看看