zoukankan      html  css  js  c++  java
  • 服务调用

    强烈建议查看官方文档

    @Autowired
    @Qualifier("serviceImpl")
    private TestService testService;
    

    调用现场

    这里我们可以发现,testService是proxy0对象,也就是服务引用那篇里返回的,

    @Autowired TestService testService:spring会去加载该Bean,调用到ReferenceBean.getObject获取对象

    -->InvokerInvocationHandler.invoke
      -->RpcInvocation	//所有请求都会转为RpcInvocation
      -->MockClusterInvoker.invoke //1.进入集群
        -->result = this.invoker.invoke(invocation);
          -->AbstractClusterInvoker.invoke
          	-->list(invocation)
          	  -->directory.list //2.进入目录查找 从this.methodInvokerMap里面查找一个Invoker
          	  	-->AbstractDirectory.list
          	  	  -->doList(invocation)
          	  	  	-->RegistryDirectory.doList //从this.methodInvokerMap里面查找一个Invoker
          	  	  -->router.route //3.进入路由
          	  	  	-->MockInvokersSelector.route
          	  	  	  -->getNormalInvokers
          	-->ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension("random")
          	-->doInvoke
          	  -->FailoverClusterInvoker.doInvoke
          	  	-->select //4.进入负载均衡
          	  	  -->AbstractClusterInvoker.select
          	  	  	-->doselect  //这里,如果集群中只有一个服务,直接返回
          	  	  	  -->loadbalance.select
          	  	  	  	-->AbstractLoadBalance.select
          	  	  	  	  -->doSelect
          	  	  	  	  	-->RoundRobinLoadBalance.doSelect
          	  	  	  	  	  -->invokers.get(currentSequence % length)//取模轮循
          	  	-->Result result = invoker.invoke(invocation)
          	  	-------------扩展点----------------
          	  	  -->InvokerWrapper.invoke
          	  	  	-->ListenerInvokerWrapper.invoke
          	  	  	  -->ConsumerContextFilter.invoke
          	  	  	    -->ProtocolFilterWrapper.invoke
          	  	  	      -->MonitorFilter.invoke
          	  	  	      	-->ProtocolFilterWrapper.invoke
          	  	  	      	  -->FutureFilter.invoke
          	  	  	      	  	-->ListenerInvokerWrapper.invoke
          	  	  	      	  	  -->AbstractInvoker.invoke  
            					 //将附加消息(attachment)添加到invocation, 将带到服务端去
          	  	  	      	  	  ---------------扩展点---------------
          	  	  	      	  	    -->doInvoke(invocation)
    -------------***------------      -->DubboInvoker.doInvoke 
          	  	  	      	  	      //这里主线程会等待,直到被唤醒,而且有返回值(一般是这种)
          	  	  	      	  	      //为什么DubboInvoker是个protocol? 因为
          	  	  	      	  	      //registryDirectory.refreshInvoker.toInvokers:protocol.refer
            
           							   -------------------逻辑隔开线--------------------
          	  	  	      	  	      	-->ReferenceCountExchangeClient.request
          	  	  	      	  	      	  -->HeaderExchangeClient.request
    -----------------***------------------- -->HeaderExchangeChannel.request  //创建request(自带ID)
          	  	  	      	  	      	      -->AbstractPeer.send
          	  	  	      	  	      	      	-->AbstractClient.send
          	  	  	      	  	      	      	  -->NettyChannel.send
          	  	  	      	  	      	      	  	//里面就是netty客户端向服务端发送消息的逻辑
          	  	  	      	  	      	      	  	//channel.writeAndFlush(message)
          	  	  	      	  	      	      	  	// 这里会使用netty的worke线程池去调用
          	  	  	      	  	      	      	  	//	NettyClientHandler#write
    

    中间涉及到对数据的编码操作

    ExchangeCodec#encode-->encodeRequest-->DubboCodec #encodeRequestData(序列化请求参数)

    解码操作

    ExchangeCodec#decode-->decodeBody-->DubboCodec -->decodeBody-->DecodeableRpcInvocation.decode

    ------------------------来到服务端----------
    NettyClientHandler#write
    -->NettyServerHandler.channelRead 
    //(这里dubbo官网说的是NettyHandler#messageReceived,但是我的2.6.6版本并没有进入那个方法,而是这里写着的write,channelRead)
    //服务端接收消息
     -->AbstractPeer.received
       -->MultiMessageHandler.received
         -->HeartbeatHandler.received
           -->AllChannelHandler.received
             -->ChannelEventRunnable.run
               -->DecodeHandler.received
                 //中间插入解码操作(主要是针对运行时解码)
                 -->HeaderExchangeHandler.received
    ---****---     -->handleRequest(这个方法执行目标对象的目标方法)
                 //Object result = handler.reply(channel, msg); 
    			  //handle是DubboProtocol.requestHandler属性
    				-->DubboProtocol.ExchangeHandler.reply
    				  -->Invoker.invoke // 执行过滤连 省略过滤链步骤
    					-->InvokerWrapper.invoke
    					  -->DelegateProviderMetaDataInvoker.invoke
    ----****----			-->AbstractProxyInvoker.invoke
    						//new RpcResult(doInvoke(proxy, invocation.getMethodName(), 
    						//invocation.getParameterTypes(), invocation.getArguments()));
    						  -->wrapper.invokeMethod(proxy, methodName, 
                                                    parameterTypes, arguments);
      					//JavassistProxyFactory.getInvoker.AbstractProxyInvoker.doInvoke
      						-->getData (目标方法)
    -----------------逻辑分割线-------------------
                   -->AbstractPeer.send
                     -->NettyChannel.send
                       //channel.writeAndFlush(message) 返回消息到客户端
                       -->NettyServerHandler.write
                       ------------------服务端执行完毕----------------------
                         -->NettyClientHandler.channelRead(客户端接收消息)
                           -->AbstractPeer.received
                             -->MultiMessageHandler.received
                               -->HeartbeatHandler.received
                                 -->AllChannelHandler.received
                                   -->ChannelEventRunnable.run
                                     -->DecodeHandler.received
                                       -->HeaderExchangeHandler.received
                                         //中间插入解码操作(对返回的信息)
                                         -->handleResponse
                                           -->DefaultFuture.received
                                             -->DefaultFuture.doReceived
                                               //response = res 将返回的信息保存在DefaultFuture中
                                               //done.signal() 唤醒DubboInvoker.doInvoke中暂停的主线程
                                               
    

    DubboInvoker#doInvoke

    该方法在请求时通过get方法处于while(true)等待中

    当被唤醒而且有返回值后(通常使用的这一种)继续执行主线程

    return (Result) currentClient.request(inv, timeout).get();
      -->currentClient.request //这里就是执行的以上所有过程(请求与相应)
      -->DefaultFuture.get(timeout)
        -->returnFromResponse
          // retrun response.getResult()
    

    DubboInvoker

    public class DubboInvoker<T> extends AbstractInvoker<T> {
        
        private final ExchangeClient[] clients;
        
        protected Result doInvoke(final Invocation invocation) throws Throwable {
            RpcInvocation inv = (RpcInvocation) invocation;
            final String methodName = RpcUtils.getMethodName(invocation);
            // 设置 path 和 version 到 attachment 中
            inv.setAttachment(Constants.PATH_KEY, getUrl().getPath());
            inv.setAttachment(Constants.VERSION_KEY, version);
    
            ExchangeClient currentClient;
            if (clients.length == 1) {
                // 从 clients 数组中获取 ExchangeClient
                currentClient = clients[0];
            } else {
                currentClient = clients[index.getAndIncrement() % clients.length];
            }
            try {
                // 获取异步配置
                boolean isAsync = RpcUtils.isAsync(getUrl(), invocation);
                // isOneway 为 true,表示“单向”通信
                boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);
                int timeout = getUrl().getMethodParameter(methodName, Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
    
                // 异步无返回值
                if (isOneway) {
                    boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
                    // 发送请求
                    currentClient.send(inv, isSent);
                    // 设置上下文中的 future 字段为 null
                    RpcContext.getContext().setFuture(null);
                    // 返回一个空的 RpcResult
                    return new RpcResult();
                } 
    
                // 异步有返回值
                else if (isAsync) {
                    // 发送请求,并得到一个 ResponseFuture 实例
                    ResponseFuture future = currentClient.request(inv, timeout);
                    // 设置 future 到上下文中
                    RpcContext.getContext().setFuture(new FutureAdapter<Object>(future));
                    // 暂时返回一个空结果
                    return new RpcResult();
                } 
    
                // 同步调用
                else {
                    RpcContext.getContext().setFuture(null);
                    // 发送请求,得到一个 ResponseFuture 实例,并调用该实例的 get 方法进行等待
                    return (Result) currentClient.request(inv, timeout).get();
                }
            } catch (TimeoutException e) {
                throw new RpcException(..., "Invoke remote method timeout....");
            } catch (RemotingException e) {
                throw new RpcException(..., "Failed to invoke remote method: ...");
            }
        }
        
        // 省略其他方法
    }
    

    问题: 一般情况下,服务消费方会并发调用多个服务,每个用户线程发送请求后,会调用不同 DefaultFuture 对象的 get 方法进行等待。 一段时间后,服务消费方的线程池会收到多个响应对象。这个时候要考虑一个问题,如何将每个响应对象传递给相应的 DefaultFuture 对象,且不出错??

    看到DubboInvoker.doInvoke中的同步有返回值的一段代码

    return (Result) currentClient.request(inv, timeout).get();
    

    get方法会等待被唤醒同时有返回结果

    看到HeaderExchangeChannel#request方法

    @Override
    public ResponseFuture request(Object request, int timeout) throws RemotingException {
        if (closed) {
            throw new RemotingException("is closed!");
        }
        // 创建一个request对象,默认赋值了一个ID
        Request req = new Request();
        req.setVersion(Version.getProtocolVersion());
        req.setTwoWay(true);
        req.setData(request);
        DefaultFuture future = new DefaultFuture(channel, req, timeout);
        try {
            channel.send(req);
        } catch (RemotingException e) {
            future.cancel();
            throw e;
        }
      	//直接返回DefaultFuture
        return future;
    }
    

    这里就直接返回了DefaultFuture ,对应是上面的currentClient.request, get方法就是调用的DefaultFuture .get

    channel.send后面的调用会为每次调用开启不同的线程

    在请求时会将请求参数序列化到服务端

    服务端接到请求后,会还原Request对象

    ExchangeCodec.ExchangeCodec 服务端解码

    // decode request.
    Request req = new Request(id);
    req.setVersion(Version.getProtocolVersion());
    req.setTwoWay((flag & FLAG_TWOWAY) != 0);
    if ((flag & FLAG_EVENT) != 0) {
        req.setEvent(Request.HEARTBEAT_EVENT);
    }
    try {
        ObjectInput in = CodecSupport.deserialize(channel.getUrl(), is, proto);
        Object data;
        if (req.isHeartbeat()) {
            data = decodeHeartbeatData(channel, in);
        } else if (req.isEvent()) {
            data = decodeEventData(channel, in);
        } else {
            data = decodeRequestData(channel, in);
        }
        req.setData(data);
    } catch (Throwable t) {
        // bad request
        req.setBroken(true);
        req.setData(t);
    }
    return req;
    

    在服务端调用目标方法完毕后会将请求返回的结果和id设置给Reponse对象

    HeaderExchangeHandler.handleRequest

    Response handleRequest(ExchangeChannel channel, Request req) throws RemotingException {
      	//给response设置id(该ID是请求时Request中的ID)
        Response res = new Response(req.getId(), req.getVersion());
        if (req.isBroken()) {
            Object data = req.getData();
    
            String msg;
            if (data == null) msg = null;
            else if (data instanceof Throwable) msg = StringUtils.toString((Throwable) data);
            else msg = data.toString();
            res.setErrorMessage("Fail to decode request due to: " + msg);
            res.setStatus(Response.BAD_REQUEST);
    
            return res;
        }
        // 请求方法的参数
        Object msg = req.getData();
        try {
            // result: 调用目标方法返回的结果
            Object result = handler.reply(channel, msg);
            res.setStatus(Response.OK);
            res.setResult(result);
        } catch (Throwable e) {
            res.setStatus(Response.SERVICE_ERROR);
            res.setErrorMessage(StringUtils.toString(e));
        }
        return res;
    }
    

    最后来到服务端调用完后回到客户端调用DefaultFuture.received

    public static void received(Channel channel, Response response) {
        try {
          	// FUTURES保存着每次调用后返回的DefaultFuture对象,key是生成Request生成时的ID
          	// 这里用response.getId()去获取,因为Request对应的Response有相同的ID
            DefaultFuture future = FUTURES.remove(response.getId());
            if (future != null) {
              	// 唤醒对应线程的DefaultFuture对象
                future.doReceived(response);
            } else {
                logger.warn("The timeout response finally returned at "
                        + (new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date()))
                        + ", response " + response
                        + (channel == null ? "" : ", channel: " + channel.getLocalAddress()
                        + " -> " + channel.getRemoteAddress()));
            }
        } finally {
            CHANNELS.remove(response.getId());
        }
    }
    

    这里就唤醒了Response对应的DefaultFuture对象,一个请求的响应就完成了,过程颇为复杂。

    在dubbo官网对服务调用用非常详细的讲解!!

  • 相关阅读:
    bzoj 1232 [Usaco2008Nov]安慰奶牛cheer
    bzoj 1237 [SCOI2008]配对 贪心+dp
    缺8数
    缺8数
    Binary GCD algorithm
    Binary GCD algorithm
    HDU1576 A/B (解法二)【试探法】
    HDU1576 A/B (解法二)【试探法】
    I00002 打印九九乘法表
    I00002 打印九九乘法表
  • 原文地址:https://www.cnblogs.com/qiaozhuangshi/p/11007047.html
Copyright © 2011-2022 走看看