zoukankan      html  css  js  c++  java
  • 简易RPC框架-上下文

    上下文

    记的学英语的时候,总是不记的某个词是什么意思,然后就看不下去了,只能问周围的同学或者老师或者去查词典,他们的建议是通过上下文去推测这个词大概的意思,反正我那会上学时没理解,所以英文一直比较差。

    现在英语水平也没提高多少,尽管有点领会。

    后来慢慢理解了一些,因为有些词有很多种意思,放在某个场景下可能是一个意思,放在另外一个场景下又是其它的意思,这里不举例子了,上文有一定的相似度。

    RPC客户端上下文

    客户端由于是一个独立的环境,所以可以认为它处于一个属于自己的上下文中,与其它的端隔离。在上下文中可以指定一些公共的参数来提供给接口使用,比如:

    RPC版本号
    客户端请求ID
    各类自定义的公共参数

    RPC服务端上下文

    服务端同理,也处于一个属于自己的上下文中,与其它端隔离。

    RPC上下文的作用

    基本线程级别的访问,让客户端或者服务端能够像访问本地变量一样访问RPC框架级别的变量。比如我们想将客户端的一个请求ID传递给服务端,这个请求ID作用于所有接口,比如RPC的调用链追踪,有两种方式:

    接口中增加请求ID参数

    这个方案显然是不能接受的,因为需要改的接口过多。

    接口不改的情况下,在RPC框架中提供一个上下文,其中包含请求ID

    这个方案显然成本最小,比如这种样调用:RpcContext.getRequestId();

    • 上图中的Context是指RPC框架级变量的一个本地副本,为了简化调用复杂度。
    • 上图中的Invocation是RPC框架级的变量,它与上面提到的Context相互配合,做到调用端与框架本身的解耦合

    实现

    定义上下文对象

    在RpcContext对象中增加一个map类型的参数对象,可以存放任意扩展的参数。

    public class RpcContext {
    
        private Map<String,Object> contextParameters;
    
        public Map<String, Object> getContextParameters() {
            return contextParameters;
        }
    
        public void setContextParameters(Map<String, Object> contextParameters) {
            this.contextParameters = contextParameters;
        }
    
        private static final ThreadLocal<RpcContext> rpcContextThreadLocal=new ThreadLocal<RpcContext>(){
            @Override
            protected RpcContext initialValue() {
                RpcContext context= new RpcContext();
                context.setContextParameters(Maps.newHashMap());
                return context;
            }
        };
    
        public static RpcContext getContext() {
            return rpcContextThreadLocal.get();
        }
    
        public static void removeContext() {
            rpcContextThreadLocal.remove();
        }
    
        private RpcContext() {
        }
    }

    RPC请求对象中增加上下文参数

    RpcRequest增加如下字段,用于服务端调用。

    private Map<String,Object> contextParameters;

    RpcInvocation接口中增加上下文参数

    在后续新增加的过滤器使用。

    private Map<String,Object> contextParameters;

    客户端代理

    RpcProxy在组装RpcRequest对象时,从RpcContext中获取最新的参数传递给RpcReuest,从而传递给服务端。

    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
    
            RpcRequest request = new RpcRequest();
            //...
    
            request.setContextParameters(RpcContext.getContext().getContextParameters());
    
            //...
        }

    客户端上下文过滤器

    主要作用就是从本线线程变量中获取参数传递给RpcInvocation。

    注解上的order属性文章后面详细介绍

    @ActiveFilter(group = {ConstantConfig.CONSUMER},order = -1000)
    public class ClientContextFilter implements RpcFilter {
    
        private final static Logger logger = LoggerFactory.getLogger(ClientContextFilter.class);
    
        @Override
        public Object invoke(RpcInvoker invoker, RpcInvocation invocation) {
            Map<String,Object> contextParameters=invocation.getContextParameters();
            if(null==contextParameters){
                contextParameters= Maps.newHashMap();
            }
            Map<String,Object> contextParametersFromRpcContext= RpcContext.getContext().getContextParameters();
            if(null!=contextParametersFromRpcContext) {
                contextParameters.putAll(contextParametersFromRpcContext);
            }
            Object rpcResponse=invoker.invoke(invocation);
            logger.info("ClientContextFilter.invoke end");
            return rpcResponse;
        }
    }

    服务端上下文过滤器

    服务端上下文过滤器与客户端的作用相反,是从RpcInvocation中获取参数传递给本地线程变量RpcContext,后面在执行服务端方法时就可以方便的通过RpcContext获取指定变量。

    @ActiveFilter(group = {ConstantConfig.PROVIDER},order = -1000)
    public class ServerContextFilter implements RpcFilter {
    
        private final static Logger logger = LoggerFactory.getLogger(ServerContextFilter.class);
    
        @Override
        public Object invoke(RpcInvoker invoker, RpcInvocation invocation) {
            Map<String,Object> contextParameters=invocation.getContextParameters();
            RpcContext.getContext().setContextParameters(contextParameters);
            Object rpcResponse=invoker.invoke(invocation);
            logger.info("ServerContextFilter.invoke end");
            return rpcResponse;
        }
    }

    过滤器排序

    因为我们的RpcContext是个本地线程变量,而且Rpc服务端是多线程处理业务,所以需要在请求结束后及时的清理掉相关本地线程变量信息。这就需要清理上下文的过滤动作在最后执行,否则有会出现服务端方法还没有执行就被清空了参数。创建一个工具类,专门用来处理获取客户端以及服务端过滤器。

    过滤器注解增加排序属性

    增加order字段,升级排列。

    @Target({ElementType.TYPE})
    @Retention(RetentionPolicy.RUNTIME)
    @Component
    public @interface ActiveFilter {
        String[] group() default {};
        String[] value() default {};
        int order() default 999999999;
    }

    过滤器工具类

    创建ActiveFilterUtil,包含下面两个函数。

    过滤器排序函数

    获取特定注解的类,然后根据注解上的排序属性升序排序。

    private static List<Object> getActiveFilter(){
        List<Object> rpcFilterList= Lists.newArrayList();
        Map<String, Object> rpcFilterMapObject = ApplicationContextUtils.getApplicationContext().getBeansWithAnnotation(ActiveFilter.class);
        if (null!=rpcFilterMapObject) {
            rpcFilterList = Lists.newArrayList(rpcFilterMapObject.values());
            Collections.sort(rpcFilterList, new Comparator<Object>() {
                @Override
                public int compare(Object o1, Object o2) {
                    ActiveFilter activeFilterO1 = o1.getClass().getAnnotation(ActiveFilter.class);
                    ActiveFilter activeFilterO2 = o2.getClass().getAnnotation(ActiveFilter.class);
                    return activeFilterO1.order() > activeFilterO2.order() ? 1 : -1;
                }
            });
        }
        return rpcFilterList;
    }

    获取RPC过滤器列表

    提供给客户端以及服务端的一个协助方法,便于客户端以及服务构建过滤器职责链。

    public static Map<String,RpcFilter> getFilterMap(boolean isServer){
        List<Object> rpcFilterList=getActiveFilter();
        Map<String,RpcFilter> filterMap=new HashMap<>();
        for (Object filterBean : rpcFilterList) {
            Class<?>[] interfaces = filterBean.getClass().getInterfaces();
            ActiveFilter activeFilter=filterBean.getClass().getAnnotation(ActiveFilter.class);
            String includeFilterGroupName=!isServer?ConstantConfig.CONSUMER:ConstantConfig.PROVIDER;
            if(null!=activeFilter.group()&& Arrays.stream(activeFilter.group()).filter(p->p.contains(includeFilterGroupName)).count()==0){
                continue;
            }
            for(Class<?> clazz:interfaces) {
                if(clazz.isAssignableFrom(RpcFilter.class)){
                    filterMap.put(filterBean.getClass().getName(),(RpcFilter) filterBean);
                }
            }
        }
        return filterMap;
    }

    客户端以及服务端初始化

    获取过滤器map的逻辑改为调用上面ActiveFilterUtil.getFilterMap方法。

    使用示例

    客户端设置参数

    设置一个RPC版本号的参数。

    @RequestMapping("/{productId}")
        public Product getById(@PathVariable final long productId) throws UnknownHostException {
            RpcContext.getContext().addContextParameter("rpc-version","1.0");
            //...
        }

    服务端获取参数

    简单的在服务端打印出RPC版本号。

     logger.info("get context parameter from server,rpc-version={}",String.valueOf(RpcContext.getContext().getContextParameter("rpc-version")));

    输出日志如下:

    本文源码

    https://github.com/jiangmin168168/jim-framework

    文中代码是依赖上述项目的,如果有不明白的可下载源码

  • 相关阅读:
    面向对象———类
    二维数组简单使用
    数组——算法
    第6章 约束满足问题CSP
    第5章 对抗搜索
    第4章 超越经典的搜索
    140303 命令行选项 ccf
    150904 高速公路 ccf
    170304 地铁修建 ccf
    vector容器
  • 原文地址:https://www.cnblogs.com/ASPNET2008/p/7749242.html
Copyright © 2011-2022 走看看