zoukankan      html  css  js  c++  java
  • Netty通过心跳保持长链接

     Netty自带心跳检测功能,IdleStateHandler,客户端在写空闲时主动发起心跳请求,服务器接受到心跳请求后给出一个心跳响应。当客户端在一定时间范围内不能够给出响应则断开链接。

    Java代码  收藏代码
    1. public class NettyClient {  
    2.     public void connect(String remoteServer, int port) throws Exception {  
    3.         EventLoopGroup workerGroup = new NioEventLoopGroup();  
    4.         try {  
    5.             Bootstrap b = new Bootstrap();  
    6.             b.group(workerGroup).channel(NioSocketChannel.class).remoteAddress(remoteServer, port)  
    7.                     .handler(new ChildChannelHandler());  
    8.   
    9.             ChannelFuture f = b.connect();  
    10.             System.out.println("Netty time Client connected at port " + port);  
    11.   
    12.             f.channel().closeFuture().sync();  
    13.         } finally {  
    14.             try {  
    15.                 TimeUnit.SECONDS.sleep(5);  
    16.                 try {  
    17.                     System.out.println("重新链接。。。");  
    18.                     connect(remoteServer, port);  
    19.                 } catch (Exception e) {  
    20.                     e.printStackTrace();  
    21.                 }  
    22.             } catch (Exception e) {  
    23.                 e.printStackTrace();  
    24.             }  
    25.         }  
    26.     }  
    27.   
    28.     public static class ChildChannelHandler extends ChannelInitializer<SocketChannel> {  
    29.   
    30.         @Override  
    31.         protected void initChannel(final SocketChannel ch) throws Exception {  
    32.             // -8表示lengthAdjustment,让解码器从0开始截取字节,并且包含消息头  
    33.             ch.pipeline().addLast(new RpcEncoder(NettyMessage.class)).addLast(new RpcDecoder(NettyMessage.class))  
    34.                     .addLast(new IdleStateHandler(120, 10, 0, TimeUnit.SECONDS)).addLast(new HeartBeatReqHandler());  
    35.         }  
    36.   
    37.     }  
    38.   
    39.     public static void main(String[] args) {  
    40.         try {  
    41.             new NettyClient().connect("127.0.0.1", 12000);  
    42.         } catch (Exception e) {  
    43.             e.printStackTrace();  
    44.         }  
    45.     }  
    46. }  
    Java代码  收藏代码
    1. public class SerializationUtil {  
    2.   
    3.     private static Map<Class<?>, Schema<?>> cachedSchema = new ConcurrentHashMap<Class<?>, Schema<?>>();  
    4.   
    5.     private static Objenesis                objenesis    = new ObjenesisStd(true);  
    6.   
    7.     private static <T> Schema<T> getSchema(Class<T> clazz) {  
    8.         @SuppressWarnings("unchecked")  
    9.         Schema<T> schema = (Schema<T>) cachedSchema.get(clazz);  
    10.         if (schema == null) {  
    11.             schema = RuntimeSchema.getSchema(clazz);  
    12.             if (schema != null) {  
    13.                 cachedSchema.put(clazz, schema);  
    14.             }  
    15.         }  
    16.         return schema;  
    17.     }  
    18.   
    19.     /** 
    20.      * 序列化 
    21.      * 
    22.      * @param obj 
    23.      * @return 
    24.      */  
    25.     public static <T> byte[] serializer(T obj) {  
    26.         @SuppressWarnings("unchecked")  
    27.         Class<T> clazz = (Class<T>) obj.getClass();  
    28.         LinkedBuffer buffer = LinkedBuffer.allocate(LinkedBuffer.DEFAULT_BUFFER_SIZE);  
    29.         try {  
    30.             Schema<T> schema = getSchema(clazz);  
    31.             byte result[] = ProtostuffIOUtil.toByteArray(obj, schema, buffer);  
    32.             return result;  
    33.         } catch (Exception e) {  
    34.             throw new IllegalStateException(e.getMessage(), e);  
    35.         } finally {  
    36.             buffer.clear();  
    37.         }  
    38.     }  
    39.   
    40.     /** 
    41.      * 反序列化 
    42.      * 
    43.      * @param data 
    44.      * @param clazz 
    45.      * @return 
    46.      */  
    47.     public static <T> T deserializer(byte[] data, Class<T> clazz) {  
    48.         try {  
    49.             T obj = objenesis.newInstance(clazz);  
    50.             Schema<T> schema = getSchema(clazz);  
    51.             ProtostuffIOUtil.mergeFrom(data, obj, schema);  
    52.             return obj;  
    53.         } catch (Exception e) {  
    54.             throw new IllegalStateException(e.getMessage(), e);  
    55.         }  
    56.     }  
    57. }  
    Java代码  收藏代码
    1. @SuppressWarnings("rawtypes")  
    2. public class RpcEncoder extends MessageToByteEncoder {  
    3.   
    4.     private Class<?> genericClass;  
    5.   
    6.     public RpcEncoder(Class<?> genericClass) {  
    7.         this.genericClass = genericClass;  
    8.     }  
    9.   
    10.     @Override  
    11.     public void encode(ChannelHandlerContext ctx, Object in, ByteBuf out) throws Exception {  
    12.         if (genericClass.isInstance(in)) {  
    13.             System.out.println("发送的请求是:"+in);  
    14.             byte[] data = SerializationUtil.serializer(in);  
    15.             out.writeInt(data.length);  
    16.             out.writeBytes(data);  
    17.         }  
    18.     }  
    19. }  
    Java代码  收藏代码
    1. public class RpcDecoder extends ByteToMessageDecoder {  
    2.   
    3.     private Class<?> genericClass;  
    4.   
    5.     public RpcDecoder(Class<?> genericClass) {  
    6.         this.genericClass = genericClass;  
    7.     }  
    8.   
    9.     @Override  
    10.     public final void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out)  
    11.             throws Exception {  
    12.         if (in.readableBytes() < 4) {  
    13.             return;  
    14.         }  
    15.         in.markReaderIndex();  
    16.         int dataLength = in.readInt();  
    17.         if (dataLength < 0) {  
    18.             ctx.close();  
    19.         }  
    20.         if (in.readableBytes() < dataLength) {  
    21.             in.resetReaderIndex();  
    22.         }  
    23.         byte[] data = new byte[dataLength];  
    24.         in.readBytes(data);  
    25.   
    26.         Object obj = SerializationUtil.deserializer(data, genericClass);  
    27.         System.out.println("接收到的消息是:"+obj);  
    28.         out.add(obj);  
    29.     }  
    30. }  
    Java代码  收藏代码
    1. public class HeartBeatReqHandler extends ChannelDuplexHandler {  
    2.   
    3.     /** 
    4.      * @see io.netty.channel.ChannelInboundHandlerAdapter#userEventTriggered(io.netty.channel.ChannelHandlerContext, 
    5.      *      java.lang.Object) 
    6.      */  
    7.     @Override  
    8.     public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {  
    9.         if (IdleStateEvent.class.isAssignableFrom(evt.getClass())) {  
    10.             IdleStateEvent event = (IdleStateEvent) evt;  
    11.             if (event.state() == IdleState.READER_IDLE) {  
    12.                 System.out.println("read 空闲");  
    13.                 ctx.disconnect();  
    14.             } else if (event.state() == IdleState.WRITER_IDLE) {  
    15.                 System.out.println("write 空闲");  
    16.                 ctx.writeAndFlush(buildHeartBeat(MessageType.HEARTBEAT_REQ.getType()));  
    17.             }  
    18.         }  
    19.     }  
    20.   
    21.     /** 
    22.      *  
    23.      * @return 
    24.      * @author zhangwei<wei.zw@corp.netease.com> 
    25.      */  
    26.     private NettyMessage buildHeartBeat(byte type) {  
    27.         NettyMessage msg = new NettyMessage();  
    28.         Header header = new Header();  
    29.         header.setType(type);  
    30.         msg.setHeader(header);  
    31.         return msg;  
    32.     }  
    33.   
    34. }  
    Java代码  收藏代码
    1. public class NettyServer {  
    2.     public void bind(int port) throws Exception {  
    3.         EventLoopGroup bossGroup = new NioEventLoopGroup();  
    4.         EventLoopGroup workerGroup = new NioEventLoopGroup();  
    5.         try {  
    6.             ServerBootstrap b = new ServerBootstrap();  
    7.             b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).option(ChannelOption.SO_BACKLOG, 1024)  
    8.                     .childHandler(new ChildChannelHandler());  
    9.   
    10.             ChannelFuture f = b.bind(port).sync();  
    11.             System.out.println("Netty time Server started at port " + port);  
    12.             f.channel().closeFuture().sync();  
    13.         } finally {  
    14.             bossGroup.shutdownGracefully();  
    15.             workerGroup.shutdownGracefully();  
    16.         }  
    17.     }  
    18.   
    19.     public static class ChildChannelHandler extends ChannelInitializer<SocketChannel> {  
    20.   
    21.         @Override  
    22.         protected void initChannel(final SocketChannel ch) throws Exception {  
    23.             ch.pipeline().addLast(new RpcDecoder(NettyMessage.class)).addLast(new RpcEncoder(NettyMessage.class))  
    24.                     .addLast(new IdleStateHandler(120, 0, 0, TimeUnit.SECONDS)).addLast(new HeartBeatRespHandler());  
    25.         }  
    26.   
    27.     }  
    28.   
    29.     public static void main(String[] args) {  
    30.         try {  
    31.             new NettyServer().bind(12000);  
    32.         } catch (Exception e) {  
    33.             e.printStackTrace();  
    34.         }  
    35.     }  
    36. }  
    Java代码  收藏代码
    1. public enum MessageType {  
    2.   
    3.     LOGIN_REQ((byte) 1), LOGIN_RESP((byte) 2), HEARTBEAT_REQ((byte) 3), HEARTBEAT_RESP((byte) 4);  
    4.     private byte type;  
    5.   
    6.     /** 
    7.      * @param type 
    8.      */  
    9.     private MessageType(byte type) {  
    10.         this.type = type;  
    11.     }  
    12.   
    13.     public byte getType() {  
    14.         return type;  
    15.     }  
    16.   
    17.     public void setType(byte type) {  
    18.         this.type = type;  
    19.     }  
    20.   
    21.     public static MessageType getMessageType(byte type) {  
    22.         for (MessageType b : MessageType.values()) {  
    23.             if (b.getType() == type) {  
    24.                 return b;  
    25.             }  
    26.         }  
    27.         return null;  
    28.     }  
    29.   
    30. }  
    Java代码  收藏代码
    1. public class HeartBeatRespHandler extends SimpleChannelInboundHandler<NettyMessage> {  
    2.   
    3.     /** 
    4.      * @see io.netty.channel.SimpleChannelInboundHandler#channelRead0(io.netty.channel.ChannelHandlerContext, 
    5.      *      java.lang.Object) 
    6.      */  
    7.     @Override  
    8.     protected void channelRead0(ChannelHandlerContext ctx, NettyMessage msg) throws Exception {  
    9.         if (msg.getHeader() != null && msg.getHeader().getType() == MessageType.HEARTBEAT_REQ.getType()) {  
    10.             NettyMessage heartBeat = buildHeartBeat(MessageType.HEARTBEAT_RESP.getType());  
    11.             ctx.writeAndFlush(heartBeat);  
    12.         } else {  
    13.             ctx.fireChannelRead(msg);  
    14.         }  
    15.     }  
    16.       
    17.   
    18.     /** 
    19.      * @see io.netty.channel.ChannelInboundHandlerAdapter#userEventTriggered(io.netty.channel.ChannelHandlerContext, 
    20.      *      java.lang.Object) 
    21.      */  
    22.     @Override  
    23.     public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {  
    24.         if (IdleStateEvent.class.isAssignableFrom(evt.getClass())) {  
    25.             IdleStateEvent event = (IdleStateEvent) evt;  
    26.             if (event.state() == IdleState.READER_IDLE) {  
    27.                 System.out.println("read 空闲 关闭链接");  
    28.                 ctx.disconnect();     
    29.             }   
    30.         }  
    31.     }  
    32.       
    33.   
    34.     /** 
    35.      *  
    36.      * @return 
    37.      * @author zhangwei<wei.zw@corp.netease.com> 
    38.      */  
    39.     private NettyMessage buildHeartBeat(byte type) {  
    40.         NettyMessage msg = new NettyMessage();  
    41.         Header header = new Header();  
    42.         header.setType(type);  
    43.         msg.setHeader(header);  
    44.         return msg;  
    45.     }  
    46.   
    47. }  
    Java代码  收藏代码
    1. public class NettyMessage implements Serializable{  
    2.       
    3.     /**  */  
    4.     private static final long serialVersionUID = 1L;  
    5.   
    6.     private Header header;  
    7.       
    8.     private Object body;  
    9.   
    10.     public Header getHeader() {  
    11.         return header;  
    12.     }  
    13.   
    14.     public void setHeader(Header header) {  
    15.         this.header = header;  
    16.     }  
    17.   
    18.     public Object getBody() {  
    19.         return body;  
    20.     }  
    21.   
    22.     public void setBody(Object body) {  
    23.         this.body = body;  
    24.     }  
    25.   
    26.     /**  
    27.      * @see java.lang.Object#toString() 
    28.      */  
    29.     @Override  
    30.     public String toString() {  
    31.         return "NettyMessage [header=" + header + ", body=" + body + "]";  
    32.     }  
    33.       
    34.       
    35. }  
    Java代码  收藏代码
    1. public class Header implements Serializable{  
    2.     /**  */  
    3.     private static final long serialVersionUID = 1L;  
    4.     private int crcCode=0xabef0101;  
    5.     private int length;  
    6.     private long sessionId;  
    7.     private byte type;  
    8.     private byte priority;  
    9.     private Map<String,Object> attachment=new HashMap<>();  
    10.     public int getCrcCode() {  
    11.         return crcCode;  
    12.     }  
    13.     public void setCrcCode(int crcCode) {  
    14.         this.crcCode = crcCode;  
    15.     }  
    16.     public int getLength() {  
    17.         return length;  
    18.     }  
    19.     public void setLength(int length) {  
    20.         this.length = length;  
    21.     }  
    22.     public long getSessionId() {  
    23.         return sessionId;  
    24.     }  
    25.     public void setSessionId(long sessionId) {  
    26.         this.sessionId = sessionId;  
    27.     }  
    28.     public byte getType() {  
    29.         return type;  
    30.     }  
    31.     public void setType(byte type) {  
    32.         this.type = type;  
    33.     }  
    34.     public byte getPriority() {  
    35.         return priority;  
    36.     }  
    37.     public void setPriority(byte priority) {  
    38.         this.priority = priority;  
    39.     }  
    40.     public Map<String, Object> getAttachment() {  
    41.         return attachment;  
    42.     }  
    43.     public void setAttachment(Map<String, Object> attachment) {  
    44.         this.attachment = attachment;  
    45.     }  
    46.     /**  
    47.      * @see java.lang.Object#toString() 
    48.      */  
    49.     @Override  
    50.     public String toString() {  
    51.         return "Header [crcCode=" + crcCode + ", length=" + length + ", sessionId=" + sessionId + ", type=" + type  
    52.                 + ", priority=" + priority + ", attachment=" + attachment + "]";  
    53.     }  
    54.       
    55.       
    56. }  

    客户端的结果是:

    Java代码  收藏代码
    1. etty time Client connected at port 12000  
    2. write 空闲  
    3. 发送的请求是:NettyMessage [header=Header [crcCode=-1410399999, length=0, sessionId=0, type=3, priority=0, attachment={}], body=null]  
    4. 接收到的消息是:NettyMessage [header=Header [crcCode=-1410399999, length=0, sessionId=0, type=4, priority=0, attachment={}], body=null]  
    5. write 空闲  
    6. 发送的请求是:NettyMessage [header=Header [crcCode=-1410399999, length=0, sessionId=0, type=3, priority=0, attachment={}], body=null]  
    7. 接收到的消息是:NettyMessage [header=Header [crcCode=-1410399999, length=0, sessionId=0, type=4, priority=0, attachment={}], body=null]  
    8. write 空闲  
    9. 发送的请求是:NettyMessage [header=Header [crcCode=-1410399999, length=0, sessionId=0, type=3, priority=0, attachment={}], body=null]  
    10. 接收到的消息是:NettyMessage [header=Header [crcCode=-1410399999, length=0, sessionId=0, type=4, priority=0, attachment={}], body=null]  
    11. write 空闲  
    12. 发送的请求是:NettyMessage [header=Header [crcCode=-1410399999, length=0, sessionId=0, type=3, priority=0, attachment={}], body=null]  
    13. 接收到的消息是:NettyMessage [header=Header [crcCode=-1410399999, length=0, sessionId=0, type=4, priority=0, attachment={}], body=null]  
    14. write 空闲  
    15. 发送的请求是:NettyMessage [header=Header [crcCode=-1410399999, length=0, sessionId=0, type=3, priority=0, attachment={}], body=null]  
    16. 接收到的消息是:NettyMessage [header=Header [crcCode=-1410399999, length=0, sessionId=0, type=4, priority=0, attachment={}], body=null]  
  • 相关阅读:
    window10 禁止更新
    安装node.msi 格式的文件失败
    url参数的转码和解码
    Linux12-内存管理
    C++四种cast
    Linux内核5-系统调用
    Linux内核3-进程管理
    UNIX12-线程(下)线程控制
    UNIX11-线程(上)
    Linux内核8-中断下半部和推后执行的工作(下半部)
  • 原文地址:https://www.cnblogs.com/austinspark-jessylu/p/7324909.html
Copyright © 2011-2022 走看看