zoukankan      html  css  js  c++  java
  • 基于netty的心跳机制实现

    前言:在实现过程查找过许多资料,各种波折,最后综合多篇文章最终实现并上线使用。为了减少大家踩坑的时间,所以写了本文,希望有用。对于实现过程中有用的参考资料直接放上链接,可能有些内容相对冗余,不过时间允许多看看也并不无益。

    入门文章:

    http://www.tuicool.com/articles/mEJvYb

    netty官网:
    (官网的user guide相对一般,javadoc倒是要看的)
     
    需求场景:
    实现用户的在线离线状态实时展现(我们的客户端是android)。
     
    技术选型:
    在线好办,关键是要监测到什么时候离线,于是我们选择了心跳模型,当心跳失效时即为离线。如果用http发送心跳包虽然简单但是极度不科学,耗电量太大,所以直接否决。我们选择基于TCP实现长连接,而借助一些第三方插件可以更好更快地实现长连接,于是在mina和netty之间我们选择了netty。(理由仅仅是在百度知道里边看到别人说netty使用的更广泛,没有深入对比过)
     
    相关版本:
    netty5.0
    jdk1.7
    tomcat6.0
     
    基础流程图如下:
    服务端代码:
    HeartBeatServer.java
    public class HeartBeatServer {
    
    	// 端口
    	private int port ;
    	public HeartBeatServer(int port) {
    		this.port = port;
    	}
    	
    	ChannelFuture f ;
    	
    	ServerBootstrap b ;
    	
    	// 检测chanel是否接受过心跳数据时间间隔(单位秒)
    	private static final int READ_WAIT_SECONDS = 10;
    
    	
    	public void startServer() {
    		EventLoopGroup bossGroup = new NioEventLoopGroup();
    		EventLoopGroup workerGroup = new NioEventLoopGroup();
    		try {
    			b = new ServerBootstrap();
    			b.group(bossGroup, workerGroup);
    			b.channel(NioServerSocketChannel.class);
    			b.childHandler(new HeartBeatServerInitializer());
    			// 服务器绑定端口监听
    			f = b.bind(port).sync();
    			// 监听服务器关闭监听,此方法会阻塞
    			f.channel().closeFuture().sync();
    			// 可以简写为
    			/* b.bind(portNumber).sync().channel().closeFuture().sync(); */
    		} catch (InterruptedException e) {
    			e.printStackTrace();
    		} finally {
    			bossGroup.shutdownGracefully();
    			workerGroup.shutdownGracefully();
    		}
    	}
    	/**
    	 * 消息处理器
    	 * @author cullen edward
    	 */
    	private class HeartBeatServerInitializer extends ChannelInitializer<SocketChannel> {
    		@Override
    		protected void initChannel(SocketChannel ch) throws Exception {
    			ChannelPipeline pipeline = ch.pipeline();
    			
    			/*
    			 * 使用ObjectDecoder和ObjectEncoder
    			 * 因为双向都有写数据和读数据,所以这里需要两个都设置
    			 * 如果只读,那么只需要ObjectDecoder即可
    			 */
    			pipeline.addLast("decoder", new StringDecoder());
    	        pipeline.addLast("encoder", new StringEncoder());
    			
    			/*
    			 * 这里只监听读操作
    			 * 可以根据需求,监听写操作和总得操作
    			 */
    			pipeline.addLast("pong", new IdleStateHandler(READ_WAIT_SECONDS, 0, 0,TimeUnit.SECONDS));
    			
    			//pipeline.addLast("handler", new Heartbeat());
    			pipeline.addLast("handler", new HeartbeatHandler());
    		}
    	}
    	
    	public void stopServer(){
    		if(f!=null){
    			f.channel().close();
    		}
    	}
    	/**
    	 * @param args
    	 */
    	public static void main(String[] args) {
    		HeartbeatServer heartbeatServer = new HeartbeatServer(9597);
    		heartbeatServer.startServer();
    	}
    }
    

      

    HeartbeatHandler.java
    public class HeartbeatHandler extends SimpleChannelInboundHandler<String> { 
    	// 失败计数器:未收到client端发送的ping请求
    	private int unRecPingTimes = 0 ;
    	private String userid;
    	
    	// 定义服务端没有收到心跳消息的最大次数
    	private static final int MAX_UN_REC_PING_TIMES = 3;
    	
    	@Override
    	protected void messageReceived(ChannelHandlerContext ctx, String msg) throws Exception {
    		System.out.println("----->msg=" + msg);    //msg格式约定为"operation,userid"
            String[] args = msg.split(",");
            String msg_operation = args[0];
            String msg_userid = args[1];
            if("LOGIN".equals(msg_operation)){
            	if(!Utils.isBlank(msg_userid)){
            		userid = msg_userid;
    	        }
            	setUserOnlineStatus(userid, true);
            }else if("HEARTBEAT".equals(msg_operation)){
    			ctx.channel().writeAndFlush("success");
    			// 失败计数器清零
    			unRecPingTimes = 0;
    		}
    	}
    	
    	public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
            if (evt instanceof IdleStateEvent) {
                IdleStateEvent event = (IdleStateEvent) evt;
                if (event.state() == IdleState.READER_IDLE) {
                    /*读超时*/
                    System.out.println("===服务端===(READER_IDLE 读超时)");
                    // 失败计数器次数大于等于3次的时候,关闭链接,等待client重连
                    if(unRecPingTimes >= MAX_UN_REC_PING_TIMES){
                    	System.out.println("===服务端===(读超时,关闭chanel)");
                    	// 连续超过N次未收到client的ping消息,那么关闭该通道,等待client重连
                    	ctx.channel().close();
                    }else{
                    	// 失败计数器加1
                    	unRecPingTimes++;
                    }
                } else if (event.state() == IdleState.WRITER_IDLE) {
                    /*写超时*/   
                    System.out.println("===服务端===(WRITER_IDLE 写超时)");
                } else if (event.state() == IdleState.ALL_IDLE) {
                    /*总超时*/
                    System.out.println("===服务端===(ALL_IDLE 总超时)");
                }
            }
        }
    	
        @Override
    	public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        	System.out.println("错误原因:"+cause.getMessage());
        	ctx.channel().close();
    	    setUserOnlineStatus(userid, false);
    	}
    	@Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
            System.out.println("Client active ");
            super.channelActive(ctx);
        }
    	
    	@Override
    	public void channelInactive(ChannelHandlerContext ctx) throws Exception {
    		// 关闭,等待重连
    	    ctx.close();
    	    System.out.println("===服务端===(客户端失效)");
    	    setUserOnlineStatus(userid, false);
    	}
    	
    	//设置用户在线离线状态
    	private void setUserOnlineStatus(String userid, boolean isOnline){
    		if(!Utils.isBlank(userid)){
        		//更新用户信息为在线状态(此处代码省略)
        	}
    	}
    }
    

      

    简易的测试客户端代码:

    SimpleClient.java
     
    public class SimpleClient {
    	public static void main(String[] args) throws Exception {
    		new SimpleClient("127.0.0.1", 9597).run();
    	}
    	private final String host;
    	private final int port;
    	public SimpleClient(String host, int port) {
    		this.host = host;
    		this.port = port;
    	}
    	public void run() throws Exception {
    		EventLoopGroup group = new NioEventLoopGroup();
    		try {
    			Bootstrap bootstrap = new Bootstrap().group(group).channel(
    					NioSocketChannel.class).handler(
    					new SimpleClientInitializer());
    			Channel channel = bootstrap.connect(host, port).sync().channel();
    			BufferedReader in = new BufferedReader(new InputStreamReader(
    					System.in));
    			while (true) {
    				channel.writeAndFlush(in.readLine());
    			}
    		} catch (Exception e) {
    			e.printStackTrace();
    		} finally {
    			group.shutdownGracefully();
    		}
    	}
    }
    

      

    SimpleClientInitializer.java

    public class SimpleClientInitializer extends ChannelInitializer<SocketChannel> {
        @Override
        public void initChannel(SocketChannel ch) throws Exception {
            ChannelPipeline pipeline = ch.pipeline();
            pipeline.addLast("framer", new DelimiterBasedFrameDecoder(8192, Delimiters.lineDelimiter()));
            pipeline.addLast("decoder", new StringDecoder());
            pipeline.addLast("encoder", new StringEncoder());
        }
    }

    备注:代码大部分是从其他网站复制修改调整的,写得相对简易一点,其中还有很多安全性、合理性有待优化。

    代码参考文章:

     
    更多相关文章:
     
  • 相关阅读:
    [转载]Oracle 11g R1中资本管理器增强(2)
    [转载]Oracle监听器安装与设置(2)
    [转载]Oracle 11g R1中资源管理器加强(3)
    [转载]Oracle能否吸收领跑数据库市场?(2)
    [转载]怎样高效删除Oracle数据库中的反双数据
    [转载]Oracle 11g R1下的主动内存处置惩罚(1)
    [转载]Oracle假造公用数据控制设备使用
    [转载]Oracle可否担当领跑数据库市场?(1)
    [转载]将oracle 9i备份文件导入oracle 8i的方式简介
    OpenSuSE 11.1内行装置教程(贴图版)
  • 原文地址:https://www.cnblogs.com/leap/p/5352772.html
Copyright © 2011-2022 走看看