zoukankan      html  css  js  c++  java
  • 005-核心技术-netty线程模型、任务队列、异步模型

    一、netty从简到繁解说线程模型

    1.1、简单版

     

    说明:1)BossGroup线程维护Selector,只关注Accept

    2)当接收到Accept事件,获取到对应的SocketChannel,封装成NIOSocketChannel并注册到Worker线程(事件循环) 

    3)当worker线程监听到selector中通道发生自己感兴趣的事件后,就进行处理(由Handler)注意handler已加入到通道。

    1.2、进阶模型

    如上图

    1.3、详细模型

      

    1)netty抽象出两组线程池:BossGroup负责接口客户端的连接,WorkGroup负责网络读写

    2)BossGroup和WorkGroup都时NioEventLoopGroup

    3)NioEventLoopGroup相当于一个事件循环组,组中含有多个事件循环,每一个事件都包含NioEventLoop

    4)NIOEventLoop表示一个不断循环的执行处理任务的线程,每个NioEventLoop都有一个Selector,用于监听绑定在其上的socket的网络通讯

    5)每个Boss NIOEventLoop 循环执行步骤:

      a、轮询accept事件

      b、处理accept事件,与client建立连接,生成NioSocketChannel,并将其注册到某个Work NioEventLoop上的selector

      c、处理任务队列的任务,即runAllTasks

    6)每个Work NioEventLoop循环执行的步骤:

      a、轮询read、write事件

      b、处理IO事件,即read、write事件,在对应NioSocketChannel处理

      c、处理任务队列的任务,即runAllTasks

    7)每个Worker NioEventLoop处理业务时,会使用Pipeline,pipeline中包含了channel,管道中维护了很多处理器

    二、实例

    2.1、添加本地jar以及source

    idea找到project Structure,找的自己的Modules,找到dependences,左下角+,可以搜索下载,或者加载本地的,可以选择加载source、docs等

    2.2、简单实例 基础通讯

    服务端

    package com.github.bjlhx15.netty.demo.netty.simple;
    
    import io.netty.bootstrap.ServerBootstrap;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.ChannelOption;
    import io.netty.channel.EventLoopGroup;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioServerSocketChannel;
    
    public class NettyServer {
        public static void main(String[] args) throws InterruptedException {
            //        创建 bossGroup 和 workerGroup
            //        bossGroup 处理accept,workerGroup处理业务
            EventLoopGroup bossGroup = new NioEventLoopGroup();
            EventLoopGroup workerGroup = new NioEventLoopGroup();
            try {
                //创建服务端的启动对象,配置参数
                ServerBootstrap bootstrap = new ServerBootstrap();
                bootstrap.group(bossGroup, workerGroup)//设置两个线程组
                        .channel(NioServerSocketChannel.class)//使用NioSocketChannel作为服务器的通道实现
                        .option(ChannelOption.SO_BACKLOG, 128)//设置线程队列得到的连接个数
                        .childOption(ChannelOption.SO_KEEPALIVE, true)//设置保持活动连接状态
                        .childHandler(new ChannelInitializer<SocketChannel>() {
                            //   创建一个通道初始化
                            //   给Pipeline设置处理器
                            @Override
                            protected void initChannel(SocketChannel socketChannel) throws Exception {
                                socketChannel.pipeline().addLast(new NettyServerHandler());
                            }
                        });
                System.out.println("……服务器 is ready。");
                //绑定一个端口并且同步,生成一个ChannelFuture对象
                //启动服务器(并绑定端口)
                ChannelFuture cf = bootstrap.bind(6668).sync();
                //对关闭通道进行监听
                cf.channel().closeFuture().sync();
            } finally {
    //            优雅的关闭
                bossGroup.shutdownGracefully();
                workerGroup.shutdownGracefully();
            }
        }
    } 

    服务端handler

    package com.github.bjlhx15.netty.demo.netty.simple;
    
    import io.netty.buffer.ByteBuf;
    import io.netty.buffer.Unpooled;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.channel.ChannelInboundHandlerAdapter;
    import io.netty.util.CharsetUtil;
    
    /**
     * 1、自定义一个handler,继承ChannelInboundHandlerAdapter
     */
    public class NettyServerHandler extends ChannelInboundHandlerAdapter {
        //读取实际数据
        //ChannelHandlerContext 上下文对象,含有管道pipeline、通道channel,地址
        //Object msg:客户端数据,默认Object
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            System.out.println("Server ctx=" + ctx);
            //        将msg 转ByteBuffer
            //        ByteBuf 时netty,性能优于nio bytebuffer
            ByteBuf buffer = (ByteBuf) msg;
            System.out.println("客户端发送消息是:" + buffer.toString(CharsetUtil.UTF_8));
            System.out.println("client address:" + ctx.channel().remoteAddress());
        }
    
        //数据读取完毕
        @Override
        public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
            //        writeAndFlush 是 write和 flush
            //        将数据写入到缓存,并刷新
            //        将写入数据进行编码
            ctx.writeAndFlush(Unpooled.copiedBuffer("hello client", CharsetUtil.UTF_8));
        }
    
        //处理异常一般关闭通道
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            ctx.close();
        }
    }

    客户端

    package com.github.bjlhx15.netty.demo.netty.simple;
    
    import io.netty.bootstrap.Bootstrap;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.EventLoopGroup;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioSocketChannel;
    
    public class NettyClient {
        public static void main(String[] args) throws InterruptedException {
    //        客户端需要一个事件循环组
            EventLoopGroup group = new NioEventLoopGroup();
            try {
    //        创建客户端启动对象
    //        不是ServerBootstrap 而是 Bootstrap
                Bootstrap bootstrap = new Bootstrap();
                bootstrap.group(group)//设置线程组
                        .channel(NioSocketChannel.class)//设置客户端通道的实现类(反射)
                        .handler(new ChannelInitializer<SocketChannel>() {
                            @Override
                            protected void initChannel(SocketChannel socketChannel) throws Exception {
                                socketChannel.pipeline().addLast(new NettyClientHandler());//加入自己的处理器
                            }
                        });
                System.out.println("客户端 ok");
    //        启动客户端去连接服务器
                ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 6668).sync();
    //            关闭通道进行监听,
                channelFuture.channel().closeFuture().sync();
            } finally {
                group.shutdownGracefully();
            }
        }
    } 

    客户端Handler

    package com.github.bjlhx15.netty.demo.netty.simple;
    
    import io.netty.buffer.ByteBuf;
    import io.netty.buffer.Unpooled;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.channel.ChannelInboundHandlerAdapter;
    import io.netty.util.CharsetUtil;
    
    public class NettyClientHandler extends ChannelInboundHandlerAdapter {
        //    当通道就绪就会触发该方法
        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
            System.out.println("client ctx:" + ctx);
            ctx.writeAndFlush(Unpooled.copiedBuffer("Hello server.", CharsetUtil.UTF_8));
        }
    
        //当通道有读取事件时,会触发
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            ByteBuf byteBuf = (ByteBuf) msg;
            System.out.println("server response:" + byteBuf.toString(CharsetUtil.UTF_8));
            System.out.println("server address:" + ctx.channel().remoteAddress());
        }
    
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            cause.printStackTrace();
            ctx.close();
        }
    }

    2.3、代码分析

    服务端分析

    1)bossGroup和workerGroup包含的子线程(NioEventLoop)个数默认是 实际CPU核数*2

    2)bossGroup 一般设置1个即可,专门负责接受客户端连接

    3)workerGroup,假如有8个子线程,那么会依次循环复用给客户端使用,主要负责网络读写操作

    4)针对bossGroup和workerGroup内的子线程NioEventLoop(类型EventExecutor),都有自己独立的selector、taskQueue、executor(threadFactory)

    5)channel and pipeline 关系,相互包含

    Channel channel = ctx.channel();
    ChannelPipeline pipeline = channel.pipeline();//pipeline是一个双向链表

    6)ChannelHandlerContext 上下文,包含全部信息

    7)NioEventLoop内部采用串行化设计,从消息的读取→解码→处理→编码→发送,始终由IO线程NioEventLoop负责

      NioEventLoopGroup下包含多个NioEventLoop;每个NioEventLoop中包含有一个Selector,一个taskQueue;

      每个NioEventLoop的Selector上可以注册监听多个NioChannel;每个NioChannel只会绑定在唯一的NioEventLoop上;每个NioChannel都绑定有一个自己的ChannelPipeline;

    三、模型内关键变量介绍

    3.1、Netty模型任务队列taskQueue

    3种典型使用场景

    1)用户程序自定义的普通任务

    2)用户自定义定时任务

    3)非当前Reactor线程调用Channel的各种方法

      如:在推送系统的业务线程里面,根据用户的标识,找到对应的Channel引用,然后调用Write类方法向该用户推送消息,就会进入到这种场景。最终的Write会提交到任务队列中后被异步消费

    实例:上述服务端handler的channelRead有耗时任务

     @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            Thread.sleep(10 * 1000);
            ctx.writeAndFlush(Unpooled.copiedBuffer("hello client 2", CharsetUtil.UTF_8));
            System.out.println("go on ……");
        } 

    在channelReadComplete逻辑不变

        //数据读取完毕
        @Override
        public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
            //        writeAndFlush 是 write和 flush
            //        将数据写入到缓存,并刷新
            //        将写入数据进行编码
            ctx.writeAndFlush(Unpooled.copiedBuffer("hello client 1", CharsetUtil.UTF_8));
        } 

    此时客户端10s后输出:hello client 2;hello client 1;

    然后服务端10s后输出:go on ……

    3.1.1、用户程序自定义的普通任务,存储在taskQueue中异步执行

    比如channelRead中有个非常耗时的任务→异步执行→提交channel对应的NIOEventLoop的taskQueue中。

    服务端handler的channelRead修改

        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                //解决方案1 用户程序自定义普通任务
            ctx.channel().eventLoop().execute(()->{
                //耗时任务
                try {
                    Thread.sleep(10 * 1000);
                    ctx.writeAndFlush(Unpooled.copiedBuffer("hello client 2", CharsetUtil.UTF_8));
                } catch (InterruptedException e) {
                    System.out.println("发生异常");
                }
            });
            System.out.println("go on ……");
        }

    此时服务端立即输出:go on ……

    客户端立即输出:hello client 1;

    10s后客户端输出:hello client 2;

    3.1.2、用户自定义定时任务,该任务提交到scheduleTaskQueue队列中

    服务端handler的channelRead改造

            ctx.channel().eventLoop().schedule(() -> {
                //耗时任务
                try {
                    Thread.sleep(5 * 1000);
                    ctx.writeAndFlush(Unpooled.copiedBuffer("hello client 4", CharsetUtil.UTF_8));
                } catch (InterruptedException e) {
                    System.out.println("发生异常");
                }
            },5, TimeUnit.SECONDS); 

    3.1.3、非当前Reactor线程调用Channel的各种方法

    在服务端的:.childHandler(new ChannelInitializer<SocketChannel>() {  的initChannel方法中使用一个集合管理SocketChannel,在推送消息时,可以将业务加入到各个channel 对应的NioEventLoop的taskQueue或者scheduleTaskQueue

    四、异步模型 

    4.1、基本介绍

    1)异步和同步相对,当一个异步过程调用发出后,调用者不能立刻得到结果。实际处理这个调用的组件在完成后,通过状态、通知和回调来通知调用者。

    2)Netty中的IO操作是异步的,包括bind、write、connect等操作会简单的返回一个ChannelFutura。

    3)调用者并不能立刻获得结果,而是通过Future-Listener机制。用户可以方便的主动获取或者通过通知机制获得IO操作结果

    4)Netty的异步模型是建立在future和callback上的。callback就是回调。Future核心思想是:假设一个方法fun,计算过程可能非常耗时,等待fun返回不合适,那么可以在调用fun的时候,立马返回一个Future,后续可以通过Future去监控方法fun的处理过程(即Future-Listener机制)

    4.2、Future

      表示异步的执行结果,可以通过它提供的方法来检测执行是否完成,比如检索计算

      ChannelFuture是一个接口,可以添加监听器,当监听的事件发生时,就会通知到监听器

      

      说明:1)在使用Netty进行编程时,拦截操作和转换出入站数据只需要您提供callback或利用Future即可。链式操作简单、高效、有利于编写可重用的、通用的代码

        2)Netty目标,使业务逻辑从网络基础应用编码中分离出来,解脱出来

    4.3、Future-Listener机制

      当Future对象刚刚创建时,处于非完成状态,调用者可以通过返回的ChannelFuture来获取操作执行的状态,注册监听函数来执行完成后的操作。

      常见操作:

    isDone:是否完成

    isSuccess:操作是否成功

    getCause:操作失败原因

    isCancelled:才做是否被取消

    addListener方法来注册监听器,当操作已完成(isDone方法返回完成),将会通知指定的监听器,如果Future对象已完成,则通知指定的监听器 

    示例:

               ChannelFuture cf = bootstrap.bind(6668).sync();
    
                cf.addListener(new ChannelFutureListener() {
                    @Override
                    public void operationComplete(ChannelFuture channelFuture) throws Exception {
                        if(cf.isSuccess()){
                            System.out.println("监听端口 6668 成功");
                        }else{
                            System.out.println("监听端口 6668 失败");
                        }
                    }
                });

    绑定端口是异步操作,当绑定操作处理完,将会调用相应的监听器处理逻辑。

    4.4、HTTP服务示例

    自定义一个TestHttpServerHandler

    //SimpleChannelInboundHandler  是 ChannelInboundHandlerAdapter
    //HttpObject 客户端和服务器端相互通讯的数据被封装成HttpObject
    public class TestHttpServerHandler extends SimpleChannelInboundHandler<HttpObject> {
    
        @Override
        protected void channelRead0(ChannelHandlerContext channelHandlerContext, HttpObject msg) throws Exception {
            if (msg instanceof HttpRequest) {
    
                System.out.println("msg 类型是:" + msg.getClass());
                System.out.println("客户端地址:" + channelHandlerContext.channel().remoteAddress());
    
                HttpRequest httpRequest = (HttpRequest) msg;
                URI uri = new URI(httpRequest.uri());
                if ("/favicon.ico".equals(uri.getPath())) {
                    System.out.println("请求了 /favicon.ico ,不处理");
                    return;
                }
    
                ByteBuf byteBuf = Unpooled.copiedBuffer("helleo 你好", CharsetUtil.UTF_8);
    
                DefaultFullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK, byteBuf);
                response.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/plain");
                response.headers().set(HttpHeaderNames.CONTENT_LENGTH, byteBuf.readableBytes());
                channelHandlerContext.writeAndFlush(response);
            }
        }
    }

    定义一个TestServerInitializer

    public class TestServerInitializer extends ChannelInitializer<SocketChannel> {
        @Override
        protected void initChannel(SocketChannel socketChannel) throws Exception {
            ChannelPipeline pipeline = socketChannel.pipeline();
            //http解码器
            pipeline.addLast("MyHttpServerCodec",new HttpServerCodec());
            pipeline.addLast("MyTestHttpServerHandler",new TestHttpServerHandler());
        }
    }

    Server

    public class TestServer {
        public static void main(String[] args) throws InterruptedException {
            EventLoopGroup bossGroup = new NioEventLoopGroup();
            EventLoopGroup workerGroup = new NioEventLoopGroup();
            try {
                ServerBootstrap bootstrap = new ServerBootstrap();
                bootstrap.group(bossGroup, workerGroup)
                        .channel(NioServerSocketChannel.class)
                        .childHandler(new TestServerInitializer());
                ChannelFuture channelFuture = bootstrap.bind(8081).sync();
                channelFuture.channel().closeFuture().sync();
            } finally {
                bossGroup.shutdownGracefully();
                workerGroup.shutdownGracefully();
            }
        }
    }
    转载请注明出处,感谢。
    作者:李宏旭
    阅罢此文,如果您觉得本文不错并有所收获,请【打赏】或【推荐】,也可【评论】留下您的问题或建议与我交流。
    你的支持是我不断创作和分享的不竭动力!
  • 相关阅读:
    Centos 7 运行出错:cannot find a valid basecrl for repo:base/7/x86_64
    nc 使用
    linux uniq去重,awk输出(可用于爆破字典优化)
    关于fixedsys字体 FSEX300.ttf FSEX300-L.ttf FSEX301-L2.ttf
    MyAtoi
    viplugin eclipse
    资源获取即初始化RAII
    阈值分割技术
    图像类型转换
    形态学
  • 原文地址:https://www.cnblogs.com/bjlhx/p/15083014.html
Copyright © 2011-2022 走看看