zoukankan      html  css  js  c++  java
  • Netty组件(一)

    一、EventLoop

      EventLoop 本质是一个单线程执行器(同时维护了一个 Selector),里面有 run 方法处理一个或多个 Channel 上源源不断的 io 事件。继承自 j.u.c.ScheduledExecutorService 因此包含了线程池中所有的方法,除此之外,继承自netty自己的OrderedEventExecutor并提供了boolean inEventLoop(Thread thread) 方法判断一个线程是否属于此 EventLoop,EventLoopGroup parent()方法来看看自己属于哪个EventLoopGroup。

    二、EventLoopGroup

      EventLoopGroup是一组 EventLoop,Channel 一般会调用 EventLoopGroup的register方法来绑定其中一个 EventLoop,后续这个 Channel 上的 io 事件都由此 EventLoop 来处理(保证了 io 事件处理时的线程安全)。继承了Iterable接口方便迭代所有的EventLoop,定义了next方法用于获取下一个EventLoop。看一段示例代码,

    public class TestEventLoop {
        public static void main(String[] args) throws InterruptedException {
            // 创建拥有两个EventLoop的NioEventLoopGroup,对应两个线程
            EventLoopGroup group = new NioEventLoopGroup(2);
            // 通过next方法可以获得下一个 EventLoop
            System.out.println(group.next());
            System.out.println(group.next());
    
            // 通过EventLoop执行普通任务
            group.next().execute(() -> {
                System.out.println(DateUtil.getTime() + " " + Thread.currentThread().getName() + " hello");
            });
    
            // 通过EventLoop执行定时任务
            group.next().scheduleAtFixedRate(() -> {
                System.out.println(DateUtil.getTime() + " " + Thread.currentThread().getName() + " hello2");
            }, 0, 2, TimeUnit.SECONDS);
    
            TimeUnit.SECONDS.sleep(4);
            // 优雅地关闭
            group.shutdownGracefully();
        }
    }

    输出结果:

    io.netty.channel.nio.NioEventLoop@5ae50ce6
    io.netty.channel.nio.NioEventLoop@6f96c77
    16:17:56 nioEventLoopGroup-2-2 hello2
    16:17:56 nioEventLoopGroup-2-1 hello
    16:17:58 nioEventLoopGroup-2-2 hello2
    16:18:00 nioEventLoopGroup-2-2 hello2

    关闭 EventLoopGroup

    优雅关闭 shutdownGracefully 方法。该方法会首先切换 EventLoopGroup 到关闭状态从而拒绝新的任务的加入,然后在任务队列的任务都处理完成后,停止线程的运行,从而确保整体应用是在正常有序的状态下退出的。

    处理IO任务

    服务端

    public class MyServer {
        public static void main(String[] args) {
            new ServerBootstrap()
                    // 两个Group,分别为Boss 负责Accept事件,Worker 负责读写事件
                    .group(new NioEventLoopGroup(1), new NioEventLoopGroup(2))
                    .channel(NioServerSocketChannel.class)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            socketChannel.pipeline().addLast(new ChannelInboundHandlerAdapter() {
                                @Override
                                public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                                    ByteBuf buf = (ByteBuf) msg;
                                    System.out.println(Thread.currentThread().getName() + " " + buf.toString(StandardCharsets.UTF_8));
                                }
                            });
                        }
                    })
                    .bind(8080);
        }
    }

    客户端

    public class MyClient {
        public static void main(String[] args) throws IOException, InterruptedException {
            Channel channel = new Bootstrap()
                    .group(new NioEventLoopGroup())
                    .channel(NioSocketChannel.class)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            socketChannel.pipeline().addLast(new StringEncoder());
                        }
                    })
                    .connect(new InetSocketAddress("localhost", 8080))
                    .sync()
                    .channel();
            System.out.println(channel);
            // 此处打断点调试,调用 channel.writeAndFlush(...);
            System.in.read();
        }
    }

    一个EventLoop可以负责多个Channel,且EventLoop一旦与Channel绑定,则一直负责处理该Channel中的事件。

    增加自定义EventLoopGroup

    当有的任务需要较长的时间处理时,可以使用非NioEventLoopGroup,避免同一个NioEventLoop中的其他Channel在较长的时间内都无法得到处理。

    public class MyServer2 {
        public static void main(String[] args) {
            // 增加自定义的非NioEventLoopGroup
            EventLoopGroup group = new DefaultEventLoopGroup();
    
            new ServerBootstrap()
                    .group(new NioEventLoopGroup(1), new NioEventLoopGroup(2))
                    .channel(NioServerSocketChannel.class)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            // 增加两个handler,第一个使用NioEventLoopGroup处理,第二个使用自定义EventLoopGroup处理
                            socketChannel.pipeline().addLast("nioHandler", new ChannelInboundHandlerAdapter() {
                                @Override
                                public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                                    ByteBuf buf = (ByteBuf) msg;
                                    System.out.println(Thread.currentThread().getName() + " " + buf.toString(StandardCharsets.UTF_8));
                                    // 调用下一个handler
                                    ctx.fireChannelRead(msg);
                                }
                                // 该handler绑定自定义的Group
                            }).addLast(group, "myHandler", new ChannelInboundHandlerAdapter() {
                                @Override
                                public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                                    ByteBuf buf = (ByteBuf) msg;
                                    System.out.println(Thread.currentThread().getName() + " " + buf.toString(StandardCharsets.UTF_8));
                                }
                            });
                        }
                    }).bind(8080);
        }
    }

    三、Channel

    1、Channel 的常用方法

    close() 可以用来关闭Channel
    closeFuture() 用来处理 Channel 的关闭
    sync 方法作用是同步等待 Channel 关闭
    addListener 方法是异步等待 Channel 关闭
    pipeline() 方法用于添加处理器
    write() 方法将数据写入
    因为缓冲机制,数据被写入到 Channel 中以后,不会立即被发送
    只有当缓冲满了或者调用了flush()方法后,才会将数据通过 Channel 发送出去
    writeAndFlush() 方法将数据写入并立即发送(刷出)

    2、ChannelFuture

    public class MyClient2 {
        public static void main(String[] args) throws IOException, InterruptedException {
            ChannelFuture channelFuture = new Bootstrap()
                    .group(new NioEventLoopGroup())
                    .channel(NioSocketChannel.class)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            socketChannel.pipeline().addLast(new StringEncoder());
                        }
                    })
                    // 该方法为异步非阻塞方法,主线程调用后不会被阻塞,真正去执行连接操作的是NIO线程
                    // NIO线程:NioEventLoop 中的线程
                    .connect(new InetSocketAddress("localhost", 8080));
    
            // 该方法用于等待连接真正建立
            channelFuture.sync();
    
            // 获取客户端-服务器之间的Channel对象
            Channel channel = channelFuture.channel();
            channel.writeAndFlush("hello world");
            System.in.read();
        }
    }

    如果我们去掉channelFuture.sync()方法,服务器会无法收到hello world。

    这是因为建立连接(connect)的过程是异步非阻塞的,若不通过sync()方法阻塞主线程,等待连接真正建立,这时通过 channelFuture.channel() 拿到的 Channel 对象,并不是真正与服务器建立好连接的 Channel,也就没法将信息正确的传输给服务器端。

    所以需要通过channelFuture.sync()方法,阻塞主线程,同步处理结果,等待连接真正建立好以后,再去获得 Channel 传递数据。使用该方法,获取 Channel 和发送数据的线程都是主线程。

    下面还有一种方法,用于异步获取建立连接后的 Channel 和发送数据,使得执行这些操作的线程是 NIO 线程(去执行connect操作的线程)

    addListener方法

    通过这种方法可以在NIO线程中获取 Channel 并发送数据,而不是在主线程中执行这些操作

    public class MyClient3 {
        public static void main(String[] args) throws IOException {
            ChannelFuture channelFuture = new Bootstrap()
                    .group(new NioEventLoopGroup())
                    .channel(NioSocketChannel.class)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            socketChannel.pipeline().addLast(new StringEncoder());
                        }
                    })
                    // 该方法为异步非阻塞方法,主线程调用后不会被阻塞,真正去执行连接操作的是NIO线程
                    // NIO线程:NioEventLoop 中的线程
                    .connect(new InetSocketAddress("localhost", 8080));
    
            // 当connect方法执行完毕后,也就是连接真正建立后
            // 会在NIO线程中调用operationComplete方法
            channelFuture.addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture channelFuture) throws Exception {
                    Channel channel = channelFuture.channel();
                    channel.writeAndFlush("hello world");
                }
            });
            System.in.read();
        }
    }

    关闭channel

    public class MyClient4 {
        public static void main(String[] args) throws InterruptedException {
            // 创建EventLoopGroup,使用完毕后关闭
            NioEventLoopGroup group = new NioEventLoopGroup();
    
            ChannelFuture channelFuture = new Bootstrap()
                    .group(group)
                    .channel(NioSocketChannel.class)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            socketChannel.pipeline().addLast(new StringEncoder());
                        }
                    })
                    .connect(new InetSocketAddress("localhost", 8080));
            channelFuture.sync();
    
            Channel channel = channelFuture.channel();
            Scanner scanner = new Scanner(System.in);
    
            // 创建一个线程用于输入并向服务器发送
            new Thread(() -> {
                while (true) {
                    String msg = scanner.next();
                    if ("q".equals(msg)) {
                        // 关闭操作是异步的,在NIO线程中执行
                        channel.close();
                        break;
                    }
                    channel.writeAndFlush(msg);
                }
            }, "inputThread").start();
    
            // 获得closeFuture对象
            ChannelFuture closeFuture = channel.closeFuture();
            System.out.println("waiting close...");
    
            // 同步等待NIO线程执行完close操作
            closeFuture.sync();
    
            // 关闭之后执行一些操作,可以保证执行的操作一定是在channel关闭以后执行的
            System.out.println("关闭之后执行一些额外操作...");
    
            // 关闭EventLoopGroup
            group.shutdownGracefully();
        }
    }

    当我们要关闭channel时,可以调用channel.close()方法进行关闭。但是该方法也是一个异步方法。真正的关闭操作并不是在调用该方法的线程中执行的,而是在NIO线程中执行真正的关闭操作

    如果我们想在channel真正关闭以后,执行一些额外的操作,可以选择以下两种方法来实现:

    1、通过channel.closeFuture()方法获得对应的ChannelFuture对象,然后调用sync()方法阻塞执行操作的线程,等待channel真正关闭后,再执行其他操作。

    // 获得closeFuture对象
    ChannelFuture closeFuture = channel.closeFuture();
    
    // 同步等待NIO线程执行完close操作
    closeFuture.sync();

    2、调用closeFuture.addListener方法,添加close的后续操作

    closeFuture.addListener(new ChannelFutureListener() {
        @Override
        public void operationComplete(ChannelFuture channelFuture) throws Exception {
            // 等待channel关闭后才执行的操作
            System.out.println("关闭之后执行一些额外操作...");
            // 关闭EventLoopGroup
            group.shutdownGracefully();
        }
    });

    四、Future与Promise

    netty 的 Future 继承自 jdk 的 Future,而 Promise 又对 netty Future 进行了扩展,jdk Future只能同步等待任务结束(或成功、或失败)才能得到结果,netty Future 可以同步等待任务结束得到结果,也可以异步方式得到结果,但都是要等任务结束。netty Promise 不仅有 netty Future 的功能,而且脱离了任务独立存在,只作为两个线程间传递结果的容器。继承关系类图如下:

    主要功能方法如下:

    功能/名称jdk Futurenetty FuturePromise
    cancel 取消任务 - -
    isCanceled 任务是否取消 - -
    isDone 任务是否完成,不能区分成功失败 - -
    get 获取任务结果,阻塞等待 - -
    getNow - 获取任务结果,非阻塞,还未产生结果时返回 null -
    await - 等待任务结束,如果任务失败,不会抛异常,而是通过 isSuccess 判断 -
    sync - 等待任务结束,如果任务失败,抛出异常 -
    isSuccess - 判断任务是否成功 -
    cause - 获取失败信息,非阻塞,如果没有失败,返回null -
    addLinstener - 添加回调,异步接收结果 -
    setSuccess - - 设置成功结果
    setFailure - - 设置失败结果

    JDK Future

    public class JdkFuture {
        public static void main(String[] args) throws ExecutionException, InterruptedException {
            ThreadFactory factory = new ThreadFactory() {
                @Override
                public Thread newThread(Runnable r) {
                    return new Thread(r, "JdkFuture");
                }
            };
            // 创建线程池
            ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 10,10, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10), factory);
    
            // 获得Future对象
            Future<Integer> future = executor.submit(new Callable<Integer>() {
    
                @Override
                public Integer call() throws Exception {
                    TimeUnit.SECONDS.sleep(1);
                    return 50;
                }
            });
    
            // 通过阻塞的方式,获得运行结果
            System.out.println(future.get());
        }
    }

    Netty Future

    public class NettyFuture {
        public static void main(String[] args) throws ExecutionException, InterruptedException {
            NioEventLoopGroup group = new NioEventLoopGroup();
    
            // 获得 EventLoop 对象
            EventLoop eventLoop = group.next();
            Future<Integer> future = eventLoop.submit(new Callable<Integer>() {
                @Override
                public Integer call() throws Exception {
                    return 50;
                }
            });
    
            // 主线程中获取结果
            System.out.println(Thread.currentThread().getName() + " 获取结果");
            System.out.println("getNow " + future.getNow());
            System.out.println("get " + future.get());
    
            // NIO线程中异步获取结果
            future.addListener(new GenericFutureListener<Future<? super Integer>>() {
                @Override
                public void operationComplete(Future<? super Integer> future) throws Exception {
                    System.out.println(Thread.currentThread().getName() + " 获取结果");
                    System.out.println("getNow " + future.getNow());
                }
            });
        }
    }

    运行结果:

    main 获取结果
    getNow null
    get 50
    nioEventLoopGroup-2-1 获取结果
    getNow 50

    Netty中的Future对象,可以通过EventLoop的sumbit()方法得到,可以通过getNow方法,获取结果,若还没有结果,则返回null,该方法是非阻塞的,也可以通过Future对象的get方法,阻塞地获取返回结果,还可以通过future.addListener方法,在Callable方法执行的线程中,异步获取返回结果。

    Netty Promise

    public class NettyPromise {
        public static void main(String[] args) throws ExecutionException, InterruptedException {
            // 创建EventLoop
            NioEventLoopGroup group = new NioEventLoopGroup();
            EventLoop eventLoop = group.next();
    
            // 创建Promise对象,用于存放结果
            DefaultPromise<Integer> promise = new DefaultPromise<>(eventLoop);
    
            eventLoop.execute(()->{
                try {
                    TimeUnit.SECONDS.sleep(1);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                // 自定义线程向Promise中存放结果
                promise.setSuccess(50);
            });
            // 主线程从Promise中获取结果
            System.out.println(Thread.currentThread().getName() + " " + promise.get());
        }
    }

    Promise相当于一个容器,可以用于存放各个线程中的结果,然后让其他线程去获取该结果。

    五、Handler与Pipeline

    Pipeline

    public class PipeLineServer {
        public static void main(String[] args) {
            new ServerBootstrap()
                    .group(new NioEventLoopGroup(1),new NioEventLoopGroup())
                    .channel(NioServerSocketChannel.class)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            // 在socketChannel的pipeline中添加handler
                            // pipeline中handler是带有head与tail节点的双向链表,的实际结构为
                            // head <-> handler1 <-> ... <-> handler4 <->tail
                            // Inbound主要处理入站操作,一般为读操作,发生入站操作时会触发Inbound方法
                            // 入站时,handler是从head向后调用的
                            socketChannel.pipeline().addLast("handler1", new ChannelInboundHandlerAdapter() {
                                @Override
                                public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                                    System.out.println(Thread.currentThread().getName() + " Inbound handler 1");
                                    // 父类该方法内部会调用fireChannelRead
                                    // 将数据传递给下一个handler
                                    super.channelRead(ctx, msg);
                                }
                            });
                            socketChannel.pipeline().addLast("handler2", new ChannelInboundHandlerAdapter() {
                                @Override
                                public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                                    System.out.println(Thread.currentThread().getName() + " Inbound handler 2");
                                    // 执行write操作,使得Outbound的方法能够得到调用
                                    socketChannel.writeAndFlush(ctx.alloc().buffer().writeBytes("Server...".getBytes(StandardCharsets.UTF_8)));
                                    super.channelRead(ctx, msg);
                                }
                            });
                            // Outbound主要处理出站操作,一般为写操作,发生出站操作时会触发Outbound方法
                            // 出站时,handler的调用是从tail向前调用的
                            socketChannel.pipeline().addLast("handler3", new ChannelOutboundHandlerAdapter() {
                                @Override
                                public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
                                    System.out.println(Thread.currentThread().getName() + " Outbound handler 1");
                                    super.write(ctx, msg, promise);
                                }
                            });
                            socketChannel.pipeline().addLast("handler4", new ChannelOutboundHandlerAdapter() {
                                @Override
                                public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
                                    System.out.println(Thread.currentThread().getName() + " Outbound handler 2");
                                    super.write(ctx, msg, promise);
                                }
                            });
                        }
                    }).bind(8080);
        }
    }

    运行结果:

    nioEventLoopGroup-3-1 Inbound handler 1
    nioEventLoopGroup-3-1 Inbound handler 2
    nioEventLoopGroup-3-1 Outbound handler 2
    nioEventLoopGroup-3-1 Outbound handler 1

    通过channel.pipeline().addLast(name, handler)添加handler时,记得给handler取名字。这样可以调用pipeline的addAfter、addBefore等方法更灵活地向pipeline中添加handler。handler需要放入通道的pipeline中,才能根据放入顺序来使用handler。

    pipeline是结构是一个带有head与tail指针的双向链表,其中的节点为handler。要通过ctx.fireChannelRead(msg)等方法,将当前handler的处理结果传递给下一个handler。当有入栈(Inbound)操作时,会从head头节点开始向后调用入栈handler,直到最后一个Inbound handler为止。当有出站(Outbound)操作时,会从tail尾节点开始向前调用handler,直到第一个Outbound handler为止。

    数据结构

    调用过程

     

    总结:1、入栈handler中需要调用super.channelRead(ctx, msg)方法向下传递,不然不会调用下一个handler,出栈也一样需要调用super.write(ctx, msg, promise)向上传递。

    2、入栈handler需要调用写方法才能触发出栈的handler调用,例如:socketChannel.writeAndFlush(ctx.alloc().buffer().writeBytes("Server...".getBytes(StandardCharsets.UTF_8)))。

    参考链接:https://nyimac.gitee.io/2021/04/25/Netty%E5%9F%BA%E7%A1%80/

  • 相关阅读:
    mycat 1.6.6.1 distinct报错问题
    linux下Tomcat+OpenSSL配置单向&双向认证(自制证书)
    Too many open files错误与解决方法
    Tomcat类加载机制触发的Too many open files问题分析(转)
    spring boot 自签发https证书
    redis集群如何解决重启不了的问题
    centos7 docker 安装 zookeeper 3.4.13 集群
    centos7用docker安装kafka
    心怀感恩
    不使用if switch 各种大于 小于 判断2个数的大小
  • 原文地址:https://www.cnblogs.com/sglx/p/15398403.html
Copyright © 2011-2022 走看看