zoukankan      html  css  js  c++  java
  • 11.源码分析---SOFARPC数据透传是实现的?

    SOFARPC源码解析系列:

    1. 源码分析---SOFARPC可扩展的机制SPI

    2. 源码分析---SOFARPC客户端服务引用

    3. 源码分析---SOFARPC客户端服务调用

    4. 源码分析---SOFARPC服务端暴露

    5.源码分析---SOFARPC调用服务

    6.源码分析---和dubbo相比SOFARPC是如何实现负载均衡的?

    7.源码分析---SOFARPC是如何实现连接管理与心跳?

    8.源码分析---从设计模式中看SOFARPC中的EventBus?

    9.源码分析---SOFARPC是如何实现故障剔除的?

    10.源码分析---SOFARPC内置链路追踪SOFATRACER是怎么做的?


    先把栗子放上,让大家方便测试用:
    Service端

    public static void main(String[] args) {
        ServerConfig serverConfig = new ServerConfig()
            .setProtocol("bolt") // 设置一个协议,默认bolt
            .setPort(12200) // 设置一个端口,默认12200
            .setDaemon(false); // 非守护线程
    
        ProviderConfig<HelloService> providerConfig = new ProviderConfig<HelloService>()
            .setInterfaceId(HelloService.class.getName()) // 指定接口
            .setRef(new HelloServiceImpl()) // 指定实现
            .setServer(serverConfig); // 指定服务端
    
        providerConfig.export(); // 发布服务
    }
    
    public class HelloServiceImpl implements HelloService {
    
        private final static Logger LOGGER = LoggerFactory.getLogger(HelloServiceImpl.class);
    
        @Override
        public String sayHello(String string) {
            LOGGER.info("Server receive: " + string);
    
            // 获取请求透传数据并打印
            System.out.println("service receive reqBag -> " + RpcInvokeContext.getContext().getRequestBaggage("req_bag"));
            // 设置响应透传数据到当前线程的上下文中
            RpcInvokeContext.getContext().putResponseBaggage("req_bag", "s2c");
    
            return "hello " + string + " !";
        }
    }
    
    

    client端

    public static void main(String[] args) {
        ConsumerConfig<HelloService> consumerConfig = new ConsumerConfig<HelloService>()
            .setInterfaceId(HelloService.class.getName()) // 指定接口
            .setProtocol("bolt") // 指定协议
            .setDirectUrl("bolt://127.0.0.1:12200") // 指定直连地址
            .setConnectTimeout(10 * 1000);
    
        RpcInvokeContext.getContext().putRequestBaggage("req_bag", "a2bbb");
    
        HelloService helloService = consumerConfig.refer();
    
        while (true) {
            System.out.println("service receive reqBag -> " + RpcInvokeContext.getContext().getResponseBaggage("req_bag"));
            try {
                LOGGER.info(helloService.sayHello("world"));
            } catch (Exception e) {
                e.printStackTrace();
            }
    
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
    
        }
    }
    
    

    通过上面的栗子我们可以看出整个流程应该是:

    1. 客户端把需要透传的数据放入到requestBaggage中,然后调用服务端
    2. 服务端在HelloServiceImpl中获取请求透传数据并打印,并把响应数据放入到responseBaggage中
    3. 客户端收到透传数据

    所以下面我们从客户端开始源码讲解。

    客户端数据透传给服务端

    首先客户端在引用之前要设置putRequestBaggage,然后在客户端引用的时候会调用ClientProxyInvoker#invoke方法。

    如下:
    ClientProxyInvoker#invoke

    public SofaResponse invoke(SofaRequest request) throws SofaRpcException {
       	....
            // 包装请求
            decorateRequest(request);
           ....
    }
    
    

    通过调用decorateRequest会调用到子类DefaultClientProxyInvoker的decorateRequest方法。

    DefaultClientProxyInvoker#decorateRequest

    protected void decorateRequest(SofaRequest request) {
        ....
        RpcInvokeContext invokeCtx = RpcInvokeContext.peekContext();
        RpcInternalContext internalContext = RpcInternalContext.getContext();
        if (invokeCtx != null) {
           ....
            // 如果用户指定了透传数据
            if (RpcInvokeContext.isBaggageEnable()) {
                // 需要透传
                BaggageResolver.carryWithRequest(invokeCtx, request);
                internalContext.setAttachment(HIDDEN_KEY_INVOKE_CONTEXT, invokeCtx);
            }
        }
        ....
    } 
    

    在decorateRequest方法里首先会校验有没有开启透传数据,如果开启了,那么就调用BaggageResolver#carryWithRequest,把要透传的数据放入到request里面

    BaggageResolver#carryWithRequest

    public static void carryWithRequest(RpcInvokeContext context, SofaRequest request) {
        if (context != null) {
    		  //获取所有的透传数据
            Map<String, String> requestBaggage = context.getAllRequestBaggage();
            if (CommonUtils.isNotEmpty(requestBaggage)) { // 需要透传
                request.addRequestProp(RemotingConstants.RPC_REQUEST_BAGGAGE, requestBaggage);
            }
        }
    }
    
    

    这个方法里面要做的就是获取所有的透传数据,然后放置到RequestProp里面,这样在发送请求的时候就会传送到服务端。

    服务端接受透传数据

    服务端的调用流程如下:

    BoltServerProcessor->FilterChain->ProviderExceptionFilter->FilterInvoker->RpcServiceContextFilter->FilterInvoker->ProviderBaggageFilter->FilterInvoker->ProviderTracerFilter->ProviderInvoker 
    

    所以从上面的调用链可以知道,在服务端引用的时候会经过ProviderBaggageFilter过滤器,我们下面看看这个过滤器做了什么事情:

    ProviderBaggageFilter#invoke

    public SofaResponse invoke(FilterInvoker invoker, SofaRequest request) throws SofaRpcException {
        SofaResponse response = null;
        try {
            //从request中获取透传数据存入到requestBaggage中
            BaggageResolver.pickupFromRequest(RpcInvokeContext.peekContext(), request, true);
            response = invoker.invoke(request);
        } finally {
            if (response != null) {
                BaggageResolver.carryWithResponse(RpcInvokeContext.peekContext(), response);
            }
        }
        return response;
    }
    
    

    ProviderBaggageFilter会调用BaggageResolver#pickupFromRequest从request中获取数据

    BaggageResolver#pickupFromRequest

    public static void pickupFromRequest(RpcInvokeContext context, SofaRequest request, boolean init) {
        if (context == null && !init) {
            return;
        }
        // 解析请求 
        Map<String, String> requestBaggage = (Map<String, String>) request
            .getRequestProp(RemotingConstants.RPC_REQUEST_BAGGAGE);
        if (CommonUtils.isNotEmpty(requestBaggage)) {
            if (context == null) {
                context = RpcInvokeContext.getContext();
            }
            context.putAllRequestBaggage(requestBaggage);
        }
    }
    

    最后会在ProviderBaggageFilter invoke方法的finally里面调用BaggageResolver#carryWithResponse把响应透传数据回写到response里面。

    public static void carryWithResponse(RpcInvokeContext context, SofaResponse response) {
        if (context != null) {
            Map<String, String> responseBaggage = context.getAllResponseBaggage();
            if (CommonUtils.isNotEmpty(responseBaggage)) {
                String prefix = RemotingConstants.RPC_RESPONSE_BAGGAGE + ".";
                for (Map.Entry<String, String> entry : responseBaggage.entrySet()) {
                    response.addResponseProp(prefix + entry.getKey(), entry.getValue());
                }
            }
        }
    }
    

    客户端收到响应透传数据

    最后客户端会在ClientProxyInvoker#invoke方法里调用decorateResponse获取response回写的数据。

    public SofaResponse invoke(SofaRequest request) throws SofaRpcException {
    		....
         // 包装响应
         decorateResponse(response);
    		....
    }
    
    

    decorateResponse是在子类DefaultClientProxyInvoker实现的:

    DefaultClientProxyInvoker#decorateResponse

    protected void decorateResponse(SofaResponse response) {
       ....
        //如果开启了透传
        if (RpcInvokeContext.isBaggageEnable()) {
            BaggageResolver.pickupFromResponse(invokeCtx, response, true);
        }
       ....
    }
    
    

    这个方法里面会调用BaggageResolver#pickupFromResponse

    public static void pickupFromResponse(RpcInvokeContext context, SofaResponse response, boolean init) {
        if (context == null && !init) {
            return;
        }
        Map<String, String> responseBaggage = response.getResponseProps();
        if (CommonUtils.isNotEmpty(responseBaggage)) {
            String prefix = RemotingConstants.RPC_RESPONSE_BAGGAGE + ".";
            for (Map.Entry<String, String> entry : responseBaggage.entrySet()) {
                if (entry.getKey().startsWith(prefix)) {
                    if (context == null) {
                        context = RpcInvokeContext.getContext();
                    }
                    //因为entry的key里面会包含rpc_resp_baggage,所以需要截取掉
                    context.putResponseBaggage(entry.getKey().substring(prefix.length()),
                        entry.getValue());
                }
            }
        }
    }
    
    

    这个方法里面response获取所有的透传数据,然后放入到ResponseBaggage中。

    到这里SOFARPC数据透传就分析完毕了

  • 相关阅读:
    [Python] Python基础字符串
    [android] 手机卫士绑定sim卡
    [Laravel] Laravel的基本数据库操作部分
    [android] 手机卫士手势滑动切换屏幕
    [android] 手机卫士界面切换动画
    [android] 手机卫士设置向导页面
    [javaEE] Servlet的手动配置
    [android] 手机卫士保存密码时进行md5加密
    [android] 手机卫士自定义对话框布局
    [Laravel] Laravel的基本使用
  • 原文地址:https://www.cnblogs.com/luozhiyun/p/11388450.html
Copyright © 2011-2022 走看看