zoukankan      html  css  js  c++  java
  • dubbo学习(一)线程池

    dubbo的consumer&provider的线程池有什么区别?

    dubbo的consumer线程池默认是cached,虽然ThreadPool的@SPI标注是fixed,但是在初始化client时,会覆盖consumer的url的threadlocal配置为cached。且该初始化的线程池为全局共享,用于异步调用处理future通知时使用。

    具体代码如下所示:

    AbstractClient:

        private void initExecutor(URL url) {
            url = ExecutorUtil.setThreadName(url, CLIENT_THREAD_POOL_NAME);
            url = url.addParameterIfAbsent(THREADPOOL_KEY, DEFAULT_CLIENT_THREADPOOL);
            executor = executorRepository.createExecutorIfAbsent(url);
        }
    

    AbstractInvoker:

        protected ExecutorService getCallbackExecutor(URL url, Invocation inv) {
            ExecutorService sharedExecutor = ExtensionLoader.getExtensionLoader(ExecutorRepository.class).getDefaultExtension().getExecutor(url);
            if (InvokeMode.SYNC == RpcUtils.getInvokeMode(getUrl(), inv)) {
                return new ThreadlessExecutor(sharedExecutor);
            } else {
                return sharedExecutor;
            }
        }

    dubbo的provider线程池默认是fixed,也是所有服务提供者全局共享的线程池。在层层包装的ChannelHandler中,是通过该线程池来发起具体的执行。

    具体代码如下所示:

    AbstractServer:

    public AbstractServer(URL url, ChannelHandler handler) throws RemotingException {
            super(url, handler);
            localAddress = getUrl().toInetSocketAddress();
    
            String bindIp = getUrl().getParameter(Constants.BIND_IP_KEY, getUrl().getHost());
            int bindPort = getUrl().getParameter(Constants.BIND_PORT_KEY, getUrl().getPort());
            if (url.getParameter(ANYHOST_KEY, false) || NetUtils.isInvalidLocalHost(bindIp)) {
                bindIp = ANYHOST_VALUE;
            }
            bindAddress = new InetSocketAddress(bindIp, bindPort);
            this.accepts = url.getParameter(ACCEPTS_KEY, DEFAULT_ACCEPTS);
            this.idleTimeout = url.getParameter(IDLE_TIMEOUT_KEY, DEFAULT_IDLE_TIMEOUT);
            try {
                doOpen();
                if (logger.isInfoEnabled()) {
                    logger.info("Start " + getClass().getSimpleName() + " bind " + getBindAddress() + ", export " + getLocalAddress());
                }
            } catch (Throwable t) {
                throw new RemotingException(url.toInetSocketAddress(), null, "Failed to bind " + getClass().getSimpleName()
                        + " on " + getLocalAddress() + ", cause: " + t.getMessage(), t);
            }
            executor = executorRepository.createExecutorIfAbsent(url);
        }

    AllChannelHandler:

        @Override
        public void received(Channel channel, Object message) throws RemotingException {
            ExecutorService executor = getPreferredExecutorService(message);
            try {
                executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
            } catch (Throwable t) {
                if(message instanceof Request && t instanceof RejectedExecutionException){
                    sendFeedback(channel, (Request) message, t);
                    return;
                }
                throw new ExecutionException(message, channel, getClass() + " error when process received event .", t);
            }
        }

    默认的线程数量是200。

  • 相关阅读:
    Linux内核学习笔记(6)-- 进程优先级详解(prio、static_prio、normal_prio、rt_priority)
    Cuteftp连接虚拟机Centos7
    Linux内核学习笔记(5)-- 进程调度概述
    Linux内核学习笔记(4)-- wait、waitpid、wait3 和 wait4
    Linux内核学习笔记(3)-- 进程的创建和终结
    Linux内核学习笔记(2)-- 父进程和子进程及它们的访问方法
    Linux内核学习笔记(1)-- 进程管理概述
    汇编基础--标识符、标号、伪指令和指令
    JAVA学习笔记--正则表达式
    JAVA学习笔记--字符串概述
  • 原文地址:https://www.cnblogs.com/asfeixue/p/13795104.html
Copyright © 2011-2022 走看看