zoukankan      html  css  js  c++  java
  • Dubbo中编码和解码的解析

    (这里做的解析不是很详细,等到走完整个流程再来解析)Dubbo中编解码的工作由Codec2接口的实现来处理,回想一下第一次接触到Codec2相关的内容是在服务端暴露服务的时候,根据具体的协议去暴露服务的步骤中,在DubboProtocol的createServer方法中:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    private ExchangeServer createServer(URL url) {
    。。。
    //这里url会添加codec=dubbo
    url = url.addParameter(Constants.CODEC_KEY, Version.isCompatibleVersion() ? COMPATIBLE_CODEC_NAME : DubboCodec.NAME);
    ExchangeServer server;
    try {
    server = Exchangers.bind(url, requestHandler);
    }
    。。。
    return server;
    }

    紧接着进入Exchangers.bind(url, requestHandler);

    1
    2
    3
    4
    5
    public static ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
    //如果url中没有codec属性,就会添加codec=exchange
    url = url.addParameterIfAbsent(Constants.CODEC_KEY, "exchange");
    return getExchanger(url).bind(url, handler);
    }

    然后会继续进入HeaderExchanger的bind方法:

    1
    2
    3
    public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
    return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler))));
    }

    在这里会创建一个DecodeHandler实例。继续跟踪Transporters的bind方法,会发现直接返回一个NettyServer实例,在NettyServer的父类AbstractEndpoint构造方法初始的时候,会根据url获取一个ChannelCodec,并将其赋值给codec存放到NettyServer的实例中。

    我们先看下getChannelCodec(url);方法:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    protected static Codec2 getChannelCodec(URL url) {
    //获取codecName,不存在的话,默认为telnet
    String codecName = url.getParameter(Constants.CODEC_KEY, "telnet");
    //先看下是不是Codec2的实现,是的话就根据SPI扩展机制获得Codec2扩展的实现
    //我们这里默认使用的是DubboCountCodec
    if (ExtensionLoader.getExtensionLoader(Codec2.class).hasExtension(codecName)) {
    return ExtensionLoader.getExtensionLoader(Codec2.class).getExtension(codecName);
    } else {
    //如果不是Codec2的实现,就去查找Codec的实现
    //然后使用CodecAdapter适配器类来转换成Codec2
    return new CodecAdapter(ExtensionLoader.getExtensionLoader(Codec.class)
    .getExtension(codecName));
    }
    }

    这里返回的是Codec2,而Codec这个接口已经被标记为过时。到这里的话,在NettyServer中就会存在一个Codec2的实例了。

    在继续往下看到NettyServer中的doOpen()方法,这里是使用Netty的逻辑打开服务并绑定监听服务的地方:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    protected void doOpen() throws Throwable {
    NettyHelper.setNettyLoggerFactory();
    ExecutorService boss = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerBoss", true));
    ExecutorService worker = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerWorker", true));
    ChannelFactory channelFactory = new NioServerSocketChannelFactory(boss, worker, getUrl().getPositiveParameter(Constants.IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS));
    bootstrap = new ServerBootstrap(channelFactory);

    final NettyHandler nettyHandler = new NettyHandler(getUrl(), this);
    channels = nettyHandler.getChannels();
    bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
    public ChannelPipeline getPipeline() {
    //这里的getCodec方法获取到的codec就是在AbstractEndpoint中我们获取到的codec
    //NettyCodecAdapter,适配器类
    NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec() ,getUrl(), NettyServer.this);
    ChannelPipeline pipeline = Channels.pipeline();
    pipeline.addLast("decoder", adapter.getDecoder());//SimpleChannelUpstreamHandler
    pipeline.addLast("encoder", adapter.getEncoder());//OneToOneEncoder
    pipeline.addLast("handler", nettyHandler);
    return pipeline;
    }
    });
    // bind
    channel = bootstrap.bind(getBindAddress());
    }

    这里就在Netty的pipeline中添加了编解码器。这里涉及到Netty的相关流程,可以先了解下Netty3服务端流程简介

    decoder为解码器,是一个SimpleChannelUpstreamHandler,从Socket到Netty中的时候,需要解码,也就是服务提供端接收到消费者的请求的时候,需要解码。

    encoder是编码器,是OneToOneEncoder,这个类实现了ChannelDownstreamHandler,从服务提供端发送给服务消费者的时候,需要编码。

    nettyHandler实现了ChannelUpstreamHandler, ChannelDownstreamHandler两个,上下的时候都需要处理。

    接收到服务消费者的请求的时候,会先执行decoder,然后执行nettyHandler。

    发送给消费者的时候,会先执行nettyHandler,然后执行encoder。

    dubbo协议头

    dubbo协议头示意图

    协议头是16字节的定长数据:

    • 2字节short类型的Magic
    • 1字节的消息标志位

      • 5位序列化id
      • 1位心跳还是正常请求
      • 1位双向还是单向
      • 1位请求还是响应
    • 1字节的状态位

    • 8字节的消息id
    • 4字节数据长度

    编码的过程

    首先会判断是请求还是响应,代码在ExchangeCodec的encode方法:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    public void encode(Channel channel, ChannelBuffer buffer, Object msg) throws IOException {
    if (msg instanceof Request) {//Request类型
    encodeRequest(channel, buffer, (Request) msg);
    } else if (msg instanceof Response) {//Response类型
    encodeResponse(channel, buffer, (Response) msg);
    } else {//telenet类型的
    super.encode(channel, buffer, msg);
    }
    }

    服务提供者对响应信息编码

    在服务提供者端一般是对响应来做编码,所以这里重点看下encodeResponse。

    encodeResponse:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    protected void encodeResponse(Channel channel, ChannelBuffer buffer, Response res) throws IOException {
    try {
    //序列化方式
    //也是根据SPI扩展来获取,url中没指定的话默认使用hessian2
    Serialization serialization = getSerialization(channel);
    //长度为16字节的数组,协议头
    byte[] header = new byte[HEADER_LENGTH];
    //魔数0xdabb
    Bytes.short2bytes(MAGIC, header);
    //序列化方式
    header[2] = serialization.getContentTypeId();
    //心跳消息还是正常消息
    if (res.isHeartbeat()) header[2] |= FLAG_EVENT;
    //响应状态
    byte status = res.getStatus();
    header[3] = status;
    //设置请求id
    Bytes.long2bytes(res.getId(), header, 4);
    //buffer为1024字节的ChannelBuffer
    //获取buffer的写入位置
    int savedWriteIndex = buffer.writerIndex();
    //需要再加上协议头的长度之后,才是正确的写入位置
    buffer.writerIndex(savedWriteIndex + HEADER_LENGTH);
    ChannelBufferOutputStream bos = new ChannelBufferOutputStream(buffer);
    ObjectOutput out = serialization.serialize(channel.getUrl(), bos);
    // 对响应信息或者错误消息进行编码
    if (status == Response.OK) {
    if (res.isHeartbeat()) {
    //心跳
    encodeHeartbeatData(channel, out, res.getResult());
    } else {
    //正常响应
    encodeResponseData(channel, out, res.getResult());
    }
    }
    //错误消息
    else out.writeUTF(res.getErrorMessage());
    out.flushBuffer();
    bos.flush();
    bos.close();
    //写出去的消息的长度
    int len = bos.writtenBytes();
    //查看消息长度是否过长
    checkPayload(channel, len);
    Bytes.int2bytes(len, header, 12);
    //重置写入的位置
    buffer.writerIndex(savedWriteIndex);
    //向buffer中写入消息头
    buffer.writeBytes(header); // write header.
    //buffer写出去的位置从writerIndex开始,加上header长度,加上数据长度
    buffer.writerIndex(savedWriteIndex + HEADER_LENGTH + len);
    } catch (Throwable t) {
    // 发送失败信息给Consumer,否则Consumer只能等超时了
    if (! res.isEvent() && res.getStatus() != Response.BAD_RESPONSE) {
    try {
    // FIXME 在Codec中打印出错日志?在IoHanndler的caught中统一处理?
    logger.warn("Fail to encode response: " + res + ", send bad_response info instead, cause: " + t.getMessage(), t);

    Response r = new Response(res.getId(), res.getVersion());
    r.setStatus(Response.BAD_RESPONSE);
    r.setErrorMessage("Failed to send response: " + res + ", cause: " + StringUtils.toString(t));
    channel.send(r);

    return;
    } catch (RemotingException e) {
    logger.warn("Failed to send bad_response info back: " + res + ", cause: " + e.getMessage(), e);
    }
    }

    // 重新抛出收到的异常
    if (t instanceof IOException) {
    throw (IOException) t;
    } else if (t instanceof RuntimeException) {
    throw (RuntimeException) t;
    } else if (t instanceof Error) {
    throw (Error) t;
    } else {
    throw new RuntimeException(t.getMessage(), t);
    }
    }
    }

    服务消费者对请求信息编码

    消费者端暂先不做解析

    解码的过程

    服务提供者对请求消息的解码

    decode方法一次只会解析一个完整的dubbo协议包,但是每次收到的协议包不一定是完整的,或者有可能是多个协议包。看下代码解析,首先看NettyCodecAdapter的内部类InternalDecoder的messageReceived方法:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    public void messageReceived(ChannelHandlerContext ctx, MessageEvent event) throws Exception {
    Object o = event.getMessage();
    if (! (o instanceof ChannelBuffer)) {
    ctx.sendUpstream(event);
    return;
    }

    ChannelBuffer input = (ChannelBuffer) o;
    int readable = input.readableBytes();
    if (readable <= 0) {
    return;
    }

    com.alibaba.dubbo.remoting.buffer.ChannelBuffer message;
    if (buffer.readable()) {
    if (buffer instanceof DynamicChannelBuffer) {
    buffer.writeBytes(input.toByteBuffer());
    message = buffer;
    } else {
    int size = buffer.readableBytes() + input.readableBytes();
    message = com.alibaba.dubbo.remoting.buffer.ChannelBuffers.dynamicBuffer(
    size > bufferSize ? size : bufferSize);
    message.writeBytes(buffer, buffer.readableBytes());
    message.writeBytes(input.toByteBuffer());
    }
    } else {
    message = com.alibaba.dubbo.remoting.buffer.ChannelBuffers.wrappedBuffer(
    input.toByteBuffer());
    }

    NettyChannel channel = NettyChannel.getOrAddChannel(ctx.getChannel(), url, handler);
    Object msg;
    //读索引
    int saveReaderIndex;
    try {
    do {
    saveReaderIndex = message.readerIndex();
    try {
    //解码
    msg = codec.decode(channel, message);
    } catch (IOException e) {
    buffer = com.alibaba.dubbo.remoting.buffer.ChannelBuffers.EMPTY_BUFFER;
    throw e;
    }
    //不完整的协议包
    if (msg == Codec2.DecodeResult.NEED_MORE_INPUT) {
    //重置读索引
    message.readerIndex(saveReaderIndex);
    //跳出循环,之后在finally中把message赋值给buffer保存起来,等到下次接收到数据包的时候会追加到buffer的后面
    break;
    } else {//有多个协议包,触发messageReceived事件
    if (saveReaderIndex == message.readerIndex()) {
    buffer = com.alibaba.dubbo.remoting.buffer.ChannelBuffers.EMPTY_BUFFER;
    throw new IOException("Decode without read data.");
    }
    if (msg != null) {
    Channels.fireMessageReceived(ctx, msg, event.getRemoteAddress());
    }
    }
    } while (message.readable());
    } finally {
    if (message.readable()) {
    message.discardReadBytes();
    buffer = message;
    } else {
    buffer = com.alibaba.dubbo.remoting.buffer.ChannelBuffers.EMPTY_BUFFER;
    }
    NettyChannel.removeChannelIfDisconnected(ctx.getChannel());
    }
    }

    继续看codec.decode(channel, message);这里是DubboCountCodec的decode方法:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    public Object decode(Channel channel, ChannelBuffer buffer) throws IOException {
    //当前的读索引记录下来
    int save = buffer.readerIndex();
    //多消息
    MultiMessage result = MultiMessage.create();
    do {
    //解码消息
    Object obj = codec.decode(channel, buffer);
    //不是完整的协议包
    if (Codec2.DecodeResult.NEED_MORE_INPUT == obj) {
    buffer.readerIndex(save);
    break;
    } else {//多个协议包
    result.addMessage(obj);
    logMessageLength(obj, buffer.readerIndex() - save);
    save = buffer.readerIndex();
    }
    } while (true);
    if (result.isEmpty()) {
    return Codec2.DecodeResult.NEED_MORE_INPUT;
    }
    if (result.size() == 1) {
    return result.get(0);
    }
    return result;
    }

    继续看ExchangeCodec的decode方法:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    public Object decode(Channel channel, ChannelBuffer buffer) throws IOException {
    //可读字节数
    int readable = buffer.readableBytes();
    byte[] header = new byte[Math.min(readable, HEADER_LENGTH)];
    //协议头
    buffer.readBytes(header);
    //解码
    return decode(channel, buffer, readable, header);
    }

    解码decode:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    protected Object decode(Channel channel, ChannelBuffer buffer, int readable, byte[] header) throws IOException {
    //检查魔数.
    if (readable > 0 && header[0] != MAGIC_HIGH
    || readable > 1 && header[1] != MAGIC_LOW) {
    int length = header.length;
    if (header.length < readable) {
    header = Bytes.copyOf(header, readable);
    buffer.readBytes(header, length, readable - length);
    }
    for (int i = 1; i < header.length - 1; i ++) {
    if (header[i] == MAGIC_HIGH && header[i + 1] == MAGIC_LOW) {
    buffer.readerIndex(buffer.readerIndex() - header.length + i);
    header = Bytes.copyOf(header, i);
    break;
    }
    }
    //telenet
    return super.decode(channel, buffer, readable, header);
    }
    //不完整的包
    if (readable < HEADER_LENGTH) {
    return DecodeResult.NEED_MORE_INPUT;
    }

    //数据长度
    int len = Bytes.bytes2int(header, 12);
    checkPayload(channel, len);

    int tt = len + HEADER_LENGTH;
    if( readable < tt ) {
    return DecodeResult.NEED_MORE_INPUT;
    }

    // limit input stream.
    ChannelBufferInputStream is = new ChannelBufferInputStream(buffer, len);

    try {
    //解码数据
    return decodeBody(channel, is, header);
    } finally {
    if (is.available() > 0) {
    try {
    StreamUtils.skipUnusedStream(is);
    } catch (IOException e) { }
    }
    }
    }

    decodeBody解析数据部分:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    protected Object decodeBody(Channel channel, InputStream is, byte[] header) throws IOException {
    byte flag = header[2], proto = (byte) (flag & SERIALIZATION_MASK);
    //获取序列化方式
    Serialization s = CodecSupport.getSerialization(channel.getUrl(), proto);
    //反序列化
    ObjectInput in = s.deserialize(channel.getUrl(), is);
    //获取请求id
    long id = Bytes.bytes2long(header, 4);
    //这里是解码响应数据
    if ((flag & FLAG_REQUEST) == 0) {
    //response的id设为来时候的Request的id,这样才能对上暗号
    Response res = new Response(id);
    //判断是什么类型请求
    if ((flag & FLAG_EVENT) != 0) {
    res.setEvent(Response.HEARTBEAT_EVENT);
    }
    //获取状态
    byte status = header[3];
    res.setStatus(status);
    if (status == Response.OK) {
    try {
    Object data;
    if (res.isHeartbeat()) {
    //解码心跳数据
    data = decodeHeartbeatData(channel, in);
    } else if (res.isEvent()) {
    //事件
    data = decodeEventData(channel, in);
    } else {
    //响应
    data = decodeResponseData(channel, in, getRequestData(id));
    }
    res.setResult(data);
    } catch (Throwable t) {
    res.setStatus(Response.CLIENT_ERROR);
    res.setErrorMessage(StringUtils.toString(t));
    }
    } else {
    res.setErrorMessage(in.readUTF());
    }
    return res;
    } else {//这是解码请求数据
    // request的id
    Request req = new Request(id);
    req.setVersion("2.0.0");
    req.setTwoWay((flag & FLAG_TWOWAY) != 0);
    if ((flag & FLAG_EVENT) != 0) {
    req.setEvent(Request.HEARTBEAT_EVENT);
    }
    try {
    Object data;
    if (req.isHeartbeat()) {
    //心跳
    data = decodeHeartbeatData(channel, in);
    } else if (req.isEvent()) {
    //事件
    data = decodeEventData(channel, in);
    } else {
    //请求
    data = decodeRequestData(channel, in);
    }
    req.setData(data);
    } catch (Throwable t) {
    // bad request
    req.setBroken(true);
    req.setData(t);
    }
    return req;
    }
    }

    具体的解码细节交给底层解码器,这里是使用的hessian2。

    服务消费者对响应消息的解码

    暂先不做解释。


  • 相关阅读:
    MVC和MTV模式
    Do a web framework ourselves
    什么是web框架?
    12.1.2 实战演练——编写一个适用于Android系统的网页
    10.4 实战演练
    Genymotion
    Reactive Native开发环境搭建
    第10章 使用WebKit浏览网页数据
    第7章 Android中访问网络资源
    3.4 存储简单数据的利器——Preferences
  • 原文地址:https://www.cnblogs.com/zhangboyu/p/7452406.html
Copyright © 2011-2022 走看看