zoukankan      html  css  js  c++  java
  • Netty(4-1)factorial~总结

    本节大纲:

    1、Handler的执行顺序
    2、自定义二进制协议(每条完整数据的组成),从而解决拆包和粘包。
    3、通过为每个channel创建新的handler,从而解决即使handler中使用全局变量,也可以避免竞态条件。

    1、Handler的执行顺序。

    client中pipeline顺序:
    //first,add codec
    pipeline.addLast(new BigIntegerDecoder());
    pipeline.addLast(new NumberEncoder());
    //then,add business logic
    pipeline.addLast(new FactorialClientHandler());
    
    server中pipeline顺序:
    //first,codec pipeline.addLast(new BigIntegerDecoder()); pipeline.addLast(new NumberEncoder()); //then,business logic pipeline.addLast(new FactorialServerHandler());

    总结:
    写(outbound):自下而上,跳过inbound
    读(inbound): 自上而下,跳过outbound
    Codec放在上边,业务逻辑handler放在下边。

    2、自定义二进制协议(每条完整数据的组成),从而解决拆包和粘包。

    每条完整数据的组成:'F'+4个字节的长度+数据
    
    将传进来的number编码为二进制,在其前边加上'F'和4个字节的长度,作为前缀。
    例如:42被编码为:'F',0,0,0,1,42 

    客户端:

    public class FactorialClientHandler extends SimpleChannelInboundHandler<BigInteger> {
        
        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
            StringBuilder sb = new StringBuilder();
            for (int j = 0; j < 1024; j++) {
                sb.append(j);
            }
            BigInteger bigInt = new BigInteger(sb.toString());
            
            ChannelFuture future = ctx.writeAndFlush(bigInt);//只发送1次
            log.info("send:{}", bigInt);
            log.info("send字节数:{}", bigInt.toByteArray().length);
            future.addListener(new ChannelFutureListener() {
                @Override
                public void operationComplete(ChannelFuture future) throws Exception {
                    if (future.isSuccess()) {
                        log.info("发送成功");
                    }
                }
            });
        }
    /**
     * <pre>
     * 自定义二进制协议:F+4字节长度+具体数值
     * 例如:'F',0,0,0,1,42 解码为new BigInteger("42")
     * </pre>
     */
    @Slf4j
    public class BigIntegerDecoder extends ByteToMessageDecoder {
        private int splitCount = 0;
        @Override
        protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
            log.info(">>>>>splitCount:{},可读字节数:{}",++splitCount,in.readableBytes());
            //wait until the length prefix is available
            if (in.readableBytes() < 5) {
                return ;
            }
            in.markReaderIndex();
            int magicNumber = in.readUnsignedByte();
            if (magicNumber !='F') {
                throw new CorruptedFrameException("Invalid:"+magicNumber);
            }
            //wait until the whole data is available
            int dataLength = in.readInt();
            if (in.readableBytes() < dataLength) {
                in.resetReaderIndex();
                return ;
            }
            //convert the received data into a new BigInteger
            byte [] decoded = new byte[dataLength];
            in.readBytes(decoded);
            out.add(new BigInteger(decoded));
        }
    }

    客户端发送了1240个字节的BigInteger,服务端接收:

    18:15:13.121 [nioEventLoopGroup-3-1] >>>>>splitCount:1,可读字节数:1024
    18:15:13.126 [nioEventLoopGroup-3-1] >>>>>splitCount:2,可读字节数:1245

    虽然客户端只发送了1次,但服务端分2次接收在BigIntegerDecoder中都接收完后,才调用FactorialServerHandler的channelRead0方法

    注意,

    in.markReaderIndex();
    。。。
    if (in.readableBytes() < dataLength) {
        in.resetReaderIndex();
        return ;
    }

    以上的流程:

    当第一次时,in.readableBytes()=1024,而dataLength=1245,所以进入该方法,将readerIndex复位到之前mark处,此例为0。舍弃该部分包数据。

    当第二次时,in.readableBytes()=1245(说明,从0开始读的),读取到了完整的报文。

    如果去掉以上代码,则会报错:

    18:09:51.465 [nioEventLoopGroup-3-1] >>>>>splitCount:1,可读字节数:1024
    io.netty.handler.codec.DecoderException: java.lang.IndexOutOfBoundsException: readerIndex(5) + length(1240) exceeds writerIndex(1024): PooledUnsafeDirectByteBuf(ridx: 5, widx: 1024, cap: 1024)

    当然,服务端处理完并把原文发给客户端后,客户端也是分2次读取的:

    18:15:13.119 [nioEventLoopGroup-2-1] send字节数:1240
    18:15:13.153 [nioEventLoopGroup-2-1] >>>>>splitCount:1,可读字节数:1024
    18:15:13.154 [nioEventLoopGroup-2-1] >>>>>splitCount:2,可读字节数:1245

    3、通过为每个channel创建新的handler,从而解决即使handler中使用全局变量,也可以避免竞态条件

    并发发送数据包,且每个数据包超过1024个字节,如下代码中的成员变量:

    public class FactorialServerHandler extends SimpleChannelInboundHandler<BigInteger> {
        private BigInteger lastMultiplier = new BigInteger("1");
        private BigInteger factorial = new BigInteger("1");
        
        @Override
        protected void channelRead0(ChannelHandlerContext ctx, BigInteger msg) throws Exception {
            //计算阶乘并发送到客户端
            lastMultiplier = msg;
            factorial = factorial.multiply(msg);
            ctx.writeAndFlush(factorial);
        }
    。。。

    客户端调用:

    public static void main(String[] args) throws Exception {
            Thread t1 = new Thread(new Runnable() {
                @Override
                public void run() {
                    executor(2,Thread.currentThread().getName());// 2*3*4*5=120
                }
            });
            t1.start();
            Thread t2 = new Thread(new Runnable() {
                @Override
                public void run() {
                    executor(3,Thread.currentThread().getName());// 3*4*5=60
                }
            });
            t2.start();
            Thread t3 = new Thread(new Runnable() {
                @Override
                public void run() {
                    executor(4,Thread.currentThread().getName());// 4*5=20
                }
            });
            t3.start();
        }
    public static void executor(int next,String threadName) {
            EventLoopGroup group = new NioEventLoopGroup();
            try {
                Bootstrap b = new Bootstrap();
                b.group(group).channel(NioSocketChannel.class).handler(new FactorialClientInitializer(next));
                // make a new connection
                ChannelFuture f = b.connect(HOST, PORT).sync();
                // get the handler instance to retrieve the answer.
                FactorialClientHandler handler = (FactorialClientHandler) f.channel().pipeline().last();
                // print out the answer
                log.info("threadName:{},开始:{},结束:{},结果:{}", threadName,next,COUNT, handler.getFactorial());
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                group.shutdownGracefully();
            }
        }

    结果:不乱,各自打印各自的。因为,每发送1条数据则创建1个Channel和handler ,所以不会乱。

    4、codec是netty封装好了的handler,简化代码开发。

    本例中涉及的是:

    ByteToMessageDecoder(inbound):必须实现decode方法

    MessageToByteEncoder<Number>(outbound):必须实现encode方法

    最后,

    以上的3参考代码:userguide-04-factorial。1、2参考代码:userguide-04-2-factorial

  • 相关阅读:
    Graphics总结
    自动布局
    引导页总结
    日常记录未分类
    .net core linux部署方案宝典大全
    .net core linux部署方案宝典大全
    .net core linux部署方案宝典大全
    .net core linux部署方案宝典大全
    .net core linux部署方案宝典大全
    .net core linux部署方案宝典大全
  • 原文地址:https://www.cnblogs.com/yaoyuan2/p/9656777.html
Copyright © 2011-2022 走看看