zoukankan      html  css  js  c++  java
  • 结合源码分析 bubble 使用注意事项

    使用dubbo时候要尽量了解源码,不然会很容易入坑。

    一、服务消费端ReferenceConfig需要自行缓存

    ReferenceConfig实例是个很重的实例,每个ReferenceConfig实例里面都维护了与服务注册中心的一个长链,并且维护了与所有服务提供者的的长链。假设有一个服务注册中心和N个服务提供者,那么每个ReferenceConfig实例里面维护了N+1个长链,如果频繁的生成ReferenceConfig实例,可能会造成性能问题,甚至产生内存或者连接泄露的风险。特别是使用dubbo api编程时候容易忽略这个问题。

    为了解决这个问题,之前都是自行缓存,但是自从dubbo2.4.0版本后,dubbo 提供了简单的工具类 ReferenceConfigCache 用于缓存ReferenceConfig 实例。使用如下:

    /创建服务消费实例
    ReferenceConfig<XxxService> reference = new ReferenceConfig<XxxService>();
    reference.setInterface(XxxService.class);
    reference.setVersion("1.0.0");
    ......
    //获取dubbo提供的缓存
    ReferenceConfigCache cache = ReferenceConfigCache.getCache();
    // cache.get方法中会缓存 reference对象,并且调用reference.get方法启动ReferenceConfig,并返回经过代理后的服务接口的对象
    XxxService xxxService = cache.get(reference);
    
    // 使用xxxService对象
    xxxService.sayHello();

    需要注意的是 Cache内持有ReferenceConfig对象的引用,不要在外部再调用ReferenceConfig的destroy方法了,这会导致Cache内的ReferenceConfig失效!

    如果要销毁 Cache 中的 ReferenceConfig ,将销毁 ReferenceConfig 并释放对应的资源,具体使用下面方法来销毁

    ReferenceConfigCache cache = ReferenceConfigCache.getCache();
    cache.destroy(reference);

    另外以服务 Group、接口、版本为缓存的 Key,ReferenceConfig实例为对应的value。如果你需要使用自定义的key,可以在创建cache时候调用ReferenceConfigCache cache = ReferenceConfigCache.getCache(keyGenerator );方法传递自定义的keyGenerator。

    二、 并发控制

    2.1 服务消费方并发控制 在服务消费方法进行并发控制需要设置actives参数,如下:

    <dubbo:reference id="userService" interface="com.test.UserServiceBo"
            group="dubbo" version="1.0.0" timeout="3000" actives="10"/>

    设置com.test.UserServiceBo接口中所有方法,每个方法最多同时并发请求10个请求。

    也可以使用下面方法设置接口中的单个方法的并发请求个数,如下:

     

    <dubbo:reference id="userService" interface="com.test.UserServiceBo"
            group="dubbo" version="1.0.0" timeout="3000">
                    <dubbo:method name="sayHello" actives="10" />
    </dubbo:reference>

    如上设置sayHello方法的并发请求数量最大为10,如果客户端请求该方法并发超过了10则客户端会被阻塞,等客户端并发请求数量少于10的时候,该请求才会被发送到服务提供方服务器。在dubbo中客户端并发控制是使用ActiveLimitFilter过滤器来控制的,代码如下:

    public class ActiveLimitFilter implements Filter {
    
        public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
            URL url = invoker.getUrl();
            String methodName = invocation.getMethodName();
            //获取设置的acvites的值,默认为0
            int max = invoker.getUrl().getMethodParameter(methodName, Constants.ACTIVES_KEY, 0);
            //获取当前方法目前并发请求数量
            RpcStatus count = RpcStatus.getStatus(invoker.getUrl(), invocation.getMethodName());
            if (max > 0) {//说明设置了actives变量
                long timeout = invoker.getUrl().getMethodParameter(invocation.getMethodName(), Constants.TIMEOUT_KEY, 0);
                long start = System.currentTimeMillis();
                long remain = timeout;
                int active = count.getActive();
                //如果该方法并发请求数量大于设置值,则挂起当前线程。
                if (active >= max) {
                    synchronized (count) {
                        while ((active = count.getActive()) >= max) {
                            try {
                                count.wait(remain);
                            } catch (InterruptedException e) {
                            }
                            //如果等待时间超时,则抛出异常
                            long elapsed = System.currentTimeMillis() - start;
                            remain = timeout - elapsed;
                            if (remain <= 0) {
                                throw new RpcException("Waiting concurrent invoke timeout in client-side for service:  "
                                        + invoker.getInterface().getName() + ", method: "
                                        + invocation.getMethodName() + ", elapsed: " + elapsed
                                        + ", timeout: " + timeout + ". concurrent invokes: " + active
                                        + ". max concurrent invoke limit: " + max);
                            }
                        }
                    }
                }
            }
            //没有限流时候,正常调用
            try {
                long begin = System.currentTimeMillis();
                RpcStatus.beginCount(url, methodName);
                try {
                    Result result = invoker.invoke(invocation);
                    RpcStatus.endCount(url, methodName, System.currentTimeMillis() - begin, true);
                    return result;
                } catch (RuntimeException t) {
                    RpcStatus.endCount(url, methodName, System.currentTimeMillis() - begin, false);
                    throw t;
                }
            } finally {
                if (max > 0) {
                    synchronized (count) {
                        count.notify();
                    }
                }
            }
        }
    
    }

    可知客户端并发控制,是如果当并发量达到指定值后,当前客户端请求线程会被挂起,如果在等待超时期间并发请求量少了,那么阻塞的线程会被激活,然后发送请求到服务提供方,如果等待超时了,则直接抛出异常,这时候服务根本都没有发送到服务提供方服务器。

    三、 改进的广播策略

    前面我们讲解集群容错时候谈到有一个广播策略,该策略主要用于对所有服务提供者进行广播消息,那么有个问题需要思考,广播是是说你在客户端调用接口一次,内部就是轮询调用所有服务提供者的机器的服务,那么你调用一次该接口,返回值是什么那?比如内部轮询了10台机器,每个机器应该都有一个返回值,那么你调用的这一次返回值是10个返回值的组成?其实不是,返回的是轮询调用的最后一个机器结果,那么如果我们想把所有的机器返回的结果聚合起来如何做的?

     

     

  • 相关阅读:
    EZ 2018 1 21 2018noip第五次膜你赛
    POJ 1068&&2632&&1573&&2993&&2996
    POJ 3278&&2049&&3083
    POJ 1328&&2109&&2586
    POJ 2965&&1753
    EZ 2018 01 14 2018noip第四次膜你赛
    LCA的一些算法
    Image Processing and Analysis_15_Image Registration: A Method for Registration of 3-D shapes——1992
    Image Processing and Analysis_15_Image Registration:Image matching as a diffusion process: An analogy with Maxwell's demons——1998
    Signal Processing and Pattern Recognition in Vision_15_RANSAC:Random Sample Consensus——1981
  • 原文地址:https://www.cnblogs.com/yizhiamumu/p/9019558.html
Copyright © 2011-2022 走看看