zoukankan      html  css  js  c++  java
  • 5.源码分析---SOFARPC调用服务

    我们这一次来接着上一篇文章《4. 源码分析---SOFARPC服务端暴露》讲一下服务暴露之后被客户端调用之后服务端是怎么返回数据的。

    示例我们还是和上篇文章一样使用一样的bolt协议来讲:

        public static void main(String[] args) {
            ServerConfig serverConfig = new ServerConfig()
                    .setProtocol("bolt") // 设置一个协议,默认bolt
                    .setPort(12200) // 设置一个端口,默认12200
                    .setDaemon(false); // 非守护线程
    
            ProviderConfig<HelloService> providerConfig = new ProviderConfig<HelloService>()
                .setInterfaceId(HelloService.class.getName()) // 指定接口
                .setRef(new HelloServiceImpl()) // 指定实现
                .setServer(serverConfig); // 指定服务端
    
            providerConfig.export(); // 发布服务
        }
    

    在Bolt协议下面,当服务端被调用的时候一个服务的流程如下所示:
    BoltServerProcessor->FilterChain->ProviderExceptionFilter->FilterInvoker->RpcServiceContextFilter->FilterInvoker->ProviderBaggageFilter->FilterInvoker->ProviderTracerFilter->ProviderInvoker

    BoltServerProcessor#handleRequest

    @Override
    public void handleRequest(BizContext bizCtx, AsyncContext asyncCtx, SofaRequest request) {
        // RPC内置上下文
        RpcInternalContext context = RpcInternalContext.getContext();
        context.setProviderSide(true);
    
        String appName = request.getTargetAppName();
        if (appName == null) {
            // 默认全局appName
            appName = (String) RpcRuntimeContext.get(RpcRuntimeContext.KEY_APPNAME);
        }
    
        // 是否链路异步化中
        boolean isAsyncChain = false;
        try { // 这个 try-finally 为了保证Context一定被清理
            processingCount.incrementAndGet(); // 统计值加1
    
            context.setRemoteAddress(bizCtx.getRemoteHost(), bizCtx.getRemotePort()); // 远程地址
            context.setAttachment(RpcConstants.HIDDEN_KEY_ASYNC_CONTEXT, asyncCtx); // 远程返回的通道
    
            if (RpcInternalContext.isAttachmentEnable()) {
                InvokeContext boltInvokeCtx = bizCtx.getInvokeContext();
                if (boltInvokeCtx != null) {
                    putToContextIfNotNull(boltInvokeCtx, InvokeContext.BOLT_PROCESS_WAIT_TIME,
                        context, RpcConstants.INTERNAL_KEY_PROCESS_WAIT_TIME); // rpc线程池等待时间 Long
                }
            }
            if (EventBus.isEnable(ServerReceiveEvent.class)) {
                EventBus.post(new ServerReceiveEvent(request));
            }
    
            // 开始处理
            SofaResponse response = null; // 响应,用于返回
            Throwable throwable = null; // 异常,用于记录
            ProviderConfig providerConfig = null;
            String serviceName = request.getTargetServiceUniqueName();
    
            try { // 这个try-catch 保证一定有Response
                invoke:
                {
                    if (!boltServer.isStarted()) { // 服务端已关闭
                        throwable = new SofaRpcException(RpcErrorType.SERVER_CLOSED, LogCodes.getLog(
                            LogCodes.WARN_PROVIDER_STOPPED, SystemInfo.getLocalHost() + ":" +
                                boltServer.serverConfig.getPort()));
                        response = MessageBuilder.buildSofaErrorResponse(throwable.getMessage());
                        break invoke;
                    }
                    if (bizCtx.isRequestTimeout()) { // 加上丢弃超时的请求的逻辑
                        throwable = clientTimeoutWhenReceiveRequest(appName, serviceName, bizCtx.getRemoteAddress());
                        break invoke;
                    }
                    // 查找服务
                    //在server.registerProcessor方法中设置 ProviderProxyInvoker
                    Invoker invoker = boltServer.findInvoker(serviceName);
                    if (invoker == null) {
                        throwable = cannotFoundService(appName, serviceName);
                        response = MessageBuilder.buildSofaErrorResponse(throwable.getMessage());
                        break invoke;
                    }
                    if (invoker instanceof ProviderProxyInvoker) {
                        providerConfig = ((ProviderProxyInvoker) invoker).getProviderConfig();
                        // 找到服务后,打印服务的appName
                        appName = providerConfig != null ? providerConfig.getAppName() : null;
                    }
                    // 查找方法
                    String methodName = request.getMethodName();
                    //在server.registerProcessor方法中设置
                    Method serviceMethod = ReflectCache.getOverloadMethodCache(serviceName, methodName,
                        request.getMethodArgSigs());
                    if (serviceMethod == null) {
                        throwable = cannotFoundServiceMethod(appName, methodName, serviceName);
                        response = MessageBuilder.buildSofaErrorResponse(throwable.getMessage());
                        break invoke;
                    } else {
                        request.setMethod(serviceMethod);
                    }
    
                    // 真正调用
                    response = doInvoke(serviceName, invoker, request);
    
                    if (bizCtx.isRequestTimeout()) { // 加上丢弃超时的响应的逻辑
                        throwable = clientTimeoutWhenSendResponse(appName, serviceName, bizCtx.getRemoteAddress());
                        break invoke;
                    }
                }
            } catch (Exception e) {
                // 服务端异常,不管是啥异常
                LOGGER.errorWithApp(appName, "Server Processor Error!", e);
                throwable = e;
                response = MessageBuilder.buildSofaErrorResponse(e.getMessage());
            }
    
            // Response不为空,代表需要返回给客户端
            if (response != null) {
                RpcInvokeContext invokeContext = RpcInvokeContext.peekContext();
                isAsyncChain = CommonUtils.isTrue(invokeContext != null ?
                    (Boolean) invokeContext.remove(RemotingConstants.INVOKE_CTX_IS_ASYNC_CHAIN) : null);
                // 如果是服务端异步代理模式,特殊处理,因为该模式是在业务代码自主异步返回的
                if (!isAsyncChain) {
                    // 其它正常请求
                    try { // 这个try-catch 保证一定要记录tracer
                        asyncCtx.sendResponse(response);
                    } finally {
                        if (EventBus.isEnable(ServerSendEvent.class)) {
                            EventBus.post(new ServerSendEvent(request, response, throwable));
                        }
                    }
                }
            }
        } catch (Throwable e) {
            // 可能有返回时的异常
            if (LOGGER.isErrorEnabled(appName)) {
                LOGGER.errorWithApp(appName, e.getMessage(), e);
            }
        } finally {
            processingCount.decrementAndGet();
            if (!isAsyncChain) {
                if (EventBus.isEnable(ServerEndHandleEvent.class)) {
                    EventBus.post(new ServerEndHandleEvent());
                }
            }
            RpcInvokeContext.removeContext();
            RpcInternalContext.removeAllContext();
        }
    }
    

    这个方法主要做了如下几件事:

    1. 设置上下文参数
    2. 从缓存中得到服务暴露时设置的invoker
    3. 为request设置method参数
    4. 调用doInvoke返回response
    5. 将response返回给客户端

    BoltServerProcessor#doInvoke

    我们直接进入到doInvoke方法中,看是如何生成response对象的。

    private SofaResponse doInvoke(String serviceName, Invoker invoker, SofaRequest request) throws SofaRpcException {
        // 开始调用,先记下当前的ClassLoader
        ClassLoader rpcCl = Thread.currentThread().getContextClassLoader();
        try {
            // 切换线程的ClassLoader到 服务 自己的ClassLoader
            ClassLoader serviceCl = ReflectCache.getServiceClassLoader(serviceName);
            Thread.currentThread().setContextClassLoader(serviceCl);
            return invoker.invoke(request);
        } finally {
            Thread.currentThread().setContextClassLoader(rpcCl);
        }
    }
    

    这里主要是为了获取缓存里面加载被暴露服务的类加载器,这样可以防止不同的类加载器之间一个类被加载多次。

    然后调用过滤器链,最后进入到ProviderInvoker中

    ProviderInvoker#invoke

    @Override
    public SofaResponse invoke(SofaRequest request) throws SofaRpcException {
        SofaResponse sofaResponse = new SofaResponse();
        long startTime = RpcRuntimeContext.now();
        try {
            // 反射 真正调用业务代码
            Method method = request.getMethod();
            if (method == null) {
                throw new SofaRpcException(RpcErrorType.SERVER_FILTER, "Need decode method first!");
            }
            Object result = method.invoke(providerConfig.getRef(), request.getMethodArgs());
    
            sofaResponse.setAppResponse(result);
        } catch (IllegalArgumentException e) { // 非法参数,可能是实现类和接口类不对应)
            sofaResponse.setErrorMsg(e.getMessage());
        } catch (IllegalAccessException e) { // 如果此 Method 对象强制执行 Java 语言访问控制,并且底层方法是不可访问的
            sofaResponse.setErrorMsg(e.getMessage());
        } catch (InvocationTargetException e) { // 业务代码抛出异常
            cutCause(e.getCause());
            sofaResponse.setAppResponse(e.getCause());
        } finally {
            if (RpcInternalContext.isAttachmentEnable()) {
                long endTime = RpcRuntimeContext.now();
                RpcInternalContext.getContext().setAttachment(RpcConstants.INTERNAL_KEY_IMPL_ELAPSE,
                    endTime - startTime);
            }
        }
    
        return sofaResponse;
    }
    

    到最后我们发现,服务端会通过反射调用被暴露服务的方法,封装成Response类返回。

    我们再次回到BoltServerProcessor#handleRequest方法中

    ....//忽略其他内容
    // Response不为空,代表需要返回给客户端
    if (response != null) {
        RpcInvokeContext invokeContext = RpcInvokeContext.peekContext();
        isAsyncChain = CommonUtils.isTrue(invokeContext != null ?
            (Boolean) invokeContext.remove(RemotingConstants.INVOKE_CTX_IS_ASYNC_CHAIN) : null);
        // 如果是服务端异步代理模式,特殊处理,因为该模式是在业务代码自主异步返回的
        if (!isAsyncChain) {
            // 其它正常请求
            try { // 这个try-catch 保证一定要记录tracer
                asyncCtx.sendResponse(response);
            } finally {
                if (EventBus.isEnable(ServerSendEvent.class)) {
                    EventBus.post(new ServerSendEvent(request, response, throwable));
                }
            }
        }
    }
    ....//忽略其他内容
    

    最后我们的response实例会使用netty传给客户端。

  • 相关阅读:
    Using PL/SQL APIs as Web Services
    1.Cocos2dx 3.2中vector,ValueMap,Touch触摸时间的使用.iconv字符编解码
    我对贝叶斯分类器的理解
    VB.NET的前世今生
    《转》PyQt4 精彩实例分析* 实例2 标准对话框的使用
    网络流合集:bzoj1433,1934,1854 题解
    Android 下拉刷新上拉载入 多种应用场景 超级大放送(上)
    Nginx 笔记与总结(15)nginx 实现反向代理 ( nginx + apache 动静分离)
    深入浅出之数据分析四步曲
    深入浅出之数据分析四步曲
  • 原文地址:https://www.cnblogs.com/luozhiyun/p/11299769.html
Copyright © 2011-2022 走看看