定义
Netty 提供异步的、基于事件驱动的网络应用程序框架,用于快速开发高性能的、高可靠的网络IO程序。
原生 NIO 存在的问题
- NIO 类库和 API 复杂,使用门槛高。需要熟练掌握 Selector、ServerSocketChannel、SocketChannel、ByteBuffer 等;
- 需要具备其他额外技能,对 Java 多线程和网络编程非常熟悉;
- 开发工作量和难度大,例如客户端面临断连重连、网络闪断、半包读写、失败缓存、网络拥塞和异常流的处理等等;
- JDK NIO 存在 Bug: 臭名的 Epoll Bug,会导致 Selector 空轮询,最终导致 CPU 使用率 100%。 知道 JDK1.7 仍然存在该问题,没有被根本解决;
功能特性
Netty 功能特性如下:
- 传输服务,支持 BIO、NIO;
- 容器集成, 支持 OSGI、JBossMC、Spring、Guice 容器;
- 协议支持, HTTP、Protobuf、二进制、文本、WebSocket 等一系列常见协议都支持。还支持通过实行编码解码逻辑来实现自定义协议;
- Core 核心, 可扩展事件模型、通用通信 API、支持零拷贝的 ByteBuf 缓冲对象;
Reactor 线程模型
IO 编程中,常见的两种模型
- Reactor: Java NIO 就是 Reactor 模型,当事件触发时,服务器得到通知,进行相应的处理;
- Proactor: AIO 采用了 Proactor 模型,简化了程序编写,有效的请求才启动线程,特点是先由操作系统完成后才通知服务端程序启动线程去处理,一般适用于连接数多且连接时间较长的应用。
Reactor 简介
- Reactor 模式(其他叫法:1.反应器模式;2.分发者模式 Dispatcher;通知者模式 notifier)。
- Reactor 模型是指通过一个或多个输入同时传递给服务处理器的服务请求的事件驱动处理模式。
Reactor 模型关键组成
- Reactor, Reactor 在一个单独的线程中运行,负责监听和分发事件,分发给适当的处理程序来对 IO 事件做出反应;
- Handlers, 处理程序执行 I/O 事件要完成的实际事件。
根据 Reactor 和 Handler 的数量不同,又将 Reactor 模型分为三类:
- 单 Reactor 单线程
- 单 Reactor 多线程
- 主从 Reactor 多线程
主从 Reactor 多线程模型
主从 Reactor 多线程模型有多个 Reactor:
- MainReactor 负责客户端的连接请求,并将请求转交给 SubReactor;
- SubReactor 负责相应通道的 IO 读写请求;
- 非 IO 请求(具体逻辑处理)的任务则会直接写入队列,等待 worker threads 进行处理。
Netty 线程模型
Netty 的线程模型基于主从 Reactor 多线程,借用了 MainReactor 和 SubReactor 的结构。
Netty中有 bossGroup 和 workerGroup 概念,这两个 group 都是线程池:
- bossGroup 在 bind 一个端口后,获取一个线程作为 MainReactor,专门处理端口的 Accept 事件,每个端口对应一个 boss 线程;
- workerGroup 线程池会被各个 SubReator 和 Worker 线程充分利用;
异步处理,Future-Listener机制
当一个异步过程调用发出后,调用者不能立刻得到结果。实际处理这个调用的部件在完成后,通过状态、通知和回调来通知调用者。
Netty 中的 I/O 操作是异步的,包括 Bind、Write、Connect 等操作会简单的返回一个 ChannelFuture。
调用者并不能立刻获得结果,而是通过 Future-Listener 机制,用户可以方便的主动获取或者通过通知机制获得 IO 操作结果。
当 Future 对象刚刚创建时,处于非完成状态,调用者可以通过返回的 ChannelFuture 来获取操作执行的状态,注册监听函数来执行完成后的操作。
常见有如下操作:
- 通过 isDone 方法来判断当前操作是否完成。
- 通过 isSuccess 方法来判断已完成的当前操作是否成功。
- 通过 getCause 方法来获取已完成的当前操作失败的原因。
- 通过 isCancelled 方法来判断已完成的当前操作是否被取消。
- 通过 addListener 方法来注册监听器,当操作已完成(isDone 方法返回完成),将会通知指定的监听器;如果 Future 对象已完成,则理解通知指定的监听器。
相比传统阻塞 I/O,执行 I/O 操作后线程会被阻塞住, 直到操作完成;异步处理的好处是不会造成线程阻塞,线程在 I/O 操作期间可以执行别的程序,在高并发情形下会更稳定和更高的吞吐量。
使用方法见下文完整 Demo。
运行架构
服务端 Netty 的工作架构图:
服务端包含一个 Boss NioEventLoopGroup 和一个个 Worker NioEventLoopGroup。
NioEventLoopGroup 相当于 1 个事件循环组,这个组里包含多个事件循环 NioEventLoop,每个 NioEventLoop 包含 1 个 Selector 和 1 个事件循环线程。
每个 Boss NioEventLoop 循环执行的任务包含 3 步:
- 轮询 Accept 事件;
- 处理 Accept I/O 事件, 与 Client 建立连接,生成 NioSocketChannel,并将 NioSocketChannel 注册到某个 Worker NioEventLoop 的 Selector 上;
- 处理任务队列中的任务,runAllTasks。 任务队列中的任务包括用户调用 eventloop.execute 或 schedule 执行的任务,或者其他线程提交到该 eventloop 的任务;
每个 Worker NioEventLoop 循环执行的任务包含 3 步:
- 轮询 Read、Write 事件;
- 处理 I/O 事件,即 Read、Write 事件,在 NioSocketChannel 可读、可写事件发生时进行处理;
- 处理任务队列中的任务,runAllTasks;
Netty Demo
初始化并启动 Netty 服务端过程如下:
public class NettyServer {
public static void main(String[] args) throws InterruptedException {
// 1. 创建两个线程组 bossGroup 和 workerGroup
// 2. bossGroup 只处理连接请求,真正业务处理,会将给workerGroup
// 3. 两个都是无限循环的
NioEventLoopGroup bossGroup = new NioEventLoopGroup(1);
NioEventLoopGroup workerGroup = new NioEventLoopGroup();
try {
// 创建服务端的启动对象,配置参数
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class) // 使用NioServerSocketChannel作为服务器的Channel实现
.option(ChannelOption.SO_BACKLOG, 128) // 设置线程队列连接个数
.childOption(ChannelOption.SO_KEEPALIVE, true) // 设置保持活动连接状态
.childHandler(new ChannelInitializer<SocketChannel>() { // 创建一个通道测试对象
// 给pipline设置处理器
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast(new NettyServerHandler());
}
});
System.out.println("server is ready...");
// 绑定一个端口并同步,生成一个channelFuture对象
// 启动服务器
ChannelFuture channelFuture = bootstrap.bind(6668).sync();
// 异步io Future-Listener机制
// 添加监听
channelFuture.addListener(future -> {
if (future.isSuccess()) {
System.out.println(new Date() + " port bind success!");
} else {
System.out.println(new Date() + " port bind failure!");
}
});
// 对关闭通道进行监听
channelFuture.channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
基本过程如下:
- 初始化创建 2 个 NioEventLoopGroup,其中 boosGroup 用于 Accetpt 连接建立事件并分发请求,workerGroup 用于处理 I/O 读写事件和业务逻辑。
- 基于 ServerBootstrap(服务端启动引导类),配置 EventLoopGroup、Channel 类型,连接参数、配置入站、出站事件 handler。
- 绑定端口,开始工作。
自定义的处理器:
public class NettyServerHandler extends ChannelInboundHandlerAdapter {
// 读取客户端发送的数据/消息
// ChannelHandlerContext ctx,上下文对象,包含管道pipline,通道channel,地址
// Object msg, 客户端发送的数据
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("server ctx: " + ctx.toString());
// 将msg转成ByteBuf,注意这里是ByteBuf,是netty提供的,ByteBuffer是NIO提供的
ByteBuf byteBuf = (ByteBuf) msg;
System.out.println("client's msg: " + byteBuf.toString(CharsetUtil.UTF_8));
System.out.println("client's address: " + ctx.channel().remoteAddress());
}
// 数据处理完毕
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
// 将数据写入到缓冲,再刷新
// 需要对发送的数据进行编码
ctx.writeAndFlush(Unpooled.copiedBuffer("hello, client, 您好~", CharsetUtil.UTF_8));
}
// 处理异常,将通道关闭
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
初始化并启动 Netty 客户端过程如下:
public class NettyClient {
public static void main(String[] args) throws InterruptedException {
NioEventLoopGroup group = new NioEventLoopGroup();
try {
// 创建客户端启动对象
// 注意服务端是ServerBootstrap, 客户端是Bootstrap
Bootstrap bootstrap = new Bootstrap();
// 设置相关参数
bootstrap.group(group)
.channel(NioSocketChannel.class) // 设置客户端的实现类,反射
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast(new NettyClientHandler()); // 加入自己的处理器
}
});
System.out.println("client is ready...");
// 启动客户端连接服务器
// 返回channelFuture,涉及到netty异步模型,后面分析
ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 6668).sync();
// 给关闭通道添加监听
channelFuture.channel().closeFuture().sync();
} finally {
group.shutdownGracefully();
}
}
}
自定义处理器:
public class NettyClientHandler extends ChannelInboundHandlerAdapter {
// 到通道就绪就会触发该方法
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("client ctx: " + ctx.toString());
ctx.writeAndFlush(Unpooled.copiedBuffer("hello, server, 你好呀~", CharsetUtil.UTF_8));
}
// 当通道有读取事件时,触发该方法
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = (ByteBuf) msg;
System.out.println("server's reply: " + buf.toString(CharsetUtil.UTF_8));
System.out.println("server's address: " + ctx.channel().remoteAddress());
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}
}
模块组件
refrence: