zoukankan      html  css  js  c++  java
  • netty5心跳与阻塞性业务消息分发实例

      继续之前的例子(netty5心跳与业务消息分发实例),我们在NettyClientHandler把业务消息改为阻塞性的:

    package com.wlf.netty.nettyclient.handler;
    
    import com.wlf.netty.nettyapi.javabean.Header;
    import com.wlf.netty.nettyapi.javabean.NettyMessage;
    import io.netty.channel.ChannelHandlerAdapter;
    import io.netty.channel.ChannelHandlerContext;
    import lombok.extern.slf4j.Slf4j;import java.io.RandomAccessFile;
    import java.util.Arrays;
    import java.util.concurrent.TimeUnit;
    
    /**
     * 客户端处理类
     */
    @Slf4j
    public class NettyClientHandler extends ChannelHandlerAdapter {
    
        private static final String AUDIO_PATH = "D:\input\寒号鸟.wav";
    
    
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            NettyMessage nettyMessage = (NettyMessage) msg;
    
            // 接收控制数据响应消息成功,每5秒发送pcm数据
            if (nettyMessage.getHeader() != null && nettyMessage.getHeader().getType() == (byte) 0) {
    //            ctx.writeAndFlush(buildClientRequest());
    //        }
                // 音频文件总时长,单位:秒
                int audioTotal = 122;
                try (RandomAccessFile raf = new RandomAccessFile(AUDIO_PATH, "r")) {
    
                    // 读结束标志
                    boolean readFinish = false;
    
                    // 文件总字节数
                    long audioLength = raf.length();
    
                    // 每次发送字节数
                    long eachLength = audioLength * 5 / audioTotal;
    
                    // 音频数据
                    byte[] audioData = null;
                    byte[] bytes = new byte[1024];
    
                    long cuccrentLength = 0L;
    
                    // 读取音频文件
                    while (true) {
                        // 休眠5秒
                        TimeUnit.SECONDS.sleep(5);
    
                        // 获取当前时间
                        long startTime = System.currentTimeMillis();
                        while (cuccrentLength <= eachLength) {
    
                            // 获取5秒内的音频字节流
                            int len = raf.read(bytes);
                            if (len == -1) {
                                readFinish = true;
                                break;
                            }
    
                            bytes = Arrays.copyOf(bytes, len);
                            audioData = ArrayUtils.addAll(audioData, bytes);
                            cuccrentLength += len;
                        }
    
    
                        // 发送5秒的数据包
                        NettyMessage nettyClientApi = buildNettyClientRequest(audioData, startTime);
                        log.info("[client] send client msg : {}", nettyClientApi);
                        ctx.writeAndFlush(nettyClientApi);
    
                        // 读完了
                        if (readFinish) {
                            log.info("The audio data send finish...");
                            break;
                        }
    
                        // 重置
                        cuccrentLength = 0L;
                    }
                }
            }
        }
    
       
       /**
         * long转字节
         *
         * @param values
         * @return
         */
        private byte[] longToBytes(long values) {
            byte[] buffer = new byte[8];
            for (int i = 0; i < 8; i++) {
                int offset = 64 - (i + 1) * 8;
                buffer[i] = (byte) ((values >> offset) & 0xff);
            }
            return buffer;
        }
    
        /**
         * 将两个数组合并起来
         *
         * @param array1
         * @param array2
         * @return
         */
        private byte[] addAll(byte[] array1, byte... array2) {
            byte[] joinedArray = new byte[array1.length + array2.length];
            System.arraycopy(array1, 0, joinedArray, 0, array1.length);
            System.arraycopy(array2, 0, joinedArray, array1.length, array2.length);
            return joinedArray;
        }
    
        /**
         * 在处理过程中引发异常时被调用
         *
         * @param ctx
         * @param cause
         * @throws Exception
         */
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            log.error("[Client] netty client request error: {}", cause.getMessage());
            ctx.close();
        }
    
    
        /**
         * 创建请求消息体
         *
         * @param audioData
         * @param time
         * @return
         */
        private NettyMessage buildNettyClientRequest(byte[] audioData, long time) {
            NettyMessage nettyMessage = new NettyMessage();
            Header header = new Header();
            byte[] data = buildPcmData(audioData, time);
            header.setDelimiter(0xABEF0101);
            header.setLength(data.length);
            header.setType((byte) 1);
            header.setReserved((byte) 0);
            nettyMessage.setHeader(header);
    
            // 设置数据包
            nettyMessage.setData(data);
            return nettyMessage;
        }
    
        /**
         * 构造PCM请求消息体
         *
         * @return
         */
        private byte[] buildPcmData(byte[] audioData, long time) {
            byte[] timeByte = longToBytes(time);
    
            return addAll(timeByte, audioData);
        }
    
    }

      重启客户端,会发现输出变成这样:

    23:35:34.339 [nioEventLoopGroup-1-0] INFO com.wlf.netty.nettyclient.handler.HeartBeatClientHandler - [client] control response is OK, header : Header{delimiter=-1410399999, length=8, type=0, reserved=0}. sid : 56, interval : 5000
    23:35:48.216 [nioEventLoopGroup-1-0] INFO com.wlf.netty.nettyclient.handler.NettyClientHandler - [client] send client msg : NettyMessage{header=Header{delimiter=-1410399999, length=161800, type=1, reserved=0}, data=[B@60deb0fd}
    23:35:53.259 [nioEventLoopGroup-1-0] INFO com.wlf.netty.nettyclient.handler.NettyClientHandler - [client] send client msg : NettyMessage{header=Header{delimiter=-1410399999, length=323592, type=1, reserved=0}, data=[B@6f7f091e}
    23:35:58.319 [nioEventLoopGroup-1-0] INFO com.wlf.netty.nettyclient.handler.NettyClientHandler - [client] send client msg : NettyMessage{header=Header{delimiter=-1410399999, length=485384, type=1, reserved=0}, data=[B@1c9231b2}
    23:36:03.361 [nioEventLoopGroup-1-0] INFO com.wlf.netty.nettyclient.handler.NettyClientHandler - [client] send client msg : NettyMessage{header=Header{delimiter=-1410399999, length=647176, type=1, reserved=0}, data=[B@35f278be}
    23:36:08.433 [nioEventLoopGroup-1-0] INFO com.wlf.netty.nettyclient.handler.NettyClientHandler - [client] send client msg : NettyMessage{header=Header{delimiter=-1410399999, length=808968, type=1, reserved=0}, data=[B@20be6fa5}
    23:36:13.496 [nioEventLoopGroup-1-0] INFO com.wlf.netty.nettyclient.handler.NettyClientHandler - [client] send client msg : NettyMessage{header=Header{delimiter=-1410399999, length=970760, type=1, reserved=0}, data=[B@371eb555}
    23:36:18.607 [nioEventLoopGroup-1-0] INFO com.wlf.netty.nettyclient.handler.NettyClientHandler - [client] send client msg : NettyMessage{header=Header{delimiter=-1410399999, length=1132552, type=1, reserved=0}, data=[B@3a8c0da5}
    23:36:23.694 [nioEventLoopGroup-1-0] INFO com.wlf.netty.nettyclient.handler.NettyClientHandler - [client] send client msg : NettyMessage{header=Header{delimiter=-1410399999, length=1294344, type=1, reserved=0}, data=[B@1c9da5c2}
    23:36:28.855 [nioEventLoopGroup-1-0] INFO com.wlf.netty.nettyclient.handler.NettyClientHandler - [client] send client msg : NettyMessage{header=Header{delimiter=-1410399999, length=1456136, type=1, reserved=0}, data=[B@4f0d32b3}
    23:36:33.974 [nioEventLoopGroup-1-0] INFO com.wlf.netty.nettyclient.handler.NettyClientHandler - [client] send client msg : NettyMessage{header=Header{delimiter=-1410399999, length=1617928, type=1, reserved=0}, data=[B@d7b821a}
    23:36:39.134 [nioEventLoopGroup-1-0] INFO com.wlf.netty.nettyclient.handler.NettyClientHandler - [client] send client msg : NettyMessage{header=Header{delimiter=-1410399999, length=1779720, type=1, reserved=0}, data=[B@57404735}
    23:36:44.272 [nioEventLoopGroup-1-0] INFO com.wlf.netty.nettyclient.handler.NettyClientHandler - [client] send client msg : NettyMessage{header=Header{delimiter=-1410399999, length=1941512, type=1, reserved=0}, data=[B@26825baa}
    23:36:49.410 [nioEventLoopGroup-1-0] INFO com.wlf.netty.nettyclient.handler.NettyClientHandler - [client] send client msg : NettyMessage{header=Header{delimiter=-1410399999, length=2103304, type=1, reserved=0}, data=[B@bc7d63}
    23:36:54.650 [nioEventLoopGroup-1-0] INFO com.wlf.netty.nettyclient.handler.NettyClientHandler - [client] send client msg : NettyMessage{header=Header{delimiter=-1410399999, length=2265096, type=1, reserved=0}, data=[B@5106443c}
    23:36:59.816 [nioEventLoopGroup-1-0] INFO com.wlf.netty.nettyclient.handler.NettyClientHandler - [client] send client msg : NettyMessage{header=Header{delimiter=-1410399999, length=2426888, type=1, reserved=0}, data=[B@4aac8c6}
    23:37:05.009 [nioEventLoopGroup-1-0] INFO com.wlf.netty.nettyclient.handler.NettyClientHandler - [client] send client msg : NettyMessage{header=Header{delimiter=-1410399999, length=2588680, type=1, reserved=0}, data=[B@30419cf2}
    23:37:10.200 [nioEventLoopGroup-1-0] INFO com.wlf.netty.nettyclient.handler.NettyClientHandler - [client] send client msg : NettyMessage{header=Header{delimiter=-1410399999, length=2750472, type=1, reserved=0}, data=[B@53f5b8fc}
    23:37:15.416 [nioEventLoopGroup-1-0] INFO com.wlf.netty.nettyclient.handler.NettyClientHandler - [client] send client msg : NettyMessage{header=Header{delimiter=-1410399999, length=2912264, type=1, reserved=0}, data=[B@3031311a}
    23:37:20.633 [nioEventLoopGroup-1-0] INFO com.wlf.netty.nettyclient.handler.NettyClientHandler - [client] send client msg : NettyMessage{header=Header{delimiter=-1410399999, length=3074056, type=1, reserved=0}, data=[B@628f3322}
    23:37:25.886 [nioEventLoopGroup-1-0] INFO com.wlf.netty.nettyclient.handler.NettyClientHandler - [client] send client msg : NettyMessage{header=Header{delimiter=-1410399999, length=3235848, type=1, reserved=0}, data=[B@5e95858d}

      心跳根本没进来,因为业务消息占用了事件循环的IO线程,还轮不到心跳消息的发送,除非当前的业务消息发送完了。反之亦然,如果是先发送心跳,那业务消息就别指望有机会发送了,因为心跳根本就停不下来。怎么办?两种解决方案,一种是采用netty自带的IdleStateHandler来做心跳,它不会占用IO线程,因为它采用的是事件检测;另一种就是把业务消息和心跳消息糅合到一起,既然都是定时发送,那就放一起好了,只不过这样一来定时的时间间隔就必须一致了。我们在心跳Handler中带上业务消息:

    package com.wlf.netty.nettyclient.handler;
    
    import com.wlf.netty.nettyapi.javabean.Header;
    import com.wlf.netty.nettyapi.javabean.NettyMessage;
    import com.wlf.netty.nettyapi.util.CommonUtil;
    import io.netty.buffer.ByteBuf;
    import io.netty.buffer.Unpooled;
    import io.netty.channel.ChannelHandlerAdapter;
    import io.netty.channel.ChannelHandlerContext;
    import io.netty.util.concurrent.ScheduledFuture;
    import lombok.extern.slf4j.Slf4j;
    
    import java.io.RandomAccessFile;
    import java.util.Arrays;
    import java.util.concurrent.TimeUnit;
    
    @Slf4j
    public class HeartBeatClientHandler extends ChannelHandlerAdapter {
        private static final String AUDIO_PATH = "D:\input\寒号鸟.wav";
        private volatile int interval = 5000;
        private volatile ScheduledFuture<?> heartBeat;
    
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            NettyMessage nettyMessage = (NettyMessage) msg;
    
            // 接收控制数据响应消息成功,发送心跳给服务端
            if (nettyMessage.getHeader() != null && nettyMessage.getHeader().getType() == (byte) 0) {
                byte[] data = nettyMessage.getData();
                ByteBuf buf = Unpooled.buffer(8);
                buf.writeBytes(data);
                int sid = buf.readInt();
                interval = buf.readInt();
                log.info("[client] control response is OK, header : {}. sid : {}, interval : {}", nettyMessage.getHeader(), sid, interval);
    
                // 每interval(默认5000)豪秒发送一次心跳请求到服务端
    //            heartBeat = ctx.executor().scheduleAtFixedRate(new Runnable() {
    //                                                               @Override
    //                                                               public void run() {
    //                                                                   NettyMessage heartBeat = buildHeartBeat(sid);
    //                                                                   log.info("[client] Client send heart beat message to server : ----> {}", heartBeat);
    //                                                                   ctx.writeAndFlush(heartBeat);
    //                                                               }
    //                                                           },
    //                    0, interval, TimeUnit.MILLISECONDS);
    
                // 音频文件总时长,单位:秒
                int audioTotal = 122;
                try (RandomAccessFile raf = new RandomAccessFile(AUDIO_PATH, "r")) {
    
                    // 读结束标志
                    boolean readFinish = false;
    
                    // 文件总字节数
                    long audioLength = raf.length();
    
                    // 每次发送字节数
                    long eachLength = audioLength * 5 / audioTotal;
    
                    // 音频数据
                    byte[] audioData = null;
                    byte[] bytes = new byte[1024];
    
                    long cuccrentLength = 0L;
    
                    // 读取音频文件
                    while (true) {
                        // 休眠5秒
                        TimeUnit.SECONDS.sleep(5);
    
                        // 获取当前时间
                        long startTime = System.currentTimeMillis();
                        do {
    
                            // 获取5秒内的音频字节流
                            int len = raf.read(bytes);
                            if (len == -1) {
                                readFinish = true;
                                break;
                            }
    
                            bytes = Arrays.copyOf(bytes, len);
                            audioData = ArrayUtils.addAll(audioData, bytes);
                            cuccrentLength += len;
                        } while (cuccrentLength <= eachLength);
    
    
                        // 发送心跳
                        NettyMessage heartBeat = buildHeartBeat(sid);
                        log.info("[client] Client send heart beat message to server : ----> {}", heartBeat);
                        ctx.writeAndFlush(heartBeat);
    
                        // 发送5秒的数据包
                        NettyMessage nettyClientApi = buildNettyClientRequest(audioData, startTime);
                        log.info("[client] Client send business message to server : ----> {}", nettyClientApi);
                        ctx.writeAndFlush(nettyClientApi);
    
                        // 读完了
                        if (readFinish) {
                            log.info("The audio data send finish...");
                            break;
                        }
    
                        // 重置
                        cuccrentLength = 0L;
                        audioData = null;
                    }
                }
            } else if (nettyMessage.getHeader() != null && nettyMessage.getHeader().getType() == (byte) 2) {
                log.info("[client] receive server business : {}", nettyMessage);
            } else {
                ctx.fireChannelRead(msg);
            }
        }
    
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            log.error("[Client] heart request error: {}", cause.getMessage());
            if (heartBeat != null) {
                heartBeat.cancel(true);
                heartBeat = null;
            }
            ctx.fireExceptionCaught(cause);
        }
    
        /**
         * 构造心跳请求消息体
         *
         * @return
         */
        private NettyMessage buildHeartBeat(int sid) {
            NettyMessage message = new NettyMessage();
            Header header = new Header();
            byte[] data = buildData(sid);
            header.setDelimiter(0xABEF0101);
            header.setLength(data.length);
            header.setType((byte) 3);
            header.setReserved((byte) 0);
            message.setHeader(header);
    
            // 设置数据包
            message.setData(data);
            return message;
        }
    
        /**
         * 构建心跳响应消息体
         *
         * @param sid
         * @return
         */
        private byte[] buildData(int sid) {
            ByteBuf result = Unpooled.buffer(4);
            result.writeInt(sid);
            return result.array();
        }
    
    
        /**
         * 创建请求消息体
         *
         * @param audioData
         * @param time
         * @return
         */
        private NettyMessage buildNettyClientRequest(byte[] audioData, long time) {
            NettyMessage nettyMessage = new NettyMessage();
            Header header = new Header();
            byte[] data = buildPcmData(audioData, time);
            header.setDelimiter(Delimiter.DELIMITER);
            header.setLength(data.length);
            header.setType(MessageType.PCM_TYPE.getType());
            header.setReserved((byte) 0);
            nettyMessage.setHeader(header);
    
            // 设置数据包
            nettyMessage.setData(data);
            return nettyMessage;
        }
    
        /**
         * 构造PCM请求消息体
         *
         * @return
         */
        private byte[] buildPcmData(byte[] audioData, long time) {
            byte[] timeByte = CommonUtil.longToBytes(time);
    
            return addAll(timeByte, audioData);
        }
    
        /**
         * long转字节
         *
         * @param values
         * @return
         */
        private byte[] longToBytes(long values) {
            byte[] buffer = new byte[8];
            for (int i = 0; i < 8; i++) {
                int offset = 64 - (i + 1) * 8;
                buffer[i] = (byte) ((values >> offset) & 0xff);
            }
            return buffer;
        }
    
        /**
         * 将两个数组合并起来
         *
         * @param array1
         * @param array2
         * @return
         */
        private byte[] addAll(byte[] array1, byte... array2) {
            byte[] joinedArray = new byte[array1.length + array2.length];
            System.arraycopy(array1, 0, joinedArray, 0, array1.length);
            System.arraycopy(array2, 0, joinedArray, array1.length, array2.length);
            return joinedArray;
        }
    
    
    }

      至于原来的NettyClientHandler相当于废了,在引导类中名存实亡。重启NettyClient,输出如下:

    15:45:07.442 [nioEventLoopGroup-1-2] INFO com.wlf.netty.nettyclient.handler.HeartBeatClientHandler - [client] control response is OK, header : Header{delimiter=-1410399999, length=8, type=0, reserved=0}. sid : 94, interval : 5000
    15:45:12.499 [nioEventLoopGroup-1-2] INFO com.wlf.netty.nettyclient.handler.HeartBeatClientHandler - [client] Client send heart beat message to server : ----> NettyMessage{header=Header{delimiter=-1410399999, length=4, type=3, reserved=0}, data=[B@2bf023ed}
    15:45:12.504 [nioEventLoopGroup-1-2] INFO com.wlf.netty.nettyclient.handler.HeartBeatClientHandler - [client] Client send business message to server : ----> NettyMessage{header=Header{delimiter=-1410399999, length=161800, type=1, reserved=0}, data=[B@b1ad25a}
    15:45:17.519 [nioEventLoopGroup-1-2] INFO com.wlf.netty.nettyclient.handler.HeartBeatClientHandler - [client] Client send heart beat message to server : ----> NettyMessage{header=Header{delimiter=-1410399999, length=4, type=3, reserved=0}, data=[B@44fb731d}
    15:45:17.520 [nioEventLoopGroup-1-2] INFO com.wlf.netty.nettyclient.handler.HeartBeatClientHandler - [client] Client send business message to server : ----> NettyMessage{header=Header{delimiter=-1410399999, length=161800, type=1, reserved=0}, data=[B@253b550c}
    15:45:22.551 [nioEventLoopGroup-1-2] INFO com.wlf.netty.nettyclient.handler.HeartBeatClientHandler - [client] Client send heart beat message to server : ----> NettyMessage{header=Header{delimiter=-1410399999, length=4, type=3, reserved=0}, data=[B@f7ef50d}
    15:45:22.552 [nioEventLoopGroup-1-2] INFO com.wlf.netty.nettyclient.handler.HeartBeatClientHandler - [client] Client send business message to server : ----> NettyMessage{header=Header{delimiter=-1410399999, length=161800, type=1, reserved=0}, data=[B@52ab6eba}
    15:45:27.565 [nioEventLoopGroup-1-2] INFO com.wlf.netty.nettyclient.handler.HeartBeatClientHandler - [client] Client send heart beat message to server : ----> NettyMessage{header=Header{delimiter=-1410399999, length=4, type=3, reserved=0}, data=[B@4d578c69}
    15:45:27.565 [nioEventLoopGroup-1-2] INFO com.wlf.netty.nettyclient.handler.HeartBeatClientHandler - [client] Client send business message to server : ----> NettyMessage{header=Header{delimiter=-1410399999, length=161800, type=1, reserved=0}, data=[B@323b23fa}
    15:45:32.598 [nioEventLoopGroup-1-2] INFO com.wlf.netty.nettyclient.handler.HeartBeatClientHandler - [client] Client send heart beat message to server : ----> NettyMessage{header=Header{delimiter=-1410399999, length=4, type=3, reserved=0}, data=[B@4a957e2d}
    15:45:32.598 [nioEventLoopGroup-1-2] INFO com.wlf.netty.nettyclient.handler.HeartBeatClientHandler - [client] Client send business message to server : ----> NettyMessage{header=Header{delimiter=-1410399999, length=161800, type=1, reserved=0}, data=[B@43c2077b}
    15:45:37.623 [nioEventLoopGroup-1-2] INFO com.wlf.netty.nettyclient.handler.HeartBeatClientHandler - [client] Client send heart beat message to server : ----> NettyMessage{header=Header{delimiter=-1410399999, length=4, type=3, reserved=0}, data=[B@3207ffae}
  • 相关阅读:
    打印出从1到最大的n位十进制数
    排序算法--(二)
    排序算法 (-)
    两个栈实现一个队列
    C++ 模板类解析
    根据先序遍历中序遍历重建二叉树
    格式化时间
    用js实现冒泡排序
    接口和抽象类的区别
    解析json
  • 原文地址:https://www.cnblogs.com/wuxun1997/p/11749928.html
Copyright © 2011-2022 走看看