zoukankan      html  css  js  c++  java
  • Netty自定义任务&Future-Listener机制

    1. 自定义任务

    常见场景:

    (1)  用户程序自定义的普通任务

    (2)  用户自定义定时任务

    (3) 非当前Reactor线程调用Channel的各种方法(这种解决办法就是连接成功之后用一个集合将Channel维护起来,后面拿到做操作)

      例如在推送系统的业务线程里面,根据用户的标识,找到对应的channel,然后向该channel推送数据。

    1.  自定义普通任务

    假设我们在服务端读取到消息之后需要一个比较耗时的处理,比如:

    NettyServer如下:

    package netty;
    
    import io.netty.bootstrap.ServerBootstrap;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.ChannelOption;
    import io.netty.channel.EventLoopGroup;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioServerSocketChannel;
    
    import java.util.logging.Level;
    import java.util.logging.Logger;
    
    public class NettyServer {
    
        public static void main(String[] args) throws InterruptedException {
            // 1. 创建bossGrou好eworkerGroup
            // bossGroup只负责连接请求,其他交给workerGroup, 两个都是无线循环
            EventLoopGroup bossGroup = new NioEventLoopGroup(1);
            EventLoopGroup workerGroup = new NioEventLoopGroup();
    
            // 创建服务器端启动对象用于设置参数
            ServerBootstrap bootstrap = new ServerBootstrap();
            // 使用链式编程设置参数
            bootstrap.group(bossGroup, workerGroup)// 设置两个组
                    .channel(NioServerSocketChannel.class) // 设置服务器的通道
                    .option(ChannelOption.SO_BACKLOG, 128) // 设置线程队列得到连接个数
                    .childOption(ChannelOption.SO_KEEPALIVE, true) // 设置保持活动连接状态
                    .childHandler(new ChannelInitializer<SocketChannel>() { // 设置通道测试对象(匿名对象)
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            // 给pipeline添加一个handler
                            socketChannel.pipeline().addLast(new ServerHandler());
                        }
                    });
    
            System.out.println("服务端is ok。。。");
    
            // 启动服务器并绑定端口。绑定一个端口并且同步,生成一个ChannelFuture对象
            ChannelFuture channelFuture = bootstrap.bind(6666).sync();
            // 对关闭通道进行监控
            channelFuture.channel().closeFuture().sync();
        }
    }

    ServerHandler:

    package netty;
    
    
    import io.netty.buffer.ByteBuf;
    import io.netty.buffer.Unpooled;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.channel.ChannelInboundHandlerAdapter;
    import io.netty.util.CharsetUtil;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    /**
     * 自定义服务器端处理handler,需要继承netty定义的ChannelInboundHandlerAdapter 类
     */
    public class ServerHandler extends ChannelInboundHandlerAdapter {
    
        private static final Logger log = LoggerFactory.getLogger(ServerHandler.class);
    
        /**
         * 读取事件
         *
         * @param ctx 上下文对象,含有pipeline管道;通道channel;地址address等信息
         * @param msg 客户端发送的数据(实际类型是ByteBuf - netty 封装的ByteBuffer)
         * @throws Exception
         */
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            log.info("当前线程: " + Thread.currentThread().getName());
            // 强转为netty的ByteBuffer(实际就是包装的ByteBuffer)
            ByteBuf byteBuf = (ByteBuf) msg;
            log.info("客户端发送的消息是:" + byteBuf.toString(CharsetUtil.UTF_8));
            log.info("客户端地址:" + ctx.channel().remoteAddress());
    
            // 比如这里我们将一个特别耗时的任务转为异步执行(也就是任务提交到NioEventLoop的taskQueue中)
            ctx.writeAndFlush(Unpooled.copiedBuffer("hello, 客户端!0!", CharsetUtil.UTF_8));
            Thread.sleep(10 * 1000);
            ctx.writeAndFlush(Unpooled.copiedBuffer("hello, 客户端!1!", CharsetUtil.UTF_8));
        }
    
        /**
         * 数据读取完毕事件
         *
         * @param ctx
         * @throws Exception
         */
        @Override
        public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
            // 将数据写到客户端(write + flush)
            ctx.writeAndFlush(Unpooled.copiedBuffer("hello, 客户端!", CharsetUtil.UTF_8));
        }
    
        /**
         * 发生异常事件
         *
         * @param ctx
         * @param cause
         * @throws Exception
         */
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            ctx.close();
        }
    }

    读取数据里面有个Sleep(10s),模拟处理耗时10s。

    Client:

    package netty;
    
    import io.netty.bootstrap.Bootstrap;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.EventLoopGroup;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioSocketChannel;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    public class NettyClient {
    
        public static void main(String[] args) throws InterruptedException {
            // 创建一个事件循环组
            EventLoopGroup eventExecutors = new NioEventLoopGroup();
            try {
                // 创建一个启动Bootstrap(注意是Netty包下的)
                Bootstrap bootstrap = new Bootstrap();
                // 链式设置参数
                bootstrap.group(eventExecutors) // 设置线程组
                        .channel(NioSocketChannel.class) // 设置通道class
                        .handler(new ChannelInitializer<SocketChannel>() {
                            @Override
                            protected void initChannel(SocketChannel socketChannel) throws Exception {
                                socketChannel.pipeline().addLast(new ClientHandler());
                            }
                        });
                System.out.println("客户端is ok...");
    
                // 启动客户端连接服务器(ChannelFuture 是netty的异步模型)
                ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 6666).sync();
                // 监听关闭通道
                channelFuture.channel().closeFuture().sync();
            } finally {
                // 关闭
                eventExecutors.shutdownGracefully();
            }
        }
    }

    客户端处理类如下:

    package netty;
    
    
    import io.netty.buffer.ByteBuf;
    import io.netty.buffer.Unpooled;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.channel.ChannelInboundHandlerAdapter;
    import io.netty.util.CharsetUtil;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    /**
     * 自定义服务器端处理handler,需要继承netty定义的ChannelInboundHandlerAdapter 类
     */
    public class ClientHandler extends ChannelInboundHandlerAdapter {
    
        private static final Logger log = LoggerFactory.getLogger(ClientHandler.class);
    
        /**
         * 通道就绪事件
         *
         * @param ctx
         * @throws Exception
         */
        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
            log.info("ClientHandler ctx: " + ctx);
            ctx.writeAndFlush(Unpooled.copiedBuffer("hello, 服务器!", CharsetUtil.UTF_8));
        }
    
        /**
         * 读取事件
         *
         * @param ctx 上下文对象,含有pipeline管道;通道channel;地址address等信息
         * @param msg 客户端发送的数据(实际类型是ByteBuf - netty 封装的ByteBuffer)
         * @throws Exception
         */
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            // 强转为netty的ByteBuffer(实际就是包装的ByteBuffer)
            ByteBuf byteBuf = (ByteBuf) msg;
            log.info("服务器会送的消息是:" + byteBuf.toString(CharsetUtil.UTF_8));
            log.info("服务器地址:" + ctx.channel().remoteAddress());
        }
    
        /**
         * 发生异常事件
         *
         * @param ctx
         * @param cause
         * @throws Exception
         */
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            ctx.close();
        }
    }

    连接成功之后发送一条消息,然后接收服务器端回传的消息

    测试结果如下:

    客户端is ok...
    2021-03-03 22:31:06.510 | APP_NAME_IS_UNDEFINED - INFO | nioEventLoopGroup-2-1 | netty.ClientHandler | line:27 - ClientHandler ctx: ChannelHandlerContext(ClientHandler#0, [id: 0x197a62b1, L:/127.0.0.1:64388 - R:/127.0.0.1:6666])
    2021-03-03 22:31:06.857 | APP_NAME_IS_UNDEFINED - INFO | nioEventLoopGroup-2-1 | netty.ClientHandler | line:42 - 服务器会送的消息是:hello, 客户端!0!
    2021-03-03 22:31:06.858 | APP_NAME_IS_UNDEFINED - INFO | nioEventLoopGroup-2-1 | netty.ClientHandler | line:43 - 服务器地址:/127.0.0.1:6666
    2021-03-03 22:31:16.855 | APP_NAME_IS_UNDEFINED - INFO | nioEventLoopGroup-2-1 | netty.ClientHandler | line:42 - 服务器会送的消息是:hello, 客户端!1!
    2021-03-03 22:31:16.856 | APP_NAME_IS_UNDEFINED - INFO | nioEventLoopGroup-2-1 | netty.ClientHandler | line:43 - 服务器地址:/127.0.0.1:6666
    2021-03-03 22:31:16.858 | APP_NAME_IS_UNDEFINED - INFO | nioEventLoopGroup-2-1 | netty.ClientHandler | line:42 - 服务器会送的消息是:hello, 客户端!
    2021-03-03 22:31:16.859 | APP_NAME_IS_UNDEFINED - INFO | nioEventLoopGroup-2-1 | netty.ClientHandler | line:43 - 服务器地址:/127.0.0.1:6666

    可以看到服务器端的逻辑:

    (1) channelRead 处理,中间有个10s休眠

    (2) channelReadComplete处理

    解决:

    1. 将channelRead 耗时任务交给NioEventLoop的taskQueue

    修改ServerHandler:

    package netty;
    
    
    import io.netty.buffer.ByteBuf;
    import io.netty.buffer.Unpooled;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.channel.ChannelInboundHandlerAdapter;
    import io.netty.util.CharsetUtil;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    /**
     * 自定义服务器端处理handler,需要继承netty定义的ChannelInboundHandlerAdapter 类
     */
    public class ServerHandler extends ChannelInboundHandlerAdapter {
    
        private static final Logger log = LoggerFactory.getLogger(ServerHandler.class);
    
        /**
         * 读取事件
         *
         * @param ctx 上下文对象,含有pipeline管道;通道channel;地址address等信息
         * @param msg 客户端发送的数据(实际类型是ByteBuf - netty 封装的ByteBuffer)
         * @throws Exception
         */
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            log.info("当前线程: " + Thread.currentThread().getName());
            // 强转为netty的ByteBuffer(实际就是包装的ByteBuffer)
            ByteBuf byteBuf = (ByteBuf) msg;
            log.info("客户端发送的消息是:" + byteBuf.toString(CharsetUtil.UTF_8));
            log.info("客户端地址:" + ctx.channel().remoteAddress());
    
            ctx.channel().eventLoop().execute(new Runnable() {
                @Override
                public void run() {
                    // 比如这里我们将一个特别耗时的任务转为异步执行(也就是任务提交到NioEventLoop的taskQueue中)
                    ctx.writeAndFlush(Unpooled.copiedBuffer("hello, 客户端!0!", CharsetUtil.UTF_8));
                    try {
                        Thread.sleep(10 * 1000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    ctx.writeAndFlush(Unpooled.copiedBuffer("hello, 客户端!1!", CharsetUtil.UTF_8));
                }
            });
        }
    
        /**
         * 数据读取完毕事件
         *
         * @param ctx
         * @throws Exception
         */
        @Override
        public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
            // 将数据写到客户端(write + flush)
            ctx.writeAndFlush(Unpooled.copiedBuffer("hello, 客户端!", CharsetUtil.UTF_8));
        }
    
        /**
         * 发生异常事件
         *
         * @param ctx
         * @param cause
         * @throws Exception
         */
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            ctx.close();
        }
    }

    测试查看客户端日志:(可以看到channelReadComplete 没有被阻塞)

    客户端is ok...
    2021-03-03 22:39:25.664 | APP_NAME_IS_UNDEFINED - INFO | nioEventLoopGroup-2-1 | netty.ClientHandler | line:27 - ClientHandler ctx: ChannelHandlerContext(ClientHandler#0, [id: 0x6958d27f, L:/127.0.0.1:64860 - R:/127.0.0.1:6666])
    2021-03-03 22:39:25.858 | APP_NAME_IS_UNDEFINED - INFO | nioEventLoopGroup-2-1 | netty.ClientHandler | line:42 - 服务器会送的消息是:hello, 客户端!hello, 客户端!0!
    2021-03-03 22:39:25.859 | APP_NAME_IS_UNDEFINED - INFO | nioEventLoopGroup-2-1 | netty.ClientHandler | line:43 - 服务器地址:/127.0.0.1:6666
    2021-03-03 22:39:35.859 | APP_NAME_IS_UNDEFINED - INFO | nioEventLoopGroup-2-1 | netty.ClientHandler | line:42 - 服务器会送的消息是:hello, 客户端!1!
    2021-03-03 22:39:35.860 | APP_NAME_IS_UNDEFINED - INFO | nioEventLoopGroup-2-1 | netty.ClientHandler | line:43 - 服务器地址:/127.0.0.1:6666

     2. 我们将两个任务提交到taskQueue中

    package netty;
    
    
    import io.netty.buffer.ByteBuf;
    import io.netty.buffer.Unpooled;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.channel.ChannelInboundHandlerAdapter;
    import io.netty.util.CharsetUtil;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    /**
     * 自定义服务器端处理handler,需要继承netty定义的ChannelInboundHandlerAdapter 类
     */
    public class ServerHandler extends ChannelInboundHandlerAdapter {
    
        private static final Logger log = LoggerFactory.getLogger(ServerHandler.class);
    
        /**
         * 读取事件
         *
         * @param ctx 上下文对象,含有pipeline管道;通道channel;地址address等信息
         * @param msg 客户端发送的数据(实际类型是ByteBuf - netty 封装的ByteBuffer)
         * @throws Exception
         */
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            log.info("当前线程: " + Thread.currentThread().getName());
            // 强转为netty的ByteBuffer(实际就是包装的ByteBuffer)
            ByteBuf byteBuf = (ByteBuf) msg;
            log.info("客户端发送的消息是:" + byteBuf.toString(CharsetUtil.UTF_8));
            log.info("客户端地址:" + ctx.channel().remoteAddress());
    
            ctx.channel().eventLoop().execute(new Runnable() {
                @Override
                public void run() {
                    // 比如这里我们将一个特别耗时的任务转为异步执行(也就是任务提交到NioEventLoop的taskQueue中)
                    ctx.writeAndFlush(Unpooled.copiedBuffer("hello, 客户端!0!", CharsetUtil.UTF_8));
                    try {
                        Thread.sleep(10 * 1000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    ctx.writeAndFlush(Unpooled.copiedBuffer("hello, 客户端!1!", CharsetUtil.UTF_8));
                }
            });
    
            ctx.channel().eventLoop().execute(new Runnable() {
                @Override
                public void run() {
                    // 比如这里我们将一个特别耗时的任务转为异步执行(也就是任务提交到NioEventLoop的taskQueue中)
                    ctx.writeAndFlush(Unpooled.copiedBuffer("hello, 客户端!2!", CharsetUtil.UTF_8));
                    try {
                        Thread.sleep(20 * 1000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    ctx.writeAndFlush(Unpooled.copiedBuffer("hello, 客户端!3!", CharsetUtil.UTF_8));
                }
            });
        }
    
        /**
         * 数据读取完毕事件
         *
         * @param ctx
         * @throws Exception
         */
        @Override
        public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
            // 将数据写到客户端(write + flush)
            ctx.writeAndFlush(Unpooled.copiedBuffer("hello, 客户端!", CharsetUtil.UTF_8));
        }
    
        /**
         * 发生异常事件
         *
         * @param ctx
         * @param cause
         * @throws Exception
         */
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            ctx.close();
        }
    }

    启动查看客户端日志: 可以看到第二个任务是在第一个任务完了的基础上才开始。 也就是后面跑任务的只有一个线程,taskQueue可以存放多个任务。

    客户端is ok...
    2021-03-03 22:47:04.249 | APP_NAME_IS_UNDEFINED - INFO | nioEventLoopGroup-2-1 | netty.ClientHandler | line:27 - ClientHandler ctx: ChannelHandlerContext(ClientHandler#0, [id: 0xb9c4bc2d, L:/127.0.0.1:65343 - R:/127.0.0.1:6666])
    2021-03-03 22:47:04.430 | APP_NAME_IS_UNDEFINED - INFO | nioEventLoopGroup-2-1 | netty.ClientHandler | line:42 - 服务器会送的消息是:hello, 客户端!hello, 客户端!0!
    2021-03-03 22:47:04.431 | APP_NAME_IS_UNDEFINED - INFO | nioEventLoopGroup-2-1 | netty.ClientHandler | line:43 - 服务器地址:/127.0.0.1:6666
    2021-03-03 22:47:14.433 | APP_NAME_IS_UNDEFINED - INFO | nioEventLoopGroup-2-1 | netty.ClientHandler | line:42 - 服务器会送的消息是:hello, 客户端!1!
    2021-03-03 22:47:14.434 | APP_NAME_IS_UNDEFINED - INFO | nioEventLoopGroup-2-1 | netty.ClientHandler | line:43 - 服务器地址:/127.0.0.1:6666
    2021-03-03 22:47:14.436 | APP_NAME_IS_UNDEFINED - INFO | nioEventLoopGroup-2-1 | netty.ClientHandler | line:42 - 服务器会送的消息是:hello, 客户端!2!
    2021-03-03 22:47:14.437 | APP_NAME_IS_UNDEFINED - INFO | nioEventLoopGroup-2-1 | netty.ClientHandler | line:43 - 服务器地址:/127.0.0.1:6666
    2021-03-03 22:47:34.435 | APP_NAME_IS_UNDEFINED - INFO | nioEventLoopGroup-2-1 | netty.ClientHandler | line:42 - 服务器会送的消息是:hello, 客户端!3!
    2021-03-03 22:47:34.436 | APP_NAME_IS_UNDEFINED - INFO | nioEventLoopGroup-2-1 | netty.ClientHandler | line:43 - 服务器地址:/127.0.0.1:6666

    debug 查看taskQueue有两个任务

     2. 自定义定时任务

    1. 定义一次性任务

    package netty;
    
    
    import io.netty.buffer.ByteBuf;
    import io.netty.buffer.Unpooled;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.channel.ChannelInboundHandlerAdapter;
    import io.netty.util.CharsetUtil;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    import java.util.concurrent.TimeUnit;
    
    /**
     * 自定义服务器端处理handler,需要继承netty定义的ChannelInboundHandlerAdapter 类
     */
    public class ServerHandler extends ChannelInboundHandlerAdapter {
    
        private static final Logger log = LoggerFactory.getLogger(ServerHandler.class);
    
        /**
         * 读取事件
         *
         * @param ctx 上下文对象,含有pipeline管道;通道channel;地址address等信息
         * @param msg 客户端发送的数据(实际类型是ByteBuf - netty 封装的ByteBuffer)
         * @throws Exception
         */
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            log.info("当前线程: " + Thread.currentThread().getName());
            // 强转为netty的ByteBuffer(实际就是包装的ByteBuffer)
            ByteBuf byteBuf = (ByteBuf) msg;
            log.info("客户端发送的消息是:" + byteBuf.toString(CharsetUtil.UTF_8));
            log.info("客户端地址:" + ctx.channel().remoteAddress());
    
            // 比如这里我们将一个特别耗时的任务转为异步执行(也就是任务提交到NioEventLoop的scheduleTaskQueue中)
            ctx.channel().eventLoop().schedule(new Runnable() {
                @Override
                public void run() {
                    ctx.writeAndFlush(Unpooled.copiedBuffer("hello, 客户端!0!", CharsetUtil.UTF_8));
                }
            }, 5, TimeUnit.SECONDS);
        }
    
        /**
         * 数据读取完毕事件
         *
         * @param ctx
         * @throws Exception
         */
        @Override
        public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
            // 将数据写到客户端(write + flush)
            ctx.writeAndFlush(Unpooled.copiedBuffer("hello, 客户端!", CharsetUtil.UTF_8));
        }
    
        /**
         * 发生异常事件
         *
         * @param ctx
         * @param cause
         * @throws Exception
         */
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            ctx.close();
        }
    }

    客户端结果:

    客户端is ok...
    2021-03-03 22:54:41.197 | APP_NAME_IS_UNDEFINED - INFO | nioEventLoopGroup-2-1 | netty.ClientHandler | line:27 - ClientHandler ctx: ChannelHandlerContext(ClientHandler#0, [id: 0xb887b7d2, L:/127.0.0.1:49453 - R:/127.0.0.1:6666])
    2021-03-03 22:54:44.302 | APP_NAME_IS_UNDEFINED - INFO | nioEventLoopGroup-2-1 | netty.ClientHandler | line:42 - 服务器会送的消息是:hello, 客户端!
    2021-03-03 22:54:44.303 | APP_NAME_IS_UNDEFINED - INFO | nioEventLoopGroup-2-1 | netty.ClientHandler | line:43 - 服务器地址:/127.0.0.1:6666
    2021-03-03 22:54:46.513 | APP_NAME_IS_UNDEFINED - INFO | nioEventLoopGroup-2-1 | netty.ClientHandler | line:42 - 服务器会送的消息是:hello, 客户端!0!
    2021-03-03 22:54:46.514 | APP_NAME_IS_UNDEFINED - INFO | nioEventLoopGroup-2-1 | netty.ClientHandler | line:43 - 服务器地址:/127.0.0.1:6666

    2. 定义周期任务

    package netty;
    
    
    import io.netty.buffer.ByteBuf;
    import io.netty.buffer.Unpooled;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.channel.ChannelInboundHandlerAdapter;
    import io.netty.util.CharsetUtil;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    import java.util.concurrent.TimeUnit;
    
    /**
     * 自定义服务器端处理handler,需要继承netty定义的ChannelInboundHandlerAdapter 类
     */
    public class ServerHandler extends ChannelInboundHandlerAdapter {
    
        private static final Logger log = LoggerFactory.getLogger(ServerHandler.class);
    
        /**
         * 读取事件
         *
         * @param ctx 上下文对象,含有pipeline管道;通道channel;地址address等信息
         * @param msg 客户端发送的数据(实际类型是ByteBuf - netty 封装的ByteBuffer)
         * @throws Exception
         */
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            log.info("当前线程: " + Thread.currentThread().getName());
            // 强转为netty的ByteBuffer(实际就是包装的ByteBuffer)
            ByteBuf byteBuf = (ByteBuf) msg;
            log.info("客户端发送的消息是:" + byteBuf.toString(CharsetUtil.UTF_8));
            log.info("客户端地址:" + ctx.channel().remoteAddress());
    
            // 比如这里我们将一个特别耗时的任务转为异步执行(也就是任务提交到NioEventLoop的scheduleTaskQueue中)
            ctx.channel().eventLoop().scheduleAtFixedRate(new Runnable() {
                @Override
                public void run() {
                    ctx.writeAndFlush(Unpooled.copiedBuffer("hello, 客户端!0!", CharsetUtil.UTF_8));
                }
            }, 5, 5, TimeUnit.SECONDS);
        }
    
        /**
         * 数据读取完毕事件
         *
         * @param ctx
         * @throws Exception
         */
        @Override
        public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
            // 将数据写到客户端(write + flush)
            ctx.writeAndFlush(Unpooled.copiedBuffer("hello, 客户端!", CharsetUtil.UTF_8));
        }
    
        /**
         * 发生异常事件
         *
         * @param ctx
         * @param cause
         * @throws Exception
         */
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            ctx.close();
        }
    }

    2. Futture 异步模型

      Netty中的I/O是异步的,包括Bind、Write、Connect等操作会简单的返回一个ChannelFuture。Netty的异步模型是建立在future和fallback基础上的。callback是回调;future的核心思想是:假设一个方法fun,计算过程可能非常耗时,等待fun返回显然不合适。那么可以在调用fun的时候,立马返回一个future,后续通过future去监控方法的处理过程(即: future-listener机制)

    package io.netty.channel;
    
    import io.netty.util.concurrent.Future;
    import io.netty.util.concurrent.GenericFutureListener;
    
    public interface ChannelFuture extends Future<Void> {
        Channel channel();
    
        ChannelFuture addListener(GenericFutureListener<? extends Future<? super Void>> var1);
    
        ChannelFuture addListeners(GenericFutureListener... var1);
    
        ChannelFuture removeListener(GenericFutureListener<? extends Future<? super Void>> var1);
    
        ChannelFuture removeListeners(GenericFutureListener... var1);
    
        ChannelFuture sync() throws InterruptedException;
    
        ChannelFuture syncUninterruptibly();
    
        ChannelFuture await() throws InterruptedException;
    
        ChannelFuture awaitUninterruptibly();
    
        boolean isVoid();
    }

    Future接口如下:

    package io.netty.util.concurrent;
    
    import java.util.concurrent.TimeUnit;
    
    public interface Future<V> extends java.util.concurrent.Future<V> {
        boolean isSuccess();
    
        boolean isCancellable();
    
        Throwable cause();
    
        Future<V> addListener(GenericFutureListener<? extends Future<? super V>> var1);
    
        Future<V> addListeners(GenericFutureListener... var1);
    
        Future<V> removeListener(GenericFutureListener<? extends Future<? super V>> var1);
    
        Future<V> removeListeners(GenericFutureListener... var1);
    
        Future<V> sync() throws InterruptedException;
    
        Future<V> syncUninterruptibly();
    
        Future<V> await() throws InterruptedException;
    
        Future<V> awaitUninterruptibly();
    
        boolean await(long var1, TimeUnit var3) throws InterruptedException;
    
        boolean await(long var1) throws InterruptedException;
    
        boolean awaitUninterruptibly(long var1, TimeUnit var3);
    
        boolean awaitUninterruptibly(long var1);
    
        V getNow();
    
        boolean cancel(boolean var1);
    }

    工作原理图如下:

      在使用netty进行编程时,拦截操作和转换出入栈数据只需要提供callback或者使用future。这使得链式操作简单、高效,有利用编写可重用的、通用的代码。

    Future-Listener 机制如下:

    1. 当Future对象刚刚创建时,处于非完成状态,调用者可以通过返回的ChannelFuture来获取操作执行的状态,注册监听函数来执行完成后的操作。

    2. 主要方法如下:

    isDone 判断当前操作是否完成

    isSuccess 判断当前操作是否成功执行

    getCause 获取当前操作失败的原因

    isCanclled 判断当前已完成的操作是否被取消

    例如:

    package netty;
    
    import io.netty.bootstrap.ServerBootstrap;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.ChannelOption;
    import io.netty.channel.EventLoopGroup;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioServerSocketChannel;
    
    public class NettyServer {
    
        public static void main(String[] args) throws InterruptedException {
            // 1. 创建bossGrou好eworkerGroup
            // bossGroup只负责连接请求,其他交给workerGroup, 两个都是无线循环
            EventLoopGroup bossGroup = new NioEventLoopGroup(1);
            EventLoopGroup workerGroup = new NioEventLoopGroup();
    
            // 创建服务器端启动对象用于设置参数
            ServerBootstrap bootstrap = new ServerBootstrap();
            // 使用链式编程设置参数
            bootstrap.group(bossGroup, workerGroup)// 设置两个组
                    .channel(NioServerSocketChannel.class) // 设置服务器的通道
                    .option(ChannelOption.SO_BACKLOG, 128) // 设置线程队列得到连接个数
                    .childOption(ChannelOption.SO_KEEPALIVE, true) // 设置保持活动连接状态
                    .childHandler(new ChannelInitializer<SocketChannel>() { // 设置通道测试对象(匿名对象)
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            // 给pipeline添加一个handler
                            socketChannel.pipeline().addLast(new ServerHandler());
                        }
                    });
    
            System.out.println("服务端is ok。。。 异步绑定端口");
    
            // 启动服务器并绑定端口。绑定一个端口并且同步,生成一个ChannelFuture对象
            ChannelFuture channelFuture = bootstrap.bind(6666).sync();
            // 绑定端口是异步操作,添加监听器进行处理
            channelFuture.addListener(future -> {
                // isDone 方法来判断当前操作是否完成
                System.out.println(future.isDone());
                // isSuccess 方法来判断已完成的当前操作是否成功;
                System.out.println(future.isSuccess());
                // isCancelled 方法来判断已完成的当前操作是否被取消;
                System.out.println(future.isCancelled());
                if (future.isSuccess()) {
                    System.out.println("绑定成功");
                } else {
                    System.err.println("6666 端口绑定失败");
                }
            });
    
            // 对关闭通道进行监控
            channelFuture.channel().closeFuture().sync();
        }
    }

    结果:

    服务端is ok。。。 异步绑定端口
    true
    true
    false
    绑定成功
    【当你用心写完每一篇博客之后,你会发现它比你用代码实现功能更有成就感!】
  • 相关阅读:
    跨域解决方法
    css之line-height
    untiy项目中使用MD5加密
    unity给子物体添加Shader
    unity中UI坐标转3d世界坐标
    unity项目字符串转为Vector3和Quaternion
    unity中使用Highlighting System v4.0插件给物体添加高亮
    加载AssetBundle方法
    Lua面向对象----类、继承、多继承、单例的实现
    Lua学习笔记(一)-----C#和lua的交互
  • 原文地址:https://www.cnblogs.com/qlqwjy/p/14477603.html
Copyright © 2011-2022 走看看