zoukankan      html  css  js  c++  java
  • ActiveMQ 的线程池

    ActiveMQ 的线程池实质上也是 ThreadPoolExecutor,不过它的任务模型有自己的特点,我们先看一个例子:

    public static void main(String[] args) throws InterruptedException {
        // TaskRunnerFactory 的作用是创建线程池
        TaskRunnerFactory factory = new TaskRunnerFactory();
        factory.init();
        // 创建 PooledTaskRunner
        TaskRunner taskRunner = factory.createTaskRunner(new Task() {
            // iterate 的返回值很重要,true表示继续,false表示停止
            public boolean iterate() {
                System.out.println("hello zhang");
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                return true;
            }
        }, "zhang");
        
        // 调用线程池的 execute(runnable)
        taskRunner.wakeup();
        
        LockSupport.park();
    }

    Task 接口真正处理业务逻辑。factory.createTaskRunner 的作用只是创建一个命名的 PooledTaskRunner。

    PooledTaskRunner 封装了线程池 executor 和任务 runnable,只有在调用 PooledTaskRunner.wakeup() 时,才会调用 executor.execute(runnable),即真正执行任务。

    以 Queue 类为例,它继承了 Task 接口,并且有自己的 taskRunner:

    // org.apache.activemq.broker.region.Queue
    public void initialize() throws Exception {
        // 省略其他代码
        // 创建queue的taskRunner
        this.taskRunner = taskFactory.createTaskRunner(this, "Queue:" + destination.getPhysicalName());
    }
    

     queue 的 iterate:

    //org.apache.activemq.broker.region.Queue
    @Override
    public boolean iterate() {
        MDC.put("activemq.destination", getName());
        boolean pageInMoreMessages = false;
        synchronized (iteratingMutex) {
    
            // If optimize dispatch is on or this is a slave this method could be called recursively
            // we set this state value to short-circuit wakeup in those cases to avoid that as it
            // could lead to errors.
            iterationRunning = true;
    
            // do early to allow dispatch of these waiting messages
            synchronized (messagesWaitingForSpace) {
                Iterator<Runnable> it = messagesWaitingForSpace.values().iterator();
                while (it.hasNext()) {
                    if (!memoryUsage.isFull()) {
                        Runnable op = it.next();
                        it.remove();
                        op.run();
                    } else {
                        registerCallbackForNotFullNotification();
                        break;
                    }
                }
            }
    
            if (firstConsumer) {
                firstConsumer = false;
                try {
                    if (consumersBeforeDispatchStarts > 0) {
                        int timeout = 1000; // wait one second by default if
                                            // consumer count isn't reached
                        if (timeBeforeDispatchStarts > 0) {
                            timeout = timeBeforeDispatchStarts;
                        }
                        if (consumersBeforeStartsLatch.await(timeout, TimeUnit.MILLISECONDS)) {
                            LOG.debug("{} consumers subscribed. Starting dispatch.", consumers.size());
                        } else {
                            LOG.debug("{} ms elapsed and {} consumers subscribed. Starting dispatch.", timeout, consumers.size());
                        }
                    }
                    if (timeBeforeDispatchStarts > 0 && consumersBeforeDispatchStarts <= 0) {
                        iteratingMutex.wait(timeBeforeDispatchStarts);
                        LOG.debug("{} ms elapsed. Starting dispatch.", timeBeforeDispatchStarts);
                    }
                } catch (Exception e) {
                    LOG.error(e.toString());
                }
            }
    
            messagesLock.readLock().lock();
            try{
                pageInMoreMessages |= !messages.isEmpty();
            } finally {
                messagesLock.readLock().unlock();
            }
    
            pagedInPendingDispatchLock.readLock().lock();
            try {
                pageInMoreMessages |= !pagedInPendingDispatch.isEmpty();
            } finally {
                pagedInPendingDispatchLock.readLock().unlock();
            }
    
            // Perhaps we should page always into the pagedInPendingDispatch
            // list if
            // !messages.isEmpty(), and then if
            // !pagedInPendingDispatch.isEmpty()
            // then we do a dispatch.
            boolean hasBrowsers = browserDispatches.size() > 0;
    
            if (pageInMoreMessages || hasBrowsers || !redeliveredWaitingDispatch.isEmpty()) {
                try {
              //分发消息
    pageInMessages(hasBrowsers); }
    catch (Throwable e) { LOG.error("Failed to page in more queue messages ", e); } } if (hasBrowsers) { ArrayList<MessageReference> alreadyDispatchedMessages = null; pagedInMessagesLock.readLock().lock(); try{ alreadyDispatchedMessages = new ArrayList<MessageReference>(pagedInMessages.values()); }finally { pagedInMessagesLock.readLock().unlock(); } Iterator<BrowserDispatch> browsers = browserDispatches.iterator(); while (browsers.hasNext()) { BrowserDispatch browserDispatch = browsers.next(); try { MessageEvaluationContext msgContext = new NonCachedMessageEvaluationContext(); msgContext.setDestination(destination); QueueBrowserSubscription browser = browserDispatch.getBrowser(); LOG.debug("dispatch to browser: {}, already dispatched/paged count: {}", browser, alreadyDispatchedMessages.size()); boolean added = false; for (MessageReference node : alreadyDispatchedMessages) { if (!((QueueMessageReference)node).isAcked() && !browser.isDuplicate(node.getMessageId()) && !browser.atMax()) { msgContext.setMessageReference(node); if (browser.matches(node, msgContext)) { browser.add(node); added = true; } } } // are we done browsing? no new messages paged if (!added || browser.atMax()) { browser.decrementQueueRef(); browserDispatches.remove(browserDispatch); } } catch (Exception e) { LOG.warn("exception on dispatch to browser: {}", browserDispatch.getBrowser(), e); } } } if (pendingWakeups.get() > 0) { pendingWakeups.decrementAndGet(); } MDC.remove("activemq.destination"); iterationRunning = false; return pendingWakeups.get() > 0; } }

     队列分发消息:

    protected void pageInMessages(boolean force) throws Exception {
        doDispatch(doPageInForDispatch(force, true));
    }
  • 相关阅读:
    cocos2dx CCSprite自动拉伸全屏
    linux 安装输入法
    linux jdk 配置
    Proguard.cfg 配置
    C++基本概念
    查看android keystore 别名
    view onTouch,onClick,onLongClick
    LiteDB V4.1.4版本 查询日期写法 C#
    解决Highcharts 5.0.7,IE8下bar类型图表无法显示的问题
    AspNetCore AmbiguousMatchException: The request matched multiple endpoints. Matches
  • 原文地址:https://www.cnblogs.com/allenwas3/p/8760442.html
Copyright © 2011-2022 走看看