当服务器超过 3 秒 没有读时,提示“读空闲”;当服务器超过 5 秒没有写操作时,提示“写空闲”;当服务器超过 7 秒没有读或者写操作时,提示“读写空闲”。
HeartBeatServer
package com.oy.heartbeat; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.logging.LogLevel; import io.netty.handler.logging.LoggingHandler; import io.netty.handler.timeout.IdleStateHandler; import java.util.concurrent.TimeUnit; public class HeartBeatServer { private int port; public HeartBeatServer(int port) { this.port = port; } public static void main(String[] args) throws Exception { new HeartBeatServer(8001).run(); } public void run() throws Exception { EventLoopGroup boss = new NioEventLoopGroup(1); EventLoopGroup work = new NioEventLoopGroup(); try { ServerBootstrap bootstrap = new ServerBootstrap(); bootstrap .channel(NioServerSocketChannel.class) .group(boss, work) .handler(new LoggingHandler(LogLevel.DEBUG)) // 在 bossGroup 添加一个日志处理器 .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { ChannelPipeline pipeline = socketChannel.pipeline(); /* * IdleStateHandler: netty 提供的处理空闲状态的处理器 * long readerIdleTime: 多长时间没有读,就会发送一个心跳检查是否连接 * long writerIdleTime: 多长时间没有写,就会发送一个心跳检查是否连接 * long allIdleTime: 多长时间没有读写,就会发送一个心跳检查是否连接 */ pipeline.addLast(new IdleStateHandler(3,5,7, TimeUnit.SECONDS)); // 当 IdleStateEvent 触发后,就会传递给管道的下一个 handler 进行处理 // 通过调用(触发)下一个 handler 的 userEventTriggered() // 添加对空闲检测进一步处理的自定义 handler pipeline.addLast(new HeartBeatServerHandler()); } }); // 绑定端口,启动服务 ChannelFuture future = bootstrap.bind(port).sync(); System.out.println("server started and listen " + port); // 监听关闭 future.channel().closeFuture().sync(); } catch (Exception e) { e.printStackTrace(); } finally { boss.shutdownGracefully(); work.shutdownGracefully(); } } }
HeartBeatServerHandler
package com.oy.heartbeat; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.handler.timeout.IdleStateEvent; public class HeartBeatServerHandler extends ChannelInboundHandlerAdapter { @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { if (evt instanceof IdleStateEvent) { IdleStateEvent event = (IdleStateEvent) evt; String eventType = null; switch (event.state()) { case READER_IDLE: eventType = "读空闲"; break; case WRITER_IDLE: eventType = "写空闲"; break; case ALL_IDLE: eventType = "读写空闲"; break; } System.out.println(ctx.channel().remoteAddress() + "--超时事件: " + eventType); // 如果发生空闲,关闭通道 //ctx.channel().close(); } } }
HeartBeatClient
package com.oy.heartbeat; import io.netty.bootstrap.Bootstrap; import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.codec.string.StringDecoder; import io.netty.handler.codec.string.StringEncoder; import java.util.Scanner; public class HeartBeatClient { private static final String HOST = "127.0.0.1"; private static final int PORT = 8001; public static void main(String[] args) { new HeartBeatClient().run(HOST, PORT); } public void run(String host, int port) { EventLoopGroup group = new NioEventLoopGroup(); try { Bootstrap client = new Bootstrap() .group(group) .channel(NioSocketChannel.class) .option(ChannelOption.TCP_NODELAY, true) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast("decoder", new StringDecoder()); // 解码器 pipeline.addLast("encoder", new StringEncoder()); // 编码器 ch.pipeline().addLast(new HeartBeatClientHandler()); } }); ChannelFuture future = client.connect(host, port).sync(); System.out.println("--------------" + future.channel().localAddress() + "--------------"); Channel channel = future.channel(); // 客户端输入信息 Scanner scanner = new Scanner(System.in); while (scanner.hasNext()) { String msg = scanner.nextLine(); channel.writeAndFlush(msg + " "); } future.channel().closeFuture().sync(); } catch (Exception e) { e.printStackTrace(); } finally { group.shutdownGracefully(); } } }
HeartBeatClientHandler
package com.oy.heartbeat; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; public class HeartBeatClientHandler extends SimpleChannelInboundHandler<String> { @Override protected void channelRead0(ChannelHandlerContext channelHandlerContext, String msg) throws Exception { System.out.println(msg); } }
---