zoukankan      html  css  js  c++  java
  • netty5心跳与业务消息分发实例

      继续基于我们之前的例子(参见netty5自定义私有协议实例 ),这次我们加上连接校验和心跳机制:

      

       只要校验通过,客户端发送心跳和业务消息是两个不同的事件发送的,彼此互不干扰。针对以上流程,我们需要增加4个handler:客户端请求handler、心跳handler ,服务端校验handler、心跳处理handler。当然,引导类也得添加上面对应的handler。上代码:

      新增客户端首次连接handler:

    package com.wlf.netty.nettyclient.handler;
    
    import com.wlf.netty.nettyapi.javabean.Header;
    import com.wlf.netty.nettyapi.javabean.NettyMessage;
    import io.netty.channel.ChannelHandlerAdapter;
    import io.netty.channel.ChannelHandlerContext;
    import lombok.extern.slf4j.Slf4j;
    
    @Slf4j
    public class ControlClientHandler extends ChannelHandlerAdapter {
    
        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
            ctx.writeAndFlush(buildControlReq());
        }
    
        /**
         * 在处理过程中引发异常时被调用
         *
         * @param ctx
         * @param cause
         * @throws Exception
         */
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            log.error("[Client] conrol request error: {}", cause.getMessage());
            ctx.fireExceptionCaught(cause);
        }
    
        /**
         * 构造请求消息体
         *
         * @return
         */
        private NettyMessage buildControlReq() {
            NettyMessage nettyMessage = new NettyMessage();
            Header header = new Header();
            byte[] data = buildControlData();
            header.setDelimiter(0xABEF0101);
            header.setLength(data.length);
            header.setType((byte) 0);
            header.setReserved((byte) 0);
            nettyMessage.setHeader(header);
    
            // 设置数据包
            nettyMessage.setData(data);
            return nettyMessage;
        }
    
        /**
         * 构造控制请求消息体
         *
         * @return
         */
        private byte[] buildControlData() {
            byte[] result = new byte[2];
    
            result[0] = (byte) 1;
    
            result[1] = (byte) 16;
            return result;
        }
    }

      服务端校验handler:

    package com.wlf.netty.nettyserver.handler;
    
    import com.wlf.netty.nettyapi.javabean.Header;
    import com.wlf.netty.nettyapi.javabean.NettyMessage;
    import io.netty.buffer.ByteBuf;
    import io.netty.buffer.Unpooled;
    import io.netty.channel.ChannelHandlerAdapter;
    import io.netty.channel.ChannelHandlerContext;
    import lombok.extern.slf4j.Slf4j;
    
    import java.net.InetSocketAddress;
    import java.util.Map;
    import java.util.concurrent.ConcurrentHashMap;
    
    @Slf4j
    public class ControlServerHandler extends ChannelHandlerAdapter {
    
        // 白名单列表
        private String[] whiteList = new String[]{"127.0.0.1"};
    
        private Map<String, Boolean> nodeCheck = new ConcurrentHashMap<>();
    
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            NettyMessage nettyMessage = (NettyMessage) msg;
    
            // 如果是控制数据格式请求消息,说明是客户端首次请求,校验白名单,否则进入下一个处理流程
            if (nettyMessage.getHeader() != null &&
                    nettyMessage.getHeader().getType() == (byte) 0) {
                String nodeIndex = ctx.channel().remoteAddress().toString();
                NettyMessage controlResponse = null;
    
                if (nodeCheck.containsKey(nodeIndex)) {
                    log.warn("request ip : {} has requested.", nodeIndex);
                    controlResponse = buildResponse(false);
                } else {
                    InetSocketAddress address = (InetSocketAddress) ctx.channel().remoteAddress();
                    String ip = address.getAddress().getHostAddress();
                    boolean isOK = false;
    
                    for (String whiteIp : whiteList) {
                        if (ip.equals(whiteIp)) {
                            isOK = true;
                            break;
                        }
                    }
    
                    if (isOK) {
                        nodeCheck.put(nodeIndex, true);
                        // 白名单校验通过,校验是否支持PCM格式
                        byte[] data = nettyMessage.getData();
                        ByteBuf buf = Unpooled.buffer(2);
                        buf.writeBytes(data);
                        byte sample = buf.readByte();
    
                        if (sample != (byte) 1) {
                            log.error("sample : {} is not 1", sample);
                            controlResponse = buildResponse(false);
                        } else {
                            controlResponse = buildResponse(true);
                        }
                    } else {
                        log.error("ip : {} is not in whiteList : {}.", ip, whiteList);
                        controlResponse = buildResponse(false);
                    }
    
                }
                log.info("[server] The control response is : {}, data : {}", controlResponse, controlResponse.getData());
                ctx.writeAndFlush(controlResponse);
            } else {
                ctx.fireChannelRead(msg);
            }
        }
    
        /**
         * 在处理过程中引发异常时被调用
         *
         * @param ctx
         * @param cause
         * @throws Exception
         */
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            log.error("[server] control response error: {}", cause.getMessage());
            // 删除缓存
            nodeCheck.remove(ctx.channel().remoteAddress().toString());
            ctx.fireExceptionCaught(cause);
        }
    
        /**
         * 构造响应消息体
         *
         * @param isOk
         * @return
         */
        private NettyMessage buildResponse(boolean isOk) {
            NettyMessage nettyMessage = new NettyMessage();
            Header header = new Header();
            byte[] data = buildData(isOk);
            header.setDelimiter(0xABEF0101);
            header.setLength(data.length);
            header.setType((byte) 0);
            header.setReserved((byte) 0);
            nettyMessage.setHeader(header);
            nettyMessage.setData(data);
            return nettyMessage;
        }
    
        /**
         * 构建控制数据格式响应消息体
         *
         * @param isOk
         * @return
         */
        private byte[] buildData(boolean isOk) {
            ByteBuf result = null;
            if (isOk) {
                result = Unpooled.buffer(8);
                // 生成sid
                result.writeInt(buildSid());
    
                // 心跳发送间隔,5000毫秒秒
                result.writeInt(5000);
            } else {
                result = Unpooled.buffer(1);
                result.writeByte((byte) -1);
            }
    
            return result.array();
        }
    
        private int buildSid() {
            int max = 100, min = 1;
            long randomNum = System.currentTimeMillis();
            return (int) (randomNum % (max - min) + min);
        }
    }

      心跳客户端handler:

    package com.wlf.netty.nettyclient.handler;
    
    import com.wlf.netty.nettyapi.javabean.Header;
    import com.wlf.netty.nettyapi.javabean.NettyMessage;
    import io.netty.buffer.ByteBuf;
    import io.netty.buffer.Unpooled;
    import io.netty.channel.ChannelHandlerAdapter;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.util.concurrent.ScheduledFuture;
    import lombok.extern.slf4j.Slf4j;
    
    import java.util.concurrent.TimeUnit;
    
    @Slf4j
    public class HeartBeatClientHandler extends ChannelHandlerAdapter {
        private volatile int interval = 5000;
        private volatile ScheduledFuture<?> heartBeat;
    
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            NettyMessage nettyMessage = (NettyMessage) msg;
    
            // 接收控制数据响应消息成功,发送心跳给服务端
            if (nettyMessage.getHeader() != null && nettyMessage.getHeader().getType() == (byte) 0) {
                byte[] data = nettyMessage.getData();
                ByteBuf buf = Unpooled.buffer(8);
                buf.writeBytes(data);
                int sid = buf.readInt();
                interval = buf.readInt();
                log.info("[client] control response is OK, header : {}. sid : {}, interval : {}", nettyMessage.getHeader(), sid, interval);
    
                // 每interval(默认5000)豪秒发送一次心跳请求到服务端
                heartBeat = ctx.executor().scheduleAtFixedRate(new Runnable() {
                                                                   @Override
                                                                   public void run() {
                                                                       NettyMessage heartBeat = buildHeartBeat(sid);
                                                                       log.info("[client] Client send heart beat message to server : ----> {}", heartBeat);
                                                                       ctx.writeAndFlush(heartBeat);
                                                                   }
                                                               },
                        0, interval, TimeUnit.MILLISECONDS);
    
                // 消息继续向后传
                ctx.fireChannelRead(msg);
            } else {
           ctx.fireChannedRead(msg);
         }
    } @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { log.error("[Client] heart request error: {}", cause.getMessage()); if (heartBeat != null) { heartBeat.cancel(true); heartBeat = null; } ctx.fireExceptionCaught(cause); } /** * 构造心跳请求消息体 * * @return */ private NettyMessage buildHeartBeat(int sid) { NettyMessage message = new NettyMessage(); Header header = new Header(); byte[] data = buildData(sid); header.setDelimiter(0xABEF0101); header.setLength(data.length); header.setType((byte) 3); header.setReserved((byte) 0); message.setHeader(header); // 设置数据包 message.setData(data); return message; } /** * 构建心跳响应消息体 * * @param sid * @return */ private byte[] buildData(int sid) { ByteBuf result = Unpooled.buffer(4); result.writeInt(sid); return result.array(); } }

      服务端心跳handler:

    package com.wlf.netty.nettyserver.handler;
    
    import com.wlf.netty.nettyapi.constant.MessageType;
    import com.wlf.netty.nettyapi.javabean.NettyMessage;
    import io.netty.channel.ChannelHandlerAdapter;
    import io.netty.channel.ChannelHandlerContext;
    import lombok.extern.slf4j.Slf4j;
    
    @Slf4j
    public class HeartBeatServerHandler extends ChannelHandlerAdapter {
    
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            NettyMessage nettyMessage = (NettyMessage) msg;
    
            // 接收到心跳请求,打印心跳消息,否则进入下一处理流程
            if (nettyMessage.getHeader() != null &&
                    nettyMessage.getHeader().getType() == (byte) 3) {
                log.info("[server] Receive client heart beat message : ----> {}", nettyMessage);
            } else {
                ctx.fireChannelRead(msg);
            }
        }
    
        /**
         * 在处理过程中引发异常时被调用
         *
         * @param ctx
         * @param cause
         * @throws Exception
         */
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            log.error("[server] heart response error: {}", cause.getMessage());
    
            ctx.fireExceptionCaught(cause);
        }
    
    }

      客户端发送业务消息NettyClientHandler修改,发送触发从channelAcitve事件改为channelRead事件:

        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            NettyMessage nettyMessage = (NettyMessage) msg;
    
            // 接收控制数据响应消息成功,每5秒发送pcm数据
            if (nettyMessage.getHeader() != null && nettyMessage.getHeader().getType() == (byte) 0) {
                ctx.writeAndFlush(buildClientRequest());
            }
    
    //    @Override
    //    public void channelActive(ChannelHandlerContext ctx) throws Exception {
    //        ctx.writeAndFlush(buildClientRequest());
    //    }

       客户端引导类NettyClient修改,新增handler:

        public void connect(int port, String host) throws Exception {
            NioEventLoopGroup workGroup = new NioEventLoopGroup();
            try {
                Bootstrap bootstrap = new Bootstrap();
                bootstrap.group(workGroup).channel(NioSocketChannel.class).option(ChannelOption.TCP_NODELAY, true)
                        .handler(new ChannelInitializer<SocketChannel>() {
                            @Override
                            public void initChannel(SocketChannel channel) throws Exception {
                                channel.pipeline().addLast(new NettyMessageDecoder());
                                channel.pipeline().addLast(new NettyMessageEncoder());
                                channel.pipeline().addLast(new ControlClientHandler());
                                channel.pipeline().addLast(new HeartBeatClientHandler());
                                channel.pipeline().addLast(new NettyClientHandler());
                            }
                        });
                ChannelFuture future = bootstrap.connect(host, port).sync();
                future.channel().closeFuture().sync();
            } finally {
                workGroup.shutdownGracefully();
            }
        }

      服务端引导类修改:

        public void bind(int port) throws Exception {
            try {
                ServerBootstrap serverBootstrap = new ServerBootstrap();
                serverBootstrap.group(bossGroup, workGroup).channel(NioServerSocketChannel.class)
                        .option(ChannelOption.SO_BACKLOG, 100)
                        .handler(new LoggingHandler(LogLevel.INFO))
                        .childHandler(new ChannelInitializer<SocketChannel>() {
                            @Override
                            public void initChannel(SocketChannel channel) throws Exception {
                                channel.pipeline().addLast(new NettyMessageDecoder());
                                channel.pipeline().addLast(new NettyMessageEncoder());
                                channel.pipeline().addLast(new ControlServerHandler());
                                channel.pipeline().addLast(new HeartBeatServerHandler());
                                channel.pipeline().addLast(new NettyServerHandler());
                            }
                        });
                // 绑定端口
                ChannelFuture channelFuture = serverBootstrap.bind(port).sync();
                channelFuture.channel().closeFuture().sync();
            } finally {
                bossGroup.shutdownGracefully();
                workGroup.shutdownGracefully();
            }
        }

      我们跑起来看看,先跑服务端再跑客户端:

      服务端输出:

    22:55:33.725 [nioEventLoopGroup-1-0] INFO com.wlf.netty.nettyserver.handler.ControlServerHandler - [server] The control response is : NettyMessage{header=Header{delimiter=-1410399999, length=8, type=0, reserved=0}, data=[B@65aac359}, data : [0, 0, 0, 18, 0, 0, 19, -120]
    22:55:33.741 [nioEventLoopGroup-1-0] INFO com.wlf.netty.nettyserver.handler.NettyServerHandler - [server] server receive client message : NettyMessage{header=Header{delimiter=-1410399999, length=8, type=1, reserved=0}, data=[B@75f76fa3}
    22:55:33.752 [nioEventLoopGroup-1-0] INFO com.wlf.netty.nettyserver.handler.NettyServerHandler - data length: 8
    22:55:33.752 [nioEventLoopGroup-1-0] INFO com.wlf.netty.nettyserver.handler.NettyServerHandler - startTime: 1572105096532

    22:55:38.747 [nioEventLoopGroup-1-0] INFO com.wlf.netty.nettyserver.handler.HeartBeatServerHandler - [server] Receive client heart beat message : ----> NettyMessage{header=Header{delimiter=-1410399999, length=4, type=3, reserved=0}, data=[B@77e50de} 22:55:43.746 [nioEventLoopGroup-1-0] INFO com.wlf.netty.nettyserver.handler.HeartBeatServerHandler - [server] Receive client heart beat message : ----> NettyMessage{header=Header{delimiter=-1410399999, length=4, type=3, reserved=0}, data=[B@75305725} 22:55:48.769 [nioEventLoopGroup-1-0] INFO com.wlf.netty.nettyserver.handler.HeartBeatServerHandler - [server] Receive client heart beat message : ----> NettyMessage{header=Header{delimiter=-1410399999, length=4, type=3, reserved=0}, data=[B@29515fc1} 22:55:53.754 [nioEventLoopGroup-1-0] INFO com.wlf.netty.nettyserver.handler.HeartBeatServerHandler - [server] Receive client heart beat message : ----> NettyMessage{header=Header{delimiter=-1410399999, length=4, type=3, reserved=0}, data=[B@1dfc1da8} 22:55:58.739 [nioEventLoopGroup-1-0] INFO com.wlf.netty.nettyserver.handler.HeartBeatServerHandler - [server] Receive client heart beat message : ----> NettyMessage{header=Header{delimiter=-1410399999, length=4, type=3, reserved=0}, data=[B@68f15669} 22:56:03.738 [nioEventLoopGroup-1-0] INFO com.wlf.netty.nettyserver.handler.HeartBeatServerHandler - [server] Receive client heart beat message : ----> NettyMessage{header=Header{delimiter=-1410399999, length=4, type=3, reserved=0}, data=[B@1f8fad2d} 22:56:08.742 [nioEventLoopGroup-1-0] INFO com.wlf.netty.nettyserver.handler.HeartBeatServerHandler - [server] Receive client heart beat message : ----> NettyMessage{header=Header{delimiter=-1410399999, length=4, type=3, reserved=0}, data=[B@3c83464a}

      客户端输出:

    22:55:33.725 [nioEventLoopGroup-1-0] INFO com.wlf.netty.nettyclient.handler.HeartBeatClientHandler - [client] control response is OK, header : Header{delimiter=-1410399999, length=8, type=0, reserved=0}. sid : 18, interval : 5000
    22:55:33.725 [nioEventLoopGroup-1-0] INFO com.wlf.netty.nettyclient.handler.HeartBeatClientHandler - [client] Client send heart beat message to server : ----> NettyMessage{header=Header{delimiter=-1410399999, length=4, type=3, reserved=0}, data=[B@b1db7b2}
    22:55:38.747 [nioEventLoopGroup-1-0] INFO com.wlf.netty.nettyclient.handler.HeartBeatClientHandler - [client] Client send heart beat message to server : ----> NettyMessage{header=Header{delimiter=-1410399999, length=4, type=3, reserved=0}, data=[B@4958706c}
    22:55:43.746 [nioEventLoopGroup-1-0] INFO com.wlf.netty.nettyclient.handler.HeartBeatClientHandler - [client] Client send heart beat message to server : ----> NettyMessage{header=Header{delimiter=-1410399999, length=4, type=3, reserved=0}, data=[B@1fb0fa50}
    22:55:48.769 [nioEventLoopGroup-1-0] INFO com.wlf.netty.nettyclient.handler.HeartBeatClientHandler - [client] Client send heart beat message to server : ----> NettyMessage{header=Header{delimiter=-1410399999, length=4, type=3, reserved=0}, data=[B@4aeea171}
    22:55:53.754 [nioEventLoopGroup-1-0] INFO com.wlf.netty.nettyclient.handler.HeartBeatClientHandler - [client] Client send heart beat message to server : ----> NettyMessage{header=Header{delimiter=-1410399999, length=4, type=3, reserved=0}, data=[B@2c282fb9}
    22:55:58.739 [nioEventLoopGroup-1-0] INFO com.wlf.netty.nettyclient.handler.HeartBeatClientHandler - [client] Client send heart beat message to server : ----> NettyMessage{header=Header{delimiter=-1410399999, length=4, type=3, reserved=0}, data=[B@2679e140}
    22:56:03.738 [nioEventLoopGroup-1-0] INFO com.wlf.netty.nettyclient.handler.HeartBeatClientHandler - [client] Client send heart beat message to server : ----> NettyMessage{header=Header{delimiter=-1410399999, length=4, type=3, reserved=0}, data=[B@3e03eda5}
    22:56:08.742 [nioEventLoopGroup-1-0] INFO com.wlf.netty.nettyclient.handler.HeartBeatClientHandler - [client] Client send heart beat message to server : ----> NettyMessage{header=Header{delimiter=-1410399999, length=4, type=3, reserved=0}, data=[B@1d9363d}

       从输出可以看到心跳是正常的,业务消息也发送了,但有一点要注意,就是业务消息只有一条,而且在心跳之前发送给了客户端。如果业务消息也是阻塞性的,那么就会出现问题,详见netty5心跳与阻塞性业务消息分发实例

  • 相关阅读:
    PHP 表单
    php之表单-2(表单验证)
    go语言使用官方的 log package 来记录日志
    golang 中timer,ticker 的使用
    go语言slice的理解
    GETTING STARTED WITH THE OTTO JAVASCRIPT INTERPRETER
    golang time.Duration()的问题解疑
    css3动画
    【转】golang中的并行与并发
    【转】Golang 关于通道 Chan 详解
  • 原文地址:https://www.cnblogs.com/wuxun1997/p/11746337.html
Copyright © 2011-2022 走看看