zoukankan      html  css  js  c++  java
  • netty5心跳与阻塞性业务消息分发实例

      继续之前的例子(netty5心跳与业务消息分发实例),我们在NettyClientHandler把业务消息改为阻塞性的:

    package com.wlf.netty.nettyclient.handler;
    
    import com.wlf.netty.nettyapi.javabean.Header;
    import com.wlf.netty.nettyapi.javabean.NettyMessage;
    import io.netty.channel.ChannelHandlerAdapter;
    import io.netty.channel.ChannelHandlerContext;
    import lombok.extern.slf4j.Slf4j;import java.io.RandomAccessFile;
    import java.util.Arrays;
    import java.util.concurrent.TimeUnit;
    
    /**
     * 客户端处理类
     */
    @Slf4j
    public class NettyClientHandler extends ChannelHandlerAdapter {
    
        private static final String AUDIO_PATH = "D:\input\寒号鸟.wav";
    
    
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            NettyMessage nettyMessage = (NettyMessage) msg;
    
            // 接收控制数据响应消息成功,每5秒发送pcm数据
            if (nettyMessage.getHeader() != null && nettyMessage.getHeader().getType() == (byte) 0) {
    //            ctx.writeAndFlush(buildClientRequest());
    //        }
                // 音频文件总时长,单位:秒
                int audioTotal = 122;
                try (RandomAccessFile raf = new RandomAccessFile(AUDIO_PATH, "r")) {
    
                    // 读结束标志
                    boolean readFinish = false;
    
                    // 文件总字节数
                    long audioLength = raf.length();
    
                    // 每次发送字节数
                    long eachLength = audioLength * 5 / audioTotal;
    
                    // 音频数据
                    byte[] audioData = null;
                    byte[] bytes = new byte[1024];
    
                    long cuccrentLength = 0L;
    
                    // 读取音频文件
                    while (true) {
                        // 休眠5秒
                        TimeUnit.SECONDS.sleep(5);
    
                        // 获取当前时间
                        long startTime = System.currentTimeMillis();
                        while (cuccrentLength <= eachLength) {
    
                            // 获取5秒内的音频字节流
                            int len = raf.read(bytes);
                            if (len == -1) {
                                readFinish = true;
                                break;
                            }
    
                            bytes = Arrays.copyOf(bytes, len);
                            audioData = ArrayUtils.addAll(audioData, bytes);
                            cuccrentLength += len;
                        }
    
    
                        // 发送5秒的数据包
                        NettyMessage nettyClientApi = buildNettyClientRequest(audioData, startTime);
                        log.info("[client] send client msg : {}", nettyClientApi);
                        ctx.writeAndFlush(nettyClientApi);
    
                        // 读完了
                        if (readFinish) {
                            log.info("The audio data send finish...");
                            break;
                        }
    
                        // 重置
                        cuccrentLength = 0L;
                    }
                }
            }
        }
    
       
       /**
         * long转字节
         *
         * @param values
         * @return
         */
        private byte[] longToBytes(long values) {
            byte[] buffer = new byte[8];
            for (int i = 0; i < 8; i++) {
                int offset = 64 - (i + 1) * 8;
                buffer[i] = (byte) ((values >> offset) & 0xff);
            }
            return buffer;
        }
    
        /**
         * 将两个数组合并起来
         *
         * @param array1
         * @param array2
         * @return
         */
        private byte[] addAll(byte[] array1, byte... array2) {
            byte[] joinedArray = new byte[array1.length + array2.length];
            System.arraycopy(array1, 0, joinedArray, 0, array1.length);
            System.arraycopy(array2, 0, joinedArray, array1.length, array2.length);
            return joinedArray;
        }
    
        /**
         * 在处理过程中引发异常时被调用
         *
         * @param ctx
         * @param cause
         * @throws Exception
         */
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            log.error("[Client] netty client request error: {}", cause.getMessage());
            ctx.close();
        }
    
    
        /**
         * 创建请求消息体
         *
         * @param audioData
         * @param time
         * @return
         */
        private NettyMessage buildNettyClientRequest(byte[] audioData, long time) {
            NettyMessage nettyMessage = new NettyMessage();
            Header header = new Header();
            byte[] data = buildPcmData(audioData, time);
            header.setDelimiter(0xABEF0101);
            header.setLength(data.length);
            header.setType((byte) 1);
            header.setReserved((byte) 0);
            nettyMessage.setHeader(header);
    
            // 设置数据包
            nettyMessage.setData(data);
            return nettyMessage;
        }
    
        /**
         * 构造PCM请求消息体
         *
         * @return
         */
        private byte[] buildPcmData(byte[] audioData, long time) {
            byte[] timeByte = longToBytes(time);
    
            return addAll(timeByte, audioData);
        }
    
    }

      重启客户端,会发现输出变成这样:

    23:35:34.339 [nioEventLoopGroup-1-0] INFO com.wlf.netty.nettyclient.handler.HeartBeatClientHandler - [client] control response is OK, header : Header{delimiter=-1410399999, length=8, type=0, reserved=0}. sid : 56, interval : 5000
    23:35:48.216 [nioEventLoopGroup-1-0] INFO com.wlf.netty.nettyclient.handler.NettyClientHandler - [client] send client msg : NettyMessage{header=Header{delimiter=-1410399999, length=161800, type=1, reserved=0}, data=[B@60deb0fd}
    23:35:53.259 [nioEventLoopGroup-1-0] INFO com.wlf.netty.nettyclient.handler.NettyClientHandler - [client] send client msg : NettyMessage{header=Header{delimiter=-1410399999, length=323592, type=1, reserved=0}, data=[B@6f7f091e}
    23:35:58.319 [nioEventLoopGroup-1-0] INFO com.wlf.netty.nettyclient.handler.NettyClientHandler - [client] send client msg : NettyMessage{header=Header{delimiter=-1410399999, length=485384, type=1, reserved=0}, data=[B@1c9231b2}
    23:36:03.361 [nioEventLoopGroup-1-0] INFO com.wlf.netty.nettyclient.handler.NettyClientHandler - [client] send client msg : NettyMessage{header=Header{delimiter=-1410399999, length=647176, type=1, reserved=0}, data=[B@35f278be}
    23:36:08.433 [nioEventLoopGroup-1-0] INFO com.wlf.netty.nettyclient.handler.NettyClientHandler - [client] send client msg : NettyMessage{header=Header{delimiter=-1410399999, length=808968, type=1, reserved=0}, data=[B@20be6fa5}
    23:36:13.496 [nioEventLoopGroup-1-0] INFO com.wlf.netty.nettyclient.handler.NettyClientHandler - [client] send client msg : NettyMessage{header=Header{delimiter=-1410399999, length=970760, type=1, reserved=0}, data=[B@371eb555}
    23:36:18.607 [nioEventLoopGroup-1-0] INFO com.wlf.netty.nettyclient.handler.NettyClientHandler - [client] send client msg : NettyMessage{header=Header{delimiter=-1410399999, length=1132552, type=1, reserved=0}, data=[B@3a8c0da5}
    23:36:23.694 [nioEventLoopGroup-1-0] INFO com.wlf.netty.nettyclient.handler.NettyClientHandler - [client] send client msg : NettyMessage{header=Header{delimiter=-1410399999, length=1294344, type=1, reserved=0}, data=[B@1c9da5c2}
    23:36:28.855 [nioEventLoopGroup-1-0] INFO com.wlf.netty.nettyclient.handler.NettyClientHandler - [client] send client msg : NettyMessage{header=Header{delimiter=-1410399999, length=1456136, type=1, reserved=0}, data=[B@4f0d32b3}
    23:36:33.974 [nioEventLoopGroup-1-0] INFO com.wlf.netty.nettyclient.handler.NettyClientHandler - [client] send client msg : NettyMessage{header=Header{delimiter=-1410399999, length=1617928, type=1, reserved=0}, data=[B@d7b821a}
    23:36:39.134 [nioEventLoopGroup-1-0] INFO com.wlf.netty.nettyclient.handler.NettyClientHandler - [client] send client msg : NettyMessage{header=Header{delimiter=-1410399999, length=1779720, type=1, reserved=0}, data=[B@57404735}
    23:36:44.272 [nioEventLoopGroup-1-0] INFO com.wlf.netty.nettyclient.handler.NettyClientHandler - [client] send client msg : NettyMessage{header=Header{delimiter=-1410399999, length=1941512, type=1, reserved=0}, data=[B@26825baa}
    23:36:49.410 [nioEventLoopGroup-1-0] INFO com.wlf.netty.nettyclient.handler.NettyClientHandler - [client] send client msg : NettyMessage{header=Header{delimiter=-1410399999, length=2103304, type=1, reserved=0}, data=[B@bc7d63}
    23:36:54.650 [nioEventLoopGroup-1-0] INFO com.wlf.netty.nettyclient.handler.NettyClientHandler - [client] send client msg : NettyMessage{header=Header{delimiter=-1410399999, length=2265096, type=1, reserved=0}, data=[B@5106443c}
    23:36:59.816 [nioEventLoopGroup-1-0] INFO com.wlf.netty.nettyclient.handler.NettyClientHandler - [client] send client msg : NettyMessage{header=Header{delimiter=-1410399999, length=2426888, type=1, reserved=0}, data=[B@4aac8c6}
    23:37:05.009 [nioEventLoopGroup-1-0] INFO com.wlf.netty.nettyclient.handler.NettyClientHandler - [client] send client msg : NettyMessage{header=Header{delimiter=-1410399999, length=2588680, type=1, reserved=0}, data=[B@30419cf2}
    23:37:10.200 [nioEventLoopGroup-1-0] INFO com.wlf.netty.nettyclient.handler.NettyClientHandler - [client] send client msg : NettyMessage{header=Header{delimiter=-1410399999, length=2750472, type=1, reserved=0}, data=[B@53f5b8fc}
    23:37:15.416 [nioEventLoopGroup-1-0] INFO com.wlf.netty.nettyclient.handler.NettyClientHandler - [client] send client msg : NettyMessage{header=Header{delimiter=-1410399999, length=2912264, type=1, reserved=0}, data=[B@3031311a}
    23:37:20.633 [nioEventLoopGroup-1-0] INFO com.wlf.netty.nettyclient.handler.NettyClientHandler - [client] send client msg : NettyMessage{header=Header{delimiter=-1410399999, length=3074056, type=1, reserved=0}, data=[B@628f3322}
    23:37:25.886 [nioEventLoopGroup-1-0] INFO com.wlf.netty.nettyclient.handler.NettyClientHandler - [client] send client msg : NettyMessage{header=Header{delimiter=-1410399999, length=3235848, type=1, reserved=0}, data=[B@5e95858d}

      心跳根本没进来,因为业务消息占用了事件循环的IO线程,还轮不到心跳消息的发送,除非当前的业务消息发送完了。反之亦然,如果是先发送心跳,那业务消息就别指望有机会发送了,因为心跳根本就停不下来。怎么办?两种解决方案,一种是采用netty自带的IdleStateHandler来做心跳,它不会占用IO线程,因为它采用的是事件检测;另一种就是把业务消息和心跳消息糅合到一起,既然都是定时发送,那就放一起好了,只不过这样一来定时的时间间隔就必须一致了。我们在心跳Handler中带上业务消息:

    package com.wlf.netty.nettyclient.handler;
    
    import com.wlf.netty.nettyapi.javabean.Header;
    import com.wlf.netty.nettyapi.javabean.NettyMessage;
    import com.wlf.netty.nettyapi.util.CommonUtil;
    import io.netty.buffer.ByteBuf;
    import io.netty.buffer.Unpooled;
    import io.netty.channel.ChannelHandlerAdapter;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.util.concurrent.ScheduledFuture;
    import lombok.extern.slf4j.Slf4j;
    
    import java.io.RandomAccessFile;
    import java.util.Arrays;
    import java.util.concurrent.TimeUnit;
    
    @Slf4j
    public class HeartBeatClientHandler extends ChannelHandlerAdapter {
        private static final String AUDIO_PATH = "D:\input\寒号鸟.wav";
        private volatile int interval = 5000;
        private volatile ScheduledFuture<?> heartBeat;
    
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            NettyMessage nettyMessage = (NettyMessage) msg;
    
            // 接收控制数据响应消息成功,发送心跳给服务端
            if (nettyMessage.getHeader() != null && nettyMessage.getHeader().getType() == (byte) 0) {
                byte[] data = nettyMessage.getData();
                ByteBuf buf = Unpooled.buffer(8);
                buf.writeBytes(data);
                int sid = buf.readInt();
                interval = buf.readInt();
                log.info("[client] control response is OK, header : {}. sid : {}, interval : {}", nettyMessage.getHeader(), sid, interval);
    
                // 每interval(默认5000)豪秒发送一次心跳请求到服务端
    //            heartBeat = ctx.executor().scheduleAtFixedRate(new Runnable() {
    //                                                               @Override
    //                                                               public void run() {
    //                                                                   NettyMessage heartBeat = buildHeartBeat(sid);
    //                                                                   log.info("[client] Client send heart beat message to server : ----> {}", heartBeat);
    //                                                                   ctx.writeAndFlush(heartBeat);
    //                                                               }
    //                                                           },
    //                    0, interval, TimeUnit.MILLISECONDS);
    
                // 音频文件总时长,单位:秒
                int audioTotal = 122;
                try (RandomAccessFile raf = new RandomAccessFile(AUDIO_PATH, "r")) {
    
                    // 读结束标志
                    boolean readFinish = false;
    
                    // 文件总字节数
                    long audioLength = raf.length();
    
                    // 每次发送字节数
                    long eachLength = audioLength * 5 / audioTotal;
    
                    // 音频数据
                    byte[] audioData = null;
                    byte[] bytes = new byte[1024];
    
                    long cuccrentLength = 0L;
    
                    // 读取音频文件
                    while (true) {
                        // 休眠5秒
                        TimeUnit.SECONDS.sleep(5);
    
                        // 获取当前时间
                        long startTime = System.currentTimeMillis();
                        do {
    
                            // 获取5秒内的音频字节流
                            int len = raf.read(bytes);
                            if (len == -1) {
                                readFinish = true;
                                break;
                            }
    
                            bytes = Arrays.copyOf(bytes, len);
                            audioData = ArrayUtils.addAll(audioData, bytes);
                            cuccrentLength += len;
                        } while (cuccrentLength <= eachLength);
    
    
                        // 发送心跳
                        NettyMessage heartBeat = buildHeartBeat(sid);
                        log.info("[client] Client send heart beat message to server : ----> {}", heartBeat);
                        ctx.writeAndFlush(heartBeat);
    
                        // 发送5秒的数据包
                        NettyMessage nettyClientApi = buildNettyClientRequest(audioData, startTime);
                        log.info("[client] Client send business message to server : ----> {}", nettyClientApi);
                        ctx.writeAndFlush(nettyClientApi);
    
                        // 读完了
                        if (readFinish) {
                            log.info("The audio data send finish...");
                            break;
                        }
    
                        // 重置
                        cuccrentLength = 0L;
                        audioData = null;
                    }
                }
            } else if (nettyMessage.getHeader() != null && nettyMessage.getHeader().getType() == (byte) 2) {
                log.info("[client] receive server business : {}", nettyMessage);
            } else {
                ctx.fireChannelRead(msg);
            }
        }
    
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            log.error("[Client] heart request error: {}", cause.getMessage());
            if (heartBeat != null) {
                heartBeat.cancel(true);
                heartBeat = null;
            }
            ctx.fireExceptionCaught(cause);
        }
    
        /**
         * 构造心跳请求消息体
         *
         * @return
         */
        private NettyMessage buildHeartBeat(int sid) {
            NettyMessage message = new NettyMessage();
            Header header = new Header();
            byte[] data = buildData(sid);
            header.setDelimiter(0xABEF0101);
            header.setLength(data.length);
            header.setType((byte) 3);
            header.setReserved((byte) 0);
            message.setHeader(header);
    
            // 设置数据包
            message.setData(data);
            return message;
        }
    
        /**
         * 构建心跳响应消息体
         *
         * @param sid
         * @return
         */
        private byte[] buildData(int sid) {
            ByteBuf result = Unpooled.buffer(4);
            result.writeInt(sid);
            return result.array();
        }
    
    
        /**
         * 创建请求消息体
         *
         * @param audioData
         * @param time
         * @return
         */
        private NettyMessage buildNettyClientRequest(byte[] audioData, long time) {
            NettyMessage nettyMessage = new NettyMessage();
            Header header = new Header();
            byte[] data = buildPcmData(audioData, time);
            header.setDelimiter(Delimiter.DELIMITER);
            header.setLength(data.length);
            header.setType(MessageType.PCM_TYPE.getType());
            header.setReserved((byte) 0);
            nettyMessage.setHeader(header);
    
            // 设置数据包
            nettyMessage.setData(data);
            return nettyMessage;
        }
    
        /**
         * 构造PCM请求消息体
         *
         * @return
         */
        private byte[] buildPcmData(byte[] audioData, long time) {
            byte[] timeByte = CommonUtil.longToBytes(time);
    
            return addAll(timeByte, audioData);
        }
    
        /**
         * long转字节
         *
         * @param values
         * @return
         */
        private byte[] longToBytes(long values) {
            byte[] buffer = new byte[8];
            for (int i = 0; i < 8; i++) {
                int offset = 64 - (i + 1) * 8;
                buffer[i] = (byte) ((values >> offset) & 0xff);
            }
            return buffer;
        }
    
        /**
         * 将两个数组合并起来
         *
         * @param array1
         * @param array2
         * @return
         */
        private byte[] addAll(byte[] array1, byte... array2) {
            byte[] joinedArray = new byte[array1.length + array2.length];
            System.arraycopy(array1, 0, joinedArray, 0, array1.length);
            System.arraycopy(array2, 0, joinedArray, array1.length, array2.length);
            return joinedArray;
        }
    
    
    }

      至于原来的NettyClientHandler相当于废了,在引导类中名存实亡。重启NettyClient,输出如下:

    15:45:07.442 [nioEventLoopGroup-1-2] INFO com.wlf.netty.nettyclient.handler.HeartBeatClientHandler - [client] control response is OK, header : Header{delimiter=-1410399999, length=8, type=0, reserved=0}. sid : 94, interval : 5000
    15:45:12.499 [nioEventLoopGroup-1-2] INFO com.wlf.netty.nettyclient.handler.HeartBeatClientHandler - [client] Client send heart beat message to server : ----> NettyMessage{header=Header{delimiter=-1410399999, length=4, type=3, reserved=0}, data=[B@2bf023ed}
    15:45:12.504 [nioEventLoopGroup-1-2] INFO com.wlf.netty.nettyclient.handler.HeartBeatClientHandler - [client] Client send business message to server : ----> NettyMessage{header=Header{delimiter=-1410399999, length=161800, type=1, reserved=0}, data=[B@b1ad25a}
    15:45:17.519 [nioEventLoopGroup-1-2] INFO com.wlf.netty.nettyclient.handler.HeartBeatClientHandler - [client] Client send heart beat message to server : ----> NettyMessage{header=Header{delimiter=-1410399999, length=4, type=3, reserved=0}, data=[B@44fb731d}
    15:45:17.520 [nioEventLoopGroup-1-2] INFO com.wlf.netty.nettyclient.handler.HeartBeatClientHandler - [client] Client send business message to server : ----> NettyMessage{header=Header{delimiter=-1410399999, length=161800, type=1, reserved=0}, data=[B@253b550c}
    15:45:22.551 [nioEventLoopGroup-1-2] INFO com.wlf.netty.nettyclient.handler.HeartBeatClientHandler - [client] Client send heart beat message to server : ----> NettyMessage{header=Header{delimiter=-1410399999, length=4, type=3, reserved=0}, data=[B@f7ef50d}
    15:45:22.552 [nioEventLoopGroup-1-2] INFO com.wlf.netty.nettyclient.handler.HeartBeatClientHandler - [client] Client send business message to server : ----> NettyMessage{header=Header{delimiter=-1410399999, length=161800, type=1, reserved=0}, data=[B@52ab6eba}
    15:45:27.565 [nioEventLoopGroup-1-2] INFO com.wlf.netty.nettyclient.handler.HeartBeatClientHandler - [client] Client send heart beat message to server : ----> NettyMessage{header=Header{delimiter=-1410399999, length=4, type=3, reserved=0}, data=[B@4d578c69}
    15:45:27.565 [nioEventLoopGroup-1-2] INFO com.wlf.netty.nettyclient.handler.HeartBeatClientHandler - [client] Client send business message to server : ----> NettyMessage{header=Header{delimiter=-1410399999, length=161800, type=1, reserved=0}, data=[B@323b23fa}
    15:45:32.598 [nioEventLoopGroup-1-2] INFO com.wlf.netty.nettyclient.handler.HeartBeatClientHandler - [client] Client send heart beat message to server : ----> NettyMessage{header=Header{delimiter=-1410399999, length=4, type=3, reserved=0}, data=[B@4a957e2d}
    15:45:32.598 [nioEventLoopGroup-1-2] INFO com.wlf.netty.nettyclient.handler.HeartBeatClientHandler - [client] Client send business message to server : ----> NettyMessage{header=Header{delimiter=-1410399999, length=161800, type=1, reserved=0}, data=[B@43c2077b}
    15:45:37.623 [nioEventLoopGroup-1-2] INFO com.wlf.netty.nettyclient.handler.HeartBeatClientHandler - [client] Client send heart beat message to server : ----> NettyMessage{header=Header{delimiter=-1410399999, length=4, type=3, reserved=0}, data=[B@3207ffae}
  • 相关阅读:
    背水一战 Windows 10 (26)
    背水一战 Windows 10 (25)
    背水一战 Windows 10 (24)
    背水一战 Windows 10 (23)
    背水一战 Windows 10 (22)
    背水一战 Windows 10 (21)
    背水一战 Windows 10 (20)
    背水一战 Windows 10 (19)
    背水一战 Windows 10 (18)
    背水一战 Windows 10 (17)
  • 原文地址:https://www.cnblogs.com/wuxun1997/p/11749928.html
Copyright © 2011-2022 走看看