zoukankan      html  css  js  c++  java
  • dubbo学习(一)线程池

    dubbo的consumer&provider的线程池有什么区别?

    dubbo的consumer线程池默认是cached,虽然ThreadPool的@SPI标注是fixed,但是在初始化client时,会覆盖consumer的url的threadlocal配置为cached。且该初始化的线程池为全局共享,用于异步调用处理future通知时使用。

    具体代码如下所示:

    AbstractClient:

        private void initExecutor(URL url) {
            url = ExecutorUtil.setThreadName(url, CLIENT_THREAD_POOL_NAME);
            url = url.addParameterIfAbsent(THREADPOOL_KEY, DEFAULT_CLIENT_THREADPOOL);
            executor = executorRepository.createExecutorIfAbsent(url);
        }
    

    AbstractInvoker:

        protected ExecutorService getCallbackExecutor(URL url, Invocation inv) {
            ExecutorService sharedExecutor = ExtensionLoader.getExtensionLoader(ExecutorRepository.class).getDefaultExtension().getExecutor(url);
            if (InvokeMode.SYNC == RpcUtils.getInvokeMode(getUrl(), inv)) {
                return new ThreadlessExecutor(sharedExecutor);
            } else {
                return sharedExecutor;
            }
        }

    dubbo的provider线程池默认是fixed,也是所有服务提供者全局共享的线程池。在层层包装的ChannelHandler中,是通过该线程池来发起具体的执行。

    具体代码如下所示:

    AbstractServer:

    public AbstractServer(URL url, ChannelHandler handler) throws RemotingException {
            super(url, handler);
            localAddress = getUrl().toInetSocketAddress();
    
            String bindIp = getUrl().getParameter(Constants.BIND_IP_KEY, getUrl().getHost());
            int bindPort = getUrl().getParameter(Constants.BIND_PORT_KEY, getUrl().getPort());
            if (url.getParameter(ANYHOST_KEY, false) || NetUtils.isInvalidLocalHost(bindIp)) {
                bindIp = ANYHOST_VALUE;
            }
            bindAddress = new InetSocketAddress(bindIp, bindPort);
            this.accepts = url.getParameter(ACCEPTS_KEY, DEFAULT_ACCEPTS);
            this.idleTimeout = url.getParameter(IDLE_TIMEOUT_KEY, DEFAULT_IDLE_TIMEOUT);
            try {
                doOpen();
                if (logger.isInfoEnabled()) {
                    logger.info("Start " + getClass().getSimpleName() + " bind " + getBindAddress() + ", export " + getLocalAddress());
                }
            } catch (Throwable t) {
                throw new RemotingException(url.toInetSocketAddress(), null, "Failed to bind " + getClass().getSimpleName()
                        + " on " + getLocalAddress() + ", cause: " + t.getMessage(), t);
            }
            executor = executorRepository.createExecutorIfAbsent(url);
        }

    AllChannelHandler:

        @Override
        public void received(Channel channel, Object message) throws RemotingException {
            ExecutorService executor = getPreferredExecutorService(message);
            try {
                executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
            } catch (Throwable t) {
                if(message instanceof Request && t instanceof RejectedExecutionException){
                    sendFeedback(channel, (Request) message, t);
                    return;
                }
                throw new ExecutionException(message, channel, getClass() + " error when process received event .", t);
            }
        }

    默认的线程数量是200。

  • 相关阅读:
    mysql in like GROUP_CONCAT
    StringBuilder的常用方法
    mysql 中unionall 使用
    mysql中,数据库字段为时间戳转时间的处理方法
    一个数组储存多个对象
    Java中的substring()用法
    java思想篇1
    任务调配管理
    字符窜数组去重及各种常规用法
    自定义属性的设值
  • 原文地址:https://www.cnblogs.com/asfeixue/p/13795104.html
Copyright © 2011-2022 走看看