zoukankan      html  css  js  c++  java
  • 11.源码分析---SOFARPC数据透传是实现的?

    SOFARPC源码解析系列:

    1. 源码分析---SOFARPC可扩展的机制SPI

    2. 源码分析---SOFARPC客户端服务引用

    3. 源码分析---SOFARPC客户端服务调用

    4. 源码分析---SOFARPC服务端暴露

    5.源码分析---SOFARPC调用服务

    6.源码分析---和dubbo相比SOFARPC是如何实现负载均衡的?

    7.源码分析---SOFARPC是如何实现连接管理与心跳?

    8.源码分析---从设计模式中看SOFARPC中的EventBus?

    9.源码分析---SOFARPC是如何实现故障剔除的?

    10.源码分析---SOFARPC内置链路追踪SOFATRACER是怎么做的?


    先把栗子放上,让大家方便测试用:
    Service端

    public static void main(String[] args) {
        ServerConfig serverConfig = new ServerConfig()
            .setProtocol("bolt") // 设置一个协议,默认bolt
            .setPort(12200) // 设置一个端口,默认12200
            .setDaemon(false); // 非守护线程
    
        ProviderConfig<HelloService> providerConfig = new ProviderConfig<HelloService>()
            .setInterfaceId(HelloService.class.getName()) // 指定接口
            .setRef(new HelloServiceImpl()) // 指定实现
            .setServer(serverConfig); // 指定服务端
    
        providerConfig.export(); // 发布服务
    }
    
    public class HelloServiceImpl implements HelloService {
    
        private final static Logger LOGGER = LoggerFactory.getLogger(HelloServiceImpl.class);
    
        @Override
        public String sayHello(String string) {
            LOGGER.info("Server receive: " + string);
    
            // 获取请求透传数据并打印
            System.out.println("service receive reqBag -> " + RpcInvokeContext.getContext().getRequestBaggage("req_bag"));
            // 设置响应透传数据到当前线程的上下文中
            RpcInvokeContext.getContext().putResponseBaggage("req_bag", "s2c");
    
            return "hello " + string + " !";
        }
    }
    
    

    client端

    public static void main(String[] args) {
        ConsumerConfig<HelloService> consumerConfig = new ConsumerConfig<HelloService>()
            .setInterfaceId(HelloService.class.getName()) // 指定接口
            .setProtocol("bolt") // 指定协议
            .setDirectUrl("bolt://127.0.0.1:12200") // 指定直连地址
            .setConnectTimeout(10 * 1000);
    
        RpcInvokeContext.getContext().putRequestBaggage("req_bag", "a2bbb");
    
        HelloService helloService = consumerConfig.refer();
    
        while (true) {
            System.out.println("service receive reqBag -> " + RpcInvokeContext.getContext().getResponseBaggage("req_bag"));
            try {
                LOGGER.info(helloService.sayHello("world"));
            } catch (Exception e) {
                e.printStackTrace();
            }
    
            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
    
        }
    }
    
    

    通过上面的栗子我们可以看出整个流程应该是:

    1. 客户端把需要透传的数据放入到requestBaggage中,然后调用服务端
    2. 服务端在HelloServiceImpl中获取请求透传数据并打印,并把响应数据放入到responseBaggage中
    3. 客户端收到透传数据

    所以下面我们从客户端开始源码讲解。

    客户端数据透传给服务端

    首先客户端在引用之前要设置putRequestBaggage,然后在客户端引用的时候会调用ClientProxyInvoker#invoke方法。

    如下:
    ClientProxyInvoker#invoke

    public SofaResponse invoke(SofaRequest request) throws SofaRpcException {
       	....
            // 包装请求
            decorateRequest(request);
           ....
    }
    
    

    通过调用decorateRequest会调用到子类DefaultClientProxyInvoker的decorateRequest方法。

    DefaultClientProxyInvoker#decorateRequest

    protected void decorateRequest(SofaRequest request) {
        ....
        RpcInvokeContext invokeCtx = RpcInvokeContext.peekContext();
        RpcInternalContext internalContext = RpcInternalContext.getContext();
        if (invokeCtx != null) {
           ....
            // 如果用户指定了透传数据
            if (RpcInvokeContext.isBaggageEnable()) {
                // 需要透传
                BaggageResolver.carryWithRequest(invokeCtx, request);
                internalContext.setAttachment(HIDDEN_KEY_INVOKE_CONTEXT, invokeCtx);
            }
        }
        ....
    } 
    

    在decorateRequest方法里首先会校验有没有开启透传数据,如果开启了,那么就调用BaggageResolver#carryWithRequest,把要透传的数据放入到request里面

    BaggageResolver#carryWithRequest

    public static void carryWithRequest(RpcInvokeContext context, SofaRequest request) {
        if (context != null) {
    		  //获取所有的透传数据
            Map<String, String> requestBaggage = context.getAllRequestBaggage();
            if (CommonUtils.isNotEmpty(requestBaggage)) { // 需要透传
                request.addRequestProp(RemotingConstants.RPC_REQUEST_BAGGAGE, requestBaggage);
            }
        }
    }
    
    

    这个方法里面要做的就是获取所有的透传数据,然后放置到RequestProp里面,这样在发送请求的时候就会传送到服务端。

    服务端接受透传数据

    服务端的调用流程如下:

    BoltServerProcessor->FilterChain->ProviderExceptionFilter->FilterInvoker->RpcServiceContextFilter->FilterInvoker->ProviderBaggageFilter->FilterInvoker->ProviderTracerFilter->ProviderInvoker 
    

    所以从上面的调用链可以知道,在服务端引用的时候会经过ProviderBaggageFilter过滤器,我们下面看看这个过滤器做了什么事情:

    ProviderBaggageFilter#invoke

    public SofaResponse invoke(FilterInvoker invoker, SofaRequest request) throws SofaRpcException {
        SofaResponse response = null;
        try {
            //从request中获取透传数据存入到requestBaggage中
            BaggageResolver.pickupFromRequest(RpcInvokeContext.peekContext(), request, true);
            response = invoker.invoke(request);
        } finally {
            if (response != null) {
                BaggageResolver.carryWithResponse(RpcInvokeContext.peekContext(), response);
            }
        }
        return response;
    }
    
    

    ProviderBaggageFilter会调用BaggageResolver#pickupFromRequest从request中获取数据

    BaggageResolver#pickupFromRequest

    public static void pickupFromRequest(RpcInvokeContext context, SofaRequest request, boolean init) {
        if (context == null && !init) {
            return;
        }
        // 解析请求 
        Map<String, String> requestBaggage = (Map<String, String>) request
            .getRequestProp(RemotingConstants.RPC_REQUEST_BAGGAGE);
        if (CommonUtils.isNotEmpty(requestBaggage)) {
            if (context == null) {
                context = RpcInvokeContext.getContext();
            }
            context.putAllRequestBaggage(requestBaggage);
        }
    }
    

    最后会在ProviderBaggageFilter invoke方法的finally里面调用BaggageResolver#carryWithResponse把响应透传数据回写到response里面。

    public static void carryWithResponse(RpcInvokeContext context, SofaResponse response) {
        if (context != null) {
            Map<String, String> responseBaggage = context.getAllResponseBaggage();
            if (CommonUtils.isNotEmpty(responseBaggage)) {
                String prefix = RemotingConstants.RPC_RESPONSE_BAGGAGE + ".";
                for (Map.Entry<String, String> entry : responseBaggage.entrySet()) {
                    response.addResponseProp(prefix + entry.getKey(), entry.getValue());
                }
            }
        }
    }
    

    客户端收到响应透传数据

    最后客户端会在ClientProxyInvoker#invoke方法里调用decorateResponse获取response回写的数据。

    public SofaResponse invoke(SofaRequest request) throws SofaRpcException {
    		....
         // 包装响应
         decorateResponse(response);
    		....
    }
    
    

    decorateResponse是在子类DefaultClientProxyInvoker实现的:

    DefaultClientProxyInvoker#decorateResponse

    protected void decorateResponse(SofaResponse response) {
       ....
        //如果开启了透传
        if (RpcInvokeContext.isBaggageEnable()) {
            BaggageResolver.pickupFromResponse(invokeCtx, response, true);
        }
       ....
    }
    
    

    这个方法里面会调用BaggageResolver#pickupFromResponse

    public static void pickupFromResponse(RpcInvokeContext context, SofaResponse response, boolean init) {
        if (context == null && !init) {
            return;
        }
        Map<String, String> responseBaggage = response.getResponseProps();
        if (CommonUtils.isNotEmpty(responseBaggage)) {
            String prefix = RemotingConstants.RPC_RESPONSE_BAGGAGE + ".";
            for (Map.Entry<String, String> entry : responseBaggage.entrySet()) {
                if (entry.getKey().startsWith(prefix)) {
                    if (context == null) {
                        context = RpcInvokeContext.getContext();
                    }
                    //因为entry的key里面会包含rpc_resp_baggage,所以需要截取掉
                    context.putResponseBaggage(entry.getKey().substring(prefix.length()),
                        entry.getValue());
                }
            }
        }
    }
    
    

    这个方法里面response获取所有的透传数据,然后放入到ResponseBaggage中。

    到这里SOFARPC数据透传就分析完毕了

  • 相关阅读:
    共享纸巾更换主板代码分析 共享纸巾主板更换后的对接代码
    Python Django Ajax 传递列表数据
    Python Django migrate 报错解决办法
    Python 创建字典的多种方式
    Python 两个list合并成一个字典
    Python 正则 re.sub替换
    python Django Ajax基础
    Python Django 获取表单数据的三种方式
    python Django html 模板循环条件
    Python Django ORM 字段类型、参数、外键操作
  • 原文地址:https://www.cnblogs.com/luozhiyun/p/11388450.html
Copyright © 2011-2022 走看看