zoukankan      html  css  js  c++  java
  • Dubbo系列之 (七)网络层那些事(2)

    辅助链接

    Dubbo系列之 (一)SPI扩展

    Dubbo系列之 (二)Registry注册中心-注册(1)

    Dubbo系列之 (三)Registry注册中心-注册(2)

    Dubbo系列之 (四)服务订阅(1)

    Dubbo系列之 (五)服务订阅(2)

    Dubbo系列之 (六)服务订阅(3)

    Dubbo系列之 (七)链路层那些事(1)

    Dubbo系列之 (七)链路层那些事(2)

    让我们以自己编写的TCP的思想,来看dubbo的网络层。

    1、网络层结构图

    Netty,让我们的编写TCP变的非常简单,并且它在业界运用极其广泛。Dubbo底层默认实现也是通过Netty。
    org.apache.dubbo.remoting.transport.netty4.NettyServer就是其默认实现方式。它的类关系如下:

    从上面的类图,我们可以知道其设计的理念。

    1. 网络通信,基本都是点对点通信,每个通信端可以称为一个终端,记名为Endpont。它具有获取本地地址getLocalAddress(),发送消息send(),关闭服务close(),并且可以获取得到处理消息的句柄getChannelHandler() 等基本功能。
      2)一般进行网络通信,都认为是一个远程服务器,它具有很多个客户端与其通信,所以持有获取所有通信通道的getChannels()方法,判断双端是否可以通信isBound()方法等。并且它实现Resetable,IdleSensible 接口,说明远程服务端可以有重置功能,也可以处于空闲。它必定是一个通信终端,所以继承Endpoint,记名为RemotingServer。
    2. 从网络通信的双端的角度来讲,客户端和服务端是一对对等端,记名为AbstractPeer,它具有通道信息处理能力,所以实现ChannelHandler。ChannelHandler是用来处理通信链路上的消息处理器。
      4)为了让程序更具有通用性,抽取为AbstractServer,子类实现其doOpen()和doClose()方法。
      5)NettyServer 就是其网络通信客户端的一个具体的实现。
      6)相应的客户端的示意图如下:

    2、NettyServer 内部持有对象

    1)Map<String, Channel> channels 持有与该服务端通信的TCP通信Channel,key为host:port。它是NettyServerHandler内部channels的引用,NettyServerHandler就是Netty处理消息的句柄,由我们自己编写。
    2)io.netty.channel.Channel channel netty监听网络连接的channel,由bind()方法返回
    3)内部监听线程为1个,线程名为NettyServerBoss。
    4)内部IO线程为可用处理器+1,最多为32个线程,线程名为NettyServerWorker。
    5)NettyServerHandler 为dubbo的消息处理器,编解码器对消息编解码后,就是扔给它处理。它内部持有一个ChannelHandler,即是NettyServer这个类,从类图上可知NettyServer实现了ChannelHandler 这个接口。
    6、NettyServer 持有一个外部传进来的ChannelHandler。并会对其进行封装,截图如下(截图比较清晰):

    6)NettyCodecAdapter,为dubbo内部的编解码器,内部持有编码器InternalEncoder,解码器为InternalDecoder。
    7)Dubbo 的业务线程名为DubboServerHandler。之后再日志上看到这个名字不要在陌生了。

    3、直接new NettyServer?

    答案,肯定不是,直接在框架上new,之后肯定没有扩展性可言。没事情,我们包裹一层即可,命名为传输层Transporter,它就有bind,connect功能。
    NettyTransporter就是其具体的实现之一。

    public class NettyTransporter implements Transporter {
    
        public static final String NAME = "netty";
    
        @Override
        public RemotingServer bind(URL url, ChannelHandler handler) throws RemotingException {
            return new NettyServer(url, handler);
        }
    
        @Override
        public Client connect(URL url, ChannelHandler handler) throws RemotingException {
            return new NettyClient(url, handler);
        }
    }
    

    4、Transporter 是一个SPI扩展,它的工具类Transporters

    既然是一个SPI扩展,必然需要通过ExtensionLoader.getExtensionLoader()进行加载,Transporters其工具类的静态方法getTransporter()就是通过ExtensionLoader进行加载的,然后其静态方法Transporters.bind()来获取RemotingServer,即NettyServer。

    public static RemotingServer bind(URL url, ChannelHandler... handlers) throws RemotingException {
            if (url == null) {
                throw new IllegalArgumentException("url == null");
            }
            if (handlers == null || handlers.length == 0) {
                throw new IllegalArgumentException("handlers == null");
            }
            ChannelHandler handler;
            if (handlers.length == 1) {
                handler = handlers[0];
            } else {
                handler = new ChannelHandlerDispatcher(handlers);
            }
            return getTransporter().bind(url, handler);
        }
    

    值得注意的是,可以为其添加多个ChannelHandler,如果为多个,则对其封装为ChannelHandlerDispatcher,然后传递给NettyServer。

    5、Transporters#bind()传入的ChannelHandler到底是谁?

    先引入一个名词Exchanger,我称为信息交换回执器,它其实是对Transporter的封装。Exchanger 同样具有bind()和connect()方法。
    Exchanger的bind()和connect()接收一个ExchangeHandler的消息处理句柄,ExchangeHandler 也是一个ChannelHandler,也是用来处理通道消息的,但它对其进行的增强,有一个回执的方法reply()。即接收一个消息后,可以进行回执,通过reply()。

    6、Exchanger 是一个SPI扩展,它的工具类Exchangers

    和Transporters一样,工具类Exchangers同样的方式通过ExtensionLoader.getExtensionLoader()来获取特定的Exchanger,默认为HeaderExchanger。HeaderExchanger#bind()和connect()。

    HeaderExchanger是默认的信息交换回执器。可以看到HeaderExchangeServer 和HeaderExchangeClient分别接收RemoteServer 和Client。并且通过Transporters调用传入。ExchangeHandler会被包裹成HeaderExchangeHandler,接着在被包裹为DecodeHandler。
    从这里我们就可以解答第五个问题,Transporters#bind()传入的ChannelHandler 其实是DecodeHandler。为啥这里需要DecodeHandler,其实是服务端或者客户端在收到对等端的消息字节数据后,首先解析的是头部信息,body信息是没有在netty的编解码中进行解析的,是到了真正处理消息的时候,通过Decodehandler#received()内部进行解码的。

    7、Exchangers的bind()和connect()重载方法太多?

    实际上,框架真正用到的是如下:

    其他最终都是会调到这个方法,有时间的同学可以自己调用下其他的重载方法。

    8、Exchangers#bind()和connect()的ExchangeHandler 最终是什么?

    这个问题请查看DubboProtocol的成员变量ExchangeHandler。接下来,我们来分析这个ExchangeHandler 到底做了什么。
    DubboProtocol内部new 了一个ExchangeHandlerAdapter 对象,也就是ExchangeHandler。该handler主要处理已经Invocation类型的消息。
    首先,看下received()方法。

     @Override
            public void received(Channel channel, Object message) throws RemotingException {
                if (message instanceof Invocation) {
                    reply((ExchangeChannel) channel, message);
    
                } else {
                    super.received(channel, message);
                }
            }
    

    如果消息的类型是Invocation,那么调用reply方法进行消息应答,如果不是调用超类,也就是
    ExchangeHandlerAdapter的received方法,该方法是空的,所以即会丢弃该消息内容。
    那我们来看下,relpy方法主要干了什么?注释如下代码块。

     public CompletableFuture<Object> reply(ExchangeChannel channel, Object message) throws RemotingException {
    
                /**
                 *
                 * 如果消息类型不是Invocation,那么会抛出异常,一般情况下不会,在received方法上已经进行的消息类型判断。
                 */
                if (!(message instanceof Invocation)) {
                    throw new RemotingException(channel, "Unsupported request: "
                            + (message == null ? null : (message.getClass().getName() + ": " + message))
                            + ", channel: consumer: " + channel.getRemoteAddress() + " --> provider: " + channel.getLocalAddress());
                }
    
                Invocation inv = (Invocation) message;
    
                //这里得到的就是服务Invoker根据inv。
                Invoker<?> invoker = getInvoker(channel, inv);
                // need to consider backward-compatibility if it's a callback
                // 看是不是dubbo 回调,之后看下回调内容
                if (Boolean.TRUE.toString().equals(inv.getObjectAttachments().get(IS_CALLBACK_SERVICE_INVOKE))) {
                    String methodsStr = invoker.getUrl().getParameters().get("methods");
                    boolean hasMethod = false;
                    if (methodsStr == null || !methodsStr.contains(",")) {
                        hasMethod = inv.getMethodName().equals(methodsStr);
                    } else {
                        String[] methods = methodsStr.split(",");
                        for (String method : methods) {
                            if (inv.getMethodName().equals(method)) {
                                hasMethod = true;
                                break;
                            }
                        }
                    }
                    if (!hasMethod) {
                        logger.warn(new IllegalStateException("The methodName " + inv.getMethodName()
                                + " not found in callback service interface ,invoke will be ignored."
                                + " please update the api interface. url is:"
                                + invoker.getUrl()) + " ,invocation is :" + inv);
                        return null;
                    }
                }
                RpcContext.getContext().setRemoteAddress(channel.getRemoteAddress());
                //调用,这里就是服务调用,这里会牵扯到dubbo filter 相关内容,在服务调用时具体坚决
                Result result = invoker.invoke(inv);
                return result.thenApply(Function.identity());
            }
    

    从上面的可知道,reply主要就是进行服务的调用,核心语句就是 invoker.invoke(inv)。
    那么是如何找到这个服务提供者的服务呢。来看下getInvoker()。

    Invoker<?> getInvoker(Channel channel, Invocation inv) throws RemotingException {
            boolean isCallBackServiceInvoke = false;
            boolean isStubServiceInvoke = false;
            int port = channel.getLocalAddress().getPort();
            String path = (String) inv.getObjectAttachments().get(PATH_KEY);
    
            // if it's callback service on client side
            //如果该调用是在客服端,可能会有配置Stub类,通过isStubServiceInvoke标注
            isStubServiceInvoke = Boolean.TRUE.toString().equals(inv.getObjectAttachments().get(STUB_EVENT_KEY));
            if (isStubServiceInvoke) {
                port = channel.getRemoteAddress().getPort();
            }
    
            //callback
            // 查看是否是本地回调
            isCallBackServiceInvoke = isClientSide(channel) && !isStubServiceInvoke;
            if (isCallBackServiceInvoke) {
                path += "." + inv.getObjectAttachments().get(CALLBACK_SERVICE_KEY);
                inv.getObjectAttachments().put(IS_CALLBACK_SERVICE_INVOKE, Boolean.TRUE.toString());
            }
            
            // 构建serviceKey,通过端口port,路径(一般是接口的全限定名)path,版本号version,分组group
            String serviceKey = serviceKey(
                    port,
                    path,
                    (String) inv.getObjectAttachments().get(VERSION_KEY),
                    (String) inv.getObjectAttachments().get(GROUP_KEY)
            );
            
            //所有的服务导出都会放在exporterMap对象里,然后根据key获取得到DubboExporter
            DubboExporter<?> exporter = (DubboExporter<?>) exporterMap.get(serviceKey);
    
            if (exporter == null) {
                throw new RemotingException(channel, "Not found exported service: " + serviceKey + " in " + exporterMap.keySet() + ", may be version or group mismatch " +
                        ", channel: consumer: " + channel.getRemoteAddress() + " --> provider: " + channel.getLocalAddress() + ", message:" + getInvocationWithoutData(inv));
            }
            // 接着返回Invoker。
            return exporter.getInvoker();
        }
    

    当服务调用方与服务提供方建立连接和断开连接时,代码如下:

    @Override
            public void connected(Channel channel) throws RemotingException {
                invoke(channel, ON_CONNECT_KEY);
            }
    
            @Override
            public void disconnected(Channel channel) throws RemotingException {
                if (logger.isDebugEnabled()) {
                    logger.debug("disconnected from " + channel.getRemoteAddress() + ",url:" + channel.getUrl());
                }
                invoke(channel, ON_DISCONNECT_KEY);
            }
    

    都是进行调用invoke方法。那么invoke方法主要干了啥?

    private void invoke(Channel channel, String methodKey) {
                Invocation invocation = createInvocation(channel, channel.getUrl(), methodKey);
                if (invocation != null) {
                    try {
                        received(channel, invocation);
                    } catch (Throwable t) {
                        logger.warn("Failed to invoke event method " + invocation.getMethodName() + "(), cause: " + t.getMessage(), t);
                    }
                }
            }
    
            /**
             * FIXME channel.getUrl() always binds to a fixed service, and this service is random.
             * we can choose to use a common service to carry onConnect event if there's no easy way to get the specific
             * service this connection is binding to.
             * @param channel
             * @param url
             * @param methodKey
             * @return
             */
            private Invocation createInvocation(Channel channel, URL url, String methodKey) {
                String method = url.getParameter(methodKey);
                if (method == null || method.length() == 0) {
                    return null;
                }
    
                RpcInvocation invocation = new RpcInvocation(method, url.getParameter(INTERFACE_KEY), new Class<?>[0], new Object[0]);
                invocation.setAttachment(PATH_KEY, url.getPath());
                invocation.setAttachment(GROUP_KEY, url.getParameter(GROUP_KEY));
                invocation.setAttachment(INTERFACE_KEY, url.getParameter(INTERFACE_KEY));
                invocation.setAttachment(VERSION_KEY, url.getParameter(VERSION_KEY));
                if (url.getParameter(STUB_EVENT_KEY, false)) {
                    invocation.setAttachment(STUB_EVENT_KEY, Boolean.TRUE.toString());
                }
    
                return invocation;
            }
    

    9、最后

    这一篇文章距离上一次已经很久了,主要是遇到了国庆,自己想放松一下,接下来还会继续努力的分析dubbo的一些内容。

  • 相关阅读:
    Oracle数据导出到MySql
    ORA04031 shared_pool 不能分配足够内存或磁盘碎片
    IDEA那些好用的插件
    MySQL基础篇增删改查
    SpringBoot项目部署在阿里云
    三、Mybatis相应API
    chrome的书签备份
    redis踩坑
    四、Mybatis的Dao层实现
    MySQL基础篇函数
  • 原文地址:https://www.cnblogs.com/liferecord/p/13848668.html
Copyright © 2011-2022 走看看