这边博客分两个部分,先介绍下IO线程模型,然后介绍下Netty的模型加上一个简单的demo
1. IO线程模型的介绍
IO线程模型分为三大类
1.传统阻塞式模型 2.Reactor模型(反应堆模型) 3.Proactor模型(前摄器)
1.1 传统阻塞模型
一个连接占用一个线程,当大量并发的时候会造成资源的浪费,而且连接建立后,容易阻塞的读或者写的状态
1.2 Reactor模型
Reactor模式是基于事件驱动开发的,核心组成部分包括Reactor和线程池,其中Reactor负责监听和分配事件,线程池负责处理事件,而根据Reactor的数量和线程池的数量,又将Reactor分为三种模型:
单线程模型 (单Reactor单线程)
多线程模型 (单Reactor多线程)
主从多线程模型 (多Reactor多线程)
单线程模型
好比一个接待员,一个服务员
1. select是I/O复用模型介绍的标准网络编程 API,可以实现应用程序通过一个阻塞对象监听多路连接请求 2. Reactor对象通过select监控客户端请求事件,收到事件后通过dispatch进行分发 3. 如果是建立连接请求事件,则由Acceptor通过accept处理连接请求,然后创建一个Handler对象处理连接完成后的后续业务处理 4. 如果不是建立连接事件,则Reactor会分发调用连接对应的Handler来响应 5. Handler会完成 read -> 业务处理 -> send 的完整业务流程
结合实例:服务器端用一个线程通过多路复用搞定所有的IO操作(包括连接,读、写等),编码简单清晰明了,但是如果客户端连接数量较多,将无法支撑。
优点:模型简单,没有多线程、进程通信、竞争的问题,全部都在一个线程中完成
缺点:性能问题,只有一个线程,无法完全发挥多核 CPU 的性能。Handler 在处理某个连接上的业务时,整个进程无法处理其他连接事件,很容易导致性能瓶颈。可靠性问题,线程意外终止,或者进入死循环,会导致整个系统通信模块不可用,不能接收和处理外部消息,造成节点故障
场景:客户端有限,业务处理快,比如redis
多线程模型
好比就是一个接待员,多个服务员
1. Reactor对象通过select监控客户端请求事件, 收到事件后,通过dispatch进行分发 2. 如果建立连接请求, 则由Acceptor通过accept处理连接请求, 然后创建一个Handler对象处理完成连接后的各种事件 3. 如果不是连接请求,则由Reactor分发调用连接对应的Handler来处理4. Handler只负责响应事件,不做具体的业务处理, 通过read读取数据后,会分发给后面的worker线程池的某个线程处理业务 5. worker线程池会分配独立线程完成真正的业务,并将结果返回给handler 6. handler收到响应后,通过send将结果返回给client
单Reactor承当所有事件的监听和响应,而当我们的服务端遇到大量的客户端同时进行连接,或者在请求连接时执行一些耗时操作,比如身份认证,权限检查等,这种瞬时的高并发就容易成为性能瓶颈
下图是对应的业务逻辑处理部分,将任务交给线程池处理, 资料来自:http://gee.cs.oswego.edu/dl/cpjslides/nio.pdf
优点:可以充分的利用多核 cpu 的处理能力
缺点:多线程数据共享和访问比较复杂, Reactor处理所有的事件的监听和响应,在单线程运行, 在高并发场景容易出现性能瓶颈
主从多线程模型(主流的方式)
好比多个接待员,多个服务员
1. Reactor主线程MainReactor对象通过select监听连接事件, 收到事件后,通过Acceptor处理连接事件 2. 当Acceptor处理连接事件后,MainReactor将连接分配给SubReactor 3. Subreactor将连接加入到连接队列进行监听,并创建Handler进行各种事件处理 4. 当有新事件发生时, Subreactor就会调用对应的Handler处理 5. Handler通过read读取数据,分发给后面的Worker线程处理 6. worker线程池分配独立的worker线程进行业务处理,并返回结果7. Handler收到响应的结果后,再通过send将结果返回给client 8. Reactor主线程可以对应多个Reactor子线程, 即MainRecator可以关联多个SubReactor
优点:父线程与子线程的数据交互简单职责明确,父线程只需要接收新连接,子线程完成后续的业务处理。父线程与子线程的数据交互简单,Reactor 主线程只需要把新连接传给子线程,子线程无需返回数据。
缺点:编程复杂度较高
Nginx / Netty / Memcached都是使用的这个模式
1.3 Proactor模型
Reactor先建立连接,等待事件发生,然后让实现准备好的handler去处理,后者来实际读写,它是同步非阻塞的线程模型.如果IO改为异步交给操作系统来完成,则可以进一步提高效率,这就是异步网络模型 Proactor
Reactor读写在Handler里面完成, 而Proactor读写在内核中完成
编程复杂性,由于异步操作流程的事件的初始化和事件完成在时间和空间上都是相互分离的,因此开发异步应用程序更加复杂。应用程序还可能因为反向的流控而变得更加难以Debug;
内存使用,缓冲区在读或写操作的时间段内必须保持住,可能造成持续的不确定性,并且每个并发操作都要求有独立的缓存,相比Reactor模式,在Socket已经准备好读或写前,是不要求开辟缓存的;
2. Netty模型介绍
1. BossGroup线程维护Selector, 只关注Accecpt 2. 当接收到Accept事件,获取到对应的SocketChannel, 封装成NIOScoketChannel并注册到Worker线程(事件循环), 并进行维护 3. 当Worker线程监听到selector中通道发生自己感兴趣的事件后,就进行处理(就由handler), 注意handler已经加入到通道
1. Netty抽象出两组线程池BossGroup专门负责接收客户端的连接,WorkerGroup专门负责网络的读写 2. BossGroup和WorkerGroup类型都是NioEventLoopGroup 3. NioEventLoopGroup相当于一个事件循环组, 这个组中含有多个事件循环 ,每一个事件循环是NioEventLoop 4. NioEventLoop表示一个不断循环的执行处理任务的线程, 每个NioEventLoop都有一个selector , 用于监听绑定在其上的socket的网络通讯 5. NioEventLoopGroup可以有多个线程, 即可以含有多个NioEventLoop 6. 每个Boss NioEventLoop循环执行的步骤有3步 - 轮询accept事件 - 处理accept事件 , 与client建立连接 , 生成NioScocketChannel , 并将其注册到某个worker NioEventLoop上的selector
- 处理任务队列的任务 , 即runAllTasks 7. 每个Worker NioEventLoop循环执行的步骤 - 轮询read / write事件 - 处理i/o事件, 即read / write事件,在对应NioScocketChannel处理 - 处理任务队列的任务 , 即runAllTasks 8) 每个Worker NioEventLoop处理业务时,会使用pipeline(管道), pipeline中包含了channel , 即通过pipeline可以获取到对应通道, 管道中维护了很多的处理器
3. 基于Netty的demo
3.1 TCP简单的demo
简单介绍下Netty的NioEventLoopGroup的结构:
1. Netty抽象出两组线程池,BossGroup专门负责接收客户端连接,WorkerGroup专门负责网络读写操作
2. NioEventLoop表示一个不断循环执行处理任务的线程,每个NioEventLoop都有一个selector,用于监听绑定在其上的socket网络通道
3. NioEventLoop 内部采用串行化设计,从消息的读取->解码->处理->编码->发送,始终由IO线程NioEventLoop负责
- NioEventLoopGroup下包含多个NioEventLoop
- 每个NioEventLoop中包含有一个Selector,一个taskQueue
- 每个NioEventLoop的Selector上可以注册监听多个NioChannel
- 每个NioChannel只会绑定在唯一的NioEventLoop 上
- 每个NioChannel都绑定有一个自己的ChannelPipeline
BrianServer.java
package com.kawa.io.netty.simple; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import lombok.extern.slf4j.Slf4j; @Slf4j public class BrianServer { public static void main(String[] args) { // create the bossGroup and workerGroup // bossGroup only handle the connect request EventLoopGroup bossGroup = new NioEventLoopGroup(1); // workerGroup handle the business request, thread pool size = cup * 2 EventLoopGroup workerGroup = new NioEventLoopGroup(); try { // ServerBootstrap application start class ServerBootstrap bootstrap = new ServerBootstrap(); // set the parent group and child group bootstrap.group(bossGroup, workerGroup) // NioServerSocketChannel use as server channel .channel(NioServerSocketChannel.class) // set .option(ChannelOption.SO_BACKLOG, 128) // set keep alive connect .childOption(ChannelOption.SO_KEEPALIVE, true) // create a SocketChannel // set Handler to pipeline of workerGroup's EventLoop .childHandler(new ChannelInitializer<SocketChannel>() { // set Handler for pipeline @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new BrianServerHandler()); } }); // start the server bind port and sync create ChannelFuture ChannelFuture cf = bootstrap.bind(9001).sync(); log.info("---------- BrianServer is ready ----------"); // listen the close channel cf.channel().closeFuture().sync(); } catch (InterruptedException e) { e.printStackTrace(); } finally { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } }
BrianServerHandler.java 服务端自定义Handler
package com.kawa.io.netty.simple; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.util.CharsetUtil; import lombok.extern.slf4j.Slf4j; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; @Slf4j public class BrianServerHandler extends ChannelInboundHandlerAdapter { private ConcurrentHashMap<String, String> storage = new ConcurrentHashMap<>(); @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { log.info(">>>>>>>>>> channelActive current thread: {}", Thread.currentThread().getName()); ctx.channel().eventLoop().execute(()-> { // save to storage storage.put(ctx.channel().remoteAddress().toString(), "Y"); }); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { Channel channel = ctx.channel(); // convert the msg to ByteBuf ByteBuf buf = (ByteBuf) msg; log.info(">>>>>>>>>> get msg from client:{}, msg:{}", channel.remoteAddress(), getMsg(buf)); } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { sendMsg(ctx," get the msg"); } // when hit the Exception can close the related channel @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { log.error(">>>>>>>>>> BrianServerHandler error: {}", cause.getMessage()); ctx.channel().eventLoop().schedule(()-> { // update the storage storage.put(ctx.channel().remoteAddress().toString(), "N"); }, 20L, TimeUnit.MILLISECONDS); ctx.close(); } private void sendMsg(ChannelHandlerContext ctx, String message){ ctx.writeAndFlush(Unpooled.copiedBuffer(message, CharsetUtil.UTF_8)); } private String getMsg(ByteBuf buf){ return buf.toString(CharsetUtil.UTF_8); } }
BrianClient.java
package com.kawa.io.netty.simple; import io.netty.bootstrap.Bootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import lombok.extern.slf4j.Slf4j; @Slf4j public class BrianClient { public static void main(String[] args) { EventLoopGroup group = new NioEventLoopGroup(); try { Bootstrap bootstrap = new Bootstrap(); bootstrap.group(group) .channel(NioSocketChannel.class) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { // add BrianClientHandler ch.pipeline().addLast(new BrianClientHandler()); } }); // start the client and listen the server port 9001 ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 9001).sync(); log.info("---------- BrianClient is ready ----------"); // listen the close channel channelFuture.channel().closeFuture().sync(); } catch (InterruptedException e) { e.printStackTrace(); } finally { group.shutdownGracefully(); } } }
BrianClientHandler.java 客户端自定义Handler
package com.kawa.io.netty.simple; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.util.CharsetUtil; import lombok.extern.slf4j.Slf4j; @Slf4j public class BrianClientHandler extends ChannelInboundHandlerAdapter { @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { log.info(">>>>>>>>>> BrianClientHandler channelActive "); sendMsg(ctx, "BrianClientHandler channelActive"); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ByteBuf buf = (ByteBuf) msg; log.info(">>>>>>>>>> server: {}{}", ctx.channel().remoteAddress(), getMsg(buf)); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { log.error(">>>>>>>>>> BrianClientHandler error: {}", cause.getMessage()); ctx.close(); } private void sendMsg(ChannelHandlerContext ctx, String message){ ctx.writeAndFlush(Unpooled.copiedBuffer(message, CharsetUtil.UTF_8)); } private String getMsg(ByteBuf buf){ return buf.toString(CharsetUtil.UTF_8); } }
整个demo很简单,就是开启服务器端等带客户端连接和打印客户端的发送的消息,以及响应客户端,所以这里就不截图测试日志了
3.2 HTTP简单的demo
在写HTTP demo前先讲讲Netty的异步模型
3.2.1 Netty的异步模型
1. 异步的概念和同步相对。当一个异步过程调用发出后,调用者不能立刻得到结果。实际处理这个调用的组件在完成后,通过状态、通知和回调来通知调用者 2. Netty中的I/O操作是异步的,包括Bind、Write、Connect等操作会简单的返回一个ChannelFuture 3. 调用者并不能立刻获得结果,而是通过Future-Listener机制,用户可以方便的主动获取或者通过通知机制获得IO操作结果 4. Netty的异步模型是建立在future和callback的之上的。callback就是回调。重点说Future它的核心思想是:
假设一个方法fun,计算过程可能非常耗时,等待fun返回显然不合适。那么可以在调用fun的时候,立马返回一个Future,
后续可以通过Future去监控方法fun的处理过程(即:Future-Listener机制)
Future说明
1. 表示异步的执行结果, 可以通过它提供的方法来检测执行是否完成,比如检索计算等 2. ChannelFuture是一个接口 : public interface ChannelFuture extends Future<Void> 我们可以添加监听器,当监听的事件发生时,就会通知到监听器
在使用Netty进行编程时,拦截操作和转换出入站数据只需要您提供callback或利用future即可。这使得链式操作简单、高效, 并有利于编写可重用的、通用的代码。Netty框架的目标就是让你的业务逻辑从网络基础应用编码中分离出来、解脱出来
链式操作示意图:
Future-Listener机制
1.当Future对象刚刚创建时,处于非完成状态,调用者可以通过返回的ChannelFuture来获取操作执行的状态,注册监听函数来执行完成后的操作 2.常见有如下操作 通过isDone方法来判断当前操作是否完成 通过isSuccess方法来判断已完成的当前操作是否成功 通过getCause方法来获取已完成的当前操作失败的原因 通过isCancelled方法来判断已完成的当前操作是否被取消 通过addListener方法来注册监听器,当操作已完成(isDone方法返回完成),将会通知指定的监听器;如果Future对象已完成,则通知指定的监听器
相比传统阻塞IO,执行IO操作后线程会被阻塞住,直到操作完成;异步处理的好处是不会造成线程阻塞,线程在IO操作期间可以执行别的程序,在高并发情形下会更稳定和更高的吞吐量
3.2.2 HTTP demo
BrianHttpServer.java
package com.kawa.io.netty.http; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioServerSocketChannel; import lombok.extern.slf4j.Slf4j; @Slf4j public class BrianHttpServer { public static void main(String[] args) { EventLoopGroup bossGroup = new NioEventLoopGroup(1); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap serverBootstrap = new ServerBootstrap(); serverBootstrap.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .childHandler(new BrianServerInitializer()); ChannelFuture channelFuture = serverBootstrap.bind(9999).sync(); log.info("---------- BrianHttpServer is ready ----------"); channelFuture.channel().closeFuture().sync(); } catch (InterruptedException e) { e.printStackTrace(); } finally { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } }
BrianServerInitializer.java 设置handler
package com.kawa.io.netty.http; import io.netty.channel.ChannelInitializer; import io.netty.channel.socket.SocketChannel; import io.netty.handler.codec.http.HttpServerCodec; import lombok.extern.slf4j.Slf4j; @Slf4j public class BrianServerInitializer extends ChannelInitializer<SocketChannel> { @Override protected void initChannel(SocketChannel ch) throws Exception { // add handler ch.pipeline() // add Netty Encoder-Decoder .addLast("httpServerCodec", new HttpServerCodec()) // add customized handler .addLast("brianHttpServerHandler", new BrianHttpServerHandler()); } }
BrianHttpServerHandler.java 地定义handler处理 /kawa 的http请求
package com.kawa.io.netty.http; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import io.netty.handler.codec.http.*; import io.netty.util.CharsetUtil; import lombok.extern.slf4j.Slf4j; import java.net.URI; @Slf4j public class BrianHttpServerHandler extends SimpleChannelInboundHandler<HttpObject> { // read the client send data @Override protected void channelRead0(ChannelHandlerContext ctx, HttpObject msg) throws Exception { // check the msg is HttpRequest if (msg instanceof HttpRequest) { HttpRequest request = (HttpRequest) msg; URI uri = new URI(request.uri()); if ("/kawa".equals(uri.getPath())) { log.info(">>>>>>>>>> client: {}", ctx.channel().remoteAddress()); log.info(">>>>>>>>>> type: {}", msg.getClass().getName()); log.info(">>>>>>>>>> pipeline: {}", ctx.pipeline().getClass().getName() + ctx.pipeline().hashCode()); log.info(">>>>>>>>>> handler: {}", getClass().getName() + this.hashCode()); sendMsg(ctx, "save the request to storage"); } else if ("/favicon.ico".equals(uri.getPath())) { sendMsg(ctx, "load the favicon.ico"); } else { log.info(">>>>>>>>>> client: {} type: {}", ctx.channel().remoteAddress(), msg.getClass()); sendMsg(ctx, "404 not found"); } } } private void sendMsg(ChannelHandlerContext ctx, String msg) { // send the http protocol response String template = "{"message":"%s"}"; ByteBuf content = Unpooled.copiedBuffer(String.format(template, msg), CharsetUtil.UTF_8); // create a HttpResponse FullHttpResponse httpResponse = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK, content); httpResponse.headers().set(HttpHeaderNames.CONTENT_TYPE, "application/json"); httpResponse.headers().set(HttpHeaderNames.CONTENT_LENGTH, content.readableBytes()); // send ctx.writeAndFlush(httpResponse); } }
启动服务测试http://localhost:9999/kawa
后台打印的日志
INFO [main] 2021-09-25 22:27:55.662 c.kawa.io.netty.http.BrianHttpServer - ---------- BrianHttpServer is ready ---------- DEBUG [nioEventLoopGroup-3-2] 2021-09-25 22:28:14.241 io.netty.buffer.AbstractByteBuf - -Dio.netty.buffer.checkAccessible: true DEBUG [nioEventLoopGroup-3-2] 2021-09-25 22:28:14.242 io.netty.buffer.AbstractByteBuf - -Dio.netty.buffer.checkBounds: true DEBUG [nioEventLoopGroup-3-2] 2021-09-25 22:28:14.243 i.n.util.ResourceLeakDetectorFactory - Loaded default ResourceLeakDetector: io.netty.util.ResourceLeakDetector@7db6b201 DEBUG [nioEventLoopGroup-3-1] 2021-09-25 22:28:14.260 io.netty.util.Recycler - -Dio.netty.recycler.maxCapacityPerThread: 4096 DEBUG [nioEventLoopGroup-3-1] 2021-09-25 22:28:14.261 io.netty.util.Recycler - -Dio.netty.recycler.maxSharedCapacityFactor: 2 DEBUG [nioEventLoopGroup-3-1] 2021-09-25 22:28:14.261 io.netty.util.Recycler - -Dio.netty.recycler.linkCapacity: 16 DEBUG [nioEventLoopGroup-3-1] 2021-09-25 22:28:14.261 io.netty.util.Recycler - -Dio.netty.recycler.ratio: 8 DEBUG [nioEventLoopGroup-3-1] 2021-09-25 22:28:14.261 io.netty.util.Recycler - -Dio.netty.recycler.delayedQueue.ratio: 8 INFO [nioEventLoopGroup-3-1] 2021-09-25 22:28:14.297 c.k.i.n.http.BrianHttpServerHandler - >>>>>>>>>> client: /0:0:0:0:0:0:0:1:36500 INFO [nioEventLoopGroup-3-1] 2021-09-25 22:28:14.298 c.k.i.n.http.BrianHttpServerHandler - >>>>>>>>>> type: io.netty.handler.codec.http.DefaultHttpRequest INFO [nioEventLoopGroup-3-1] 2021-09-25 22:28:14.307 c.k.i.n.http.BrianHttpServerHandler - >>>>>>>>>> pipeline: io.netty.channel.DefaultChannelPipeline855934580 INFO [nioEventLoopGroup-3-1] 2021-09-25 22:28:14.308 c.k.i.n.http.BrianHttpServerHandler - >>>>>>>>>> handler: com.kawa.io.netty.http.BrianHttpServerHandler325803268