zoukankan      html  css  js  c++  java
  • 12.4 客户端响应解码

    客户端响应解码整体流程:

     1 NettyCodecAdapter$InternalDecoder.decode(ChannelHandlerContext ctx, ByteBuf input, List<Object> out)
     2 -->new NettyBackedChannelBuffer(ByteBuf buffer) // 创建一个buffer
     3 -->NettyChannel.getOrAddChannel(io.netty.channel.Channel ch, URL url, ChannelHandler handler)
     4 -->DubboCountCodec.decode(Channel channel, ChannelBuffer buffer)
     5   -->ExchangeCodec.decode(Channel channel, ChannelBuffer buffer)
     6     -->buffer.readBytes(header); //读取header byte[]
     7     -->decode(Channel channel, ChannelBuffer buffer, int readable, byte[] header)
     8       -->检查魔数、检查总长度是否大于等于16
     9       -->获取请求体长度
    10       -->new ChannelBufferInputStream(buffer, len)
    11       -->DubboCodec.decodeBody(Channel channel, InputStream is, byte[] header)
    12         -->CodecSupport.getSerialization(URL url, Byte id) //解析出请求头header[2]中的序列化ID,根据该ID获取与请求编码相同的序列化协议
    13         -->Bytes.bytes2long(header, 4) //获取respID
    14         <!-- 之后创建一个新的Response对象,将respID及后续解析出来的各种resp属性塞入该对象中 -->
    15         -->new DecodeableRpcResult(Channel channel, Response response, InputStream is, Invocation invocation, byte id)
    16           -->DecodeableRpcResult.decode()
    17             -->decode(Channel channel, InputStream input) //解析请求体参数并将其构造为一个DecodeableRpcResult,最终塞到Request对象的data属性中
    18               -->new Hessian2ObjectInput(InputStream is)
    19               -->反序列化:in.readObject()

    与 12.2 服务端请求解码 极其相似。

    不同的地方是:

      1 protected Object decodeBody(Channel channel, InputStream is, byte[] header) throws IOException {
      2         byte flag = header[2], proto = (byte) (flag & SERIALIZATION_MASK);
      3         Serialization s = CodecSupport.getSerialization(channel.getUrl(), proto);
      4         // get request id.
      5         long id = Bytes.bytes2long(header, 4);
      6         if ((flag & FLAG_REQUEST) == 0) {// 解码服务端的响应
      7             // decode response.
      8             Response res = new Response(id);
      9             if ((flag & FLAG_EVENT) != 0) {
     10                 res.setEvent(Response.HEARTBEAT_EVENT);
     11             }
     12             // get status.
     13             byte status = header[3];
     14             res.setStatus(status);
     15             if (status == Response.OK) {
     16                 try {
     17                     Object data;
     18                     if (res.isHeartbeat()) {
     19                         data = decodeHeartbeatData(channel, deserialize(s, channel.getUrl(), is));
     20                     } else if (res.isEvent()) {
     21                         data = decodeEventData(channel, deserialize(s, channel.getUrl(), is));
     22                     } else {
     23                         DecodeableRpcResult result;
     24                         if (channel.getUrl().getParameter(
     25                                 Constants.DECODE_IN_IO_THREAD_KEY,
     26                                 Constants.DEFAULT_DECODE_IN_IO_THREAD)) {
     27                             result = new DecodeableRpcResult(channel, res, is,
     28                                     (Invocation) getRequestData(id), proto);
     29                             result.decode();
     30                         } else {
     31                             result = new DecodeableRpcResult(channel, res,
     32                                     new UnsafeByteArrayInputStream(readMessageData(is)),
     33                                     (Invocation) getRequestData(id), proto);
     34                         }
     35                         data = result;
     36                     }
     37                     res.setResult(data);
     38                 } catch (Throwable t) {
     39                     if (log.isWarnEnabled()) {
     40                         log.warn("Decode response failed: " + t.getMessage(), t);
     41                     }
     42                     res.setStatus(Response.CLIENT_ERROR);
     43                     res.setErrorMessage(StringUtils.toString(t));
     44                 }
     45             } else {
     46                 res.setErrorMessage(deserialize(s, channel.getUrl(), is).readUTF());
     47             }
     48             return res;
     49         } else { // 解码客户端的请求
     50             // decode request.
     51             Request req = new Request(id);
     52             req.setVersion("2.0.0");
     53             req.setTwoWay((flag & FLAG_TWOWAY) != 0);
     54             if ((flag & FLAG_EVENT) != 0) {
     55                 req.setEvent(Request.HEARTBEAT_EVENT);
     56             }
     57             try {
     58                 Object data;
     59                 if (req.isHeartbeat()) {
     60                     data = decodeHeartbeatData(channel, deserialize(s, channel.getUrl(), is));
     61                 } else if (req.isEvent()) {
     62                     data = decodeEventData(channel, deserialize(s, channel.getUrl(), is));
     63                 } else {
     64                     DecodeableRpcInvocation inv;
     65                     if (channel.getUrl().getParameter(
     66                             Constants.DECODE_IN_IO_THREAD_KEY,
     67                             Constants.DEFAULT_DECODE_IN_IO_THREAD)) {
     68                         inv = new DecodeableRpcInvocation(channel, req, is, proto);
     69                         inv.decode();
     70                     } else {
     71                         inv = new DecodeableRpcInvocation(channel, req,
     72                                 new UnsafeByteArrayInputStream(readMessageData(is)), proto);
     73                     }
     74                     data = inv;
     75                 }
     76                 req.setData(data);
     77             } catch (Throwable t) {
     78                 if (log.isWarnEnabled()) {
     79                     log.warn("Decode request failed: " + t.getMessage(), t);
     80                 }
     81                 // bad request
     82                 req.setBroken(true);
     83                 req.setData(t);
     84             }
     85             return req;
     86         }
     87     }
     88 
     89     private ObjectInput deserialize(Serialization serialization, URL url, InputStream is)
     90             throws IOException {
     91         return serialization.deserialize(url, is);
     92     }
     93 
     94     private byte[] readMessageData(InputStream is) throws IOException {
     95         if (is.available() > 0) {
     96             byte[] result = new byte[is.available()];
     97             is.read(result);
     98             return result;
     99         }
    100         return new byte[]{};
    101     }

    DecodeableRpcResult:

     1     public Object decode(Channel channel, InputStream input) throws IOException {
     2         ObjectInput in = CodecSupport.getSerialization(channel.getUrl(), serializationType)
     3                 .deserialize(channel.getUrl(), input);
     4 
     5         byte flag = in.readByte();
     6         switch (flag) {
     7             case DubboCodec.RESPONSE_NULL_VALUE:
     8                 break;
     9             case DubboCodec.RESPONSE_VALUE://这个值是响应编码是编码进来的,用来表示响应的返回类型
    10                 try {
    11                     Type[] returnType = RpcUtils.getReturnTypes(invocation);
    12                     setValue(returnType == null || returnType.length == 0 ? in.readObject() :
    13                             (returnType.length == 1 ? in.readObject((Class<?>) returnType[0])
    14                                     : in.readObject((Class<?>) returnType[0], returnType[1])));
    15                 } catch (ClassNotFoundException e) {
    16                     throw new IOException(StringUtils.toString("Read response data failed.", e));
    17                 }
    18                 break;
    19             case DubboCodec.RESPONSE_WITH_EXCEPTION:
    20                 try {
    21                     Object obj = in.readObject();
    22                     if (obj instanceof Throwable == false)
    23                         throw new IOException("Response data error, expect Throwable, but get " + obj);
    24                     setException((Throwable) obj);
    25                 } catch (ClassNotFoundException e) {
    26                     throw new IOException(StringUtils.toString("Read response data failed.", e));
    27                 }
    28                 break;
    29             default:
    30                 throw new IOException("Unknown result flag, expect '0' '1' '2', get " + flag);
    31         }
    32         return this;
    33     }

    setValue:设置DecodeableRpcResult的Object result属性。

    响应解码结束。

  • 相关阅读:
    仿12306客户端
    object-c开发中混合使用或不使用ARC
    Objective-c 的 @property 详解
    iPhone的Push(推送通知)功能原理浅析
    Objective-C内存管理教程和原理剖析3
    IDEA 创建JAVA Maven Web 工程
    Linux CenOS 7 安装Redis
    Linux CenOS 7 安装Tomcat
    Linux CentOS 7 安装wordpress
    Linux CenOS 7 安装JDK
  • 原文地址:https://www.cnblogs.com/java-zhao/p/8196113.html
Copyright © 2011-2022 走看看