zoukankan      html  css  js  c++  java
  • 3. 源码分析---SOFARPC客户端服务调用

    我们首先看看BoltClientProxyInvoker的关系图

    所以当我们用BoltClientProxyInvoker#invoke的时候实际上是调用了父类的invoke方法
    ClientProxyInvoker#invoke

        @Override
        public SofaResponse invoke(SofaRequest request) throws SofaRpcException {
            SofaResponse response = null;
            Throwable throwable = null;
            try {
                RpcInternalContext.pushContext();
                RpcInternalContext context = RpcInternalContext.getContext();
                context.setProviderSide(false);
                // 包装request请求
                decorateRequest(request);
                try {
                    // 产生开始调用事件
                    if (EventBus.isEnable(ClientStartInvokeEvent.class)) {
                        EventBus.post(new ClientStartInvokeEvent(request));
                    }
                    // 得到结果
                    response = cluster.invoke(request);
                } catch (SofaRpcException e) {
                    throwable = e;
                    throw e;
                } finally {
                    // 产生调用结束事件
                    if (!request.isAsync()) {
                        if (EventBus.isEnable(ClientEndInvokeEvent.class)) {
                            EventBus.post(new ClientEndInvokeEvent(request, response, throwable));
                        }
                    }
                }
                // 包装响应
                decorateResponse(response);
                return response;
            } finally {
                RpcInternalContext.removeContext();
                RpcInternalContext.popContext();
            }
        }
    

    这个方法主要做了几件事:

    1. 包装request请求,设置必要的参数
    2. 调用FailOverCluster的invoke方法,将reques请求发送出去,并得到response相应
    3. 包装response响应

    我们在调用FailOverCluster的时候实际上是调用的父类AbstractCluster的invoker方法,FailOverCluster关系图如下:

    所以我们进入到AbstractCluster的invoker方法中:

        @Override
        public SofaResponse invoke(SofaRequest request) throws SofaRpcException {
            SofaResponse response = null;
            try {
                // 做一些初始化检查,例如未连接可以连接
                checkClusterState();
                // 开始调用
                countOfInvoke.incrementAndGet(); // 计数+1         
                response = doInvoke(request);
                return response;
            } catch (SofaRpcException e) {
                // 客户端收到异常(客户端自己的异常)
                throw e;
            } finally {
                countOfInvoke.decrementAndGet(); // 计数-1
            }
        }
    

    checkClusterState方法主要是用来校验是否已销毁了,或是调用了init方法进行初始化了。
    然后会在调用之前记一下数。
    然后我们进入到doInvoke方法中:

        public SofaResponse doInvoke(SofaRequest request) throws SofaRpcException {
            String methodName = request.getMethodName();
            int retries = consumerConfig.getMethodRetries(methodName);
            int time = 0;
            SofaRpcException throwable = null;// 异常日志
            List<ProviderInfo> invokedProviderInfos = new ArrayList<ProviderInfo>(retries + 1);
            do {
                //负载均衡
                ProviderInfo providerInfo = select(request, invokedProviderInfos);
                try {
                    //调用过滤器链
                    SofaResponse response = filterChain(providerInfo, request);
                    if (response != null) {
                        if (throwable != null) {
                            if (LOGGER.isWarnEnabled(consumerConfig.getAppName())) {
                                LOGGER.warnWithApp(consumerConfig.getAppName(),
                                    LogCodes.getLog(LogCodes.WARN_SUCCESS_BY_RETRY,
                                        throwable.getClass() + ":" + throwable.getMessage(),
                                        invokedProviderInfos));
                            }
                        }
                        return response;
                    } else {
                        throwable = new SofaRpcException(RpcErrorType.CLIENT_UNDECLARED_ERROR,
                            "Failed to call " + request.getInterfaceName() + "." + methodName
                                + " on remote server " + providerInfo + ", return null");
                        time++;
                    }
                } catch (SofaRpcException e) { // 服务端异常+ 超时异常 才发起rpc异常重试
                    if (e.getErrorType() == RpcErrorType.SERVER_BUSY
                        || e.getErrorType() == RpcErrorType.CLIENT_TIMEOUT) {
                        throwable = e;
                        time++;
                    } else {
                        throw e;
                    }
                } catch (Exception e) { // 其它异常不重试
                    throw new SofaRpcException(RpcErrorType.CLIENT_UNDECLARED_ERROR,
                        "Failed to call " + request.getInterfaceName() + "." + request.getMethodName()
                            + " on remote server: " + providerInfo + ", cause by unknown exception: "
                            + e.getClass().getName() + ", message is: " + e.getMessage(), e);
                } finally {
                    if (RpcInternalContext.isAttachmentEnable()) {
                        RpcInternalContext.getContext().setAttachment(RpcConstants.INTERNAL_KEY_INVOKE_TIMES,
                            time + 1); // 重试次数
                    }
                }
                invokedProviderInfos.add(providerInfo);
            } while (time <= retries);
    
            throw throwable;
        }
    

    这个方法里面主要做了这这件事:

    1. 如果失败的话就循环调用
    2. 负载均衡,选取provider
    3. 通过过滤器链调用服务端,并返回结果
    4. 异常处理

    接着我们进入到filterChain方法中,根据过滤器链最后会跳到ConsumerInvoker中的invoke方法

        @Override
        public SofaResponse invoke(SofaRequest sofaRequest) throws SofaRpcException {
            // 设置下服务器应用
            ProviderInfo providerInfo = RpcInternalContext.getContext().getProviderInfo();
            String appName = providerInfo.getStaticAttr(ProviderInfoAttrs.ATTR_APP_NAME);
            if (StringUtils.isNotEmpty(appName)) {
                sofaRequest.setTargetAppName(appName);
            }
    
            // 目前只是通过client发送给服务端
            return consumerBootstrap.getCluster().sendMsg(providerInfo, sofaRequest);
        }
    

    consumerBootstrap.getCluster()会返回FailOverCluster实例,然后调用父类AbstractCluster的sendMsg方法

        public SofaResponse sendMsg(ProviderInfo providerInfo, SofaRequest request) throws SofaRpcException {
            ClientTransport clientTransport = connectionHolder.getAvailableClientTransport(providerInfo);
            if (clientTransport != null && clientTransport.isAvailable()) {
                return doSendMsg(providerInfo, clientTransport, request);
            } else {
                throw unavailableProviderException(request.getTargetServiceUniqueName(), providerInfo.getOriginUrl());
            }
        }
        
        
        protected SofaResponse doSendMsg(ProviderInfo providerInfo, ClientTransport transport,
                                         SofaRequest request) throws SofaRpcException {
            RpcInternalContext context = RpcInternalContext.getContext();
            // 添加调用的服务端远程地址
            RpcInternalContext.getContext().setRemoteAddress(providerInfo.getHost(), providerInfo.getPort());
            try {
                checkProviderVersion(providerInfo, request); // 根据服务端版本特殊处理
                String invokeType = request.getInvokeType();
                int timeout = resolveTimeout(request, consumerConfig, providerInfo);
    
                SofaResponse response = null;
                // 同步调用
                if (RpcConstants.INVOKER_TYPE_SYNC.equals(invokeType)) {
                    long start = RpcRuntimeContext.now();
                    try {
                        response = transport.syncSend(request, timeout);
                    } finally {
                        if (RpcInternalContext.isAttachmentEnable()) {
                            long elapsed = RpcRuntimeContext.now() - start;
                            context.setAttachment(RpcConstants.INTERNAL_KEY_CLIENT_ELAPSE, elapsed);
                        }
                    }
                }
                // 单向调用
                else if (RpcConstants.INVOKER_TYPE_ONEWAY.equals(invokeType)) {
                    long start = RpcRuntimeContext.now();
                    try {
                        transport.oneWaySend(request, timeout);
                        response = buildEmptyResponse(request);
                    } finally {
                        if (RpcInternalContext.isAttachmentEnable()) {
                            long elapsed = RpcRuntimeContext.now() - start;
                            context.setAttachment(RpcConstants.INTERNAL_KEY_CLIENT_ELAPSE, elapsed);
                        }
                    }
                }
                // Callback调用
                else if (RpcConstants.INVOKER_TYPE_CALLBACK.equals(invokeType)) {
                    // 调用级别回调监听器
                    SofaResponseCallback sofaResponseCallback = request.getSofaResponseCallback();
                    if (sofaResponseCallback == null) {
                        SofaResponseCallback methodResponseCallback = consumerConfig
                            .getMethodOnreturn(request.getMethodName());
                        if (methodResponseCallback != null) { // 方法的Callback
                            request.setSofaResponseCallback(methodResponseCallback);
                        }
                    }
                    // 记录发送开始时间
                    context.setAttachment(RpcConstants.INTERNAL_KEY_CLIENT_SEND_TIME, RpcRuntimeContext.now());
                    // 开始调用
                    transport.asyncSend(request, timeout);
                    response = buildEmptyResponse(request);
                }
                // Future调用
                else if (RpcConstants.INVOKER_TYPE_FUTURE.equals(invokeType)) {
                    // 记录发送开始时间
                    context.setAttachment(RpcConstants.INTERNAL_KEY_CLIENT_SEND_TIME, RpcRuntimeContext.now());
                    // 开始调用
                    ResponseFuture future = transport.asyncSend(request, timeout);
                    // 放入线程上下文
                    RpcInternalContext.getContext().setFuture(future);
                    response = buildEmptyResponse(request);
                } else {
                    throw new SofaRpcException(RpcErrorType.CLIENT_UNDECLARED_ERROR, "Unknown invoke type:" + invokeType);
                }
                return response;
            } catch (SofaRpcException e) {
                throw e;
            } catch (Throwable e) { // 客户端其它异常
                throw new SofaRpcException(RpcErrorType.CLIENT_UNDECLARED_ERROR, e);
            }
        }
    

    sendMsg方法最后会调用到doSendMsg。
    soSendMsg里面主要做了如下几件事:

    1. 如果是同步调用,则直接返回封装好的参数
    2. 如果是单向调用,则调用buildEmptyResponse方法,返回一个空的response
    3. 如果是callback调用asyncSend,RPC在获取到服务端的结果后会自动执行该回调实现。
    4. 服务端返回响应结果被 RPC 缓存,当客户端需要响应结果的时候需要主动获取结果,获取结果的过程阻塞线程。
  • 相关阅读:
    RocketMQ集群部署(一)
    Apache Curator之InterProcessMutex抢购案例(三)
    Apache Curator之InterProcessMutex源码分析(四)
    Apache Curator之分布式锁原理(二)
    webapi框架搭建-安全机制(二)-身份验证
    webapi框架搭建-安全机制(一)
    asp.net webapi http请求生命周期
    webapi框架搭建-数据访问ef code first
    webapi框架搭建-webapi异常处理
    webapi框架搭建-日志管理log4net
  • 原文地址:https://www.cnblogs.com/luozhiyun/p/11261303.html
Copyright © 2011-2022 走看看