  • 12.1 客户端请求编码



     1 NettyCodecAdapter$InternalEncoder.encode(ChannelHandlerContext ctx, Channel ch, Object msg)
     2 -->new NettyBackedChannelBuffer(ByteBuf buffer) // 创建一个buffer
     3 -->NettyChannel.getOrAddChannel(io.netty.channel.Channel ch, URL url, ChannelHandler handler)
     4 -->DubboCountCodec.encode(Channel channel, ChannelBuffer buffer, Object msg)
     5   -->ExchangeCodec.encode(Channel channel, ChannelBuffer buffer, Object msg)
     6       -->encodeRequest(Channel channel, ChannelBuffer buffer, Request req)
     7         -->getSerialization(Channel channel)   //获取Hessian2Serialization序列化实例
     8           -->CodecSupport.getSerialization(URL url)
     9             -->ExtensionLoader.getExtensionLoader(Serialization.class).getExtension(url.getParameter("serialization", "hessian2"))
    10         <!-- 构造一个16字节的byte[16] header -->
    11         -->byte[] header = new byte[16]
    12         -->Bytes.short2bytes(MAGIC, header)  //设置前两个字节为魔数[-38, -69, 0, ..., 0]
    13         <!-- 第三个字节:表示消息是req,序列化协议ID,twoway/event -->
    14         -->header[2] = (byte) (FLAG_REQUEST | serialization.getContentTypeId());
    15          if (req.isTwoWay()) header[2] |= FLAG_TWOWAY;
    16          if (req.isEvent()) header[2] |= FLAG_EVENT;
    17       <!-- 设置第5~12个字节(long是64bit,即8byte):requestID -->
    18       -->Bytes.long2bytes(req.getId(), header, 4);
    19       <!-- 下面序列化请求体数据 -->
    20       -->new Hessian2ObjectOutput(out)
    21       -->DubboCodec.encodeRequestData(Channel channel, ObjectOutput out, Object data)
    22       -->Bytes.int2bytes(len, header, 12); // 设置第13~16个字节(int是32位,4个字节):消息体长度
    23       -->buffer.writeBytes(header); // 将header写入buffer的前16位


    • 创建一个buffer
    • 创建一个16位的byte[16] header,将魔数、请求标志、序列化协议ID、twoway/event标志、requestID、请求体长度写入header
    • 之后序列化请求体,从buffer的第17位向后写入序列化后的请求体字节数组
    • 最后,将header中的内容写入buffer的前16位
    • 最后发送buffer


     1     @Override
     2     protected void doOpen() throws Throwable {
     3         NettyHelper.setNettyLoggerFactory();
     4         final NettyClientHandler nettyClientHandler = new NettyClientHandler(getUrl(), this);
     5         bootstrap = new Bootstrap();
     6         bootstrap.group(nioEventLoopGroup)
     7                 .option(ChannelOption.SO_KEEPALIVE, true)
     8                 .option(ChannelOption.TCP_NODELAY, true)
     9                 .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
    10                 //.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, getTimeout())
    11                 .channel(NioSocketChannel.class);
    13         if (getTimeout() < 3000) {
    14             bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 3000);
    15         } else {
    16             bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, getTimeout());
    17         }
    19         bootstrap.handler(new ChannelInitializer() {
    21             protected void initChannel(Channel ch) throws Exception {
    22                 NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyClient.this);
    23                 ch.pipeline()//.addLast("logging",new LoggingHandler(LogLevel.INFO))//for debug
    24                         .addLast("decoder", adapter.getDecoder())
    25                         .addLast("encoder", adapter.getEncoder())
    26                         .addLast("handler", nettyClientHandler);
    27             }
    28         });
    29     }


     1 final class NettyCodecAdapter {
     2     private final ChannelHandler encoder = new InternalEncoder();
     3     private final ChannelHandler decoder = new InternalDecoder();
     4     private final Codec2 codec;
     5     private final URL url;
     6     private final com.alibaba.dubbo.remoting.ChannelHandler handler;
     8     public NettyCodecAdapter(Codec2 codec, URL url, com.alibaba.dubbo.remoting.ChannelHandler handler) {
     9         this.codec = codec;
    10         this.url = url;
    11         this.handler = handler;
    12     }
    14     public ChannelHandler getEncoder() {
    15         return encoder;
    16     }
    18     public ChannelHandler getDecoder() {
    19         return decoder;
    20     }
    22     private class InternalEncoder extends MessageToByteEncoder {
    23         protected void encode(ChannelHandlerContext ctx, Object msg, ByteBuf out) throws Exception {
    24             com.alibaba.dubbo.remoting.buffer.ChannelBuffer buffer = new NettyBackedChannelBuffer(out);
    25             Channel ch = ctx.channel();
    26             NettyChannel channel = NettyChannel.getOrAddChannel(ch, url, handler);
    27             try {
    28                 codec.encode(channel, buffer, msg);
    29             } finally {
    30                 NettyChannel.removeChannelIfDisconnected(ch);
    31             }
    32         }
    33     }
    35     private class InternalDecoder extends ByteToMessageDecoder {
    36         protected void decode(ChannelHandlerContext ctx, ByteBuf input, List<Object> out) throws Exception {
    37             ChannelBuffer message = new NettyBackedChannelBuffer(input);
    38             NettyChannel channel = NettyChannel.getOrAddChannel(ctx.channel(), url, handler);
    39             Object msg;
    40             int saveReaderIndex;
    42             try {
    43                 // decode object.
    44                 do {
    45                     saveReaderIndex = message.readerIndex();
    46                     try {
    47                         msg = codec.decode(channel, message);
    48                     } catch (IOException e) {
    49                         throw e;
    50                     }
    51                     if (msg == Codec2.DecodeResult.NEED_MORE_INPUT) {
    52                         message.readerIndex(saveReaderIndex);
    53                         break;
    54                     } else {
    55                         //is it possible to go here ?
    56                         if (saveReaderIndex == message.readerIndex()) {
    57                             throw new IOException("Decode without read data.");
    58                         }
    59                         if (msg != null) {
    60                             out.add(msg);
    61                         }
    62                     }
    63                 } while (message.readable());
    64             } finally {
    65                 NettyChannel.removeChannelIfDisconnected(ctx.channel());
    66             }
    67         }
    68     }
    69 }


    1 com.alibaba.dubbo.remoting.buffer.ChannelBuffer buffer = new NettyBackedChannelBuffer(out);


    1 ByteBuf buffer = SimpleLeakAwareByteBuf
    2 -->ByteBuf buf = PooledUnsafeDirectByteBuf


    1     private ByteBuf buffer;
    3     public NettyBackedChannelBuffer(ByteBuf buffer) {
    4         Assert.notNull(buffer, "buffer == null");
    5         this.buffer = buffer;
    6     }


    1 NettyBackedChannelBuffer
    2 -->ByteBuf buffer = SimpleLeakAwareByteBuf
    3   -->ByteBuf buf = PooledUnsafeDirectByteBuf



    1 NettyChannel channel = NettyChannel.getOrAddChannel(ch, url, handler);
     1     private static final ConcurrentMap<Channel, NettyChannel> channelMap = new ConcurrentHashMap<Channel, NettyChannel>();
     2     private final Channel channel;
     4     private NettyChannel(Channel channel, URL url, ChannelHandler handler) {
     5         super(url, handler);
     6         if (channel == null) {
     7             throw new IllegalArgumentException("netty channel == null;");
     8         }
     9         this.channel = channel;
    10     }
    12     static NettyChannel getOrAddChannel(Channel ch, URL url, ChannelHandler handler) {
    13         if (ch == null) {
    14             return null;
    15         }
    16         NettyChannel ret = channelMap.get(ch);
    17         if (ret == null) {
    18             NettyChannel nettyChannel = new NettyChannel(ch, url, handler);
    19             if (ch.isActive()) {
    20                 ret = channelMap.putIfAbsent(ch, nettyChannel);
    21             }
    22             if (ret == null) {
    23                 ret = nettyChannel;
    24             }
    25         }
    26         return ret;
    27     }

    首先从缓存ConcurrentMap<Channel, NettyChannel> channelMap中获取key=io.netty.channel的NettyChannel,有则返回,没有则新建并返回。


    1 -->Channel channel = NioSocketChannel
    2 -->ChannelHandler handler = NettyClient
    3 -->URL url = dubbo://


    1 codec.encode(channel, buffer, msg)


    1 Codec2 codec = 
    2 DubboCountCodec
    3 -->DubboCodec codec = new DubboCodec()


    1     private DubboCodec codec = new DubboCodec();
    3     public void encode(Channel channel, ChannelBuffer buffer, Object msg) throws IOException {
    4         codec.encode(channel, buffer, msg);
    5     }


    • channel:上述的NettyChannel对象
    • buffer:上述的NettyBackedChannelBuffer对象
    • msg:Request对象,其属性如下:
    •  1 long mId = 0
       2 String mVersion = "2.0.0"
       3 boolean mTwoWay = true
       4 boolean mEvent = false
       5 boolean mBroken = false
       6 Object mData = RpcInvocation对象
       7 -->String methodName = "sayHello"
       8 -->Class<?>[] parameterTypes = [java.lang.String]
       9 -->Object[] arguments = ["world"]
      10 -->Map<String, String> attachments = {
      11      "path" -> "com.alibaba.dubbo.demo.DemoService"
      12      "interface" -> "com.alibaba.dubbo.demo.DemoService"
      13      "version" -> "0.0.0"
      14      "timeout" -> "6000000"
      15 }
      16 -->Invoker<?> invoker = DubboInvoker对象

    之后调用DubboCodec.encode(Channel channel, ChannelBuffer buffer, Object msg),该方法位于其父类ExchangeCodec中。

     1     public void encode(Channel channel, ChannelBuffer buffer, Object msg) throws IOException {
     2         if (msg instanceof Request) {
     3             encodeRequest(channel, buffer, (Request) msg);
     4         } else if (msg instanceof Response) {
     5             encodeResponse(channel, buffer, (Response) msg);
     6         } else {
     7             super.encode(channel, buffer, msg);
     8         }
     9     }
    11     protected void encodeRequest(Channel channel, ChannelBuffer buffer, Request req) throws IOException {
    12         Serialization serialization = getSerialization(channel);
    13         // header.
    14         byte[] header = new byte[HEADER_LENGTH];
    15         // set magic number.
    16         Bytes.short2bytes(MAGIC, header);
    18         // set request and serialization flag.
    19         header[2] = (byte) (FLAG_REQUEST | serialization.getContentTypeId());
    21         if (req.isTwoWay()) header[2] |= FLAG_TWOWAY;
    22         if (req.isEvent()) header[2] |= FLAG_EVENT;
    24         // set request id.
    25         Bytes.long2bytes(req.getId(), header, 4);
    27         // encode request data.
    28         int savedWriteIndex = buffer.writerIndex();
    29         buffer.writerIndex(savedWriteIndex + HEADER_LENGTH);//设置writerIndex为0+16,先输入请求体的字节
    30         ChannelBufferOutputStream bos = new ChannelBufferOutputStream(buffer);
    31         ObjectOutput out = serialization.serialize(channel.getUrl(), bos);
    32         if (req.isEvent()) {
    33             encodeEventData(channel, out, req.getData());
    34         } else {
    35             encodeRequestData(channel, out, req.getData());
    36         }
    37         out.flushBuffer();
    38         bos.flush();
    39         bos.close();
    40         int len = bos.writtenBytes();
    41         checkPayload(channel, len);
    42         Bytes.int2bytes(len, header, 12);
    44         // write
    45         buffer.writerIndex(savedWriteIndex);
    46         buffer.writeBytes(header); // write header.
    47         buffer.writerIndex(savedWriteIndex + HEADER_LENGTH + len);
    48     }

    1 首先使用spi获取序列化协议

    1 Serialization serialization = getSerialization(channel);


    1     protected Serialization getSerialization(Channel channel) {
    2         return CodecSupport.getSerialization(channel.getUrl());
    3     }
    1     public static Serialization getSerialization(URL url) {
    2         return ExtensionLoader.getExtensionLoader(Serialization.class).getExtension(
    3                 url.getParameter("serialization", "hessian2"));
    4     }

    最终获取到的Serialization serialization = Hessian2Serialization对象:

     1 public class Hessian2Serialization implements Serialization {
     2     public static final byte ID = 2;
     4     public byte getContentTypeId() {
     5         return ID;
     6     }
     8     public String getContentType() {
     9         return "x-application/hessian2";
    10     }
    12     public ObjectOutput serialize(URL url, OutputStream out) throws IOException {
    13         return new Hessian2ObjectOutput(out);
    14     }
    16     public ObjectInput deserialize(URL url, InputStream is) throws IOException {
    17         return new Hessian2ObjectInput(is);
    18     }
    19 }


    2 创建16字节header字节数组

    1 byte[] header = new byte[16];


     1         // set magic number.
     2         Bytes.short2bytes(MAGIC, header);
     4         // set request and serialization flag.
     5         header[2] = (byte) (FLAG_REQUEST | serialization.getContentTypeId());
     7         if (req.isTwoWay()) header[2] |= FLAG_TWOWAY;
     8         if (req.isEvent()) header[2] |= FLAG_EVENT;
    10         // set request id.
    11         Bytes.long2bytes(req.getId(), header, 4);

    3 序列化请求体


    1         int savedWriteIndex = buffer.writerIndex();
    2         buffer.writerIndex(savedWriteIndex + HEADER_LENGTH);//设置writerIndex为0+16,先输入请求体的字节

    首先存储了buffer当前的writeIndex(可写位置),从该位置开始到“该位置+15”这一段我们会写入header字节数组(例如,[0,15]),从“该位置+16”开始向后写入请求体字节数组(例如,[16, x))。



     1         ChannelBufferOutputStream bos = new ChannelBufferOutputStream(buffer);
     2         ObjectOutput out = serialization.serialize(channel.getUrl(), bos);
     3         if (req.isEvent()) {
     4             encodeEventData(channel, out, req.getData());
     5         } else {
     6             encodeRequestData(channel, out, req.getData());
     7         }
     8         out.flushBuffer();
     9         bos.flush();
    10         bos.close();


     1     private final ChannelBuffer buffer;
     2     private final int startIndex;
     4     public ChannelBufferOutputStream(ChannelBuffer buffer) {
     5         if (buffer == null) {
     6             throw new NullPointerException("buffer");
     7         }
     8         this.buffer = buffer;
     9         startIndex = buffer.writerIndex();
    10     }

    buffer为上述的NettyBackedChannelBuffer对象;startIndex == 16


    1     public ObjectOutput serialize(URL url, OutputStream out) throws IOException {
    2         return new Hessian2ObjectOutput(out);
    3     }
    1     private final Hessian2Output mH2o;
    3     public Hessian2ObjectOutput(OutputStream os) {
    4         mH2o = new Hessian2Output(os);
    5         mH2o.setSerializerFactory(Hessian2SerializerFactory.SERIALIZER_FACTORY);
    6     }
    1     public final static int SIZE = 4096;
    2     private final byte[] _buffer = new byte[SIZE];
    3     protected OutputStream _os;
    5     public Hessian2Output(OutputStream os) {
    6         _os = os;
    7     }


    1 Hessian2ObjectOutput
    2 -->Hessian2Output mH2o
    3    -->byte[] _buffer = new byte[4096]
    4    -->OutputStream _os = 上述的ChannelBufferOutputStream对象
    5    -->SerializerFactory _serializerFactory = Hessian2SerializerFactory实例

    最后执行DubboCodec.encodeRequestData(Channel channel, ObjectOutput out, Object data),该方法是真正的进行请求体序列化的地方。

     1     @Override
     2     protected void encodeRequestData(Channel channel, ObjectOutput out, Object data) throws IOException {
     3         RpcInvocation inv = (RpcInvocation) data;
     5         out.writeUTF(inv.getAttachment(Constants.DUBBO_VERSION_KEY, DUBBO_VERSION));
     6         out.writeUTF(inv.getAttachment(Constants.PATH_KEY));
     7         out.writeUTF(inv.getAttachment(Constants.VERSION_KEY));
     9         out.writeUTF(inv.getMethodName());
    10         out.writeUTF(ReflectUtils.getDesc(inv.getParameterTypes()));
    11         Object[] args = inv.getArguments();
    12         if (args != null)
    13             for (int i = 0; i < args.length; i++) {
    14                 out.writeObject(encodeInvocationArgument(channel, inv, i));
    15             }
    16         out.writeObject(inv.getAttachments());
    17     }


     1 Object mData = RpcInvocation对象
     2 -->String methodName = "sayHello"
     3 -->Class<?>[] parameterTypes = [java.lang.String]
     4 -->Object[] arguments = ["world"]
     5 -->Map<String, String> attachments = {
     6      "path" -> "com.alibaba.dubbo.demo.DemoService"
     7      "interface" -> "com.alibaba.dubbo.demo.DemoService"
     8      "version" -> "0.0.0"
     9      "timeout" -> "6000000"
    10 }
    11 -->Invoker<?> invoker = DubboInvoker对象


    • methodName:方法名
    • parameterTypes:参数类型
    • arguments:参数值
    • attachments:附加参数



    1 Hessian2ObjectOutput.writeUTF(String v)
    2 -->Hessian2Output.writeString(String value)
    3    -->printString(String v, int strOffset, int length) 

    通过这个方法,我们将传入的v存储在ObjectOutput对象的byte[] _buffer = new byte[4096]数组中。

     1     Hessian2Output:
     2     /**
     3      * Writes any object to the output stream.
     4      */
     5     public void writeObject(Object object)
     6             throws IOException {
     7         if (object == null) {
     8             writeNull();
     9             return;
    10         }
    12         Serializer serializer = findSerializerFactory().getSerializer(object.getClass());
    13         serializer.writeObject(object, this);
    14     }
    16     public final SerializerFactory findSerializerFactory() {
    17         SerializerFactory factory = _serializerFactory;
    18         if (factory == null)
    19             _serializerFactory = factory = new SerializerFactory();
    20         return factory;
    21     }
    23     SerializerFactory:
    24     private static HashMap _staticSerializerMap;
    25     private HashMap _cachedSerializerMap;
    26     /**
    27      * Returns the serializer for a class.
    28      * @param cl the class of the object that needs to be serialized.
    29      * @return a serializer object for the serialization.
    30      */
    31     public Serializer getSerializer(Class cl)
    32             throws HessianProtocolException {
    33         Serializer serializer;
    35         serializer = (Serializer) _staticSerializerMap.get(cl);
    36         if (serializer != null)
    37             return serializer;
    39         if (_cachedSerializerMap != null) {
    40             synchronized (_cachedSerializerMap) {
    41                 serializer = (Serializer) _cachedSerializerMap.get(cl);
    42             }
    44             if (serializer != null)
    45                 return serializer;
    46         }
    48         ......
    50         if (serializer != null) {
    52         } 
    53         .......
    54         else if (Map.class.isAssignableFrom(cl)) {
    55             if (_mapSerializer == null)
    56                 _mapSerializer = new MapSerializer();
    58             serializer = _mapSerializer;
    59         } 
    60         ......
    61         if (serializer == null)
    62             serializer = getDefaultSerializer(cl);
    64         if (_cachedSerializerMap == null)
    65             _cachedSerializerMap = new HashMap(8);
    67         synchronized (_cachedSerializerMap) {
    68             _cachedSerializerMap.put(cl, serializer);
    69         }
    71         return serializer;
    72     }

     out.writeObject(Object object):

    首先获取_serializerFactory工厂,这里是Hessian2SerializerFactory实例。其getSerializer(Class cl)方法位于其父类SerializerFactory中:获取序列化器的逻辑是:首先从_staticSerializerMap中获取相关类型的序列化器(_staticSerializerMap中启动时就缓存好一堆类型的序列化器:具体见com.alibaba.com.caucho.hessian.io.SerializerFactory),如果有返回,否则从_cachedSerializerMap缓存中获取相关的类加载器,如果没有,根据类型先创建序列化器(new MapSerializer(),当然还有getDefaultSerializer(cl)来兜底),最后放入缓存_cachedSerializerMap中。最后返回创建好的类加载器。

    最后调用MapSerializer.writeObject(Object obj, AbstractHessianOutput out)进行序列化。

    DubboCodec.encodeRequestData执行完毕之后,我们将所有的信息写入了ObjectOutput对象的byte[] _buffer = new byte[4096]数组中。


    • 如果在将数据写入到_buffer的过程中,字节量超出了4096,会先执行一把Hessian2ObjectOutput.flushBuffer()将_buffer中的数据拷贝到PooledUnsafeDirectByteBuf中,之后再往_buffer中写入字节


     1 Hessian2ObjectOutput
     2     public void flushBuffer() throws IOException {
     3         mH2o.flushBuffer();
     4     }
     6 Hessian2Output
     7     public final void flushBuffer()
     8             throws IOException {
     9         int offset = _offset;
    11         if (!_isStreaming && offset > 0) {
    12             _offset = 0;
    13             _os.write(_buffer, 0, offset);
    14         } else if (_isStreaming && offset > 3) {
    15             int len = offset - 3;
    16             _buffer[0] = 'p';
    17             _buffer[1] = (byte) (len >> 8);
    18             _buffer[2] = (byte) len;
    19             _offset = 3;
    20             _os.write(_buffer, 0, offset);
    21         }
    22     }

    此处执行ChannelBufferOutputStream.write(byte[] b, int off, int len)

    1     @Override
    2     public void write(byte[] b, int off, int len) throws IOException {
    3         if (len == 0) {
    4             return;
    5         }
    6         buffer.writeBytes(b, off, len);
    7     }
     1 ChannelBuffer:
     2     /**
     3      * Transfers the specified source array's data to this buffer starting at
     4      * the current {@code writerIndex} and increases the {@code writerIndex} by
     5      * the number of the transferred bytes (= {@code length}).
     6      *
     7      * @param index  the first index of the source
     8      * @param length the number of bytes to transfer
     9      */
    10     void writeBytes(byte[] src, int index, int length);

    就是将ObjectOutput对象的byte[] _buffer = new byte[4096]数组中的数据转移到buf中。(具体方法见:unsafe.copyMemory(Object srcBase, long srcOffset, Object destBase, long destOffset,long bytes))

    1 NettyBackedChannelBuffer
    2 -->ByteBuf buffer = SimpleLeakAwareByteBuf
    3   -->ByteBuf buf = PooledUnsafeDirectByteBuf

    4 将header写入buffer

    1         int len = bos.writtenBytes();//计算请求体长度
    2         checkPayload(channel, len);
    3         Bytes.int2bytes(len, header, 12);//将请求体长度写入header的第13~16个字节(int=4byte)
    5         // write
    6         buffer.writerIndex(savedWriteIndex);//设置buffer的writerIndex为该次写入的开始位置
    7         buffer.writeBytes(header); // 将header数组写入buffer
    8         buffer.writerIndex(savedWriteIndex + HEADER_LENGTH + len);//设置buffer的writerIndex,为下一次写入做准备


    来看一下请求编码的byte[] header的最终结构:

    • 1~2 byte:魔数
    • 3 byte:requestFlag、序列化方式ID、twowayFlag或eventFlag
    • 5~12 byte :requestID
    • 13~16:请求体长度


     1     protected static void checkPayload(Channel channel, long size) throws IOException {
     2         int payload = Constants.DEFAULT_PAYLOAD;
     3         if (channel != null && channel.getUrl() != null) {
     4             payload = channel.getUrl().getParameter(Constants.PAYLOAD_KEY, Constants.DEFAULT_PAYLOAD);//8M
     5         }
     6         if (payload > 0 && size > payload) {
     7             ExceedPayloadLimitException e = new ExceedPayloadLimitException("Data length too large: " + size + ", max payload: " + payload + ", channel: " + channel);
     8             logger.error(e);
     9             throw e;
    10         }
    11     }


