一、概述
使用与客户端与服务端
public IdleStateHandler(long readerIdleTime, long writerIdleTime, long allIdleTime, TimeUnit unit)
// long readerIdleTime 表示多长时间没有读,就会发送一个心跳检测包检测是否连接
// long writerIdleTim 表示多长时间没有写,就会发送一个心跳检测包检测是否连接
// long allIdleTime 表示多长时间没有读写,就会发送一个心跳检测包检测是否连接
// 触发一个 IdleStateEvent事件
// 当IdleStateEvent事件触发后,就会传递给管道下一个handler去处理。
// 通过调用(触发)下一handler的userEventTiggered。在该方法中去处理(读、写等)
客户端主要是发送偏多,注重的是写,以及心跳配置是写,客户端需要自启动,以及断线重连
服务端主要是接收偏多,注重的是读,以及心跳配置是读,定期收到不,要剔除客户端
1.1、服务端
主方法
package com.github.bjlhx15.netty.demo.netty.heartbeat; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.ChannelPipeline; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.codec.string.StringDecoder; import io.netty.handler.codec.string.StringEncoder; import io.netty.handler.logging.LogLevel; import io.netty.handler.logging.LoggingHandler; import io.netty.handler.timeout.IdleStateHandler; import java.util.concurrent.TimeUnit; public class ServerHeartBeat { public static void main(String[] args) throws InterruptedException { NioEventLoopGroup bossGroup = new NioEventLoopGroup(1); NioEventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap serverBootstrap = new ServerBootstrap(); serverBootstrap.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .handler(new LoggingHandler(LogLevel.INFO))//在bossGroup增加一个日志处理器 .option(ChannelOption.SO_BACKLOG, 128) .childOption(ChannelOption.SO_KEEPALIVE, true) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); //加入一个netty提供的IdleStateHandler,空闲状态处理器 // long readerIdleTime 表示多长时间没有读,就会发送一个心跳检测包检测是否连接 // long writerIdleTim 表示多长时间没有写,就会发送一个心跳检测包检测是否连接 // long allIdleTime 表示多长时间没有读写,就会发送一个心跳检测包检测是否连接 // 触发一个 IdleStateEvent事件 // 当IdleStateEvent事件触发后,就会传递给管道下一个handler去处理。 // 通过调用(触发)下一handler的userEventTiggered。在该方法中去处理(读、写等) pipeline.addLast(new IdleStateHandler(5, 0, 0, TimeUnit.SECONDS)); // 加入对IdleStateEvent检测进一步处理的handler pipeline.addLast(new ServerHeartBeatIdleStateTriggerChannelHandler()); pipeline.addLast("decoder", new StringDecoder()); pipeline.addLast("encoder", new StringEncoder()); pipeline.addLast(new ServerMsgChannelHandler());//业务处理的Handler } }); ChannelFuture channelFuture = serverBootstrap.bind(7000).sync(); System.out.println("Server start listen at " + 7000); channelFuture.channel().closeFuture().sync(); } finally { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } }
服务端心跳检测ChannelHandler
package com.github.bjlhx15.netty.demo.netty.heartbeat; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.handler.timeout.IdleState; import io.netty.handler.timeout.IdleStateEvent; public class ServerHeartBeatIdleStateTriggerChannelHandler extends ChannelInboundHandlerAdapter { /** * @param ctx 上下文 * @param evt 事件 * @throws Exception */ @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { if (evt instanceof IdleStateEvent) { IdleStateEvent event = (IdleStateEvent) evt; if(event.state()== IdleState.READER_IDLE){ System.out.println(ctx.channel().remoteAddress() + "--读超时时间--" + event.state()); System.out.println("服务器做对应处理即可"); } } } }
业务逻辑处理类
package com.github.bjlhx15.netty.demo.netty.heartbeat; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.handler.timeout.IdleStateEvent; public class ServerMsgChannelHandler extends ChannelInboundHandlerAdapter { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { System.out.println("server channelRead……"); System.out.println(ctx.channel().remoteAddress() + "-msg=" + msg.toString()); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { ctx.close(); } }
1.2、客户端
1、构建一个ChannelHandler集合
package com.github.bjlhx15.netty.demo.netty.heartbeat; import io.netty.channel.ChannelHandler; /** * * 客户端的ChannelHandler集合,由子类实现,这样做的好处: * 继承这个接口的所有子类可以很方便地获取ChannelPipeline中的Handlers * 获取到handlers之后方便ChannelPipeline中的handler的初始化和在重连的时候也能很方便 * 地获取所有的handlers */ public interface ChannelHandlerHolder { ChannelHandler[] handlers(); }
2、客户端心跳检测Handler
package com.github.bjlhx15.netty.demo.netty.heartbeat; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.handler.timeout.IdleState; import io.netty.handler.timeout.IdleStateEvent; import io.netty.util.CharsetUtil; @ChannelHandler.Sharable public class ClientConnectorIdleStateTriggerChannelHandler extends ChannelInboundHandlerAdapter { private static final ByteBuf HEARTBEAT_SEQUENCE = Unpooled.unreleasableBuffer(Unpooled.copiedBuffer("Heartbeat", CharsetUtil.UTF_8)); @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { if (evt instanceof IdleStateEvent) { IdleState state = ((IdleStateEvent) evt).state(); if (state == IdleState.WRITER_IDLE) { // write heartbeat to server ctx.writeAndFlush(HEARTBEAT_SEQUENCE.duplicate()); } } else { super.userEventTriggered(ctx, evt); } } }
3、客户端增加服务检测ChannelHandler类
package com.github.bjlhx15.netty.demo.netty.heartbeat; import io.netty.bootstrap.Bootstrap; import io.netty.channel.*; import io.netty.util.Timeout; import io.netty.util.Timer; import io.netty.util.TimerTask; import java.util.concurrent.TimeUnit; @ChannelHandler.Sharable public abstract class ConnectionWatchdog extends ChannelInboundHandlerAdapter implements TimerTask, ChannelHandlerHolder { private final Bootstrap bootstrap; private final Timer timer; private final int port; private final String host; private volatile boolean reconnect = true; private int attempts; public ConnectionWatchdog(Bootstrap bootstrap, Timer timer, int port, String host, boolean reconnect) { this.bootstrap = bootstrap; this.timer = timer; this.port = port; this.host = host; this.reconnect = reconnect; } /** * channel链路每次active的时候,将其连接的次数重新☞ 0 */ @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { System.out.println("当前链路已经激活了,重连尝试次数重新置为0"); attempts = 0; ctx.fireChannelActive(); } @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { System.out.println("链接关闭"); if (reconnect) { System.out.println("链接关闭,将进行重连"); if (attempts < 12) { attempts++; } //重连的间隔时间会越来越长 int timeout = 2 << attempts; timer.newTimeout(this, timeout, TimeUnit.MILLISECONDS); } ctx.fireChannelInactive(); } @Override public void run(Timeout timeout) throws Exception { ChannelFuture future; //bootstrap已经初始化好了,只需要将handler填入就可以了 synchronized (bootstrap) { bootstrap.handler(new ChannelInitializer<Channel>() { @Override protected void initChannel(Channel ch) throws Exception { ch.pipeline().addLast(handlers()); } }); future = bootstrap.connect(host, port); } //future对象 future.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture f) throws Exception { boolean succeed = f.isSuccess(); //如果重连失败,则调用ChannelInactive方法,再次出发重连事件,一直尝试12次,如果失败则不再重连 if (!succeed) { System.out.println("重连失败"); f.channel().pipeline().fireChannelInactive(); } else { System.out.println("重连成功"); } } }); } }
4、客户端主类
package com.github.bjlhx15.netty.demo.netty.heartbeat; import io.netty.bootstrap.Bootstrap; import io.netty.channel.*; import io.netty.channel.local.LocalChannel; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.codec.string.StringDecoder; import io.netty.handler.codec.string.StringEncoder; import io.netty.handler.logging.LogLevel; import io.netty.handler.logging.LoggingHandler; import io.netty.handler.timeout.IdleStateHandler; import io.netty.util.HashedWheelTimer; import java.util.concurrent.TimeUnit; public class ClientHeartBeat { protected final HashedWheelTimer timer = new HashedWheelTimer(); private Bootstrap boot; private final ClientConnectorIdleStateTriggerChannelHandler idleStateTrigger = new ClientConnectorIdleStateTriggerChannelHandler(); public void connect(int port, String host) { EventLoopGroup group = new NioEventLoopGroup(); boot = new Bootstrap(); boot.group(group).channel(NioSocketChannel.class).handler(new LoggingHandler(LogLevel.INFO)); final ConnectionWatchdog watchdog = new ConnectionWatchdog(boot, timer, port, host, true) { @Override public ChannelHandler[] handlers() { return new ChannelHandler[]{ this, new IdleStateHandler(0, 4, 0, TimeUnit.SECONDS), idleStateTrigger, new StringDecoder(), new StringEncoder(), new ClientMsgChannelHandler() }; } }; ChannelFuture future = null; //进行连接 try { synchronized (boot) { boot.handler(new ChannelInitializer<Channel>() { //初始化channel @Override protected void initChannel(Channel ch) throws Exception { ch.pipeline().addLast(watchdog.handlers()); } }); future = boot.connect(host, port); }// 以下代码在synchronized同步块外面是安全的 future.sync(); if (!future.isSuccess()) { System.out.println("---- 连接服务器失败,2秒后重试 ---------port=" + port); this.scheduleStart(future.channel(), port, host); } } catch (Throwable t) { System.out.println("connects to fails." + t.getMessage()); System.out.println("---- 连接服务器失败,2秒后重试 ---------port=" + port); Channel channel = future != null && future.channel() != null ? future.channel() : new LocalChannel(); this.scheduleStart(channel, port, host); } } public void scheduleStart(Channel channel, int port, String host) { channel.eventLoop().schedule(new Runnable() { @Override public void run() { connect(port, host); } }, 2L, TimeUnit.SECONDS); } /** * @param args * @throws Exception */ public static void main(String[] args) throws Exception { new ClientHeartBeat().connect(7000, "127.0.0.1"); } }
5、客户端消息处理类
package com.github.bjlhx15.netty.demo.netty.heartbeat; import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import io.netty.util.ReferenceCountUtil; import java.util.Date; @ChannelHandler.Sharable public class ClientMsgChannelHandler extends ChannelInboundHandlerAdapter { @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { System.out.println("激活时间是:" + new Date()); System.out.println("HeartBeatClientHandler channelActive"); ctx.fireChannelActive(); } @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { System.out.println("停止时间是:" + new Date()); System.out.println("HeartBeatClientHandler channelInactive"); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { String message = (String) msg; System.out.println(message); if (message.equals("Heartbeat")) { ctx.write("has read message from server"); ctx.flush(); } ReferenceCountUtil.release(msg); } }
1.3、测试
1、启动服务端,在启动客户端,正常
服务端会定时收到心跳
server channelRead……
/127.0.0.1:52276-msg=Heartbeat
2、停止服务端
客户端如下
重连失败 链接关闭 链接关闭,将进行重连 停止时间是:Sat Aug 07 22:53:20 CST 2021 HeartBeatClientHandler channelInactive
启动后恢复正常
重连成功 当前链路已经激活了,重连尝试次数重新置为0 激活时间是:Sat Aug 07 22:53:57 CST 2021 HeartBeatClientHandler channelActive
3、如果先启动客户端
客户端如下
connects to fails.Connection refused: /127.0.0.1:7000 ---- 连接服务器失败,2秒后重试 ---------port=7000
然后启动服务端
客户端会
connects to fails.Connection refused: /127.0.0.1:7000 ---- 连接服务器失败,2秒后重试 ---------port=7000 当前链路已经激活了,重连尝试次数重新置为0 激活时间是:Sat Aug 07 22:55:25 CST 2021 HeartBeatClientHandler channelActive
电风扇