使用dubbo时候要尽量了解源码,不然会很容易入坑。
一、服务消费端ReferenceConfig需要自行缓存
ReferenceConfig实例是个很重的实例,每个ReferenceConfig实例里面都维护了与服务注册中心的一个长链,并且维护了与所有服务提供者的的长链。假设有一个服务注册中心和N个服务提供者,那么每个ReferenceConfig实例里面维护了N+1个长链,如果频繁的生成ReferenceConfig实例,可能会造成性能问题,甚至产生内存或者连接泄露的风险。特别是使用dubbo api编程时候容易忽略这个问题。
为了解决这个问题,之前都是自行缓存,但是自从dubbo2.4.0版本后,dubbo 提供了简单的工具类 ReferenceConfigCache 用于缓存ReferenceConfig 实例。使用如下:
/创建服务消费实例 ReferenceConfig<XxxService> reference = new ReferenceConfig<XxxService>(); reference.setInterface(XxxService.class); reference.setVersion("1.0.0"); ...... //获取dubbo提供的缓存 ReferenceConfigCache cache = ReferenceConfigCache.getCache(); // cache.get方法中会缓存 reference对象,并且调用reference.get方法启动ReferenceConfig,并返回经过代理后的服务接口的对象 XxxService xxxService = cache.get(reference); // 使用xxxService对象 xxxService.sayHello();
需要注意的是 Cache内持有ReferenceConfig对象的引用,不要在外部再调用ReferenceConfig的destroy方法了,这会导致Cache内的ReferenceConfig失效!
如果要销毁 Cache 中的 ReferenceConfig ,将销毁 ReferenceConfig 并释放对应的资源,具体使用下面方法来销毁
ReferenceConfigCache cache = ReferenceConfigCache.getCache();
cache.destroy(reference);
另外以服务 Group、接口、版本为缓存的 Key,ReferenceConfig实例为对应的value。如果你需要使用自定义的key,可以在创建cache时候调用ReferenceConfigCache cache = ReferenceConfigCache.getCache(keyGenerator );方法传递自定义的keyGenerator。
二、 并发控制
2.1 服务消费方并发控制 在服务消费方法进行并发控制需要设置actives参数,如下:
<dubbo:reference id="userService" interface="com.test.UserServiceBo" group="dubbo" version="1.0.0" timeout="3000" actives="10"/>
设置com.test.UserServiceBo接口中所有方法,每个方法最多同时并发请求10个请求。
也可以使用下面方法设置接口中的单个方法的并发请求个数,如下:
<dubbo:reference id="userService" interface="com.test.UserServiceBo" group="dubbo" version="1.0.0" timeout="3000"> <dubbo:method name="sayHello" actives="10" /> </dubbo:reference>
如上设置sayHello方法的并发请求数量最大为10,如果客户端请求该方法并发超过了10则客户端会被阻塞,等客户端并发请求数量少于10的时候,该请求才会被发送到服务提供方服务器。在dubbo中客户端并发控制是使用ActiveLimitFilter过滤器来控制的,代码如下:
public class ActiveLimitFilter implements Filter { public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException { URL url = invoker.getUrl(); String methodName = invocation.getMethodName(); //获取设置的acvites的值,默认为0 int max = invoker.getUrl().getMethodParameter(methodName, Constants.ACTIVES_KEY, 0); //获取当前方法目前并发请求数量 RpcStatus count = RpcStatus.getStatus(invoker.getUrl(), invocation.getMethodName()); if (max > 0) {//说明设置了actives变量 long timeout = invoker.getUrl().getMethodParameter(invocation.getMethodName(), Constants.TIMEOUT_KEY, 0); long start = System.currentTimeMillis(); long remain = timeout; int active = count.getActive(); //如果该方法并发请求数量大于设置值,则挂起当前线程。 if (active >= max) { synchronized (count) { while ((active = count.getActive()) >= max) { try { count.wait(remain); } catch (InterruptedException e) { } //如果等待时间超时,则抛出异常 long elapsed = System.currentTimeMillis() - start; remain = timeout - elapsed; if (remain <= 0) { throw new RpcException("Waiting concurrent invoke timeout in client-side for service: " + invoker.getInterface().getName() + ", method: " + invocation.getMethodName() + ", elapsed: " + elapsed + ", timeout: " + timeout + ". concurrent invokes: " + active + ". max concurrent invoke limit: " + max); } } } } } //没有限流时候,正常调用 try { long begin = System.currentTimeMillis(); RpcStatus.beginCount(url, methodName); try { Result result = invoker.invoke(invocation); RpcStatus.endCount(url, methodName, System.currentTimeMillis() - begin, true); return result; } catch (RuntimeException t) { RpcStatus.endCount(url, methodName, System.currentTimeMillis() - begin, false); throw t; } } finally { if (max > 0) { synchronized (count) { count.notify(); } } } } }
可知客户端并发控制,是如果当并发量达到指定值后,当前客户端请求线程会被挂起,如果在等待超时期间并发请求量少了,那么阻塞的线程会被激活,然后发送请求到服务提供方,如果等待超时了,则直接抛出异常,这时候服务根本都没有发送到服务提供方服务器。
三、 改进的广播策略
前面我们讲解集群容错时候谈到有一个广播策略,该策略主要用于对所有服务提供者进行广播消息,那么有个问题需要思考,广播是是说你在客户端调用接口一次,内部就是轮询调用所有服务提供者的机器的服务,那么你调用一次该接口,返回值是什么那?比如内部轮询了10台机器,每个机器应该都有一个返回值,那么你调用的这一次返回值是10个返回值的组成?其实不是,返回的是轮询调用的最后一个机器结果,那么如果我们想把所有的机器返回的结果聚合起来如何做的?