本文参考
本篇文章是对《Netty In Action》一书第二章"你的第一款 Netty 应用程序"的学习摘记,主要内容为编写 Echo 服务器和客户端
第一款应用程序的功能
Echo 客户端和服务器之间的交互十分简单:在客户端建立一个连接之后,它会向服务器发送一个或多个消息,反过来,服务器又会将每个消息回送给客户端。虽然我们的主要关注点可能是编写基于 Web 的用于被浏览器访问的应用程序,但它也充分地体现了客户端/服务器系统中典型的请求-响应交互模式。
虽然这只是一个简单的应用程序,但是它可以伸缩到支持数千个并发连接——每秒可以比普通的基于套接字的 Java 应用程序处理多得多的消息
编写Echo服务器
ChannelHandler是一个接口族的父接口,它的许多实现可以负责接收并响应事件通知。 在 Netty 应用程序中,所有的数据处理逻辑都包含在这些核心抽象的实现中
因为我们的Echo 服务器会响应传入的消息,所以需要实现ChannelInboundHandler接口,用来定义响应入站事件的方法。这个简单应用程序只需要用到少量的这些方法,所以继承 ChannelInboundHandlerAdapter类也足够了,它提供了ChannelInboundHandler的默认实现
在这里我们用到了
channelRead() — Invoked when the current Channel has read a message from the peer
channelReadComplete() —— Invoked when the last message read by the current read operation has been consumed by channelRead(ChannelHandlerContext, Object).
exceptionCaught() —— Gets called if a Throwable class was thrown.
//标示一个ChannelHandler可以被多个 Channel 安全地共享
@Sharable
public class EchoServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ByteBuf in = (ByteBuf) msg;
//将消息记录到控制台
System.out.println("Server received: " + in.toString(CharsetUtil.UTF_8));
//将接收到的消息写给发送者,而不冲刷出站消息
ctx.write(in);
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
//将未冲刷的消息冲刷到远程节点,并且关闭该 Channel
ctx.writeAndFlush(Unpooled.EMPTY_BUFFER)
.addListener(ChannelFutureListener.CLOSE);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx,Throwable cause) {
//打印异常栈跟踪
cause.printStackTrace();
//关闭该Channel
ctx.close();
}
}
入站异常捕获机制:
每个 Channel 都拥有一个与之相关联的 ChannelPipeline,其持有一个 ChannelHandler 的实例链。在默认的情况下,ChannelHandler 会把对它的方法的调用转发给链中的下一个 ChannelHandler。因此,对于入站事件,异常将会按照入站方向流动。如果exceptionCaught()方法没有被该链中的某处实现,那么所接收的异常将会被传递到 ChannelPipeline 的尾端并被记录,所以实现异常处理的ChannelInboundHandler通常位于ChannelPipeline的最后。应用程序应该提供至少有一个实现了 exceptionCaught()方法的ChannelHandler
若没有实现任何异常处理,Netty将会通过 Warning级别的日志记录该异常到达了ChannelPipeline的尾端,未被处理,并尝试释放该异常
因为还没有讲清ChannelPipeline等概念,对异常处理机制更深的理解会在之后的文章中涉及
@Sharable注解:
该注解也有涉及ChannelPipeline的概念,当前的ChannelHandler被注解为Sharable时,可以在不存在竞争的情况下添加到多个ChannelPipeline中,否则因成员变量无法共享,需要每次都新建一个ChannelHandler实例添加到ChannelPipeline
Indicates that the same instance of the annotated ChannelHandler can be added to one or more ChannelPipeline multiple times without a race condition.
If this annotation is not specified, you have to create a new handler instance every time you add it to a pipeline because it has unshared state such as member variables.
引导服务器
上面已经实现了核心的业务逻辑,接下来需要绑定到指定端口,启动服务
public class EchoServer {
private final int port;
public EchoServer(int port) {
this.port = port;
}
public static void main(String[] args) throws Exception {
if (args.length != 1) {
System.err.println("Usage: " + EchoServer.class.getSimpleName() + " <port>");
return;
}
//设置端口值(如果端口参数的格式不正确,则抛出一个NumberFormatException)
int port = Integer.parseInt(args[0]);
//调用服务器的 start()方法
new EchoServer(port).start();
}
public void start() throws Exception {
final EchoServerHandler serverHandler = new EchoServerHandler();
//(1) 因为使用NIO传输,所以新建NioEventLoopGroup进行事件的处理,如接受新连接以及读/ 写数据
EventLoopGroup group = new NioEventLoopGroup();
try {
//(2) 创建ServerBootstrap
ServerBootstrap b = new ServerBootstrap();
b.group(group)
//(3) 指定所使用的 NIO 传输 Channel
.channel(NioServerSocketChannel.class)
//(4) 使用指定的本地端口设置套接字地址
.localAddress(new InetSocketAddress(port))
//(5) 添加一个EchoServerHandler到该Channel的 ChannelPipeline
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
//这里对于所有的客户端连接来说,都会使用同一个 EchoServerHandler
//用于初始化每一个新的Channel,因为其被标注为@Sharable
ch.pipeline().addLast(serverHandler);
}
});
//(6) 异步地绑定服务器,创建一个新的Channel;调用 sync()方法阻塞等待直到绑定完成
ChannelFuture f = b.bind().sync();
System.out.println(EchoServer.class.getName() + " started and listening for connections on " + f.channel().localAddress());
//(7) 获取 Channel 的CloseFuture,并且阻塞当前线程直到它完成,即Channel关闭
f.channel().closeFuture().sync();
} finally {
//(8) 关闭 EventLoopGroup,释放所有的资源
group.shutdownGracefully().sync();
}
}
}
编写Echo客户端
在客户端,我们使用SimpleChannelInboundHandler 类处理所有必须的任务,它是对ChannelInboundHandlerAdapter的继承
在这里我们用到了
channelActive() —— The Channel of the ChannelHandlerContext is now active
channelRead0() —— Is called for each message of type,this method will be renamed to messageReceived() in 5.0.
exceptionCaught() —— Gets called if a Throwable class was thrown.
@Sharable
//标记该类的实例可以被多个 Channel 共享
public class EchoClientHandler extends SimpleChannelInboundHandler<ByteBuf> {
@Override
public void channelActive(ChannelHandlerContext ctx) {
//当被通知 Channel是活跃的时候,发送一条消息
ctx.writeAndFlush(Unpooled.copiedBuffer("Netty rocks!", CharsetUtil.UTF_8));
}
// 由服务器发送的消息可能会被分块接收,channelRead0()方法可能会被调用多次
// 但是作为一个面向流的协议,TCP 保证了字节数组将会按照服务器发送它们的顺序被接收
@Override
public void channelRead0(ChannelHandlerContext ctx, ByteBuf in) {
//记录已接收消息的转储
System.out.println("Client received: " + in.toString(CharsetUtil.UTF_8));
}
@Override
//在发生异常时,记录错误并关闭Channel,即终止连接
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
ctx.close();
}
}
SimpleChannelInboundHandler 与 ChannelInboundHandler的区别:
在客户端,当channelRead0()方法完成时,你已经有了传入消息,并且已经处理完它了。当该方法返回时,SimpleChannelInboundHandler负责释放指向保存该消息的ByteBuf的内存引用,下面是SimpleChannelInboundHandler的channelRead()实现,可见我们不需要再去重载channelRead()方法,只需重载channelRead0()方法
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
boolean release = true;
try {
if (acceptInboundMessage(msg)) {
@SuppressWarnings("unchecked")
I imsg = (I) msg;
channelRead0(ctx, imsg); //调用channelRead0()
} else {
release = false;
ctx.fireChannelRead(msg);
}
} finally {
if (autoRelease && release) {
ReferenceCountUtil.release(msg); //释放消息
}
}
}
在EchoServerHandler中,你仍然需要将传入消息回送给发送者,而write()操作是异步的,channelRead()方法返回后可能仍然没有完成。直到消息在 EchoServerHandler 的 channelReadComplete()方法中,当 writeAndFlush()方法被调用时被释放,下面是ChannelInboundHandler的类注释
Be aware that messages are not released after the channelRead(ChannelHandlerContext, Object) method returns automatically. If you are looking for a ChannelInboundHandler implementation that releases the received messages automatically, please see SimpleChannelInboundHandler.
引导客户端
引导客户端类似于引导服务器,不同的是,客户端是使用主机和端口参数来连接远程地址,也就是这里的 Echo 服务器的地址,而不是绑定到一个一直被监听的端口。
我们可以在客户端和服务器上分别使用不同的传输。 例如,在服务器端使用 NIO 传输,而在客户端使用 OIO 传输
public class EchoClient {
private final String host;
private final int port;
public EchoClient(String host, int port) {
this.host = host;
this.port = port;
}
public void start() throws Exception {
//(1) 创建EventLoopGroup
EventLoopGroup group = new NioEventLoopGroup();
try {
//(2) 创建 Bootstrap
Bootstrap b = new Bootstrap();
//(3) 指定 EventLoopGroup 以处理客户端事件,其中事件处理包括创建新的连接以及处理入站和出站数据
b.group(group)
//(4) 适用于 NIO 传输的Channel 类型
.channel(NioSocketChannel.class)
//(5) 设置服务器的InetSocketAddress
.remoteAddress(new InetSocketAddress(host, port))
//(6) 在创建Channel时,向 ChannelPipeline中添加一个 EchoClientHandler实例
.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new EchoClientHandler());
}
});
//(7) 连接到远程节点,阻塞等待直到连接完成
ChannelFuture f = b.connect().sync();
//(8) 阻塞,直到Channel 关闭
f.channel().closeFuture().sync();
} finally {
//(9)关闭线程池并且释放所有的资源
group.shutdownGracefully().sync();
}
}
public static void main(String[] args) throws Exception {
if (args.length != 2) {
System.err.println("Usage: " + EchoClient.class.getSimpleName() + " <host> <port>" );
return;
}
final String host = args[0];
final int port = Integer.parseInt(args[1]);
new EchoClient(host, port).start();
}
}
Netty将会通过 Warning级别的日志记录该异常到达了ChannelPipeline的尾端,但没有被处理, 并尝试释放该异常