zoukankan      html  css  js  c++  java
  • netty 详解(一)架构设计、异步模型、任务队列、入门案例

    录:

    1、netty 是什么
    2、netty 架构设计
        2.1、线程模型
        2.2、传统阻塞 I/O 服务模型
        2.3、Reactor 模式
        2.4、单 Reactor 单线程
        2.5、单 Reactor 多线程
        2.6、主从 Reactor 多线程
        2.7、Netty工作原理架构图
    3、Netty 编程之 helloworld
    4、自定义 ChannelInboundHandlerAdapter  收发消息
    5、任务队列 taskQueue 和 scheduledTaskQueue
    6、Netty 异步模型
    7、Netty 入门案例--HTTP 服务

    1、netty 是什么    <--返回目录

    Netty 是一个异步事件驱动的网络应用框架,用于快速开发可维护的高性能服务器和客户端。

    下面是我总结的使用 Netty 不使用 JDK 原生 NIO 的原因

    • 使用 JDK 自带的 NIO 需要了解太多的概念,编程复杂,一不小心 bug 横飞
    • Netty 底层 IO 模型随意切换,而这一切只需要做微小的改动,改改参数,Netty 可以直接从 NIO 模型变身为 IO 模型
    • Netty 自带的拆包解包,异常检测等机制让你从 NIO 的繁重细节中脱离出来,让你只需要关心业务逻辑
    • Netty 解决了 JDK 的很多包括空轮询在内的 bug
    • Netty 底层对线程,selector 做了很多细小的优化,精心设计的 reactor 线程模型做到非常高效的并发处理
    • 自带各种协议栈让你处理任何一种通用协议都几乎不用亲自动手
    • Netty 社区活跃,遇到问题随时邮件列表或者 issue
    • Netty 已经历各大 rpc 框架,消息中间件,分布式通信中间件线上的广泛验证,健壮性无比强大

    2、netty 架构设计    <--返回目录

      不同的线程模式,对程序的性能有很大影响,为了搞清 netty 线程模式,我们来系统分析下各个线程模式,最后看看 netty 线程模型有什么优越性。

    2.1、线程模型    <--返回目录

      目前存在的线程模型有:

    • 传统阻塞 I/O 服务模型
    • Reactor 模式(反应器模式、分发者模式 Dispatcher、通知者模式 Notifier)

      根据 Reactor 的数量和处理资源池线程的数量不同,有 3 种典型的实现:

    • 单 Reactor 单线程
    • 单 Reactor 多线程
    • 主从 Reactor 多线程

      netty 线程模式:netty 主要基于主从 Reactor 多线程模型做了一定的改进,其中主从 Reactor 多线程模式有多个 Reactor

    2.2、传统阻塞 I/O 服务模型    <--返回目录

      模型特点:

    • 采用阻塞 IO 模式 获取输入的数据
    • 每个连接都需要独立的线程完成数据的输入,业务处理,数据返回

    问题分析:

    • 当并发数很大,就会创建大量的线程,占用很大系统资源
    • 连接创建后,如果当前线程暂时没有数据可读,该线程回阻塞在 read 操作,造成线程资源的浪费

    2.3、Reactor 模式    <--返回目录

      针对传统阻塞 IO 服务模型的 2 个缺点,解决方案:

    • 基于 IO 复用模型:多个连接共用一个阻塞对象,应用程序只需要在一个阻塞对象等待,无需阻塞等待所有连接。当某个连接有新的数据可以处理时,操作系统通知应用程序,线程从阻塞状态返回,开始进行业务处理。
    • 基于线程池复用线程资源:不必为每个连接创建线程,将连接完成后的业务处理任务分配给线程进行处理,一个线程可以处理多个连接的业务。

      IO 复用结合线程池,就是 Reactor 模式基本设计思想。

    2.4、单 Reactor 单线程    <--返回目录

      方案说明:

    • select 是前面 IO 复用模型介绍的标准网络编程 API,可以实现应用程序通过一个阻塞对象监听多路连接请求
    • Reactor 对象通过 Select 监控客户端请求事件,收到事件后通过 Dispatch 进行分发
    • 如果是建立连接请求事件,则由 Acceptor 通过 accept 处理连接请求,然后创建一个 handler 对象处理连接完成后的后续业务处理
    • 如果不是建立连接事件,则 Reactor 会分发调用连接对应的 handler 来响应
    • handler 会完成 read -> 业务处理 -> send 的完整业务流程

      服务器端用一个线程通过多路复用搞定所有的 IO 操作(包括连接、读写等),编码简单,清晰明了,但是如果客户端连接数量较多,将无法支撑。

     

    2.5、单 Reactor 多线程    <--返回目录

      方案说明:

    • Reactor 对象通过 select 监控客户端请求事件,收到事件后,通过 dispatch 进行分发
    • 如果建立连接请求,则由 Acceptor 通过 accept 处理连接请求,然后创建一个 handler 对象处理完成连接后的各种事件
    • 如果不是连接请求,则由 reactor 分发调用连接对象对应的 handler 来处理
    • handler 之负责响应事件,不做具体的业务处理,通过 read 读取数据后,会分发给后面的 worker 线程池的某个线程处理业务
    • worker 线程池会分配独立线程完成真正的业务,并将结果返回给 handler
    • handler 收到响应后,通过 send 将结果返回给 cliet

      缺点:reactor 处理所有的事件的监听和响应,在单线程运行,在高并发场景容易出现性能瓶颈。

    2.6、主从 Reactor 多线程    <--返回目录

      主 Reactor 负责连接事件;子 Reactor 负责监听读写事件

    2.7、Netty工作原理架构图    <--返回目录

      1)Netty 抽象出两组线程池,BossGroup 专门负责接受客户端的连接,WorkerGroup 专门负责网络的读写

      2)BossGroup 和 WorkGroup 类型都是 NioEventLoopGroup

      3)NioEventLoopGroup 相当于一个事件循环组,这个组含有多个事件循环,每个事件循环时 NioEventLoop

      4)NioEventLoop 表示一个不断循环的执行处理任务的线程,每个 NioEventLoop 都有一个 Selector,用于监听绑定在其上的 socket 的网络通讯

      5)NioEventLoopGroup 可以有多个线程,即可以含有多个 NioEventLoop

      6)每个 Boss NioEventLoop 循环执行的步骤

    • 轮询 accept 事件
    • 处理 accept 事件,与 client 建立连接,生成 NiobiumSocketChannel,并将其注册到 worker NioEveltLoop 上的 Selector
    • 处理任务队列的任务,即 runAllTasks

      7) 每个 Worker NioEventLoop 循环执行的步骤

    • 轮询 read/write 事件
    • 处理 read/write 事件,在对应 NioSocketChannel 处理
    • 处理任务队列的任务,即 runAllTasks

    3、Netty 编程之 helloworld    <--返回目录

      pom 引入依赖

    <dependency>
        <groupId>io.netty</groupId>
        <artifactId>netty-all</artifactId>
        <version>4.1.22.Final</version>
    </dependency>
    View Code

      NettyServer

    package com.oy;
    
    import io.netty.bootstrap.ServerBootstrap;
    import io.netty.channel.*;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.nio.NioServerSocketChannel;
    import io.netty.channel.socket.nio.NioSocketChannel;
    import io.netty.handler.codec.string.StringDecoder;
    
    public class NettyServer {
    
        public static void main(String[] args) throws Exception {
    
            // 1.创建 BossGroup 和 workerGroup
            NioEventLoopGroup boss = new NioEventLoopGroup();
            NioEventLoopGroup worker = new NioEventLoopGroup();
    
            // 2.创建服务器端的启动对象
            ServerBootstrap serverBootstrap = new ServerBootstrap();
    
            // 3.链式编程,配置参数
            serverBootstrap
                    .group(boss, worker)
                    .channel(NioServerSocketChannel.class)
                    .option(ChannelOption.SO_BACKLOG, 128) // 设置线程队列得到连接个数
                    .childOption(ChannelOption.SO_KEEPALIVE, true) // 设置保持获得连接状态
                    .childHandler(new ChannelInitializer<NioSocketChannel>() { // 给 WorkerGroup 的 EventLoop 对应的管道设置处理器
                        protected void initChannel(NioSocketChannel ch) {
                            ch.pipeline().addLast(new StringDecoder());
                            ch.pipeline().addLast(new SimpleChannelInboundHandler<String>() {
                                @Override
                                protected void channelRead0(ChannelHandlerContext ctx, String msg) {
                                    System.out.println(msg);
                                }
                            });
                        }
                    });
    
            // 4.绑定端口,运行服务器
            ChannelFuture future = serverBootstrap.bind(8000).sync();
            System.out.println("server started and listen " + 8000);
    
            // 5.对关闭通道进行监听
            future.channel().closeFuture().sync();
        }
    
    }

      NettyClient

    package com.oy;
    
    import io.netty.bootstrap.Bootstrap;
    import io.netty.channel.Channel;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.nio.NioSocketChannel;
    import io.netty.handler.codec.string.StringEncoder;
    
    import java.util.Date;
    
    public class NettyClient {
        public static void main(String[] args) throws InterruptedException {
            Bootstrap bootstrap = new Bootstrap();
            NioEventLoopGroup group = new NioEventLoopGroup();
    
            bootstrap.group(group)
                    .channel(NioSocketChannel.class)
                    .handler(new ChannelInitializer<Channel>() {
                        @Override
                        protected void initChannel(Channel ch) {
                            ch.pipeline().addLast(new StringEncoder());
                        }
                    });
    
            Channel channel = bootstrap.connect("127.0.0.1", 8000).channel();
    
            while (true) {
                channel.writeAndFlush(new Date() + ": hello world11111111!");
                Thread.sleep(2000);
            }
        }
    }

    4、自定义 ChannelInboundHandlerAdapter  收发消息    <--返回目录

      NettyServer

    package com.oy;
    
    import io.netty.bootstrap.ServerBootstrap;
    import io.netty.buffer.ByteBuf;
    import io.netty.buffer.Unpooled;
    import io.netty.channel.*;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioServerSocketChannel;
    import io.netty.handler.codec.string.StringDecoder;
    import io.netty.handler.codec.string.StringEncoder;
    import io.netty.util.CharsetUtil;
    
    import java.net.InetSocketAddress;
    
    public class NettyServer {
    
        private int port;
    
        public static void main(String[] args) {
            new NettyServer(8080).start();
        }
    
        public NettyServer(int port) {
            this.port = port;
        }
    
        public void start() {
            /**
             * 创建两个EventLoopGroup,即两个线程池,boss线程池用于接收客户端的连接,一个线程监听一个端口,一般只会监听一个端口所以只需一个线程
             * work池用于处理网络连接数据读写或者后续的业务处理(可指定另外的线程处理业务,work完成数据读写)
             */
            EventLoopGroup boss = new NioEventLoopGroup(1);
            EventLoopGroup work = new NioEventLoopGroup();
    
            try {
                /**
                 * 实例化一个服务端启动类,
                 * group()指定线程组
                 * channel()指定用于接收客户端连接的类,对应java.nio.ServerSocketChannel
                 * childHandler()设置编码解码及处理连接的类
                 */
                ServerBootstrap server = new ServerBootstrap()
                        .group(boss, work)
                        .channel(NioServerSocketChannel.class)
                        .localAddress(new InetSocketAddress(port))
                        .option(ChannelOption.SO_BACKLOG, 128)
                        .childOption(ChannelOption.SO_KEEPALIVE, true)
                        .childHandler(new ChannelInitializer<SocketChannel>() {
                            @Override
                            protected void initChannel(SocketChannel ch) throws Exception {
                                ch.pipeline()
                                        //.addLast("decoder", new StringDecoder())
                                        //.addLast("encoder", new StringEncoder())
                                        .addLast(new NettyServerHandler());
                            }
                        });
    
                // 绑定端口
                ChannelFuture future = server.bind().sync();
                System.out.println("server started and listen " + port);
                future.channel().closeFuture().sync();
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                boss.shutdownGracefully();
                work.shutdownGracefully();
            }
        }
    
        public static class NettyServerHandler extends ChannelInboundHandlerAdapter {
    
            @Override
            public void channelActive(ChannelHandlerContext ctx) throws Exception {
                System.out.println("HelloWorldServerHandler active");
            }
    
            /**
             * 读取客户端发送的数据
             * ChannelHandlerContext ctx: 上下文对象,含有管道 pipeline,通道 channel,连接地址
             * Object msg: 客户端发送的数据
             */
            @Override
            public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                System.out.println("server channelRead...");
    
                // 读取客户端发送的数据
                ByteBuf buf = (ByteBuf) msg;
                System.out.println("from " + ctx.channel().remoteAddress() + ", " + buf.toString(CharsetUtil.UTF_8));
    
                //System.out.println(ctx.channel().remoteAddress() + "->Server :" + msg.toString());
                // 返回消息
                //ctx.write("server write, 收到消息" + msg);
                //ctx.flush();
            }
    
            /**
             * 数据读取完毕
             */
            @Override
            public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
                ctx.writeAndFlush(Unpooled.copiedBuffer("hello, 客户端", CharsetUtil.UTF_8));
            }
    
            /**
             * 处理异常,关闭通道
             */
            @Override
            public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
                ctx.channel().close();
            }
        }
    }

      NettyClient

    package com.oy;
    
    import io.netty.bootstrap.Bootstrap;
    import io.netty.buffer.ByteBuf;
    import io.netty.buffer.Unpooled;
    import io.netty.channel.*;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioSocketChannel;
    import io.netty.handler.codec.string.StringDecoder;
    import io.netty.handler.codec.string.StringEncoder;
    import io.netty.util.CharsetUtil;
    
    public class NettyClient {
        private static final String HOST = "127.0.0.1";
        private static final int PORT= 8080;
    
        public static void main(String[] args){
            new NettyClient().start(HOST, PORT);
        }
    
        public void start(String host, int port) {
            EventLoopGroup group = new NioEventLoopGroup();
            try {
                Bootstrap client = new Bootstrap()
                        .group(group)
                        .channel(NioSocketChannel.class)
                        .option(ChannelOption.TCP_NODELAY, true)
                        .handler(new ChannelInitializer<SocketChannel>() {
                            @Override
                            protected void initChannel(SocketChannel ch) throws Exception {
                                ch.pipeline()
                                        //.addLast("decoder", new StringDecoder())
                                        //.addLast("encoder", new StringEncoder())
                                        .addLast(new NettyClientHandler());
                            }
                        });
    
                ChannelFuture future = client.connect(host, port).sync();
                //future.channel().writeAndFlush("Hello Netty Server ,I am a netty client");
                future.channel().closeFuture().sync();
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                group.shutdownGracefully();
            }
        }
    
        public static class NettyClientHandler extends ChannelInboundHandlerAdapter {
            /**
             * 通道就绪触发该方法
             */
            @Override
            public void channelActive(ChannelHandlerContext ctx) throws Exception {
                System.out.println("HelloWorldClientHandler Active");
                ctx.writeAndFlush(Unpooled.copiedBuffer("hello, 服务器", CharsetUtil.UTF_8));
            }
    
            /**
             * 当通道有读取事件时触发该方法
             */
            @Override
            public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
                ByteBuf buf = (ByteBuf) msg;
                System.out.println("收到服务器响应: " + buf.toString(CharsetUtil.UTF_8));
            }
        }
    }

    5、任务队列 taskQueue 和 scheduledTaskQueue    <--返回目录

      任务队列中的 Task 有三种典型使用场景

    • 用户程序自定义的普通任务
    • 用户自定义定时任务
    • 非当前 Reactor 线程调用 Channel 的各种方法。例如在推送系统的业务线程里,根据用户的标识,找到对应的 Channel 引用,然后调用 Write 类方法向该用户推送消息,就会进入到这种场景。最终的 Write 会提交到任务队列中被异步消费
    public static class NettyServerHandler extends ChannelInboundHandlerAdapter {
    
        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
            System.out.println("HelloWorldServerHandler active");
        }
    
        @Override
        public void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exception {
            System.out.println("server channelRead...");
    
            // 读取客户端发送的数据
            ByteBuf buf = (ByteBuf) msg;
            System.out.println("from " + ctx.channel().remoteAddress() + ", " + buf.toString(CharsetUtil.UTF_8));
    
            // 模拟业务处理耗时
            //Thread.sleep(5 * 1000);
            //ctx.writeAndFlush(Unpooled.copiedBuffer("hello, 客户端1
    ", CharsetUtil.UTF_8));
    
            // 用户自定义的任务,任务添加到 taskQueue 中
            ctx.channel().eventLoop().execute(new Runnable() {
    
                public void run() {
                    try {
                        Thread.sleep(5 * 1000);
                        ctx.writeAndFlush(Unpooled.copiedBuffer(new Date().toLocaleString() + "hello, 客户端1
    ", CharsetUtil.UTF_8));
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
    
            });
    
            // 用户自定义定义任务, 任务添加到 scheduledTaskQueue 中
            ctx.channel().eventLoop().schedule(new Runnable() {
                public void run() {
                    try {
                        Thread.sleep(5 * 1000);
                        ctx.writeAndFlush(Unpooled.copiedBuffer(new Date().toLocaleString() + "hello, 客户端 shedule
    ", CharsetUtil.UTF_8));
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            },5 , TimeUnit.SECONDS);
        }
    
        /**
         * 数据读取完毕
         */
        @Override
        public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
            ctx.writeAndFlush(Unpooled.copiedBuffer(new Date().toLocaleString() + "hello, 客户端2", CharsetUtil.UTF_8));
        }
    
        /**
         * 处理异常,关闭通道
         */
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            ctx.channel().close();
        }
    }

    6、Netty 异步模型    <--返回目录

      异步的概念和同步相对。当一个异步过程调用发出后,调用者不能立刻得到结果。实际处理这个调用的组件在完成后,通过状态、通知和回调来通知调用者。

      Netty 中的 IO 操作时异步的,包括 Bind、Write、Connect 等操作会简单的返回一个 ChannelFuture。

      调用者并不能立刻获得结果,而是通过 Future-Listener 机制,用户可以方便的主动获取或者通过通知机制获得 IO 操作结果。

      Netty 的异步模型是建立在 Future 和 callback 之上的,callback 就是回调。重点说 Future,它的核心思想是:假设一个方法 fun,计算过程可能非常耗时,等待 fun 返回显然不合适,那么可以在调用 fun 的时候,立马返回一个 Future,后续可以通过 Future 取监控方法 fun 的处理过程(即 Future-Listener 机制)。

        // 绑定端口
        final ChannelFuture future = server.bind(8080).sync();
    
        future.addListener(new ChannelFutureListener() {
            public void operationComplete(ChannelFuture channelFuture) throws Exception {
                if (future.isDone()) {
                    System.out.println("监听端口 8080 成功");
                } else {
                    System.out.println("监听端口 8080 失败");
                }
            }
        });

    7、Netty 入门案例--HTTP 服务    <--返回目录

     

       HttpServer

    package com.oy.http;
    
    import io.netty.bootstrap.ServerBootstrap;
    import io.netty.channel.*;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.nio.NioServerSocketChannel;
    
    public class HttpServer {
        public static void main(String[] args) {
            EventLoopGroup boss = new NioEventLoopGroup(1);
            EventLoopGroup work = new NioEventLoopGroup();
    
            try {
                ServerBootstrap server = new ServerBootstrap()
                        .group(boss, work)
                        .channel(NioServerSocketChannel.class)
                        //.localAddress(new InetSocketAddress(port))
                        .option(ChannelOption.SO_BACKLOG, 128)
                        .childOption(ChannelOption.SO_KEEPALIVE, true)
                        .childHandler(new MyChannelInitializer());
    
                // 绑定端口
                ChannelFuture future = server.bind(8080).sync();
                System.out.println("server started and listen " + 8080);
                future.channel().closeFuture().sync();
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                boss.shutdownGracefully();
                work.shutdownGracefully();
            }
        }
    }
    View Code

      MyChannelInitializer

    package com.oy.http;
    
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.ChannelPipeline;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.handler.codec.http.HttpServerCodec;
    
    public class MyChannelInitializer extends ChannelInitializer<SocketChannel> {
    
        protected void initChannel(SocketChannel socketChannel) throws Exception {
    
            /* 向管道加入处理器 */
            ChannelPipeline pipeline = socketChannel.pipeline();
            // HttpServerCodec: netty 提供的处理 http 的编-解码器
            pipeline.addLast("MyHttpServerCodec", new HttpServerCodec());
            // 添加自定义的处理器
            pipeline.addLast("MyHttpServerHandler", new MyHttpServerHandler());
    
        }
    
    }
    View Code

      MyHttpServerHandler

    package com.oy.http;
    
    import io.netty.buffer.ByteBuf;
    import io.netty.buffer.Unpooled;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.channel.SimpleChannelInboundHandler;
    import io.netty.handler.codec.http.*;
    import io.netty.util.CharsetUtil;
    
    import java.net.URI;
    
    public class MyHttpServerHandler extends SimpleChannelInboundHandler<HttpObject> {
    
        protected void channelRead0(ChannelHandlerContext ctx, HttpObject msg) throws Exception {
            System.out.println("===================================");
            System.out.println("msg 类型: " + msg.getClass().getName());
            System.out.println("客户端地址:" + ctx.channel().remoteAddress());
    
            // 判断 msg 是否是 http request 请求
            if (msg instanceof HttpRequest) {
                HttpRequest request = (HttpRequest) msg;
                URI uri = new URI(request.uri());
                System.out.println("请求 uri: " + uri.getPath());
                if ("/favicon.ico".equals(uri.getPath())) {
                    System.out.println("请求 favicon.icon,不做响应");
                    return;
                }
    
                // 回复信息给浏览器
                ByteBuf content = Unpooled.copiedBuffer("hello, 我是服务器", CharsetUtil.UTF_8);
                // 构造一个 http 的响应,即 http response
                DefaultFullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1,
                        HttpResponseStatus.OK, content);
                response.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/plain;charset=utf-8");
                response.headers().set(HttpHeaderNames.CONTENT_LENGTH, content.readableBytes());
    
                ctx.writeAndFlush(response);
            }
        }
    
    }
    View Code

    参考:

      1)netty 官网:https://netty.io/

      2)《跟闪电侠学Netty》开篇:Netty是什么?

      3)掘金小册:Netty 入门与实战:仿写微信 IM 即时通讯系统

      4)Netty整体架构

      5)Netty工作原理架构图

  • 相关阅读:
    Java Collection知识总结
    Java异常总结
    关于触发器
    关于事务
    git分支的创建、删除、切换、合并
    github项目上传管理
    如何在github上下载单个文件夹?
    常见的javascript跨站
    各类常用端口漏洞缺陷
    SEO优化实践操作
  • 原文地址:https://www.cnblogs.com/xy-ouyang/p/12820107.html
Copyright © 2011-2022 走看看