zoukankan      html  css  js  c++  java
  • Netty(二)工作原理模型

    工作原理示意图 1-简单版

    Netty 主要基于主从 Reactors 多线程模型(如图) 做了一定的改进, 其中主从 Reactor 多线程模型有多个 Reactor

     对上图说明

    1) BossGroup 线程维护 Selector , 只关注 Accecpt。
    2) 当接收到 Accept 事件, 获取到对应的 SocketChannel, 封装成 NIOScoketChannel 并注册到 Worker 线程(事件循环), 并进行维护。
    3) Worker 线程监听到 selector 中通道发生自己感兴趣的事件后, 就进行处理(就由 handler), 注意 handler 已经加入到通道 。


    工作原理示意图 2-进阶版


    工作原理示意图 3 详细版 

     对上图的说明小结 

    1) Netty 抽象出两组线程池 BossGroup 专门负责接收客户端的连接, WorkerGroup 专门负责网络的读写

    2) BossGroup WorkerGroup 类型都是 NioEventLoopGroup
    3) NioEventLoopGroup 相当于一个事件循环组, 这个组中含有多个事件循环 , 每一个事件循环是 NioEventLoop
    4) NioEventLoop 表示一个不断循环的执行处理任务的线程, 每个 NioEventLoop 都有一个 selector , 用于监听绑定在其上的 socket 的网络通讯
    5) NioEventLoopGroup 可以有多个线程, 即可以含有多个 NioEventLoop
    6) 每个 Boss NioEventLoop 循环执行的步骤有 3

    • 轮询 accept 事件
    • 处理 accept 事件 , client 建立连接 , 生成 NioScocketChannel , 并将其注册到某个 worker NIOEventLoop 上的 selector
    • 处理任务队列的任务 , 即 runAllTasks

    8) 每个Worker NIOEventLoop 处理业务时, 会使用pipeline(管道), pipeline 中包含了 channel (通道), 即通过pipeline可以获取到对应通道管道中维护了很多的 处理器 

    自己总结:pipeline(管道)含有多个ChanelHandler(看上面图)


    Netty 快速入门实例-TCP 服务

     NettyServer

    import io.netty.bootstrap.ServerBootstrap;
    import io.netty.channel.*;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioServerSocketChannel;
    import io.netty.channel.socket.nio.NioSocketChannel;
    
    public class NettyServer {
        public static void main(String[] args) throws Exception {
    
    
            //创建BossGroup 和 WorkerGroup
            //说明
            //1. 创建两个线程组 bossGroup 和 workerGroup
            //2. bossGroup 只是处理连接请求 , 真正的和客户端业务处理,会交给 workerGroup完成
            //3. 两个都是无限循环
            //4. bossGroup 和 workerGroup 含有的子线程(NioEventLoop)的个数
            //   默认实际 cpu核数 * 2
            EventLoopGroup bossGroup = new NioEventLoopGroup(1);
            EventLoopGroup workerGroup = new NioEventLoopGroup(); //8
    
    
    
            try {
                //创建服务器端的启动对象,配置参数
                ServerBootstrap bootstrap = new ServerBootstrap();
    
                //使用链式编程来进行设置
                bootstrap.group(bossGroup, workerGroup) //设置两个线程组
                        .channel(NioServerSocketChannel.class) //使用NioSocketChannel 作为服务器的通道实现
                        .option(ChannelOption.SO_BACKLOG, 128) // 设置线程队列得到连接个数
                        .childOption(ChannelOption.SO_KEEPALIVE, true) //设置保持活动连接状态
    //                    .handler(null) // 该 handler对应 bossGroup , childHandler 对应 workerGroup
                        .childHandler(new ChannelInitializer<SocketChannel>() {//创建一个通道初始化对象(匿名对象)
                            //给pipeline 设置处理器
                            @Override
                            protected void initChannel(SocketChannel ch) throws Exception {
                                System.out.println("客户socketchannel hashcode=" + ch.hashCode()); //可以使用一个集合管理 SocketChannel, 再推送消息时,可以将业务加入到各个channel 对应的 NIOEventLoop 的 taskQueue 或者 scheduleTaskQueue
                                ch.pipeline().addLast(new NettyServerHandler());
                            }
                        }); // 给我们的workerGroup 的 EventLoop 对应的管道设置处理器
    
                System.out.println(".....服务器 is ready...");
    
                //绑定一个端口并且同步, 生成了一个 ChannelFuture 对象
                //启动服务器(并绑定端口)
                ChannelFuture cf = bootstrap.bind(6668).sync();
    
                //给cf 注册监听器,监控我们关心的事件
    
                cf.addListener(new ChannelFutureListener() {
                    @Override
                    public void operationComplete(ChannelFuture future) throws Exception {
                        if (cf.isSuccess()) {
                            System.out.println("监听端口 6668 成功");
                        } else {
                            System.out.println("监听端口 6668 失败");
                        }
                    }
                });
    
    
                //对关闭通道进行监听
                cf.channel().closeFuture().sync();
            }finally {
                bossGroup.shutdownGracefully();
                workerGroup.shutdownGracefully();
            }
    
        }
    
    }
    View Code

     NettyServerHandler

    package com.atguigu.netty.simple;
    
    import io.netty.buffer.ByteBuf;
    import io.netty.buffer.Unpooled;
    import io.netty.channel.Channel;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.channel.ChannelInboundHandlerAdapter;
    import io.netty.channel.ChannelPipeline;
    import io.netty.util.CharsetUtil;
    
    import java.util.concurrent.TimeUnit;
    
    /*
    说明
    1. 我们自定义一个Handler 需要继续netty 规定好的某个HandlerAdapter(规范)
    2. 这时我们自定义一个Handler , 才能称为一个handler
     */
    public class NettyServerHandler extends ChannelInboundHandlerAdapter {
    
        //读取数据实际(这里我们可以读取客户端发送的消息)
        /*
        1. ChannelHandlerContext ctx:上下文对象, 含有 管道pipeline , 通道channel, 地址
        2. Object msg: 就是客户端发送的数据 默认Object
         */
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    
    /*
    
            //比如这里我们有一个非常耗时长的业务-> 异步执行 -> 提交该channel 对应的
            //NIOEventLoop 的 taskQueue中,
    
            //解决方案1 用户程序自定义的普通任务
    
            ctx.channel().eventLoop().execute(new Runnable() {
                @Override
                public void run() {
    
                    try {
                        Thread.sleep(5 * 1000);
                        ctx.writeAndFlush(Unpooled.copiedBuffer("hello, 客户端~(>^ω^<)喵2", CharsetUtil.UTF_8));
                        System.out.println("channel code=" + ctx.channel().hashCode());
                    } catch (Exception ex) {
                        System.out.println("发生异常" + ex.getMessage());
                    }
                }
            });
    
            ctx.channel().eventLoop().execute(new Runnable() {
                @Override
                public void run() {
    
                    try {
                        Thread.sleep(5 * 1000);
                        ctx.writeAndFlush(Unpooled.copiedBuffer("hello, 客户端~(>^ω^<)喵3", CharsetUtil.UTF_8));
                        System.out.println("channel code=" + ctx.channel().hashCode());
                    } catch (Exception ex) {
                        System.out.println("发生异常" + ex.getMessage());
                    }
                }
            });
    
            //解决方案2 : 用户自定义定时任务 -》 该任务是提交到 scheduleTaskQueue中
    
            ctx.channel().eventLoop().schedule(new Runnable() {
                @Override
                public void run() {
    
                    try {
                        Thread.sleep(5 * 1000);
                        ctx.writeAndFlush(Unpooled.copiedBuffer("hello, 客户端~(>^ω^<)喵4", CharsetUtil.UTF_8));
                        System.out.println("channel code=" + ctx.channel().hashCode());
                    } catch (Exception ex) {
                        System.out.println("发生异常" + ex.getMessage());
                    }
                }
            }, 5, TimeUnit.SECONDS);
    
    
    
            System.out.println("go on ...");*/
    
    
            System.out.println("服务器读取线程 " + Thread.currentThread().getName() + " channle =" + ctx.channel());
            System.out.println("server ctx =" + ctx);
            System.out.println("看看channel 和 pipeline的关系");
            Channel channel = ctx.channel();
            ChannelPipeline pipeline = ctx.pipeline(); //本质是一个双向链接, 出站入站
    
    
            //将 msg 转成一个 ByteBuf
            //ByteBuf 是 Netty 提供的,不是 NIO 的 ByteBuffer.
            ByteBuf buf = (ByteBuf) msg;
            System.out.println("客户端发送消息是:" + buf.toString(CharsetUtil.UTF_8));
            System.out.println("客户端地址:" + channel.remoteAddress());
        }
    
        //数据读取完毕
        @Override
        public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
    
            //writeAndFlush 是 write + flush
            //将数据写入到缓存,并刷新
            //一般讲,我们对这个发送的数据进行编码
            ctx.writeAndFlush(Unpooled.copiedBuffer("hello, 客户端~(>^ω^<)喵1", CharsetUtil.UTF_8));
        }
    
        //处理异常, 一般是需要关闭通道
    
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            ctx.close();
        }
    }
    View Code

     NettyClient

    package com.atguigu.netty.simple;
    
    import io.netty.bootstrap.Bootstrap;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.EventLoopGroup;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioSocketChannel;
    
    public class NettyClient {
        public static void main(String[] args) throws Exception {
    
            //客户端需要一个事件循环组
            EventLoopGroup group = new NioEventLoopGroup();
    
    
            try {
                //创建客户端启动对象
                //注意客户端使用的不是 ServerBootstrap 而是 Bootstrap
                Bootstrap bootstrap = new Bootstrap();
    
                //设置相关参数
                bootstrap.group(group) //设置线程组
                        .channel(NioSocketChannel.class) // 设置客户端通道的实现类(反射)
                        .handler(new ChannelInitializer<SocketChannel>() {
                            @Override
                            protected void initChannel(SocketChannel ch) throws Exception {
                                ch.pipeline().addLast(new NettyClientHandler()); //加入自己的处理器
                            }
                        });
    
                System.out.println("客户端 ok..");
    
                //启动客户端去连接服务器端
                //关于 ChannelFuture 要分析,涉及到netty的异步模型
                ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 6668).sync();
                //给关闭通道进行监听
                channelFuture.channel().closeFuture().sync();
            }finally {
    
                group.shutdownGracefully();
    
            }
        }
    }
    View Code

     NettyClientHandler

    import io.netty.buffer.ByteBuf;
    import io.netty.buffer.Unpooled;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.channel.ChannelInboundHandlerAdapter;
    import io.netty.util.CharsetUtil;
    
    public class NettyClientHandler extends ChannelInboundHandlerAdapter {
    
        //当通道就绪就会触发该方法
        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
            System.out.println("client " + ctx);
            ctx.writeAndFlush(Unpooled.copiedBuffer("hello, server: (>^ω^<)喵", CharsetUtil.UTF_8));
        }
    
        //当通道有读取事件时,会触发
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    
            ByteBuf buf = (ByteBuf) msg;
            System.out.println("服务器回复的消息:" + buf.toString(CharsetUtil.UTF_8));
            System.out.println("服务器的地址: "+ ctx.channel().remoteAddress());
        }
    
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            cause.printStackTrace();
            ctx.close();
        }
    }
    View Code

     任务队列中的 Task 3 种典型使用场景

    1) 用户程序自定义的普通任务 [举例说明]
    2) 用户自定义定时任务
    3) 非当前 Reactor 线程调用 Channel 的各种方法
    例如在推送系统的业务线程里面, 根据用户的标识, 找到对应的 Channel 引用, 然后调用 Write 类方法向该 用户推送消息, 就会进入到这种场景。 最终的 Write 会提交到任务队列中后被异步消费

    import io.netty.buffer.ByteBuf;
    import io.netty.buffer.Unpooled;
    import io.netty.channel.Channel;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.channel.ChannelInboundHandlerAdapter;
    import io.netty.channel.ChannelPipeline;
    import io.netty.util.CharsetUtil;
    import java.util.concurrent.TimeUnit;
    /*
    说明
    1. 我们自定义一个 Handler 需要继续 netty 规定好的某个 HandlerAdapter(规范)
    2. 这时我们自定义一个 Handler , 才能称为一个 handler
    */
    public class NettyServerHandler extends ChannelInboundHandlerAdapter {
    //读取数据实际(这里我们可以读取客户端发送的消息)
    /*
    1. ChannelHandlerContext ctx:上下文对象, 含有 管道 pipeline , 通道 channel, 地址
    2. Object msg: 就是客户端发送的数据 默认 Object
    *
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    //比如这里我们有一个非常耗时长的业务-> 异步执行 -> 提交该 channel 对应的
    //NIOEventLoop 的 taskQueue 中,
    //解决方案 1 用户程序自定义的普通任务
    ctx.channel().eventLoop().execute(new Runnable() {
    @Override
    public void run() {
    try {
    Thread.sleep(5 * 1000);
    ctx.writeAndFlush(Unpooled.copiedBuffer("hello, 客户端~(>^ω ^<)喵 2", CharsetUtil.UTF_8));
    System.out.println("channel code=" + ctx.channel().hashCode());
    } catch (Exception ex) {
    System.out.println("发生异常" + ex.getMessage());
    }
    }
    });
    ctx.channel().eventLoop().execute(new Runnable() {
    @Override
    public void run() {
    try {
    Thread.sleep(5 * 1000);
    ctx.writeAndFlush(Unpooled.copiedBuffer("hello, 客户端~(>^ω ^<)喵 3", CharsetUtil.UTF_8));
    System.out.println("channel code=" + ctx.channel().hashCode());
    } catch (Exception ex) {
    System.out.println("发生异常" + ex.getMessage());
    }
    }
    });
    //解决方案 2 : 用户自定义定时任务 -》 该任务是提交到 scheduledTaskQueue 中
    ctx.channel().eventLoop().schedule(new Runnable() {
    @Override
    public void run() {
    try {
    Thread.sleep(5 * 1000);
    ctx.writeAndFlush(Unpooled.copiedBuffer("hello, 客户端~(>^ω ^<)喵 4", CharsetUtil.UTF_8));
    System.out.println("channel code=" + ctx.channel().hashCode());
    } catch (Exception ex) {
    System.out.println("发生异常" + ex.getMessage());
    }
    }
    }, 5, TimeUnit.SECONDS);
    System.out.println("go on ...");
    // System.out.println("服务器读取线程 " + Thread.currentThread().getName());
    // System.out.println("server ctx =" + ctx);
    // System.out.println("看看 channel 和 pipeline 的关系");
    // Channel channel = ctx.channel();
    // ChannelPipeline pipeline = ctx.pipeline(); //本质是一个双向链接, 出站入站
    //
    //
    // //将 msg 转成一个 ByteBuf
    // //ByteBuf 是 Netty 提供的, 不是 NIO 的 ByteBuffer.
    // ByteBuf buf = (ByteBuf) msg;
    // System.out.println("客户端发送消息是:" + buf.toString(CharsetUtil.UTF_8));
    // System.out.println("客户端地址:" + channel.remoteAddress());
    } //
    数据读取完毕
    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
    //writeAndFlush 是 write + flush
    //将数据写入到缓存, 并刷新
    //一般讲, 我们对这个发送的数据进行编码
    ctx.writeAndFlush(Unpooled.copiedBuffer("hello, 客户端~(>^ω ^<)喵 1", CharsetUtil.UTF_8));
    } //
    处理异常, 一般是需要关闭通道
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
    ctx.close();
    }
    }
    View Code

     方案再说明:

    1) Netty 抽象出两组线程池, BossGroup 专门负责接收客户端连接, WorkerGroup 专门负责网络读写操作。
    2) NioEventLoop 表示一个不断循环执行处理任务的线程, 每个 NioEventLoop 都有一个 selector, 用于监听绑定在其上的 socket 网络通道。
    3) NioEventLoop 内部采用串行化设计, 从消息的读取->解码->处理->编码->发送, 始终由 IO 线程 NioEventLoop负责
     
     

  • 相关阅读:
    eclipse export runnable jar(导出可执行jar包) runnable jar可以执行的
    mave常用指令
    771. Jewels and Stones珠宝数组和石头数组中的字母对应
    624. Maximum Distance in Arrays二重数组中的最大差值距离
    724. Find Pivot Index 找到中轴下标
    605. Can Place Flowers零一间隔种花
    581. Shortest Unsorted Continuous Subarray连续数组中的递增异常情况
    747. Largest Number At Least Twice of Others比所有数字都大两倍的最大数
    643. Maximum Average Subarray I 最大子数组的平均值
    414. Third Maximum Number数组中第三大的数字
  • 原文地址:https://www.cnblogs.com/cb1186512739/p/12774212.html
Copyright © 2011-2022 走看看