dubbo的consumer&provider的线程池有什么区别?
dubbo的consumer线程池默认是cached,虽然ThreadPool的@SPI标注是fixed,但是在初始化client时,会覆盖consumer的url的threadlocal配置为cached。且该初始化的线程池为全局共享,用于异步调用处理future通知时使用。
具体代码如下所示:
AbstractClient:
private void initExecutor(URL url) { url = ExecutorUtil.setThreadName(url, CLIENT_THREAD_POOL_NAME); url = url.addParameterIfAbsent(THREADPOOL_KEY, DEFAULT_CLIENT_THREADPOOL); executor = executorRepository.createExecutorIfAbsent(url); }
AbstractInvoker:
protected ExecutorService getCallbackExecutor(URL url, Invocation inv) { ExecutorService sharedExecutor = ExtensionLoader.getExtensionLoader(ExecutorRepository.class).getDefaultExtension().getExecutor(url); if (InvokeMode.SYNC == RpcUtils.getInvokeMode(getUrl(), inv)) { return new ThreadlessExecutor(sharedExecutor); } else { return sharedExecutor; } }
dubbo的provider线程池默认是fixed,也是所有服务提供者全局共享的线程池。在层层包装的ChannelHandler中,是通过该线程池来发起具体的执行。
具体代码如下所示:
AbstractServer:
public AbstractServer(URL url, ChannelHandler handler) throws RemotingException { super(url, handler); localAddress = getUrl().toInetSocketAddress(); String bindIp = getUrl().getParameter(Constants.BIND_IP_KEY, getUrl().getHost()); int bindPort = getUrl().getParameter(Constants.BIND_PORT_KEY, getUrl().getPort()); if (url.getParameter(ANYHOST_KEY, false) || NetUtils.isInvalidLocalHost(bindIp)) { bindIp = ANYHOST_VALUE; } bindAddress = new InetSocketAddress(bindIp, bindPort); this.accepts = url.getParameter(ACCEPTS_KEY, DEFAULT_ACCEPTS); this.idleTimeout = url.getParameter(IDLE_TIMEOUT_KEY, DEFAULT_IDLE_TIMEOUT); try { doOpen(); if (logger.isInfoEnabled()) { logger.info("Start " + getClass().getSimpleName() + " bind " + getBindAddress() + ", export " + getLocalAddress()); } } catch (Throwable t) { throw new RemotingException(url.toInetSocketAddress(), null, "Failed to bind " + getClass().getSimpleName() + " on " + getLocalAddress() + ", cause: " + t.getMessage(), t); } executor = executorRepository.createExecutorIfAbsent(url); }
AllChannelHandler:
@Override public void received(Channel channel, Object message) throws RemotingException { ExecutorService executor = getPreferredExecutorService(message); try { executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message)); } catch (Throwable t) { if(message instanceof Request && t instanceof RejectedExecutionException){ sendFeedback(channel, (Request) message, t); return; } throw new ExecutionException(message, channel, getClass() + " error when process received event .", t); } }
默认的线程数量是200。