Netty用户手册
根据netty官网进行翻译netty4.x,地址:https://netty.io/wiki/user-guide-for-4.x.html
问题
如今,我们使用通用应用程序或库来相互通信。例如,我们经常使用 HTTP 客户端库从 Web 服务器检索信息并通过 Web 服务调用远程过程调用。然而,通用协议或其实现有时不能很好地扩展。这就像我们不使用通用 HTTP 服务器来交换大文件、电子邮件和近实时消息(例如财务信息和多人游戏数据)一样。所需要的是专用于特殊目的的高度优化的协议实现。例如,您可能想要实现一个 HTTP 服务器,该服务器针对基于 AJAX 的聊天应用程序、媒体流或大文件传输进行了优化。您甚至可能想要设计和实施完全根据您的需求量身定制的全新协议。另一个不可避免的情况是,您必须处理遗留的专有协议以确保与旧系统的互操作性。在这种情况下,重要的是我们可以多快地实施该协议,同时又不牺牲最终应用程序的稳定性和性能。
处理方法
Netty 项目致力于提供异步事件驱动的网络应用程序框架和工具,用于快速开发可维护的高性能和高可扩展性协议服务器和客户端。
换句话说,Netty 是一个 NIO 客户端服务器框架,可以快速轻松地开发网络应用程序,例如协议服务器和客户端。它极大地简化和精简了网络编程,例如 TCP 和 UDP 套接字服务器开发。
“快速而简单”并不意味着生成的应用程序会遇到可维护性或性能问题。 Netty 是根据从许多协议(例如 FTP、SMTP、HTTP 以及各种二进制和基于文本的遗留协议)的实现中学到的经验精心设计的。结果,Netty 成功地找到了一种方法,可以在不妥协的情况下实现易于开发、性能、稳定性和灵活性。
一些用户可能已经发现其他声称具有相同优势的网络应用程序框架,您可能想知道是什么让 Netty 与它们如此不同。答案是它所建立的哲学。 Netty 旨在从一开始就在 API 和实现方面为您提供最舒适的体验。这不是有形的东西,但是当您阅读本指南并使用 Netty 时,您会意识到这种哲学将使您的生活变得更加轻松。
快速上手
本章通过简单的例子来介绍 Netty 的核心结构,让你快速上手。 读完本章后,您将能够立即在 Netty 之上编写客户端和服务器。
如果您更喜欢自上而下的学习方法,您可能希望从第 2 章架构概述开始,然后回到这里。
在开始之前
运行本章示例的最低要求只有两个; 最新版本的 Netty 和 JDK 1.6 或更高版本。 最新版本的 Netty 可在项目下载页面获得。 要下载正确版本的 JDK,请参考您首选的 JDK 供应商的网站。
在阅读时,您可能对本章介绍的类有更多的疑问。 每当您想了解有关它们的更多信息时,请参阅 API 参考。 为方便起见,本文档中的所有类名都链接到在线 API 参考。 另外,请不要犹豫联系 Netty 项目社区,如果有任何不正确的信息、语法错误或拼写错误,以及您是否有任何好主意来帮助改进文档,请告诉我们。
编写 Discord 服务
世界上最简单的协议不是“Hello, World!” 而是 discard(丢弃)。 它是一种丢弃任何接收到的数据而没有任何响应的协议。
要实现 DISCARD 协议,您唯一需要做的就是忽略所有接收到的数据。 让我们直接从处理程序实现开始,它处理由 Netty 生成的 I/O 事件。
package io.netty.example.discard;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
/**
* Handles a server-side channel.
*/
public class DiscardServerHandler extends ChannelInboundHandlerAdapter { // (1)
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) { // (2)
// Discard the received data silently.
((ByteBuf) msg).release(); // (3)
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { // (4)
// Close the connection when an exception is raised.
cause.printStackTrace();
ctx.close();
}
}
DiscardServerHandler extends ChannelInboundHandlerAdapter
,它是ChannelInboundHandler
的一个实现。ChannelInboundHandler
提供了各种可以重写的事件处理程序方法。 目前,继承ChannelInboundHandlerAdapter
就足够了,而不是自己实现处理程序接口。- 我们在这里重写了
channelRead()
事件处理程序方法。 每当从客户端接收到新数据时,都会使用接收到的消息调用此方法。 本例中,接收消息的类型为ByteBuf
。 - 要实现 DISCARD 协议,处理程序必须忽略接收到的消息。
ByteBuf
是一个引用计数对象,它必须通过 release() 方法显式释放。 请记住,释放传递给处理程序的任何引用计数对象是处理程序的责任。 通常,channelRead()
处理程序方法的实现方式如下:
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
try {
// Do something with msg
} finally {
ReferenceCountUtil.release(msg);
}
}
- 当 Netty 由于 I/O 错误或由于处理事件时抛出的异常而由处理程序实现引发异常时,将使用
Throwable
调用exceptionCaught()
事件处理程序方法。 在大多数情况下,应该记录捕获的异常并在此处关闭其关联的通道,尽管此方法的实现可能会有所不同,具体取决于您要做什么来处理异常情况。 例如,您可能希望在关闭连接之前发送带有错误代码的响应消息。
到现在为止还挺好。 我们已经实现了 DISCARD 服务器的前半部分。 现在剩下的是编写 main()
方法,该方法使用 DiscardServerHandler
启动服务器。
package io.netty.example.discard;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
/**
* Discards any incoming data.
*/
public class DiscardServer {
private int port;
public DiscardServer(int port) {
this.port = port;
}
public void run() throws Exception {
EventLoopGroup bossGroup = new NioEventLoopGroup(); // (1)
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap(); // (2)
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class) // (3)
.childHandler(new ChannelInitializer<SocketChannel>() { // (4)
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new DiscardServerHandler());
}
})
.option(ChannelOption.SO_BACKLOG, 128) // (5)
.childOption(ChannelOption.SO_KEEPALIVE, true); // (6)
// Bind and start to accept incoming connections.
ChannelFuture f = b.bind(port).sync(); // (7)
// Wait until the server socket is closed.
// In this example, this does not happen, but you can do that to gracefully
// shut down your server.
f.channel().closeFuture().sync();
} finally {
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
}
}
public static void main(String[] args) throws Exception {
int port = 8080;
if (args.length > 0) {
port = Integer.parseInt(args[0]);
}
new DiscardServer(port).run();
}
}
NioEventLoopGroup
是一个处理 I/O 操作的多线程事件循环。 Netty 为不同类型的传输提供了各种EventLoopGroup
实现。我们在这个例子中实现了一个服务器端应用程序,因此将使用两个NioEventLoopGroup
。第一个,通常称为“boss”,接受传入连接。第二个,通常称为“worker”,一旦 boss 接受连接并将接受的连接注册到 worker ,就会处理已接受连接的流量。使用多少线程以及它们如何映射到创建的通道取决于EventLoopGroup
实现,甚至可以通过构造函数进行配置。ServerBootstrap
是一个设置服务器的辅助类。您可以直接使用 Channel 设置服务器。但是,请注意,这是一个乏味的过程,在大多数情况下您不需要这样做。- 在这里,我们指定使用
NioServerSocketChannel
类,该类用于实例化一个新的 Channel 以接受传入的连接。 - 此处指定的处理程序将始终由新接受的
Channel
评估。ChannelInitializer
是一个特殊的处理程序,旨在帮助用户配置新的Channel
。很可能您希望通过添加一些处理程序(例如DiscardServerHandler
)来配置新Channel
的ChannelPipeline
来实现您的网络应用程序。随着应用程序变得复杂,您可能会向管道添加更多处理程序,并最终将此匿名类提取到顶级类中。 - 您还可以设置特定于
Channel
实现的参数。我们正在编写一个TCP/IP
服务器,因此我们可以设置套接字选项,例如tcpNoDelay
和keepAlive
。请参阅ChannelOption
的apidocs
和特定的ChannelConfig
实现以获取有关支持的ChannelOptions
的概述。 - 你注意到
option()
和childOption()
了吗?option()
用于接受传入连接的NioServerSocketChannel
。childOption()
用于父ServerChannel
接受的Channels
,在本例中为NioSocketChannel
。 - 我们现在准备好了。剩下的就是绑定到端口并启动服务器。在这里,我们绑定到机器中所有网卡(网卡)的8080端口。您现在可以根据需要多次调用
bind()
方法(使用不同的绑定地址。)
查看接受到的数据
现在我们已经编写了我们的第一个服务器,我们需要测试它是否真的有效。 测试它的最简单方法是使用 telnet
命令。 例如,您可以在命令行中输入 telnet localhost 8080
并键入一些内容。
但是,我们可以说服务器工作正常吗? 我们无法真正知道,因为它是丢弃服务器。 你根本不会得到任何回应。 为了证明它确实有效,让我们修改服务器以打印它收到的内容。 我们已经知道只要接收到数据就会调用 channelRead()
方法。 让我们将一些代码放入 DiscardServerHandler
的 channelRead()
方法中:
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ByteBuf in = (ByteBuf) msg;
try {
while (in.isReadable()) { // (1)
System.out.print((char) in.readByte());
System.out.flush();
}
} finally {
ReferenceCountUtil.release(msg); // (2)
}
}
- 这个低效循环实际上可以简化为:
System.out.println(in.toString(io.netty.util.CharsetUtil.US_ASCII))
- 或者,您可以在此处执行
in.release()
。
编写一个echo服务
到目前为止,我们一直在消费数据而根本没有响应。 然而,服务器通常应该响应请求。 让我们学习如何通过实现 ECHO 协议向客户端写入响应消息,其中任何接收到的数据都被发回。
与我们在前几节中实现的丢弃服务器的唯一区别是它将接收到的数据发送回而不是将接收到的数据打印到控制台。 因此,再次修改 channelRead()
方法就足够了:
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ctx.write(msg); // (1)
ctx.flush(); // (2)
}
ChannelHandlerContext
对象提供各种操作,使您能够触发各种 I/O 事件和操作。 在这里,我们调用write(Object)
逐字写入接收到的消息。 请注意,我们没有像在 DISCARD 示例中那样释放收到的消息。 这是因为当它被写出到线路时,Netty 会为你释放它。ctx.write(Object)
不会将消息写出到线路。 它在内部进行缓冲,然后通过ctx.flush()
刷新到线路。 或者,为了简洁起见,您可以调用ctx.writeAndFlush(msg)
。
如果您再次运行 telnet 命令,您将看到服务器将您发送给它的任何内容发回。
echo
服务器的完整源代码位于发行版的 io.netty.example.echo
包中。
编写Time服务
本节要实现的协议是 TIME 协议。 与前面的例子不同的是,它发送一条消息,其中包含一个 32 位整数,不接收任何请求,一旦发送消息就关闭连接。 在此示例中,您将学习如何构造和发送消息,以及如何在完成时关闭连接。
因为我们将忽略任何接收到的数据,而是在连接建立后立即发送消息,所以这次我们不能使用 channelRead()
方法。 相反,我们应该覆盖 channelActive()
方法。 下面是实现:
package io.netty.example.time;
public class TimeServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelActive(final ChannelHandlerContext ctx) { // (1)
final ByteBuf time = ctx.alloc().buffer(4); // (2)
time.writeInt((int) (System.currentTimeMillis() / 1000L + 2208988800L));
final ChannelFuture f = ctx.writeAndFlush(time); // (3)
f.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) {
assert f == future;
ctx.close();
}
}); // (4)
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
ctx.close();
}
}
-
如前所述,当建立连接并准备好生成流量时,将调用
channelActive()
方法。让我们在这个方法中写一个 32 位整数来表示当前时间。 -
要发送一条新消息,我们需要分配一个新的缓冲区来包含该消息。我们将要写入一个 32 位整数,因此我们需要一个容量至少为 4 个字节的
ByteBuf
。通过ChannelHandlerContext.alloc()
获取当前的ByteBufAllocator
并分配一个新的缓冲区。 -
像往常一样,我们编写构造的消息。
但是等等,翻转在哪里?我们之前不是在
NIO
中发送消息之前调用java.nio.ByteBuffer.flip()
吗?ByteBuf
没有这样的方法,因为它有两个指针;一个用于读操作,另一个用于写操作。当您向ByteBuf
写入内容而读取器索引不会更改时,写入器索引会增加。读取器索引和写入器索引分别表示消息开始和结束的位置。相比之下,
NIO
缓冲区没有提供一种干净的方法来确定消息内容的开始和结束位置,而无需调用翻转方法。当您忘记翻转缓冲区时,您会遇到麻烦,因为不会发送任何数据或不正确的数据。这种错误在 Netty 中不会发生,因为我们针对不同的操作类型有不同的指针。当你习惯了它时,你会发现它会让你的生活变得更轻松——一种没有生气的生活!另一点要注意的是
ChannelHandlerContext.write()
(和writeAndFlush()
)方法返回一个ChannelFuture
。ChannelFuture
表示尚未发生的 I/O 操作。这意味着,任何请求的操作可能尚未执行,因为所有操作在 Netty 中都是异步的。例如,即使在发送消息之前,以下代码也可能关闭连接:Channel ch = ...; ch.writeAndFlush(message); ch.close();
因此,您需要在
ChannelFuture
完成后调用close()
方法,该方法由write()
方法返回,并在写入操作完成时通知其侦听器。 请注意,close()
也可能不会立即关闭连接,它会返回一个ChannelFuture
。 -
我们如何在写请求完成时得到通知? 这就像将
ChannelFutureListener
添加到返回的ChannelFuture
一样简单。 在这里,我们创建了一个新的匿名ChannelFutureListener
,它在操作完成时关闭Channel
。或者,您可以使用预定义的侦听器简化代码:
f.addListener(ChannelFutureListener.CLOSE);
要测试我们的时间服务器是否按预期工作,您可以使用 UNIX rdate 命令:
$ rdate -o <port> -p <host>
其中
编写Time客户端
与 DISCARD 和 ECHO 服务器不同,我们需要 TIME 协议的客户端,因为人类无法将 32 位二进制数据转换为日历上的日期。 在本节中,我们将讨论如何确保服务器正常工作并学习如何使用 Netty 编写客户端。
Netty 中服务器和客户端之间最大也是唯一的区别是使用了不同的 Bootstrap
和 Channel
实现。 请看下面的代码:
package io.netty.example.time;
public class TimeClient {
public static void main(String[] args) throws Exception {
String host = args[0];
int port = Integer.parseInt(args[1]);
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
Bootstrap b = new Bootstrap(); // (1)
b.group(workerGroup); // (2)
b.channel(NioSocketChannel.class); // (3)
b.option(ChannelOption.SO_KEEPALIVE, true); // (4)
b.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new TimeClientHandler());
}
});
// Start the client.
ChannelFuture f = b.connect(host, port).sync(); // (5)
// Wait until the connection is closed.
f.channel().closeFuture().sync();
} finally {
workerGroup.shutdownGracefully();
}
}
}
Bootstrap
类似于ServerBootstrap
,不同之处在于它用于非服务器通道,例如客户端或无连接通道。- 如果仅指定一个
EventLoopGroup
,它将同时用作 boss 组和 worker 组。 但是,boss,worker不用于客户端。 - 代替
NioServerSocketChannel
,NioSocketChannel
用于创建客户端通道。 - 请注意,与使用
ServerBootstrap
不同,我们在这里不使用childOption()
,因为客户端SocketChannel
没有父级。 - 我们应该调用
connect()
方法而不是bind()
方法。
如您所见,它与服务器端代码并没有真正的不同。 那么关于ChannelHandler
的实现呢? 它应该从服务器接收一个 32 位整数,将其转换为人类可读的格式,打印转换的时间,然后关闭连接:
package io.netty.example.time;
import java.util.Date;
public class TimeClientHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ByteBuf m = (ByteBuf) msg; // (1)
try {
long currentTimeMillis = (m.readUnsignedInt() - 2208988800L) * 1000L;
System.out.println(new Date(currentTimeMillis));
ctx.close();
} finally {
m.release();
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
ctx.close();
}
}
- 在
TCP/IP
中,Netty 将从对等方发送的数据读取到ByteBuf
中。
它看起来非常简单,看起来与服务器端示例没有任何不同。 但是,此处理程序有时会拒绝引发 IndexOutOfBoundsException
的工作。 我们将在下一节讨论为什么会发生这种情况。
处理基于流的传输
Socket Buffer 的一个小注意事项
在基于流的传输(如 TCP/IP
)中,接收到的数据存储在套接字接收缓冲区中。 不幸的是,基于流的传输的缓冲区不是数据包队列而是字节队列。 这意味着,即使您将两条消息作为两个独立的数据包发送,操作系统也不会将它们视为两条消息,而只是一堆字节。 因此,无法保证您阅读的内容与您的远程对等方所写的内容完全相同。 例如,让我们假设操作系统的 TCP/IP
堆栈收到了三个数据包:
由于基于流的协议的这一一般属性,在您的应用程序中很有可能以以下碎片形式读取它们:
因此,接收部分,无论是服务器端还是客户端,都应该将接收到的数据碎片整理成一个或多个有意义的帧,这些帧可以被应用程序逻辑轻松理解。 在上面的例子中,接收到的数据应该是这样的:
第一个解决方法
现在让我们回到 TIME 客户端示例。 我们这里也有同样的问题。 一个 32 位整数是一个非常小的数据量,它不太可能经常被碎片化。 但是问题是可以分片,而且随着流量的增加,分片的可能性也会增加。
简单的解决方案是创建一个内部累积缓冲区,并等待所有 4 个字节都接收到内部缓冲区中。 以下是修复该问题的修改后的 TimeClientHandler
实现:
package io.netty.example.time;
import java.util.Date;
public class TimeClientHandler extends ChannelInboundHandlerAdapter {
private ByteBuf buf;
@Override
public void handlerAdded(ChannelHandlerContext ctx) {
buf = ctx.alloc().buffer(4); // (1)
}
@Override
public void handlerRemoved(ChannelHandlerContext ctx) {
buf.release(); // (1)
buf = null;
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ByteBuf m = (ByteBuf) msg;
buf.writeBytes(m); // (2)
m.release();
if (buf.readableBytes() >= 4) { // (3)
long currentTimeMillis = (buf.readUnsignedInt() - 2208988800L) * 1000L;
System.out.println(new Date(currentTimeMillis));
ctx.close();
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
ctx.close();
}
}
ChannelHandler
有两个生命周期侦听器方法:handlerAdded()
和handlerRemoved()
。 您可以执行任意(去)初始化任务,只要它不会长时间阻塞即可。- 首先,所有接收到的数据都应该累积到
buf
中。 - 然后,处理程序必须检查
buf
是否有足够的数据,在此示例中为 4 个字节,并继续执行实际的业务逻辑。 否则,当更多数据到达时,Netty 将再次调用channelRead()
方法,最终将所有 4 个字节累加。
第二个解决方法
尽管第一个解决方案已经解决了 TIME 客户端的问题,但修改后的处理程序看起来并不那么干净。 想象一个更复杂的协议,它由多个字段组成,例如可变长度字段。 您的 ChannelInboundHandler
实现将很快变得不可维护。
您可能已经注意到,您可以将多个 ChannelHandler
添加到 ChannelPipeline
,因此,您可以将一个整体的 ChannelHandler
拆分为多个模块化的,以降低应用程序的复杂性。 例如,您可以将 TimeClientHandler
拆分为两个处理程序:
- 处理碎片问题的
TimeDecoder
,以及 TimeClientHandler
的初始简单版本。
幸运的是,Netty 提供了一个可扩展的类,它可以帮助您编写第一个开箱即用的类:
package io.netty.example.time;
public class TimeDecoder extends ByteToMessageDecoder { // (1)
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) { // (2)
if (in.readableBytes() < 4) {
return; // (3)
}
out.add(in.readBytes(4)); // (4)
}
}
ByteToMessageDecoder
是ChannelInboundHandler
的一个实现,它可以轻松处理碎片问题。- 每当接收到新数据时,
ByteToMessageDecoder
都会使用内部维护的累积缓冲区调用decode()
方法。 - 当累积缓冲区中没有足够的数据时,
decode()
可以决定不添加任何内容。 当接收到更多数据时,ByteToMessageDecoder
将再次调用decode()
。 - 如果
decode()
向out
添加一个对象,则表示解码器成功解码了一条消息。ByteToMessageDecoder
将丢弃累积缓冲区的读取部分。 请记住,您不需要解码多条消息。ByteToMessageDecoder
将继续调用decode()
方法,直到它没有添加任何内容。
既然我们有另一个处理程序要插入到 ChannelPipeline
中,我们应该修改 TimeClient
中的 ChannelInitializer
实现:
b.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new TimeDecoder(), new TimeClientHandler());
}
});
如果您是一个喜欢冒险的人,您可能想尝试使用 ReplayingDecoder
来进一步简化解码器。 不过,您需要查阅 API
参考以获取更多信息。
public class TimeDecoder extends ReplayingDecoder<Void> {
@Override
protected void decode(
ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
out.add(in.readBytes(4));
}
}
此外,Netty 提供了开箱即用的解码器,使您能够非常轻松地实现大多数协议,并帮助您避免以不可维护的整体处理程序实现而告终。 请参阅以下软件包以获取更详细的示例:
io.netty.example.factorial
用于二进制协议,以及io.netty.example.telnet
用于基于文本行的协议。
使用 POJO 而不是 ByteBuf
到目前为止,我们回顾的所有示例都使用 ByteBuf
作为协议消息的主要数据结构。 在本节中,我们将改进 TIME 协议客户端和服务器示例,以使用 POJO
而不是 ByteBuf
。
在 ChannelHandler
中使用 POJO
的优势是显而易见的; 通过从处理程序中分离出从 ByteBuf
中提取信息的代码,您的处理程序变得更易于维护和重用。 在 TIME 客户端和服务器示例中,我们只读取一个 32 位整数,直接使用 ByteBuf
不是主要问题。 但是,您会发现在实现实际协议时有必要进行分离。
首先,让我们定义一个名为 UnixTime
的新类型。
package io.netty.example.time;
import java.util.Date;
public class UnixTime {
private final long value;
public UnixTime() {
this(System.currentTimeMillis() / 1000L + 2208988800L);
}
public UnixTime(long value) {
this.value = value;
}
public long value() {
return value;
}
@Override
public String toString() {
return new Date((value() - 2208988800L) * 1000L).toString();
}
}
我们现在可以修改 TimeDecoder
以生成 UnixTime
而不是 ByteBuf
。
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
if (in.readableBytes() < 4) {
return;
}
out.add(new UnixTime(in.readUnsignedInt()));
}
使用更新的解码器, TimeClientHandler
不再使用 ByteBuf
:
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
UnixTime m = (UnixTime) msg;
System.out.println(m);
ctx.close();
}
更简单和优雅,对吧? 可以在服务器端应用相同的技术。 这次我们先更新一下TimeServerHandler
:
@Override
public void channelActive(ChannelHandlerContext ctx) {
ChannelFuture f = ctx.writeAndFlush(new UnixTime());
f.addListener(ChannelFutureListener.CLOSE);
}
现在,唯一缺少的部分是编码器,它是 ChannelOutboundHandler
的实现,它将 UnixTime
转换回 ByteBuf
。 它比编写解码器简单得多,因为在编码消息时无需处理数据包碎片和组装。
package io.netty.example.time;
public class TimeEncoder extends ChannelOutboundHandlerAdapter {
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) {
UnixTime m = (UnixTime) msg;
ByteBuf encoded = ctx.alloc().buffer(4);
encoded.writeInt((int)m.value());
ctx.write(encoded, promise); // (1)
}
}
-
在这一行中有很多重要的事情。
首先,我们按原样传递原始
ChannelPromise
,以便 Netty 在编码数据实际写入线路时将其标记为成功或失败。其次,我们没有调用
ctx.flush()
。 有一个单独的处理程序方法void flush(ChannelHandlerContext ctx)
旨在覆盖 flush() 操作。
为了进一步简化,您可以使用 MessageToByteEncoder
:
public class TimeEncoder extends MessageToByteEncoder<UnixTime> {
@Override
protected void encode(ChannelHandlerContext ctx, UnixTime msg, ByteBuf out) {
out.writeInt((int)msg.value());
}
}
剩下的最后一个任务是在 TimeServerHandler
之前在服务器端的 ChannelPipeline
中插入一个 TimeEncoder
,它作为一个简单的练习。
关闭你的应用程序
关闭 Netty 应用程序通常就像关闭您通过 shutdownGracefully()
创建的所有 EventLoopGroup
一样简单。 当 EventLoopGroup
已完全终止并且属于该组的所有 Channels
已关闭时,它会返回一个 Future
通知您。
总结
在本章中,我们快速浏览了 Netty,并演示了如何在 Netty 之上编写一个完整工作的网络应用程序。
在接下来的章节中有更多关于 Netty 的详细信息。 我们还鼓励您查看 io.netty.example
包中的 Netty 示例。
另请注意,社区一直在等待您的问题和想法来帮助您并根据您的反馈不断改进 Netty 及其文档。
最后检索于 2021 年 7 月 16 日