zoukankan      html  css  js  c++  java
  • Netty样例之如何使用LengthFieldBasedFrameDecoder

    一  客户端 

    public class NewClient {
    
        private  int port;
        private  String address;
    
        public NewClient(int port,String address) {
            this.port = port;
            this.address = address;
        }
    
        public void start(){
            EventLoopGroup group = new NioEventLoopGroup();
    
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.group(group)
                    .channel(NioSocketChannel.class)
                    .option(ChannelOption.TCP_NODELAY, true)
                    .handler(new NewClientChannelInitializer());
    
            try {
                ChannelFuture future = bootstrap.connect(address,port).sync();
                new Thread(new Runnable() {
                    
                    @Override
                    public void run() {
                        while (true) {
                            BufferedReader br = new BufferedReader(new InputStreamReader(System.in));
                            String message;
                            try {
                                message = br.readLine();
                                Message msg = new Message((byte)0xCA, message.length(), message);
                                future.channel().writeAndFlush(msg);
                            } catch (IOException e) {
                                e.printStackTrace();
                            }finally {
                                try {
                                    br.close();
                                } catch (IOException e) {
                                    e.printStackTrace();
                                }
                            }
                        }
                        
                        
                    }
                }).start();
                
                
                future.channel().closeFuture().sync();
            } catch (Exception e) {
                e.printStackTrace();
            }finally {
                group.shutdownGracefully();
            }
        }
    
        public static void main(String[] args) {
            NewClient client = new NewClient(7788,"127.0.0.1");
            client.start();
        }
    }
    public class NewClientChannelInitializer extends ChannelInitializer<SocketChannel> {
    
        protected void initChannel(SocketChannel socketChannel) throws Exception {
            ChannelPipeline pipeline = socketChannel.pipeline();
    
            pipeline.addLast(new NewEncoder());
            pipeline.addLast(new NewClientHandler());
        }
    }
    public class NewClientHandler extends ChannelInboundHandlerAdapter {
    
        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
            String m = "你好啊,Netty。昂昂";
            Message msg = new Message((byte)0xCA, m.length(), m);
            ctx.writeAndFlush(msg);
        }
    }
    public class NewEncoder extends MessageToByteEncoder<Message> {
    
        @Override
        protected void encode(ChannelHandlerContext channelHandlerContext, Message message, ByteBuf byteBuf) throws Exception {
            if(message == null){
                throw new Exception("未获得消息内容");
            }
    
    
            String msgBody = message.getMsgBody();
            byte[] b = msgBody.getBytes(Charset.forName("utf-8"));
            byteBuf.writeByte(message.getType());
            byteBuf.writeByte(b.length);
            byteBuf.writeBytes(b);
    
    
        }
    }

    二 服务端

    public class NewServer {
        private static final int MAX_FRAME_LENGTH = 1024 * 1024;
        private static final int LENGTH_FIELD_LENGTH = 4;
        private static final int LENGTH_FIELD_OFFSET = 1;
        private static final int LENGTH_ADJUSTMENT = 0;
        private static final int INITIAL_BYTES_TO_STRIP = 0;
    
        private int port;
    
        public NewServer(int port) {
            this.port = port;
        }
    
        public void start(){
            EventLoopGroup bossGroup = new NioEventLoopGroup(1);
            EventLoopGroup workerGroup = new NioEventLoopGroup();
            try {
                ServerBootstrap sbs = new ServerBootstrap()
                        .group(bossGroup,workerGroup)
                        .channel(NioServerSocketChannel.class)
                        .localAddress(new InetSocketAddress(port))
                        .childHandler(new NewServerChannelInitializer(MAX_FRAME_LENGTH,LENGTH_FIELD_LENGTH,LENGTH_FIELD_OFFSET,LENGTH_ADJUSTMENT,INITIAL_BYTES_TO_STRIP))
                        .option(ChannelOption.SO_BACKLOG, 128)
                        .childOption(ChannelOption.SO_KEEPALIVE, true);
    
                ChannelFuture future = sbs.bind(port).sync();
    
                System.out.println("Server start listen at " + port );
                future.channel().closeFuture().sync();
            } catch (Exception e) {
                bossGroup.shutdownGracefully();
                workerGroup.shutdownGracefully();
            }
        }
    
        public static void main(String[] args) {
            NewServer server = new NewServer(7788);
            server.start();
        }
    
    }

      

    public class NewServerChannelInitializer extends ChannelInitializer<SocketChannel> {
        private  final int MAX_FRAME_LENGTH;
        private  final int LENGTH_FIELD_LENGTH;
        private  final int LENGTH_FIELD_OFFSET;
        private  final int LENGTH_ADJUSTMENT;
        private  final int INITIAL_BYTES_TO_STRIP;
    
        public NewServerChannelInitializer(int MAX_FRAME_LENGTH, int LENGTH_FIELD_LENGTH, int LENGTH_FIELD_OFFSET, int LENGTH_ADJUSTMENT, int INITIAL_BYTES_TO_STRIP) {
            this.MAX_FRAME_LENGTH = MAX_FRAME_LENGTH;
            this.LENGTH_FIELD_LENGTH = LENGTH_FIELD_LENGTH;
            this.LENGTH_FIELD_OFFSET = LENGTH_FIELD_OFFSET;
            this.LENGTH_ADJUSTMENT = LENGTH_ADJUSTMENT;
            this.INITIAL_BYTES_TO_STRIP = INITIAL_BYTES_TO_STRIP;
        }
    
        @Override
        protected void initChannel(SocketChannel socketChannel) throws Exception {
            ChannelPipeline pipeline = socketChannel.pipeline();
    
            pipeline.addLast(new NewDecoder(MAX_FRAME_LENGTH,LENGTH_FIELD_LENGTH,LENGTH_FIELD_OFFSET,LENGTH_ADJUSTMENT,INITIAL_BYTES_TO_STRIP,false));
            // 自己的逻辑Handler
            pipeline.addLast("handler", new NewServerHandler());
        }
    
    }
    public class NewDecoder extends LengthFieldBasedFrameDecoder {
    
        /**
         * 我们在Message类中定义了type和length,这都放在消息头部
         * type占1个字节,length占4个字节所以头部总长度是5个字节
         */
        private static final int HEADER_SIZE = 5;
        private byte type;
        private int length;
        private String msgBody;
    
    
        /**
         *
         * @param maxFrameLength   网络字节序,默认为大端字节序
         * @param lengthFieldOffset 消息中长度字段偏移的字节数
         * @param lengthFieldLength 数据帧的最大长度
         * @param lengthAdjustment 该字段加长度字段等于数据帧的长度
         * @param initialBytesToStrip 从数据帧中跳过的字节数
         * @param failFast 如果为true,则表示读取到长度域,TA的值的超过maxFrameLength,就抛出一个 TooLongFrameException
         */
        public NewDecoder(int maxFrameLength, int lengthFieldOffset,
                          int lengthFieldLength, int lengthAdjustment, int initialBytesToStrip,
                          boolean failFast) {
            super(maxFrameLength, lengthFieldOffset, lengthFieldLength,
                    lengthAdjustment, initialBytesToStrip, failFast);
        }
    
        @Override
        protected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
            if(in == null){
                return null;
            }
            if(in.readableBytes() < HEADER_SIZE){
                throw new Exception("错误的消息");
            }
            in = (ByteBuf) super.decode(ctx,in);
            /**
             * 通过源码我们能看到在读的过程中
             * 每读一次读过的字节即被抛弃
             * 即指针会往前跳
             */
            type = in.readByte();
    
            length = in.readByte();
    
    
            if(in.readableBytes() < length){
                throw new Exception("消息不正确");
            }
    
            ByteBuf buf = in.readBytes(length);
            byte[] b = new byte[buf.readableBytes()];
            buf.readBytes(b);
    
            msgBody = new String(b,"UTF-8");
            Message msg = new Message(type,length,msgBody);
            return msg;
        }
    }
    public class NewServerHandler extends SimpleChannelInboundHandler<Object> {
    
        @Override
        protected void channelRead0(ChannelHandlerContext channelHandlerContext, Object o) throws Exception {
            if(o instanceof Message) {
                Message msg = (Message)o;
                System.out.println("Client->Server:"+channelHandlerContext.channel().remoteAddress()+" send "+msg.getMsgBody());
            }
        }
    }

    三  LengthFieldBasedFrameDecoder介绍

      首先要知道它是一个InBoundHandler,也就是用来把字节转成对象的

      他有四个参数

    public LengthFieldBasedFrameDecoder(
                int maxFrameLength,
                int lengthFieldOffset, int lengthFieldLength,
                int lengthAdjustment, int initialBytesToStrip) 

      maxFrameLength 可以设的稍微大点,比如1M

      lengthFieldOffset 在字节中表示长度的偏移

      lengthFieldLength 表示长度的部分占用几个字节

      lengthAdjustment 如果代表长度字段后立刻跟的是content,此值为0

      initialBytesToStrip 跳过的字节,解码后会把原字节数组进行截断

      源代码中就有注释 其实一般我们都是这么设计的,也够了

    * <pre>
    * <b>lengthFieldOffset</b> = <b>2</b> (= the length of Header 1)
    * <b>lengthFieldLength</b> = <b>3</b>
    * lengthAdjustment = 0
    * initialBytesToStrip = 0
    *
    * BEFORE DECODE (17 bytes) AFTER DECODE (17 bytes)
    * +----------+----------+----------------+ +----------+----------+----------------+
    * | Header 1 | Length | Actual Content |----->| Header 1 | Length | Actual Content |
    * | 0xCAFE | 0x00000C | "HELLO, WORLD" | | 0xCAFE | 0x00000C | "HELLO, WORLD" |
    * +----------+----------+----------------+ +----------+----------+----------------+

      稍微复杂点的,代表长度字段后不是content

      

    * lengthFieldOffset   = 0
    * lengthFieldLength = 3
    * <b>lengthAdjustment</b> = <b>2</b> (= the length of Header 1)
    * initialBytesToStrip = 0
    *
    * BEFORE DECODE (17 bytes) AFTER DECODE (17 bytes)
    * +----------+----------+----------------+ +----------+----------+----------------+
    * | Length | Header 1 | Actual Content |----->| Length | Header 1 | Actual Content |
    * | 0x00000C | 0xCAFE | "HELLO, WORLD" | | 0x00000C | 0xCAFE | "HELLO, WORLD" |
    * +----------+----------+----------------+ +----------+----------+----------------+
  • 相关阅读:
    c语言的按位运算符
    数据结构(六)——二叉树 前序、中序、后序、层次遍历及非递归实现 查找、统计个数、比较、求深度的递归实现
    【传感器】BMA253 数字,三轴加速度传感器
    并发编程的几种形式
    php:PHP解析xml的4种方法
    spring boot: 一般注入说明(五) @Component, application event事件为Bean与Bean之间通信提供了支持
    spring boot: 一般注入说明(四) Profile配置,Environment环境配置 @Profile注解
    spring boot: Bean的初始化和销毁 (一般注入说明(三) AnnotationConfigApplicationContext容器 JSR250注解)
    spring boot: EL和资源 (一般注入说明(二) @Service注解 @Component注解)
    spring boot: scope (一般注入说明(一) @Autowired注解)
  • 原文地址:https://www.cnblogs.com/juniorMa/p/14334288.html
Copyright © 2011-2022 走看看