zoukankan      html  css  js  c++  java
  • Netty异步任务调度与异步线程池

    一、异步任务调度

    (一)为什么要使用异步任务调度

      先写一个简单的Netty程序:

      1、NettyServer

    public class NettyServer {
        public static void main(String[] args) {
            //1. 创建两个线程组 bossGroup 和 workerGroup
            EventLoopGroup bossGroup = new NioEventLoopGroup(1);
            EventLoopGroup workerGroup = new NioEventLoopGroup(); //默认16try {
                //创建服务器端的启动对象,配置参数
                ServerBootstrap bootstrap = new ServerBootstrap();
                //使用链式编程来进行设置
                bootstrap.group(bossGroup, workerGroup) //设置两个线程组
                        .channel(NioServerSocketChannel.class) //使用NioSocketChannel 作为服务器的通道实现
                        .option(ChannelOption.SO_BACKLOG, 128) // 设置线程队列得到连接个数
                        .childOption(ChannelOption.SO_KEEPALIVE, true) //设置保持活动连接状态
                        //.handler(null) // 该 handler对应 bossGroup , childHandler 对应 workerGroup
                        .childHandler(new ChannelInitializer<SocketChannel>() {//创建一个通道初始化对象(匿名对象)
                            //给pipeline 设置处理器
                            @Override
                            protected void initChannel(SocketChannel ch) throws Exception {
                                System.out.println("客户socketchannel hashcode=" + ch.hashCode()); //可以使用一个集合管理 SocketChannel, 再推送消息时,可以将业务加入到各个channel 对应的 NIOEventLoop 的 taskQueue 或者 scheduleTaskQueue
                                ChannelPipeline pipeline = ch.pipeline();
                                pipeline.addLast(new NettyServerHandler());
                            }
                        }); // 给workerGroup 的 EventLoop 对应的管道设置处理器
                System.out.println("server is ready");
                //绑定一个端口并且同步, 生成了一个 ChannelFuture 对象
                //启动服务器(并绑定端口)
                ChannelFuture cf = bootstrap.bind(6668).sync();
                //给cf 注册监听器,监控我们关心的事件
                cf.addListener(new ChannelFutureListener() {
                    @Override
                    public void operationComplete(ChannelFuture future) throws Exception {
                        if (cf.isSuccess()) {
                            System.out.println("监听端口 6668 成功");
                        } else {
                            System.out.println("监听端口 6668 失败");
                        }
                    }
                });
                cf.channel().closeFuture().sync();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                bossGroup.shutdownGracefully();
                workerGroup.shutdownGracefully();
            }
        }
    }

      2、ServerHandler

    public class NettyServerHandler extends ChannelInboundHandlerAdapter {
        private static final EventExecutorGroup group = new DefaultEventExecutorGroup(8);
    
        /**
         * 1. ChannelHandlerContext ctx:上下文对象, 含有 管道pipeline , 通道channel, 地址
         * 2. Object msg: 就是客户端发送的数据 默认Object
         */
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            System.out.println("channelReadThread:"+Thread.currentThread().getName());
            ctx.writeAndFlush(Unpooled.copiedBuffer("hello, Client No 2 ", CharsetUtil.UTF_8));
            System.out.println("go on ...");
        }
    
        //数据读取完毕
        @Override
        public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
            //writeAndFlush 是 write + flush
            //将数据写入到缓存,并刷新
            //一般讲,我们对这个发送的数据进行编码
          System.out.println("channelReadCompleteThread:"+Thread.currentThread().getName());
            ctx.writeAndFlush(Unpooled.copiedBuffer("hello, Client No 1 ", CharsetUtil.UTF_8));
        }
    
        //处理异常, 一般是需要关闭通道
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            ctx.close();
        }
    }

      

      3、NettyClient

    public class NettyClient {
        public static void main(String[] args) {
            //客户端需要一个事件循环组
            EventLoopGroup group = new NioEventLoopGroup();
            //创建客户端启动对象 注意客户端使用的不是 ServerBootstrap 而是 Bootstrap
            Bootstrap bootstrap = new Bootstrap();
            try{
                //设置相关参数
                bootstrap.group(group) //设置线程组
                        .channel(NioSocketChannel.class) // 设置客户端通道的实现类(反射)
                        .handler(new ChannelInitializer<SocketChannel>() {
                            @Override
                            protected void initChannel(SocketChannel ch) throws Exception {
                               ch.pipeline().addLast(new NettyClientHandler()); //加入自己的处理器
                            }
                        });
                System.out.println("client is ok..");
                ChannelFuture   channelFuture = bootstrap.connect("127.0.0.1", 6668).sync();
                channelFuture.addListener(new ChannelFutureListener() {
                    @Override
                    public void operationComplete(ChannelFuture future) throws Exception {
                        if (channelFuture.isSuccess()) {
                            System.out.println("连接 6668 成功");
                        } else {
                            System.out.println("连接 6668 失败");
                        }
                    }
                });
                //给关闭通道进行监听
                channelFuture.channel().closeFuture().sync();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                group.shutdownGracefully();
            }
        }
    }

      4、clientHandler

    public class NettyClientHandler extends ChannelInboundHandlerAdapter {
        //当通道就绪就会触发该方法
        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
            System.out.println("client " + ctx);
            ctx.writeAndFlush(Unpooled.copiedBuffer("hello, server: miaomiao", CharsetUtil.UTF_8));
        }
    
        //当通道有读取事件时,会触发
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            ByteBuf buf = (ByteBuf) msg;
            System.out.println("response:" + buf.toString(CharsetUtil.UTF_8));
        }
    
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            cause.printStackTrace();
            ctx.close();
        }
    }

      上面的例子非常简单,就是启动NettyClient后向NettyServer写入信息,NettyServer在接收到信息后,在向NettyClient写入消息,分别在channelRead和channelReadComplete写入了两条信息,NettyClient执行的结果为:

    response:hello, Client No 2 hello, Client No 1 

      

      但是这样就会存在一个问题,如果ServerHandler中channelRead中处理的是一个比较耗时的操作,例如与数据库交互等内容,那么就会阻塞相应,下面的代码就是模拟这种情况,代码也非常简单,就是向客户端写入数据前先休眠10秒钟。

      @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            Thread.sleep(10 * 1000);
            ctx.writeAndFlush(Unpooled.copiedBuffer(System.currentTimeMillis() + "hello, Client No 2 ", CharsetUtil.UTF_8));
            System.out.println(System.currentTimeMillis() + "go on ...");
        }

      上面代码的最终执行结果就是,需要等待十秒,服务端才会输出go on,同样的,客户端也需要等待十秒,才能拿到响应结果(channelRead的结果和channelComplete的结果)

      那么如何解决这个问题呢,这里有两种结局方案:使用自定义任务提交到TaskQueue中,或者使用自定义的定时任务提交到scheduleTaskQueue中。

    (二)TaskQueue

      1、使用方法如下所示:

        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            System.out.println("channelReadThread:"+Thread.currentThread().getName());// 解决方案1 用户程序自定义的普通任务
            ctx.channel().eventLoop().execute(new Runnable() {
                @Override
                public void run() {
                    try {
                        Thread.sleep(10 * 1000);
                        System.out.println("eventLoop().execute Thread1:"+Thread.currentThread().getName());
                        //请求讲当前的msg 通过ChannelPipeline 写入数据到目标Channel 中。值得注意的是:write 操作只是将消息存入到消息发送环形数组中,并没有真正被发送,只有调用flush 操作才会被写入到Channel 中,发送给对方。
                        ctx.writeAndFlush(Unpooled.copiedBuffer("hello, Client No 2 ", CharsetUtil.UTF_8));
                        System.out.println("Client No 2 success,"+Thread.currentThread().getName());
                    } catch (Exception ex) {
                        System.out.println("发生异常" + ex.getMessage());
                    }
                }
            });
    
            ctx.channel().eventLoop().execute(new Runnable() {
                @Override
                public void run() {
                    try {
                        Thread.sleep(5 * 1000);
                        System.out.println("eventLoop().execute Thread2:"+Thread.currentThread().getName());
                        //请求讲当前的msg 通过ChannelPipeline 写入数据到目标Channel 中。值得注意的是:write 操作只是将消息存入到消息发送环形数组中,并没有真正被发送,只有调用flush 操作才会被写入到Channel 中,发送给对方。
                        ctx.writeAndFlush(Unpooled.copiedBuffer("hello, Client No 3 ", CharsetUtil.UTF_8));
                        System.out.println("Client No 3 success,"+Thread.currentThread().getName());
                    } catch (Exception ex) {
                        System.out.println("发生异常" + ex.getMessage());
                    }
                }
            });
            System.out.println(System.currentTimeMillis() + "go on ...");
    
        }

      上述代码中,手动向taskQueue中添加了两个任务,这样的话,go on就会先输出,Server不会阻塞。

      上面的代码添加了两个任务,一个休眠了10秒,一个休眠了5秒,视为了验证在TaskQueue中的任务是串行执行的。在客户端的输出加上日期后,可以看到输出结果如下所示,也就验证了在TaskQueue中的任务是串行执行的。

    response:2021-09-13T06:08:35.899hello, Client No 2 
    response:2021-09-13T06:08:40.850hello, Client No 3 
    response:2021-09-13T06:08:40.850hello, Client No 1 

      从代码的debug中也可以看到,这两个任务是提交到了ctx.pipeline.channel.eventloop.taskQueue中。 

           

      服务端的输出结果为:

    channelReadThread:nioEventLoopGroup-3-1
    1631499116759go on ...
    channelReadCompleteThread:nioEventLoopGroup-3-1
    eventLoop().execute Thread1:nioEventLoopGroup-3-1
    Client No 2 success,nioEventLoopGroup-3-1
    eventLoop().execute Thread2:nioEventLoopGroup-3-1
    Client No 3 success,nioEventLoopGroup-3-1

      可以看到,无论是channelRead中单起的线程还是channelRead、channelReadComplete中的IO线程,使用的都是同一个线程。 

    (三)scheduleTaskQueue

      第二种解决方案就是使用自定义的定时任务提交到scheduleTaskQueue中,实现如下所示:

        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            System.out.println("channelReadThread:"+Thread.currentThread().getName());
            //解决方案2 : 用户自定义定时任务 -》 该任务是提交到 scheduleTaskQueue中
            ctx.channel().eventLoop().schedule(new Runnable() {
                @Override
                public void run() {
    
                    try {
                        Thread.sleep(5 * 1000);
                        System.out.println("channelReadThread3:"+Thread.currentThread().getName());
                        ctx.writeAndFlush(Unpooled.copiedBuffer("hello, Client No 4", CharsetUtil.UTF_8));
                        System.out.println("channel code=" + ctx.channel().hashCode());
                    } catch (Exception ex) {
                        System.out.println("发生异常" + ex.getMessage());
                    }
                }
            }, 5, TimeUnit.SECONDS);
    
            System.out.println(System.currentTimeMillis() + "go on ...");
    
        }

      最终的输出结果如下所示

    response:2021-09-13T06:34:34.791hello, Client No 1 
    response:2021-09-13T06:34:44.724hello, Client No 4

      可以看到,其是延迟了5秒执行的,然后又阻塞了5秒,最终才输出,但是从输出的结果看,channelReadComplete不再依赖这个的结果,直接进行了返回。

      同时从debug的结果看,任务是被提交到了ctx.pipeline.channel.eventloop.scheduleTaskQueue中

          

       服务端的输出结果如下所示:

    channelReadThread:nioEventLoopGroup-3-1
    1631499905539go on ...
    channelReadCompleteThread:nioEventLoopGroup-3-1
    channelReadThread3:nioEventLoopGroup-3-1
    channel code=1068232496

      可以看到,和taskQueue一样,定时任务的线程和IO线程使用的是同一个线程。

    (四)向其它线程调度任务

      在Netty的使用中,我们不仅可以对当前的客户端写入数据,同时也可以对其他的客户端写入数据,就以群聊为例(完整代码可以参照:Netty编码示例

      使用方法就是使用一个ChannelGroup,在客户端链接时添加,在客户端离线时移除,在需要对所有客户端写入数据时,循环ChannelGroup中的ChannelHandlerContext即可。

    public class GroupChatServerHandler extends SimpleChannelInboundHandler<String> {//handlerAdded 表示连接建立,一旦连接,第一个被执行
        //将当前channel 加入到  channelGroup
        @Override
        public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
            Channel channel = ctx.channel();
            channelGroup.writeAndFlush("[客户端]" + channel.remoteAddress() + " 加入聊天" + sdf.format(new java.util.Date()) + " 
    ");
            channelGroup.add(channel);
        }
    
        //断开连接, 将xx客户离开信息推送给当前在线的客户
        @Override
        public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
            Channel channel = ctx.channel();
            channelGroup.writeAndFlush("[客户端]" + channel.remoteAddress() + " 离开了
    ");
            System.out.println("channelGroup size=" + channelGroup.size());
    
        }//读取数据
        @Override
        protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
            //获取到当前channel
            Channel channel = ctx.channel();
            //这时我们遍历channelGroup, 根据不同的情况,回送不同的消息
            channelGroup.forEach(ch -> {
                if (channel != ch) { //不是当前的channel,转发消息
                    ch.writeAndFlush("[客户]" + channel.remoteAddress() + " 发送了消息" + msg + "
    ");
                } else {//回显自己发送的消息给自己
                    ch.writeAndFlush("[自己]发送了消息" + msg + "
    ");
                }
            });
        }
    }

    (五)异步任务调度总结

      1、TaskQueue如何使用

          (1)⾃定义任务 : ⾃⼰开发的任务 , 然后将该任务提交到任务队列中 ;

          (2)⾃定义定时任务 : ⾃⼰开发的任务 , 然后将该任务提交到任务队列中 , 同时可以指定任务的执⾏时间 ;

          (3)其它线程调度任务

      2、Handler同步异步

        ⽤户⾃定义的 Handler 处理器 , 该处理器继承了 ChannelInboundHandlerAdapter 类 , 在重写的public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception ⽅法中 , 执⾏的业务逻辑要注意以下两点 :

          同步操作 : 如果在该业务逻辑中只执⾏⼀个短时间的操作 , 那么可以直接执⾏ ;

          异步操作 : 如果在该业务逻辑中执⾏访问数据库 , 访问⽹络 , 读写本地⽂件 , 执⾏⼀系列复杂计算等耗时操作 , 肯定不能在该⽅法中处理 , 这样会阻塞整个线程 ; 正确的做法是将耗时的操作放⼊任务队列 TaskQueue , 异步执⾏ ;

      3、⾃定义任务的执行顺序

        如果⽤户连续向任务队列中放⼊了多个任务 , NioEventLoop 会按照顺序先后执⾏这些任务 , 注意任务队列中的任务 是先后执⾏ , 不是同时执⾏ ;

        顺序执⾏任务 ( 不是并发 ) : 任务队列任务执⾏机制是顺序执⾏的 ; 先执⾏第⼀个 , 执⾏完毕后 , 从任务队列中获取第⼆个任务 , 执⾏完毕之后 , 依次从任务队列中取出任务执⾏ , 前⼀个任务执⾏完毕后 ,才从任务队列中取出下⼀个任务执⾏ ;

      4、⾃定义任何和自定义定时任务

        ⽤户⾃定义定时任务 与 ⽤户⾃定义任务流程基本类似:

        (1)调度⽅法 :

            定时异步任务使⽤ schedule ⽅法进⾏调度

            普通异步任务使⽤ execute ⽅法进⾏调度

        (2)任务队列 :

            定时异步任务提交到 ScheduleTaskQueue 任务队列中 ;

            普通异步任务提交到 TaskQueue 任务队列中 ;

      5、向其它线程调度任务

        在服务器中使⽤ Map 集合管理该 Channel 通道 , 需要时根据⽤户标识信息 , 获取该通道 , 向该客户端通道对应的 NioEventLoop 线程中调度任务 ;

    二、异步线程池

      上面提到了使用异步任务调度所使用的的线程仍然是和IO操作是同一个线程,因此如果做的是比较耗时的工作或不可预料的操作,⽐如数据库,⽹络请求,会严重影响 Netty 对 Socket 的处理速度。⽽解决⽅法就是将耗时任务添加到异步线程池中。

      但就添加线程池这步操作来讲,可以有2种⽅式:

        handler 中加⼊线程池

        Context 中添加线程池

    (一)handler 中加⼊线程池

      1、如何使用

      使用方法比较简单,就是创建了一个EventExecutorGroup,并向其中提交任务。

        private static final EventExecutorGroup group = new DefaultEventExecutorGroup(8);
    
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            System.out.println("channelReadThread:"+Thread.currentThread().getName());
            ctx.writeAndFlush(Unpooled.copiedBuffer(System.currentTimeMillis() + "hello, Client No 2 ", CharsetUtil.UTF_8));
            group.submit(new Runnable() {
                @Override
                public void run() {
                    System.out.println("groupThread:"+Thread.currentThread().getName());
                    ctx.writeAndFlush(Unpooled.copiedBuffer("hello, Client No 2 ", CharsetUtil.UTF_8));
                }
            });
            System.out.println("go on ...");
        }

       启动两个Client,Server输出结果:

    channelReadThread:nioEventLoopGroup-3-1
    go on ...
    channelReadCompleteThread:nioEventLoopGroup-3-1
    groupThread:defaultEventExecutorGroup-5-1
    客户socketchannel hashcode=1589534751
    channelReadThread:nioEventLoopGroup-3-2
    go on ...
    channelReadCompleteThread:nioEventLoopGroup-3-2
    groupThread:defaultEventExecutorGroup-5-2

      可以看到,这样就是用了与IO操作不同的线程来处理业务逻辑。并且每个客户端的请求使用的都是不同的线程。

      2、源码

      源码部分主要就是在write方法中,可以看到,其判断了提交的异步执行任务是否是一个EventLoop事件,如果是,做后续的判断处理,如果不是,则创建一个新的线程执行。

      这里走的是后一种逻辑,直接创建了一个新的线程池。

    abstract class AbstractChannelHandlerContext implements ChannelHandlerContext,
    ResourceLeakHint {
        ....
            private void write(Object msg, boolean flush, ChannelPromise promise) {
            ObjectUtil.checkNotNull(msg, "msg");
    
            try {
                if (this.isNotValidPromise(promise, true)) {
                    ReferenceCountUtil.release(msg);
                    return;
                }
            } catch (RuntimeException var8) {
                ReferenceCountUtil.release(msg);
                throw var8;
            }
    
            AbstractChannelHandlerContext next = this.findContextOutbound(flush ? 98304 : '耀');
            Object m = this.pipeline.touch(msg, next);
            EventExecutor executor = next.executor();
            if (executor.inEventLoop()) {
                if (flush) {
                    next.invokeWriteAndFlush(m, promise);
                } else {
                    next.invokeWrite(m, promise);
                }
            } else {
                AbstractChannelHandlerContext.WriteTask task = AbstractChannelHandlerContext.WriteTask.newInstance(next, m, promise, flush);
                if (!safeExecute(executor, task, promise, m, !flush)) {
                    task.cancel();
                }
            }
    
        }      
    }

    (二)Context中添加线程池

     在调⽤addLast⽅法添加线程池后,handler将优先使⽤这个线程池,如果不添加,将使⽤IO线程。

    private static final EventExecutorGroup group =new DefaultEventExecutorGroup(2);
    pipeline.addLast(group,new NettyServerHandler());

      输出结果

    channelReadThread:defaultEventExecutorGroup-4-1
    go on ...
    channelReadCompleteThread:defaultEventExecutorGroup-4-1
    客户socketchannel hashcode=-678377054
    channelReadThread:defaultEventExecutorGroup-4-2
    go on ...
    channelReadCompleteThread:defaultEventExecutorGroup-4-2

    (三)两者对比

      在handler中添加异步,可能更加的⾃由,⽐如如果需要访问数据库,那我就异步,如果不需要就不异步,异步会拖⻓接⼝响应时间。因为需要将任务放进task中,如果IO时间很短,task很多,可能⼀个循环下来,都没时间执⾏整个task,导致响应时间不达标。

      Context中添加线程池⽅式是Netty标准⽅式即加⼊到队列,但是这么做会将整个handler都交给业务线程池,不论耗时不耗时都加⼊队列,不够灵活。

    ------------------------------------------------------------------
    -----------------------------------------------------------
    ---------------------------------------------
    朦胧的夜 留笔~~
  • 相关阅读:
    C#的内存管理原理解析+标准Dispose模式的实现
    深入理解C#:编程技巧总结(二)
    深入理解C#:编程技巧总结(一)
    深刻理解:C#中的委托、事件
    你知道JavaScript中的结果值是什么吗?
    switch语句的妙用
    相等比较、关系比较总结
    用ServiceStack操作使用redis的问题
    springmvc 处理put,delete请求
    easyui 验证动态添加和删除问题
  • 原文地址:https://www.cnblogs.com/liconglong/p/15221451.html
Copyright © 2011-2022 走看看