zoukankan      html  css  js  c++  java
  • 基于head和body的自定义数据编解码器

    收集医院设备信息时由于消息为较短的json字符串,所以我们使用netty的自定义协议进行简单的数据接收。

    协议约定为:head(实际消息的字节长度)+body(实际需要解析的json数据)

    解码器如下,分为两步骤,最后我们获取json字符串进行处理,如果想要使用byte[]直接解析,请忽略第二个解码处理器:

    /**
     * Created by zzq on 2019/12/26.
     * <p>
     * ReplayingDecoder如果字节读取失败会抛出异常,则父类会自动捕获异常,重新调用decode直到能够重复调用
     */
    public class CustomDataFrameDecoder extends ReplayingDecoder<Void> {
        @Override
        protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
            int bodyLength = in.readInt();//直接将头部数据获取,得知body的长度,此时ByteBuf中readIndex会向后移动
            byte bodyTmp[] = new byte[bodyLength];
            ByteBuf body = in.readBytes(bodyLength);
            body.readBytes(bodyTmp);//将body存入字节数组
    
            CustomDataFrame customDataFrame = new CustomDataFrame(bodyLength, bodyTmp);
            out.add(customDataFrame);
        }
    }
    /**
     * Created by zzq on 2019/12/26.
     */
    public class CustomDataFrameToStrDecoder extends MessageToMessageDecoder<CustomDataFrame> {
        @Override
        protected void decode(ChannelHandlerContext ctx, CustomDataFrame msg, List out) throws Exception {
            if (msg != null) {
                String bodyStr = new String(msg.getBody(), Charset.forName("UTF-8"));
                out.add(bodyStr);
            }
        }
    }

     服务端如下:

    static ChannelInboundHandler channelInboundHandler = new SimpleChannelInboundHandler<String>() {
            @Override
            protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
                System.out.println("收到客户端消息:" + msg);
                String res = msg + "你好";
                ctx.channel().writeAndFlush(res);
            }
        };
    
        public static void main(String[] args) {
            EventLoopGroup parent = new NioEventLoopGroup(1);
            EventLoopGroup children = new NioEventLoopGroup(1);
    
            final EventLoopGroup handle = new NioEventLoopGroup();
    
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            serverBootstrap.group(parent, children)
                    .channel(NioServerSocketChannel.class)
                    .handler(new LoggingHandler(LogLevel.DEBUG))
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ChannelPipeline pipeline = ch.pipeline();
                            pipeline.addLast(new ClientMonitorHandlerAdapter());//客户端监控处理器
                            pipeline.addLast(new CustomDataFrameDecoder());
                            pipeline.addLast(new CustomDataFrameToStrDecoder());
                            pipeline.addLast(handle, channelInboundHandler);//使用线程池异步处理业务逻辑,在此不要阻塞io线程
                            pipeline.addLast(new CustomDataFrameEncoder());
                        }
                    });
            ChannelFuture channelFuture = null;
            try {
                channelFuture = serverBootstrap.bind(9099).sync();
                channelFuture.channel().closeFuture().sync();
    
            } catch (InterruptedException e) {
                channelFuture.channel().close();
                parent.shutdownGracefully();
                children.shutdownGracefully();
            }

    为了测试服务端,编写了客户端代码如下:

     EventLoopGroup eventLoopGroup = new NioEventLoopGroup(1);
    
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.group(eventLoopGroup);
            bootstrap.channel(NioSocketChannel.class)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ChannelPipeline pipeline = ch.pipeline();
                            pipeline.addLast(new CustomDataFrameDecoder());
                            pipeline.addLast(new CustomDataFrameToStrDecoder());
                            pipeline.addLast(new SimpleChannelInboundHandler<String>() {
                                @Override
                                protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
                                    System.out.println("收到服务端返回的消息");
                                    System.out.println(msg);
                                }
    
                                @Override
                                public void channelActive(final ChannelHandlerContext ctx) throws Exception {
                                    super.channelActive(ctx);
                                    new Thread(new Runnable() {
                                        @Override
                                        public void run() {
                                            for (; ; ) {
                                                try {
                                                    Thread.sleep(2000);
                                                } catch (Exception e) {
    
                                                }
    
                                                String msg = "{a:b,i:p}" + UUID.randomUUID().toString();
                                                System.out.println("客户端发送消息:" + msg);
                                                ctx.channel().writeAndFlush(msg);
                                            }
                                        }
                                    }).start();
    
                                }
    
                                @Override
                                public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
                                    super.exceptionCaught(ctx, cause);
                                    ctx.close();
                                }
                            });
                            pipeline.addLast(new CustomDataFrameEncoder());
                        }
                    });
            try {
                bootstrap.connect("127.0.0.1", 9099).sync().channel().closeFuture().sync();
            } catch (Exception e) {
                eventLoopGroup.shutdownGracefully();
            }

    异常处理器:

    /**
     * Created by zzq on 2019/12/27.
     */
    public class ClientMonitorHandlerAdapter extends ChannelInboundHandlerAdapter {
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            System.out.println("客户端异常下线 ");
            ctx.channel().close();
        }
    
        @Override
        public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
            String clientAddress = ctx.channel().remoteAddress().toString();
            System.out.println("*通信管道注销成功 :" + clientAddress);
        }
    
        @Override
        public void channelInactive(ChannelHandlerContext ctx) throws Exception {
            String clientAddress = ctx.channel().remoteAddress().toString();
            System.out.println("*通信管道关闭" + clientAddress);
        }
    
        @Override
        public void handlerAdded(ChannelHandlerContext ctx) throws Exception {//客户端连接建立
            System.out.println("客户端连接建立");
        }
    
        @Override
        public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {//客户端失去连接
            System.out.println("客户端失去连接");
        }
    }

    数据接收和发送需要使用到解码器的同时,还需要使用编码器,编码器如下:

    /**
     * Created by zzq on 2019/12/26.
     */
    public class CustomDataFrameEncoder extends MessageToByteEncoder<String> {
        @Override
        protected void encode(ChannelHandlerContext ctx, String msg, ByteBuf out) throws Exception {
            byte body[] = msg.getBytes(Charset.forName("UTF-8"));
            int bodyLength = body.length;
            out.writeInt(bodyLength);//向输出参数中添加数据
            out.writeBytes(body);
        }
    }

    但实际我们的服务端是不需要像客户端响应数据的,我们直接在数据接收端使用自定义线程池异步处理数据即可。

    PS:消息体中间转换类如下:

    /**
     * Created by zzq on 2019/12/26.
     */
    public class CustomDataFrame implements Serializable {
        public CustomDataFrame() {
        }
    
        public CustomDataFrame(int length, byte[] body) {
            this.length = length;
            this.body = body;
        }
    
        private int length;
        private byte body[];
    
        public int getLength() {
            return length;
        }
    
        public void setLength(int length) {
            this.length = length;
        }
    
        public byte[] getBody() {
            return body;
        }
    
        public void setBody(byte[] body) {
            this.body = body;
        }
    }
  • 相关阅读:
    Android编译源码过程和重点
    Ubuntu 10.04 下android 源码下载与编译
    Android2.3系统的overscroll效果
    【转】打造人脉不如打造自己
    Android生命周期
    Android Bitmap和Canvas学习笔记
    Android获取手机和系统版本等信息的代码
    Android网络连接处理学习笔记
    Android风格与主题
    Android程序反编译的方法
  • 原文地址:https://www.cnblogs.com/zzq-include/p/12102339.html
Copyright © 2011-2022 走看看