指定方法异步调用
前面我们讲解了通过设置ReferenceConfig的setAsync()方法来让整个接口里的所有方法变为异步调用,那么如何指定某些方法为异步调用呢?下面讲解下如何正确地设置默写方法为异步调用。
假如你只需要设置接口里的方法sayHello为异步调用,那么可以使用下面方式:
final List<MethodConfig> asyncMethodList = new ArrayList<MethodConfig>(); MethodConfig methodConfig = new MethodConfig(); methodConfig.setAsync(true); methodConfig.setName("sayHello"); asyncMethodList.add(methodConfig);
然后调用ReferenceConfig的setMethods(asyncMethodList)即可。另外如果异步调用的方法没有返回值,则可以再调用methodConfig.setReturn(false); ,以便减少Dubbo内部Future对象的创建和管理成本。
关闭启动检查
正常情况下,在服务消费端启动时会检查依赖的服务是否存在,如果不存在则会抛出 throw new IllegalStateException("Failed to check the status of the service" + interfaceName + ".No provider available for the service ")异常阻止Spring初始化完成,以便上线前能及早发现问题。
可以通过调用ReferenceConfig.setCheck(false)关闭检查,设置check为true有助于及时发现问题,那么什么时候需要设置false呢?
比如测试时,有些无关的服务启动不了,导致整个应用都启动不了,这时候你可以把那些无关服务的check设置为false。再比如出现了循环依赖,必须有一方先启动,比如你给服务使用方提供了一个SDK,SDK里面使用Dubbo API远程消费服务器提供方的服务,如果你在服务提供方的服务器里面也引入这个SDK,在服务提供方启动时候就会抛出 No Provider available for the service异常,具体原因是服务提供方启动时候会初始化SDK,而SDK里面初始化时候需要检查服务提供方是否存在,而服务提供方的服务还没提供出来。
另外需要注意的是check设置为false,总是会返回调用,当服务提供方恢复服务时,能自动连上。
如何设置均衡策略
由于Dubbo提供的一致性Hash负载均衡策略,可以允许你自定义虚拟节点个数和指定某些方法需要使用一致性Hash策略,下面具体讲下如何设置:
// 虚拟节点设置为512 Map<String,String> parameters = new HashMap<String,String>(); parameters.put("hash.nodes","512"); ReferenceConfig<T> reference = new ReferenceConfig<T>(); // 设置负载均衡为一致性Hash reference.setLoadbalance(consistenthash); // 设置参数 reference.setParameters(parameters);
如下代码设置接口的sayHello方法为一致性Hash负载均衡策略,设置saySomething方法为随机负载均衡策略:
ReferenceConfig reference = new ReferenceConfig(); final List<MethodConfig> methodList = new ArrayList<MethodConfig>(); // 设置sayHello方法为一致性Hash负载均衡策略 MethodConfig methodConfig = new MethodConfig(); methodConfig.setName("sayHello"); methodConfig.setLoadbalance("consistenthash"); // 虚拟节点设置为512 Map<String,String> parameters = new HashMap<String,String>(); parameters.put("hash.nodes","512"); methodConfig.setParameters(parameters); methodList.add(methodConfig); // 设置saySomething方法为随机负载均衡策略 methodConfig = new MethodConfig(); methodConfig.setName("saySomething"); methodConfig.setLoadbalance("random"); methodList.add(methodConfig); reference.setMethods(methodList);
另外,默认情况下一致性hash使用第一个参数值计算hash值,如果你需要自定义可以通过以下设置:
Map<String,String> parameters = new HashMap<String,String>(); parameters.put("hash.nodes","512"); parameters.put("hash.arguments","0,1"); // 使用第一个和第二个参数值计算hash值 methodConfig.setParameters(parameters);
注意"0,1"是一个字符串,里面使用英文","分隔。
服务消费端ReferenceConfig需要自行缓存
ReferenceConfig实例是个很重的实例,每个ReferenceConfig实例里都维护了与服务注册中心的一个长链,并且维护了与所有服务提供者的长链。假设有一个服务注册中心和N个服务提供者,那么每个ReferenceConfig实例里面维护了N+1个长链,如果频繁地生成ReferenceConfig实例,可能会造成性能问题,甚至产生内存或者连接泄露的风险。特别是使用Dubbo API编程时候容易忽略这个问题。
为了解决这个问题,之前都是自行缓存,但自从发布Dubbo 2.4.0版本后,Dubbo提供了简单的工具类ReferenceConfigCache用于缓存ReferenceConfig实例。使用如下:
// 创建服务消费实例 ReferenceConfig<XxxService> reference = new ReferenceConfig<XxxService>(); reference.setInterface(XxxService.class); reference.setVersion("1.0.0"); ...... // 获取Dubbo提供的缓存 ReferenceConfigCache cache = ReferenceConfigCache.getCache(); // cache.get方法中会缓存reference对象,并且调用reference.get方法启动ReferenceConfig,并返回经过代理后的服务接口的对象 XxxService xxxService = cache.get(reference); // 使用xxxService对象 xxxService.sayHello();
需要注意的是Cache内持有ReferenceConfig对象的引用,不要在外部再调用ReferenceConfig的destroy方法了,这会导致Cache内的ReferenceConfig失效!
如果要销毁Cache中的ReferenceConfig,将销毁ReferenceConfig并释放对应的资源,具体使用下面方法来销毁:
ReferenceConfigCache cache = ReferenceConfigCache.getCache(); cache.destroy(reference);
另外在Dubbo中唯一确定一个服务是通过 接口+分组+版本,所以默认情况下cache内是通过服务Group/接口/版本三个属性来标示一个ReferenceConfig实例的。即以服务Group/接口/版本为缓存的key,ReferenceConfig实例为对应的value。如果你需要使用自定义的key,可以在创建cache时候调用 ReferenceConfigCache cache = ReferenceConfigCache.getCache(keyGenerator); 方法传递自定义的keyGenerator。
并发控制
服务消费方并发控制
在服务消费方进行并发控制,需要设置actives参数,如下:
<dubbo:reference id="userService" interface="com.test.UserServiceBo" group="dubbo" version="1.0.0" timeout="3000" actives="10" />
设置com.test.UserServiceBo接口中的所有方法,每个方法最多同时并发请求10个请求。
也可以使用下面方法设置接口中单个方法的并发请求个数,如下:
<dubbo:reference id="userService" interface="com.test.UserServiceBo" group="dubbo" version="1.0.0" timeout="3000"> <dubbo:method name="sayHello" actives="10" /> </dubbo:reference>
如上设置sayHello方法的并发请求数量最大为10,如果客户端请求该方法并发超过了10则客户端会被阻塞,等客户端并发请求数量少于10的时候,该请求才会被发送到服务提供方服务器。在Dubbo中客户端并发控制使用ActiveLimitFilter过滤器来控制,代码如下:
public class ActiveLimitFilter implements Filter{ public Result invoke(Invoker<?> invoker , Invocation invocation) throws RpcException{ URL url = invoker.getUrl(); String methodName = invocation.getMethodName(); // 获取设置的active值,默认为0 int max = invoker.getUrl().getMethodParameter(methodName , Constants.ACTIVES_KEY , 0); RpcStatus count = RpcStatus.getStatus(invoker.getUrl() , invocation.getMethodName()); if(max > 0){ long timeout = invoker.getUrl().getMethodParameter(invocation.getMethodName() , Constants.TIMEOUT_KEY , 0); long start = System.currentTimeMillis(); long remain = timeout; int active = count.getActive(); // 如果该方法并发请求数量大于设置值,则挂起当前线程 if(active >= max){ sychronized(count){ while((active = count.getActive()) >= max){ try{ count.wait(remain); }catch(InterruptedException e){ } // 如果等待时间超时,则抛出异常 long elapsed = System.currentTimeMillis() - start; remain = timeout - elapsed; if(remain <= 0){ throw new RpcException("Waiting concurrent invoke timeout in client-side for service: " + invoker.getInterface().getName() + " , method: " + invocation.getMethodName() + ",elapsed: " + elapsed + ",timeout: " + timeout + ". concurrent invokes: " + active + ". max concurrent invoke limit:" + max); } } } } } // 没有限流的时候,正常调用 try{ long begin = System.currentTimeMillis(); RpcStatus.beginCount(url , methodName); try{ Result result = invoker.invoke(invocation); RpcStatus.endCount(url , methodName , System.currentTimeMillis() - begin , true); return result; }catch(RuntimeException t){ RpcStatus.endCount(url , methodName , System.currentTimeMillis() - begin , false); throw t; } }finally{ if(max > 0){ synchronized(count){ count.notify(); } } } } }
由上可知,在客户端并发控制中,如果当并发量达到指定值后,当前客户端请求线程会被挂起,如果在等待超时期间并发请求量少了,那么阻塞的线程会被激活,然后发送请求到服务提供方,如果等待超时了,则直接抛出异常,这时服务根本都没有发送到服务提供方服务器。
服务提供方并发控制
在服务提供方进行并发控制需要设置executes参数,如下:
<dubbo:service interface="com.test.UserServiceBo" ref="userService" group="dubbo" version="1.0.0" timeout="3000" executes="10" />
设置com.test.UserServiceBo 接口中所有方法,每个方法最多同时并发处理10个请求,这里并发是指同时在处理10个请求。
也可以使用下面方法设置接口中单个方法的并发处理个数,如下:
<dubbo:service interface="com.test.UserServiceBo" ref="userService" group="dubbo" version="1.0.0" timeout="3000"> <dubbo:method name="sayHello" executes="10" /> </dubbo:service>
如上设置sayHello方法的并发处理数量为10 。
需要注意的是,服务提供方设置并发数量后,如果同时请求数量大于了设置的executes的值,则会抛出异常,而不是像服务端设置actives时候,会等待。服务提供方并发控制是使用ExecuteLimitFilter过滤器实现的,ExecuteLimitFilter代码如下:
public class ExecuteLimitFilter implements Filter{ public Result invoke(Invoker<?> invoker , Invocation invocation) throws RpcException{ URL url = invoker.getUrl(); // 默认不设置executes时候,其值为0 int max = url.getMethodParameter(methodName , Constants.EXECUTES_KEY , 0); if(max > 0){ // max>0说明设置了executes值 RpcStatus count = RpcStatus.getStatus(url , invocation.getMethodName()); // 可知如果并发处理数量大于设置的值,会抛出异常 executesLimit = count.getSemaphore(max); if(executesLimit != null && !(acquireResult = executesLimit.tryAcquire())){ throw new RpcException("Failed to invoke method " + invocation.getMethodName() + " in provider " + url + ",cause: The service using threads greater than <dubbo:service execute="" + max + ""> limited."); } } ... try{ // 没有限流的时候,激活filter链 Result result = invoker.invoke(invocation); return result; }catch(){ ... }finally{ ... } } }
所以当使用executes参数时候要注意,当并发量过大时侯,多余的请求会失败。
改进的广播策略
前面我们讲解集群容错时谈到广播策略,该策略主要用于对所有服务提供者广播消息,那么有个问题需要思考,广播是说你在客户端调用接口一次,内部就是轮询调用所有服务提供者的机器的服务,那么你调用一次该接口,返回值是什么呢?比如内部轮询了10台机器,每个机器应该都有一个返回值,那么你调用的这一次返回值是10个返回值的组成?其实不是,返回的轮询调用的最后一个机器结果,我们可以看下BroadcastClusterInvoker的主干代码:
public class BroadcastClusterInvoker<T> extends AbstractClusterInvoker<T>{ private static final Logger logger = LoggerFactory.getLogger(BroadcastClusterInvoker.class); public BroadcastClusterInvoker(Directory<T> directory){ super(directory); } @SuppressWarnings({"unchecked","rawtypes"}) public Result doInvoke(final Invocation invocation , List<Invoker<T>> invokers , LoadBalance loadbalance){ ... // 使用循环,轮询每个机器进行调用,其中result为最后一个机器的结果 for(Invoker<T> invoker : invokers){ try{ result = invoker.invoke(invocation); }catch(RpcException e){ exception = e; logger.warn(e.getMessage() , e); }catch(Throwable e){ exception = new RpcException(e.getMessage(),e); logger.warn(e.getMessage() , e); } } if(exception != null){ throw exception; } return result; } }
如上代码,可知使用循环轮询调用每个机器,其中result为调用最后一个机器的结果。
如果我想获取所有服务提供者的结果,该怎么办呢?其实我们可以自定义一个SPI扩展,并且规定我们的服务接口的返回结果为一个map,代码如下:
public Result doInvokePara(final Invocation invocation , List<Invoker<T>> invokers , LoadBalance loadbalance) throws RpcException{ // 用来保存所有服务提供者返回的结果 Map allResult = new ConcurrentHashMap<String , Result>(); // 保存异步调用返回的Future对象 List<Future<Result>> futureList = new ArrayList<Future<Result>>(); // 所有服务提供者的机器个数 int machineNum = invokers.size(); for(Invoker<T> invoker : invokers){ try{ // 异步调用服务提供者 Future<Result> future = paramCallPool.submit(new Callable<Result>(){ @Override public Result call() throws Exception{ try{ // 具体调用服务提供者 Result result = invoker.invoke(invocation); // 服务提供者 ip:port String url = invoker.getUrl().getAddress(); // 保存结果到map,key为服务提供者的地址,value为返回结果 allResult.put(url , result.getResult()); return result; }catch(RpcException e){ logger.warn(e.getMessage(),e); }catch(Throwable e){ logger.warn(e.getMessage(),e); } return null; } }); futureList.add(future); }catch(Exception e){ logger.warn(e.getMessage() , e); } } // 等所有调用完成 for(Future<Result> future : futureList){ try{ future.get(); }catch(InterruptedException | ExecutionException e){ e. printStackTrace(); } } // 假设服务接口返回中类型为这个 ACCSResult<Map> resultTemp = new ActionResult<Map>(true,null,null,null); // 自定义返回结果 Map finalResult = new HashMap<String,Result>(); finalResult.put("machineNum",machineNum); finalResult.put("result",result); resultTemp.setData(finalResult); // 重新设置返回值 Result result = new RpcResult(resultTemp); return result; }