zoukankan      html  css  js  c++  java
  • consumer的DubboClientHandler线程池

    1. 创建线程池

    创建线程池的调用栈如下:

    SimpleDataStore把线程池存放在map中。

    public class NettyClient extends AbstractClient {
       public NettyClient(final URL url, final ChannelHandler handler) throws RemotingException{
            //首先调用父类的wrapChannelHandler方法,创建DubboClientHandler线程池
            super(url, wrapChannelHandler(url, handler));
       }
    }
    
    public abstract class AbstractClient extends AbstractEndpoint implements Client {
        protected static final String CLIENT_THREAD_POOL_NAME = "DubboClientHandler";
        //DubboClientHandler线程池   
        protected volatile ExecutorService executor; 
        public AbstractClient(URL url, ChannelHandler handler) throws RemotingException {
            //省略其他非相关代码
            //从SimpleDataStore中获取线程池
            executor = (ExecutorService) ExtensionLoader.getExtensionLoader(DataStore.class)
                .getDefaultExtension().get(Constants.CONSUMER_SIDE, Integer.toString(url.getPort()));
            ExtensionLoader.getExtensionLoader(DataStore.class)
                .getDefaultExtension().remove(Constants.CONSUMER_SIDE, Integer.toString(url.getPort()));
        }
        
        protected static ChannelHandler wrapChannelHandler(URL url, ChannelHandler handler){
            //给线程命名
            url = ExecutorUtil.setThreadName(url, CLIENT_THREAD_POOL_NAME);
            url = url.addParameterIfAbsent(Constants.THREADPOOL_KEY, Constants.DEFAULT_CLIENT_THREADPOOL);
            return ChannelHandlers.wrap(handler, url);
        }
    }
    
    public class WrappedChannelHandler implements ChannelHandlerDelegate {
        protected final ExecutorService executor;
        public WrappedChannelHandler(ChannelHandler handler, URL url) {
            this.handler = handler;
            this.url = url;
            //此处创建了线程池
            executor = (ExecutorService) ExtensionLoader.getExtensionLoader(ThreadPool.class)
               .getAdaptiveExtension().getExecutor(url); String componentKey
    = Constants.EXECUTOR_SERVICE_COMPONENT_KEY; if (Constants.CONSUMER_SIDE.equalsIgnoreCase(url.getParameter(Constants.SIDE_KEY))) { componentKey = Constants.CONSUMER_SIDE; } DataStore dataStore = ExtensionLoader.getExtensionLoader(DataStore.class).getDefaultExtension(); dataStore.put(componentKey, Integer.toString(url.getPort()), executor); } }

     executor = (ExecutorService) ExtensionLoader.getExtensionLoader(ThreadPool.class).getAdaptiveExtension().getExecutor(url);

    ThreadPool$Adpative代码如下:

    package com.alibaba.dubbo.common.threadpool;
    import com.alibaba.dubbo.common.extension.ExtensionLoader;
    public class ThreadPool$Adpative implements com.alibaba.dubbo.common.threadpool.ThreadPool {
        public java.util.concurrent.Executor getExecutor(
                com.alibaba.dubbo.common.URL arg0) {
            if (arg0 == null)
                throw new IllegalArgumentException("url == null");
            com.alibaba.dubbo.common.URL url = arg0;
            // url中的默认参数threadpool=cached
            String extName = url.getParameter("threadpool", "fixed");
            if (extName == null)
                throw new IllegalStateException(
                        "Fail to get extension(com.alibaba.dubbo.common.threadpool.ThreadPool) name from url("
                                + url.toString() + ") use keys([threadpool])");
            com.alibaba.dubbo.common.threadpool.ThreadPool extension = 
              (com.alibaba.dubbo.common.threadpool.ThreadPool) ExtensionLoader
                    .getExtensionLoader(com.alibaba.dubbo.common.threadpool.ThreadPool.class)
                    .getExtension(extName);
            return extension.getExecutor(arg0);
        }
    }

    CachedThreadPool类:

    /**
     * 此线程池可伸缩,线程空闲一分钟后回收,新请求重新创建线程
     * 
     */
    public class CachedThreadPool implements ThreadPool {
        public Executor getExecutor(URL url) {
            String name = url.getParameter(Constants.THREAD_NAME_KEY, Constants.DEFAULT_THREAD_NAME);
            int cores = url.getParameter(Constants.CORE_THREADS_KEY, Constants.DEFAULT_CORE_THREADS);
            int threads = url.getParameter(Constants.THREADS_KEY, Integer.MAX_VALUE);
            int queues = url.getParameter(Constants.QUEUES_KEY, Constants.DEFAULT_QUEUES);
            int alive = url.getParameter(Constants.ALIVE_KEY, Constants.DEFAULT_ALIVE);
            return new ThreadPoolExecutor(cores, threads, alive, TimeUnit.MILLISECONDS, 
                    queues == 0 ? new SynchronousQueue<Runnable>() : 
                        (queues < 0 ? new LinkedBlockingQueue<Runnable>() 
                                : new LinkedBlockingQueue<Runnable>(queues)),
                    new NamedThreadFactory(name, true), new AbortPolicyWithReport(name, url));
        }
    }

    2. 往线程池投送任务

    线程池创建之后,是谁在什么时候往里面扔任务呢?

    public class AllChannelHandler extends WrappedChannelHandler { 
        public void received(Channel channel, Object message) throws RemotingException {
            ExecutorService cexecutor = getExecutorService();
            try {
                //往线程池扔任务
                cexecutor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
            } catch (Throwable t) {
                throw new ExecutionException(message, channel, getClass() + " error when process received event .", t);
            }
        }
        private ExecutorService getExecutorService() {
            //executor在父类WrappedChannelHandler中
            ExecutorService cexecutor = executor;
            if (cexecutor == null || cexecutor.isShutdown()) { 
                cexecutor = SHARED_EXECUTOR;
            }
            return cexecutor;
        }
    }

    也就是说nio线程往DubboClientHandler线程池扔任务,DubboClientHandler线程再唤醒等待的consumer调用线程。

     3. 线程池执行任务

  • 相关阅读:
    poj 3616 Milking Time
    poj 3176 Cow Bowling
    poj 2229 Sumsets
    poj 2385 Apple Catching
    poj 3280 Cheapest Palindrome
    hdu 1530 Maximum Clique
    hdu 1102 Constructing Roads
    codeforces 592B The Monster and the Squirrel
    CDOJ 1221 Ancient Go
    hdu 1151 Air Raid(二分图最小路径覆盖)
  • 原文地址:https://www.cnblogs.com/allenwas3/p/8193705.html
Copyright © 2011-2022 走看看