Netty官网:http://netty.io/
本例程使用最新的netty5.x版本编写
服务器端:
TimeServer 时间服务器 服务端接收客户端的连接请求和查询当前时间的指令,判断指令正确后响应返回当前服务器的校准时间。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
|
package c1; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; /** * server 有粘包问题 * @author xwalker */ public class TimeServer { public void bind( int port) throws Exception { // 服务器线程组 用于网络事件的处理 一个用于服务器接收客户端的连接 // 另一个线程组用于处理SocketChannel的网络读写 EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { // NIO服务器端的辅助启动类 降低服务器开发难度 ServerBootstrap serverBootstrap = new ServerBootstrap(); serverBootstrap.group(bossGroup, workerGroup) .channel(NioServerSocketChannel. class ) // 类似NIO中serverSocketChannel .option(ChannelOption.SO_BACKLOG, 1024 ) // 配置TCP参数 .childHandler( new ChildChannelHandler()); // 最后绑定I/O事件的处理类 // 处理网络IO事件 // 服务器启动后 绑定监听端口 同步等待成功 主要用于异步操作的通知回调 回调处理用的ChildChannelHandler ChannelFuture f = serverBootstrap.bind(port).sync(); System.out.println( "timeServer启动" ); // 等待服务端监听端口关闭 f.channel().closeFuture().sync(); } finally { // 优雅退出 释放线程池资源 bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); System.out.println( "服务器优雅的释放了线程资源..." ); } } /** * 网络事件处理器 */ private class ChildChannelHandler extends ChannelInitializer<SocketChannel> { @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast( new TimeServerHandler()); } } public static void main(String[] args) throws Exception { int port = 8000 ; new TimeServer().bind(port); } } |
TimerServer接收到客户端的连接和读写请求后交给处理器handler进行事件的响应处理,服务器定义两组线程组,一组用来处理客户端连接,一组用来处理网络IO事件(SocketChannel)的响应,NioEventLoopGroup是Netty提供的NIO线程组,实际上就是Java NIO中的Reactor线程组。
ServerBootstrap是Netty提供的用于NIO服务端辅助启动类,降低了NIO服务端的开发复杂度。
ServerBootstrap需要绑定服务器网络IO事件的处理类ChildChannelHandler ,用于实际处理具体的IO事件,例如记录日志,对消息编解码等。
TimeServerHandler需要继承Netty提供的适配器ChannelhandlerAdapter重写channelRead等方法完成消息的读写。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
|
package c1; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerAdapter; import io.netty.channel.ChannelHandlerContext; import java.util.Date; /** * server端网络IO事件处理 * @author xwalker * */ public class TimeServerHandler extends ChannelHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { System.out.println( "服务器读取到客户端请求..." ); ByteBuf buf=(ByteBuf) msg; byte [] req= new byte [buf.readableBytes()]; buf.readBytes(req); String body= new String(req, "UTF-8" ); System.out.println( "the time server receive order:" +body); String curentTime= "QUERY TIME ORDER" .equalsIgnoreCase(body)? new Date(System.currentTimeMillis()).toString(): "BAD ORDER" ; ByteBuf resp=Unpooled.copiedBuffer(curentTime.getBytes()); ctx.write(resp); System.out.println( "服务器做出了响应" ); } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { ctx.flush(); System.out.println( "服务器readComplete 响应完成" ); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { ctx.close(); System.out.println( "服务器异常退出" +cause.getMessage()); } } |
服务器通过handler接收和处理消息请求,channelRead中的msg就是客户端请求的消息,通过解码获取具体信息后根据消息格式和定义完成后续的响应。
ByteBuf是netty封装和扩展的java NIO中的ByteBuffer类,功能更完善。通过ByteBuf接收和解码msg 转成String类型 然后判断命令是都准确,根据结果做出响应。
客户端:
客户端的处理比较简单,启动客户端,链接服务器成功后发送时间查询的指令,等待服务器响应。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
|
package c1; import io.netty.bootstrap.Bootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; /** * client 存在TCP粘包问题 * @author xwlaker * */ public class TimeClient { /** * 连接服务器 * @param port * @param host * @throws Exception */ public void connect( int port, String host) throws Exception { //配置客户端NIO线程组 EventLoopGroup group = new NioEventLoopGroup(); try { //客户端辅助启动类 对客户端配置 Bootstrap b = new Bootstrap(); b.group(group).channel(NioSocketChannel. class ) .option(ChannelOption.TCP_NODELAY, true ) .handler( new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast( new TimeClientHandler()); } }); //异步链接服务器 同步等待链接成功 ChannelFuture f = b.connect(host, port).sync(); //等待链接关闭 f.channel().closeFuture().sync(); } finally { group.shutdownGracefully(); System.out.println( "客户端优雅的释放了线程资源..." ); } } public static void main(String[] args) throws Exception { new TimeClient().connect( 8000 , "127.0.0.1" ); } } |
客户端定义一组线程组用于处理与服务器的网络IO事件。通过客户端辅助启动类 Bootstrap来配置线程组、TCP参数以及IO事件处理的Handler。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
|
package c1; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerAdapter; import io.netty.channel.ChannelHandlerContext; import java.util.logging.Logger; /** * Client 网络IO事件处理 * @author xwalker * */ public class TimeClientHandler extends ChannelHandlerAdapter { private static final Logger logger=Logger.getLogger(TimeClientHandler. class .getName()); private ByteBuf firstMessage; public TimeClientHandler(){ byte [] req = "QUERY TIME ORDER" .getBytes(); firstMessage=Unpooled.buffer(req.length); firstMessage.writeBytes(req); } @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { ctx.writeAndFlush(firstMessage); System.out.println( "客户端active" ); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { System.out.println( "客户端收到服务器响应数据" ); ByteBuf buf=(ByteBuf) msg; byte [] req= new byte [buf.readableBytes()]; buf.readBytes(req); String body= new String(req, "UTF-8" ); System.out.println( "Now is:" +body); } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { ctx.flush(); System.out.println( "客户端收到服务器响应数据处理完成" ); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { logger.warning( "Unexpected exception from downstream:" +cause.getMessage()); ctx.close(); System.out.println( "客户端异常退出" ); } } |
TimeClienthandler继承Netty提供的Handler适配器,重写channelActive和channelRead方法 前者通道打开active状态时 发送查询指令,后者接收服务器响应的消息并解码输出。
运行结果:
客户端启动后首先处理器channelActive被调用发送查询指令,服务器端接收到查询指令后返回了当前时间,客户端接收到服务器响应后解码输出当前时间。