zoukankan      html  css  js  c++  java
  • Pigeon源码分析(五) -- 服务端netty部分

    服务端netty的channelHandler有这么多

    public class NettyServerPipelineFactory implements ChannelPipelineFactory {
    
        private NettyServer server;
    
        private static CodecConfig codecConfig = CodecConfigFactory.createClientConfig();
    
        public NettyServerPipelineFactory(NettyServer server) {
            this.server = server;
        }
    
        public ChannelPipeline getPipeline() {
            ChannelPipeline pipeline = pipeline();
            pipeline.addLast("framePrepender", new FramePrepender());
            pipeline.addLast("frameDecoder", new FrameDecoder());
            pipeline.addLast("crc32Handler", new Crc32Handler(codecConfig));
            pipeline.addLast("compressHandler", new CompressHandler(codecConfig));
            pipeline.addLast("providerDecoder", new ProviderDecoder());
            pipeline.addLast("providerEncoder", new ProviderEncoder());
            pipeline.addLast("serverHandler", new NettyServerHandler(server));
            return pipeline;
        }
    
    }

    这其中跟业务相关最强的是这三个  FrameDecoder  ProviderDecoder  NettyServerHandler

         //magic
            os.write(CodecConstants.MAGIC); // 0x39 0x3A
            //serialize
            os.writeByte(msg.getSerialize());//序列化类型 这里注意 消息头部分就3个字节 和上面可是不一样的 
            //bodyLength
            os.writeInt(Integer.MAX_VALUE);//消息体长度

    我们先分析 FrameDecoder  FrameDecoder#decode

    protected Object decode(ChannelHandlerContext ctx, Channel channel, ChannelBuffer buffer)
                throws Exception {
    
            Object message = null;
    
            if (buffer.readableBytes() <= 2) {
                return message;
            }
    
            byte[] headMsgs = new byte[2];
    
            buffer.getBytes(buffer.readerIndex(), headMsgs);
    
            if ((0x39 == headMsgs[0] && 0x3A == headMsgs[1])) {
                //old protocol
                message = doDecode(buffer);
    
            } else if ((byte) 0xAB == headMsgs[0] && (byte) 0xBA == headMsgs[1]) {
                //new protocol
                message = _doDecode(buffer);
    
            } else {
                throw new IllegalArgumentException("Decode invalid message head:" +
                        headMsgs[0] + " " + headMsgs[1] + ", " + "message:" + buffer);
            }
    
            return message;
    
        }

    这里分析下 非thrift协议,也就是 

     if ((0x39 == headMsgs[0] && 0x3A == headMsgs[1])) {
                //old protocol
                message = doDecode(buffer);
    protected Object doDecode(ChannelBuffer buffer)
                throws Exception {
    
            CodecEvent codecEvent = null;
    
            if (buffer.readableBytes() <= CodecConstants.FRONT_LENGTH) { HEAD_LENGTH + BODY_FIELD_LENGTH 3 + 4
                return codecEvent;
            }
    
            int totalLength = (int) buffer.getUnsignedInt(
                    buffer.readerIndex() +
                            CodecConstants.HEAD_LENGTH);//从开始位置数3个字节,读出来四个字节,该值就是消息体字节数
    
            int frameLength = totalLength + CodecConstants.FRONT_LENGTH;//消息体长度 + 7个字节的头长度 就是总长度
    
            if (buffer.readableBytes() >= frameLength) {
    
                ChannelBuffer frame = buffer.slice(buffer.readerIndex(), frameLength);
                buffer.readerIndex(buffer.readerIndex() + frameLength);
    
                codecEvent = new CodecEvent(frame, false);
                codecEvent.setReceiveTime(System.currentTimeMillis());//从bytebuffer中切出来 frameLength 字节数,构成一个CodecEvent
            }
    
            return codecEvent;
        }

      经过了 FrameDecoder的解码,现在handler链中的对象就不再是原生的 ByteBuffer了,而是CodecEvent

      接下来分析  ProviderDecoder ProviderDecoder 继承自 AbstractDecoder,其主要的逻辑也是在 AbstractDecoder

      AbstractDecoder # decode

    public Object decode(ChannelHandlerContext ctx, Channel channel, Object msg)
                throws Exception {
    
            if (msg == null || !(msg instanceof CodecEvent)) {
                return null;
            }
    
            CodecEvent codecEvent = (CodecEvent) msg;
    
            if (codecEvent.isValid()) {
    
                Object message = null;
    
                if (codecEvent.isUnified()) {
                    message = _doDecode(ctx, channel, codecEvent);
                    codecEvent.setInvocation((InvocationSerializable) message);
                } else {
                    message = doDecode(ctx, channel, codecEvent);
                    codecEvent.setInvocation((InvocationSerializable) message);
                }
    
            }
    
            return codecEvent;
        }

      还是看非thrift协议的解码

      

    protected Object doDecode(ChannelHandlerContext ctx, Channel channel, CodecEvent codecEvent)
                throws IOException {
            Object msg = null;
            ChannelBuffer buffer = codecEvent.getBuffer();
            //head
            buffer.skipBytes(CodecConstants.MEGIC_FIELD_LENGTH);//跳过前两个字节
            byte serialize = buffer.readByte();//读一个字节的序列化类型
            Long sequence = null;
    
            try {
                //body length
                int totalLength = buffer.readInt();//消息体长度
                int frameLength = totalLength + CodecConstants.FRONT_LENGTH;//帧长度
                //body
                int bodyLength = (totalLength - CodecConstants.TAIL_LENGTH);//消息体里包含了11个字节的尾巴。其中8个字节long类型的序列号,后三个字节固定是 30,29,28
                ChannelBuffer frame = extractFrame(buffer, buffer.readerIndex(), bodyLength);//这是切出来纯的请求体
                buffer.readerIndex(buffer.readerIndex() + bodyLength);
                //tail
                sequence = buffer.readLong();
                buffer.skipBytes(CodecConstants.EXPAND_FIELD_LENGTH);
                //deserialize
                ChannelBufferInputStream is = new ChannelBufferInputStream(frame);
    
                msg = deserialize(serialize, is);//对纯的请求体做反序列化
                //after
                doAfter(channel, msg, serialize, frameLength, codecEvent.getReceiveTime());
            } catch (Throwable e) {
                SerializationException se = new SerializationException(e);
    
                try {
                    if (sequence != null) {
                        doFailResponse(ctx, channel, ProviderUtils.createThrowableResponse(sequence.longValue(),
                                serialize, se));
                    }
    
                    logger.error("Deserialize failed. host:"
                            + ((InetSocketAddress) channel.getRemoteAddress()).getAddress().getHostAddress()
                            + "
    " + e.getMessage(), se);
    
                } catch (Throwable t) {
                    logger.error("[doDecode] doFailResponse failed.", t);
                }
            }
            return msg;
        }

    ProviderDecoder # deserialize

    public Object deserialize(byte serializerType, InputStream is) {
            Object decoded = SerializerFactory.getSerializer(serializerType).deserializeRequest(is);
            return decoded;
        }

    没啥好说的了,就是反序列化了

    注意,此时CodecEvent中已经有了反序列化好的原始请求对象了

    message = doDecode(ctx, channel, codecEvent);
                    codecEvent.setInvocation((InvocationSerializable) message);

    剩下的就是请求到达 NettyServerHandler。这部分逻辑其实在 Pigeon源码分析(四) -- 服务端接收请求过程 - MaXianZhe - 博客园 (cnblogs.com)分析过了

  • 相关阅读:
    中国行业应用软件领域恶性循环的原因是什么?【转载】
    UED之开新窗口
      关于周华健,我觉得有那么几个时期:转
    投影
    undo自动调优介绍
    (原)Oracle事务与Undo段的分配过程
    数据所在的数据块实验
    Oracle 检查点队列与增量检查点
    GC Buffer Busy Waits处理
    如何找出Oracle instance中当前打开游标open cursor的总数?
  • 原文地址:https://www.cnblogs.com/juniorMa/p/14859716.html
Copyright © 2011-2022 走看看