zoukankan      html  css  js  c++  java
  • 基于netty的心跳检测

    基于netty的心跳检测,有需要的朋友可以参考下。
    这两天由于要给android系统的设备写一个心跳功能,所以在这里写一个基于netty的心跳检测功能。
    实现的功能:
    1.客户端网络空闲5秒没有进行写操作是,进行发送一次ping心跳给服务端;
    2.客户端如果在下一个发送ping心跳周期来临时,还没有收到服务端pong的心跳应答,则失败心跳计数器加1;
    3.每当客户端收到服务端的pong心跳应答后,失败心跳计数器清零;
    4.如果连续超过3次没有收到服务端的心跳回复,则断开当前连接,在5秒后进行重连操作,直到重连成功,否则每隔5秒又会进行重连;
    5.服务端网络空闲状态到达6秒后,服务端心跳失败计数器加1;
    6.只要收到客户端的ping消息,服务端心跳失败计数器清零;
    7.服务端连续3次没有收到客户端的ping消息后,将关闭链路,释放资源,等待客户端重连;
    服务端代码:
    package com.kg.netty.server;
    import io.netty.bootstrap.ServerBootstrap;
    import io.netty.channel.ChannelFuture;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.ChannelPipeline;
    import io.netty.channel.EventLoopGroup;
    import io.netty.channel.SimpleChannelInboundHandler;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioServerSocketChannel;
    import io.netty.handler.codec.serialization.ClassResolvers;
    import io.netty.handler.codec.serialization.ObjectDecoder;
    import io.netty.handler.codec.serialization.ObjectEncoder;
    import io.netty.handler.timeout.IdleState;
    import io.netty.handler.timeout.IdleStateEvent;
    import io.netty.handler.timeout.IdleStateHandler;
    import java.util.concurrent.TimeUnit;
    import com.kg.netty.msg.KeepAliveMessage;
    import com.kg.utils.Constants;
    import com.kg.utils.Utils;
    public class KeepAliveServer {
    	// 端口
    	private int port ;
    	public KeepAliveServer(int port) {
    		this.port = port;
    	}
    	
    	ChannelFuture f ;
    	
    	ServerBootstrap b ;
    	
    	// 设置6秒检测chanel是否接受过心跳数据
    	private static final int READ_WAIT_SECONDS = 6;
    	
    	// 定义客户端没有收到服务端的pong消息的最大次数
    	private static final int MAX_UN_REC_PING_TIMES = 3;
    	public void startServer() {
    		EventLoopGroup bossGroup = new NioEventLoopGroup();
    		EventLoopGroup workerGroup = new NioEventLoopGroup();
    		try {
    			b = new ServerBootstrap();
    			b.group(bossGroup, workerGroup);
    			b.channel(NioServerSocketChannel.class);
    			b.childHandler(new KeepAliveServerInitializer());
    			// 服务器绑定端口监听
    			f = b.bind(port).sync();
    			// 监听服务器关闭监听,此方法会阻塞
    			f.channel().closeFuture().sync();
    			// 可以简写为
    			/* b.bind(portNumber).sync().channel().closeFuture().sync(); */
    		} catch (InterruptedException e) {
    			e.printStackTrace();
    		} finally {
    			bossGroup.shutdownGracefully();
    			workerGroup.shutdownGracefully();
    		}
    	}
    	/**
    	 * 消息处理器
    	 * @author cullen edward
    	 */
    	private class KeepAliveServerInitializer extends ChannelInitializer<SocketChannel> {
    		@Override
    		protected void initChannel(SocketChannel ch) throws Exception {
    			ChannelPipeline pipeline = ch.pipeline();
    			
    			/*
    			 * 使用ObjectDecoder和ObjectEncoder
    			 * 因为双向都有写数据和读数据,所以这里需要两个都设置
    			 * 如果只读,那么只需要ObjectDecoder即可
    			 */
    			pipeline.addLast("decoder", new ObjectDecoder(ClassResolvers.cacheDisabled(this.getClass().getClassLoader())));
    			pipeline.addLast("encoder", new ObjectEncoder());
    			
    			/*
    			 * 这里只监听读操作
    			 * 可以根据需求,监听写操作和总得操作
    			 */
    			pipeline.addLast("pong", new IdleStateHandler(READ_WAIT_SECONDS, 0, 0,TimeUnit.SECONDS));
    			
    			pipeline.addLast("handler", new Heartbeat());
    		}
    	}
    	
    	private class Heartbeat extends SimpleChannelInboundHandler<KeepAliveMessage> { 
    		
    		// 失败计数器:未收到client端发送的ping请求
    		private int unRecPingTimes = 0 ;
    		
    		// 每个chanel对应一个线程,此处用来存储对应于每个线程的一些基础数据,此处不一定要为KeepAliveMessage对象
    		ThreadLocal<KeepAliveMessage> localMsgInfo = new ThreadLocal<KeepAliveMessage>(); 
    		
    		@Override
    		protected void channelRead0(ChannelHandlerContext ctx, KeepAliveMessage msg) throws Exception {
    			System.out.println(ctx.channel().remoteAddress() + " Say : sn=" + msg.getSn()+",reqcode="+msg.getReqCode());
    	        // 收到ping消息后,回复
    			if(Utils.notEmpty(msg.getSn())&&msg.getReqCode()==1){
    				msg.setReqCode(Constants.RET_CODE);
    				ctx.channel().writeAndFlush(msg);
    				// 失败计数器清零
    				unRecPingTimes = 0;
    				if(localMsgInfo.get()==null){
    					KeepAliveMessage localMsg = new KeepAliveMessage();
    					localMsg.setSn(msg.getSn());
    					localMsgInfo.set(localMsg);
    					/*
    					 * 这里可以将设备号放入一个集合中进行统一管理
    					 */
    					// TODO
    				}
    			}else{
    				ctx.channel().close();
    			}
    		}
    		
    		public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
    	        if (evt instanceof IdleStateEvent) {
    	            IdleStateEvent event = (IdleStateEvent) evt;
    	            if (event.state() == IdleState.READER_IDLE) {
    	                /*读超时*/
    	                System.out.println("===服务端===(READER_IDLE 读超时)");
    	                // 失败计数器次数大于等于3次的时候,关闭链接,等待client重连
    	                if(unRecPingTimes >= MAX_UN_REC_PING_TIMES){
    	                	System.out.println("===服务端===(读超时,关闭chanel)");
    	                	// 连续超过N次未收到client的ping消息,那么关闭该通道,等待client重连
    	                	ctx.channel().close();
    	                }else{
    	                	// 失败计数器加1
    	                	unRecPingTimes++;
    	                }
    	            } else if (event.state() == IdleState.WRITER_IDLE) {
    	                /*写超时*/   
    	                System.out.println("===服务端===(WRITER_IDLE 写超时)");
    	            } else if (event.state() == IdleState.ALL_IDLE) {
    	                /*总超时*/
    	                System.out.println("===服务端===(ALL_IDLE 总超时)");
    	            }
    	        }
    	    }
    		
    	    @Override
    		public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
    	    	System.out.println("错误原因:"+cause.getMessage());
    	    	if(localMsgInfo.get()!=null){
    				/*
    				 * 从管理集合中移除设备号等唯一标示,标示设备离线
    				 */
    	    		// TODO
    		    }
    	    	ctx.channel().close();
    		}
    		@Override
    	    public void channelActive(ChannelHandlerContext ctx) throws Exception {
    	        System.out.println("Client active ");
    	        super.channelActive(ctx);
    	    }
    		
    		@Override
    		public void channelInactive(ChannelHandlerContext ctx) throws Exception {
    			// 关闭,等待重连
    		    ctx.close();
    			if(localMsgInfo.get()!=null){
    				/*
    				 * 从管理集合中移除设备号等唯一标示,标示设备离线
    				 */
    	    		// TODO
    		    }
    		    System.out.println("===服务端===(客户端失效)");
    		}
    	}
    	
    	public void stopServer(){
    		if(f!=null){
    			f.channel().close();
    		}
    	}
    	/**
    	 * @param args
    	 */
    	public static void main(String[] args) {
    		KeepAliveServer keepAliveServer = new KeepAliveServer(1666);
    		keepAliveServer.startServer();
    	}
    }
    客户端代码:
    package com.kg.netty.client;
    import io.netty.bootstrap.Bootstrap;
    import io.netty.channel.Channel;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.channel.ChannelInitializer;
    import io.netty.channel.ChannelPipeline;
    import io.netty.channel.EventLoopGroup;
    import io.netty.channel.SimpleChannelInboundHandler;
    import io.netty.channel.nio.NioEventLoopGroup;
    import io.netty.channel.socket.SocketChannel;
    import io.netty.channel.socket.nio.NioSocketChannel;
    import io.netty.handler.codec.serialization.ClassResolvers;
    import io.netty.handler.codec.serialization.ObjectDecoder;
    import io.netty.handler.codec.serialization.ObjectEncoder;
    import io.netty.handler.timeout.IdleState;
    import io.netty.handler.timeout.IdleStateEvent;
    import io.netty.handler.timeout.IdleStateHandler;
    import java.util.concurrent.Executors;
    import java.util.concurrent.ScheduledExecutorService;
    import java.util.concurrent.TimeUnit;
    import com.kg.netty.msg.KeepAliveMessage;
    import com.kg.utils.Constants;
    public class KeepAliveClient {
    	private String host ;
    	private int port ;
    	
    	private EventLoopGroup group ;
    	
    	private Bootstrap b ;
    	
    	private Channel ch ;
    	
    	// 定义客户端没有收到服务端的pong消息的最大次数
    	private static final int MAX_UN_REC_PONG_TIMES = 3;
    	
    	// 多长时间未请求后,发送心跳
    	private static final int WRITE_WAIT_SECONDS = 5;
    	
    	// 隔N秒后重连
    	private static final int RE_CONN_WAIT_SECONDS = 5;
    	
    	// 客户端连续N次没有收到服务端的pong消息  计数器
    	private int unRecPongTimes = 0 ;
    	
    	private ScheduledExecutorService executorService ;
    	
    	// 是否停止
    	private boolean isStop = false ;
    	public KeepAliveClient(String host, int port) {
    		this.host = host ;
    		this.port = port ;
    		group = new NioEventLoopGroup();
    		b = new Bootstrap();
    		b.group(group).channel(NioSocketChannel.class).handler(new HeartbeatInitializer());
    	}
    	public void start() {
    		connServer();
    	}
    	
    	private void connServer(){
    		
    		isStop = false;
    		
    		if(executorService!=null){
    			executorService.shutdown();
    		}
    		executorService = Executors.newScheduledThreadPool(1);
    		executorService.scheduleWithFixedDelay(new Runnable() {
    			
    			boolean isConnSucc = true;
    			
    			@Override
    			public void run() {
    				try {
    					// 重置计数器
    					unRecPongTimes = 0;
    					// 连接服务端
    					if(ch!=null&&ch.isOpen()){
    						ch.close();
    					}
    					ch = b.connect(host, port).sync().channel();
    					// 此方法会阻塞
    //					ch.closeFuture().sync();
    					System.out.println("connect server finish");
    				} catch (Exception e) {
    					e.printStackTrace();
    					isConnSucc = false ;
    				} finally{
    					if(isConnSucc){
    						if(executorService!=null){
    							executorService.shutdown();
    						}
    					}
    				}
    			}
    		}, RE_CONN_WAIT_SECONDS, RE_CONN_WAIT_SECONDS, TimeUnit.SECONDS);
    	}
    	
    	public class HeartbeatInitializer extends ChannelInitializer<SocketChannel> {
    		 
    	    @Override
    	    protected void initChannel(SocketChannel ch) throws Exception {
    	        ChannelPipeline pipeline = ch.pipeline();
    	 
    			pipeline.addLast("decoder", new ObjectDecoder(ClassResolvers.cacheDisabled(this.getClass().getClassLoader())));
    	        pipeline.addLast("encoder", new ObjectEncoder());
    	 
    	        pipeline.addLast("ping", new IdleStateHandler(0, WRITE_WAIT_SECONDS, 0,TimeUnit.SECONDS));
    	        // 客户端的逻辑
    	        pipeline.addLast("handler", new ClientHandler());
    	    }
    	}
    	public class ClientHandler extends SimpleChannelInboundHandler<KeepAliveMessage> {
    		 
    	    @Override
    	    protected void channelRead0(ChannelHandlerContext ctx, KeepAliveMessage msg)
    	            throws Exception {
    	    	System.out.println("Server say : sn=" + msg.getSn()+",reqcode="+msg.getReqCode());
    	        if (Constants.RET_CODE == msg.getReqCode()) {
    	        	// 计数器清零
    	        	unRecPongTimes = 0;
    	        }
    	    }
    	    
    	    @Override
    	    public void channelActive(ChannelHandlerContext ctx) throws Exception {
    	        System.out.println("Client active ");
    	        super.channelActive(ctx);
    	    }
    	    @Override
    	    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
    	        System.out.println("Client close ");
    	        super.channelInactive(ctx);
    	        /*
    	         * 重连
    	         */
    	        if(!isStop){
    	        	connServer();
    	        }
    	    }
    	    
    	    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
    	        if (evt instanceof IdleStateEvent) {
    	            IdleStateEvent event = (IdleStateEvent) evt;
    	            if (event.state() == IdleState.READER_IDLE) {
    	                /*读超时*/
    	                System.out.println("===服务端===(READER_IDLE 读超时)");
    	            } else if (event.state() == IdleState.WRITER_IDLE) {
    	                /*写超时*/   
    	                System.out.println("===服务端===(WRITER_IDLE 写超时)");
    	                if(unRecPongTimes < MAX_UN_REC_PONG_TIMES){
    	                	ctx.channel().writeAndFlush(getSrcMsg()) ;
    	                	unRecPongTimes++;
    	                }else{
    	                	ctx.channel().close();
    	                }
    	            } else if (event.state() == IdleState.ALL_IDLE) {
    	                /*总超时*/
    	                System.out.println("===服务端===(ALL_IDLE 总超时)");
    	            }
    	        }
    	    }
    	}
    	
    	private KeepAliveMessage getSrcMsg(){
    		KeepAliveMessage keepAliveMessage = new KeepAliveMessage();
    		// 设备码
            keepAliveMessage.setSn("sn_123456abcdfef");
            keepAliveMessage.setReqCode(Constants.REQ_CODE);
            return keepAliveMessage ;
    	}
    	
    	public void stop(){
    		isStop = true;
    		if(ch!=null&&ch.isOpen()){
    			ch.close();
    		}
    		if(executorService!=null){
    			executorService.shutdown();
    		}
    	}
    	
    	/**
    	 * @param args
    	 */
    	public static void main(String[] args) {
    		KeepAliveClient keepAliveServer = new KeepAliveClient("127.0.0.1",1666);
    		keepAliveServer.start();
    	}
    }
    参考网站:
    http://coder.beitown.com/archives/1180
    下载工程,请猛戳
    http://download.csdn.net/detail/asd13141718/8492741
    

  • 相关阅读:
    Netty源码分析——准备
    Netty入门
    Netty源码分析——EventLoopGroup建立
    三层架构搭建(asp.net mvc + ef)
    Springboot 1.5.x 集成基于Centos7的RabbitMQ集群安装及配置
    Springboot 2.0.x 集成基于Centos7的Redis集群安装及配置
    Springboot 2.0.x 引入链路跟踪Sleuth及Zipkin
    JAVA编码 —— 字符串关键字内容替换
    使用java发送QQ邮件的总结
    Docker原理探究
  • 原文地址:https://www.cnblogs.com/hzcya1995/p/13318013.html
Copyright © 2011-2022 走看看