zoukankan      html  css  js  c++  java
  • Pigeon源码分析(三) -- 客户端发送tcp底层源码分析

    经过之前的分析,我们知道,一个请求显示经过层层的责任链,最后才会发出去。而决定发送到消息格式是在责任链中的一环完成的

    InvokerProcessHandlerFactory # init()

    public static void init() {
            if (!isInitialized) {
                if (Constants.MONITOR_ENABLE) {
                    registerBizProcessFilter(new RemoteCallMonitorInvokeFilter());
                }
                registerBizProcessFilter(new TraceFilter());
                registerBizProcessFilter(new FaultInjectionFilter());
                registerBizProcessFilter(new DegradationFilter());
                registerBizProcessFilter(new ClusterInvokeFilter());
                registerBizProcessFilter(new GatewayInvokeFilter());
                registerBizProcessFilter(new ContextPrepareInvokeFilter());
                registerBizProcessFilter(new SecurityFilter());
                registerBizProcessFilter(new RemoteCallInvokeFilter());
                bizInvocationHandler = createInvocationHandler(bizProcessFilters);
                isInitialized = true;
            }
        }

    我们看这个 ClusterInvokeFilter 

    public class ClusterInvokeFilter extends InvocationInvokeFilter {
    
        private static final Logger logger = LoggerLoader.getLogger(ClusterInvokeFilter.class);
    
        public InvocationResponse invoke(ServiceInvocationHandler handler, InvokerContext invocationContext)
                throws Throwable {
            InvokerConfig<?> invokerConfig = invocationContext.getInvokerConfig();
            Cluster cluster = ClusterFactory.selectCluster(invokerConfig.getCluster());
            if (cluster == null) {
                throw new IllegalArgumentException("Unsupported cluster type:" + cluster);
            }
            return cluster.invoke(handler, invocationContext);
        }
    
    }

    这里以常见的 FailfastCluster 为例

    @Override
        public InvocationResponse invoke(ServiceInvocationHandler handler, InvokerContext invocationContext)
                throws Throwable {
            InvokerConfig<?> invokerConfig = invocationContext.getInvokerConfig();
            InvocationRequest request = InvokerUtils.createRemoteCallRequest(invocationContext, invokerConfig);
    
            boolean timeoutRetry = invokerConfig.isTimeoutRetry();
    ......

      InvokerUtils

    public static InvocationRequest createRemoteCallRequest(InvokerContext invokerContext,
                                                                InvokerConfig<?> invokerConfig) {
            InvocationRequest request = invokerContext.getRequest();
            if (request == null) {
                request = SerializerFactory.getSerializer(invokerConfig.getSerialize()).newRequest(invokerContext);
                invokerContext.setRequest(request);
            }
            return request;
        }

      拿到序列化器在决定请求的类型,序列化器主要分为两大类,一个是ThiftSeralizer,一类是非ThiftSeralizer

       ThriftSerializer

     public InvocationRequest newRequest(InvokerContext invokerContext) throws SerializationException {
            return new GenericRequest(invokerContext);
        }

      AbstractSerializer

    public InvocationRequest newRequest(InvokerContext invokerContext) throws SerializationException {
            return InvocationUtils.newRequest(invokerContext);
        }

    最终调用的是  DefaultInvocationBuilder

    public InvocationRequest newRequest(InvokerContext invokerContext) {
            return new DefaultRequest(invokerContext);
        }

    到这里就知道了,请求的类型就两大类,一类是GenericRequest 一类是 DefaultRequest

    二 编码阶段

    我们看客户端的channelHandlers。习惯了netty4的写法刚开始看还真不习惯 呵呵。

    public class NettyClientPipelineFactory implements ChannelPipelineFactory {
    
        private NettyClient client;
    
        private static CodecConfig codecConfig = CodecConfigFactory.createClientConfig();
    
        public NettyClientPipelineFactory(NettyClient client) {
            this.client = client;
        }
    
        public ChannelPipeline getPipeline() throws Exception {
            ChannelPipeline pipeline = pipeline();
            pipeline.addLast("framePrepender", new FramePrepender());
            pipeline.addLast("frameDecoder", new FrameDecoder());
            pipeline.addLast("crc32Handler", new Crc32Handler(codecConfig));
            pipeline.addLast("compressHandler", new CompressHandler(codecConfig));
            pipeline.addLast("invokerDecoder", new InvokerDecoder());
            pipeline.addLast("invokerEncoder", new InvokerEncoder());
            pipeline.addLast("clientHandler", new NettyClientHandler(this.client));
            return pipeline;
        }
    
    }

    主要分析这个类  InvokerEncoder。继续分析最终加密的逻辑在 

    AbstractEncoder # encode

    public Object encode(ChannelHandlerContext ctx, Channel channel, Object msg) throws Exception {
            if (msg instanceof InvocationSerializable) {
    
                InvocationSerializable _msg = (InvocationSerializable) msg;
                try {
    
                    ChannelBuffer frame;
                    CodecEvent codecEvent;
    
                    if (msg instanceof UnifiedInvocation) {//这就是Thrift对应的请求类型
                        frame = _doEncode(channel, (UnifiedInvocation) _msg);
                        codecEvent = new CodecEvent(frame, true);
                    } else {
                        frame = doEncode(channel, _msg);//这是一般类型
                        codecEvent = new CodecEvent(frame, false);
                    }
    protected ChannelBuffer _doEncode(Channel channel, UnifiedInvocation msg)
                throws IOException {
    
            ChannelBufferOutputStream os = new ChannelBufferOutputStream(dynamicBuffer(CodecConstants.ESTIMATED_LENGTH,
                    channel.getConfig().getBufferFactory()));
    
            //magic
            os.write(CodecConstants._MAGIC);//(byte) 0xAB (byte) 0xBA
            os.writeByte(msg.getProtocolVersion());//第三个字节
            //serialize
            byte serialize = SerializerFactory.convertToUnifiedSerialize(msg.getSerialize());
            //serialize
            os.writeByte(serialize);//第4个字节序列化方式
            //totalLength
            os.writeInt(Integer.MAX_VALUE);//5-8字节是消息体长度
    
            serialize(msg.getSerialize(), os, msg, channel);
    
            ChannelBuffer frame = os.buffer();
            //totalLength
            frame.setInt(CodecConstants._HEAD_LENGTH, frame.readableBytes() -
                    CodecConstants._FRONT_LENGTH_);//这里会重新设置5-8字节的长度值

     再分析一下非Thrift类型的消息请求编码

    protected ChannelBuffer doEncode(Channel channel, InvocationSerializable msg)
                throws IOException {
            ChannelBufferOutputStream os = new ChannelBufferOutputStream(dynamicBuffer(CodecConstants.ESTIMATED_LENGTH,
                    channel.getConfig().getBufferFactory()));
            //magic
            os.write(CodecConstants.MAGIC); // 0x39 0x3A
            //serialize
            os.writeByte(msg.getSerialize());//序列化类型 这里注意 消息头部分就3个字节 和上面可是不一样的 
            //bodyLength
            os.writeInt(Integer.MAX_VALUE);//消息体长度
    
            serialize(msg.getSerialize(), os, msg, channel);
            //body
            ChannelBuffer frame = os.buffer();
            //sequence
            frame.writeLong(msg.getSequence());//写完消息体之后写8个字节长度的序列号
            //expand
            frame.writeBytes(CodecConstants.EXPAND);//再写三个扩展字段
         //

    public static final byte EXPAND_FIRST = 0x1D;
    public static final byte EXPAND_SECOND = 0x1E;
    public static final byte EXPAND_THIRD = 0x1F;

    //bodyLength
            frame.setInt(CodecConstants.HEAD_LENGTH, frame.readableBytes() -
                    CodecConstants.FRONT_LENGTH);//再把4-7字节位置上重新 赋值一个int型
            doAfter(msg, frame.readableBytes());
            return frame;
        }
  • 相关阅读:
    isMemberOf与isKindOf的区别
    当你的工程出现了问题,在别的电脑上可以正常运行。你该怎么做。。
    iOS 声明属性关键字的总结
    UISegmentedControl方法与属性的总结
    UILabel与UIFont的用法和属性的一些总结
    UIActivityIndicatorView控件的属性和方法
    第四百六十一天 how can I 坚持
    《Java基础知识》Java集合(Collection)
    《Java基础知识》Java线程的概念
    《Java基础知识》Java断言
  • 原文地址:https://www.cnblogs.com/juniorMa/p/14842724.html
Copyright © 2011-2022 走看看