经过之前的分析,我们知道,一个请求显示经过层层的责任链,最后才会发出去。而决定发送到消息格式是在责任链中的一环完成的
InvokerProcessHandlerFactory # init()
public static void init() { if (!isInitialized) { if (Constants.MONITOR_ENABLE) { registerBizProcessFilter(new RemoteCallMonitorInvokeFilter()); } registerBizProcessFilter(new TraceFilter()); registerBizProcessFilter(new FaultInjectionFilter()); registerBizProcessFilter(new DegradationFilter()); registerBizProcessFilter(new ClusterInvokeFilter()); registerBizProcessFilter(new GatewayInvokeFilter()); registerBizProcessFilter(new ContextPrepareInvokeFilter()); registerBizProcessFilter(new SecurityFilter()); registerBizProcessFilter(new RemoteCallInvokeFilter()); bizInvocationHandler = createInvocationHandler(bizProcessFilters); isInitialized = true; } }
我们看这个 ClusterInvokeFilter
public class ClusterInvokeFilter extends InvocationInvokeFilter { private static final Logger logger = LoggerLoader.getLogger(ClusterInvokeFilter.class); public InvocationResponse invoke(ServiceInvocationHandler handler, InvokerContext invocationContext) throws Throwable { InvokerConfig<?> invokerConfig = invocationContext.getInvokerConfig(); Cluster cluster = ClusterFactory.selectCluster(invokerConfig.getCluster()); if (cluster == null) { throw new IllegalArgumentException("Unsupported cluster type:" + cluster); } return cluster.invoke(handler, invocationContext); } }
这里以常见的 FailfastCluster 为例
@Override public InvocationResponse invoke(ServiceInvocationHandler handler, InvokerContext invocationContext) throws Throwable { InvokerConfig<?> invokerConfig = invocationContext.getInvokerConfig(); InvocationRequest request = InvokerUtils.createRemoteCallRequest(invocationContext, invokerConfig); boolean timeoutRetry = invokerConfig.isTimeoutRetry();
......
InvokerUtils
public static InvocationRequest createRemoteCallRequest(InvokerContext invokerContext, InvokerConfig<?> invokerConfig) { InvocationRequest request = invokerContext.getRequest(); if (request == null) { request = SerializerFactory.getSerializer(invokerConfig.getSerialize()).newRequest(invokerContext); invokerContext.setRequest(request); } return request; }
拿到序列化器在决定请求的类型,序列化器主要分为两大类,一个是ThiftSeralizer,一类是非ThiftSeralizer
ThriftSerializer
public InvocationRequest newRequest(InvokerContext invokerContext) throws SerializationException { return new GenericRequest(invokerContext); }
AbstractSerializer
public InvocationRequest newRequest(InvokerContext invokerContext) throws SerializationException { return InvocationUtils.newRequest(invokerContext); }
最终调用的是 DefaultInvocationBuilder
public InvocationRequest newRequest(InvokerContext invokerContext) { return new DefaultRequest(invokerContext); }
到这里就知道了,请求的类型就两大类,一类是GenericRequest 一类是 DefaultRequest
二 编码阶段
我们看客户端的channelHandlers。习惯了netty4的写法刚开始看还真不习惯 呵呵。
public class NettyClientPipelineFactory implements ChannelPipelineFactory { private NettyClient client; private static CodecConfig codecConfig = CodecConfigFactory.createClientConfig(); public NettyClientPipelineFactory(NettyClient client) { this.client = client; } public ChannelPipeline getPipeline() throws Exception { ChannelPipeline pipeline = pipeline(); pipeline.addLast("framePrepender", new FramePrepender()); pipeline.addLast("frameDecoder", new FrameDecoder()); pipeline.addLast("crc32Handler", new Crc32Handler(codecConfig)); pipeline.addLast("compressHandler", new CompressHandler(codecConfig)); pipeline.addLast("invokerDecoder", new InvokerDecoder()); pipeline.addLast("invokerEncoder", new InvokerEncoder()); pipeline.addLast("clientHandler", new NettyClientHandler(this.client)); return pipeline; } }
主要分析这个类 InvokerEncoder。继续分析最终加密的逻辑在
AbstractEncoder # encode
public Object encode(ChannelHandlerContext ctx, Channel channel, Object msg) throws Exception { if (msg instanceof InvocationSerializable) { InvocationSerializable _msg = (InvocationSerializable) msg; try { ChannelBuffer frame; CodecEvent codecEvent; if (msg instanceof UnifiedInvocation) {//这就是Thrift对应的请求类型 frame = _doEncode(channel, (UnifiedInvocation) _msg); codecEvent = new CodecEvent(frame, true); } else { frame = doEncode(channel, _msg);//这是一般类型 codecEvent = new CodecEvent(frame, false); }
protected ChannelBuffer _doEncode(Channel channel, UnifiedInvocation msg) throws IOException { ChannelBufferOutputStream os = new ChannelBufferOutputStream(dynamicBuffer(CodecConstants.ESTIMATED_LENGTH, channel.getConfig().getBufferFactory())); //magic os.write(CodecConstants._MAGIC);//(byte) 0xAB (byte) 0xBA os.writeByte(msg.getProtocolVersion());//第三个字节 //serialize byte serialize = SerializerFactory.convertToUnifiedSerialize(msg.getSerialize()); //serialize os.writeByte(serialize);//第4个字节序列化方式 //totalLength os.writeInt(Integer.MAX_VALUE);//5-8字节是消息体长度 serialize(msg.getSerialize(), os, msg, channel); ChannelBuffer frame = os.buffer(); //totalLength frame.setInt(CodecConstants._HEAD_LENGTH, frame.readableBytes() - CodecConstants._FRONT_LENGTH_);//这里会重新设置5-8字节的长度值
再分析一下非Thrift类型的消息请求编码
protected ChannelBuffer doEncode(Channel channel, InvocationSerializable msg) throws IOException { ChannelBufferOutputStream os = new ChannelBufferOutputStream(dynamicBuffer(CodecConstants.ESTIMATED_LENGTH, channel.getConfig().getBufferFactory())); //magic os.write(CodecConstants.MAGIC); // 0x39 0x3A //serialize os.writeByte(msg.getSerialize());//序列化类型 这里注意 消息头部分就3个字节 和上面可是不一样的 //bodyLength os.writeInt(Integer.MAX_VALUE);//消息体长度 serialize(msg.getSerialize(), os, msg, channel); //body ChannelBuffer frame = os.buffer(); //sequence frame.writeLong(msg.getSequence());//写完消息体之后写8个字节长度的序列号 //expand frame.writeBytes(CodecConstants.EXPAND);//再写三个扩展字段
//
public static final byte EXPAND_FIRST = 0x1D;
public static final byte EXPAND_SECOND = 0x1E;
public static final byte EXPAND_THIRD = 0x1F;
//bodyLength frame.setInt(CodecConstants.HEAD_LENGTH, frame.readableBytes() - CodecConstants.FRONT_LENGTH);//再把4-7字节位置上重新 赋值一个int型 doAfter(msg, frame.readableBytes()); return frame; }