zoukankan      html  css  js  c++  java
  • ActiveMQ broker解析

    在 ActiveMQ 中,broker 集中管理持久的、临时的 queue 和 topic。

    public class BrokerFilter implements Broker {
        // 省略其他代码
        protected final Broker next;
    }

    最终的 Broker 链是这样的:

    StatisticsBroker -> TransactionBroker -> CompositeDestinationBroker -> AdvisoryBroker -> ManagedRegionBroker

    创建Broker 链:

    // org.apache.activemq.broker.BrokerService
    protected Broker createBroker() throws Exception {
        // 创建 RegionBroker
        regionBroker = createRegionBroker();
        // 添加拦截器
        Broker broker = addInterceptors(regionBroker);
        // Add a filter that will stop access to the broker once stopped
        broker = new MutableBrokerFilter(broker) {
            Broker old;
    
            @Override
            public void stop() throws Exception {
                old = this.next.getAndSet(new ErrorBroker("Broker has been stopped: " + this) {
                    // Just ignore additional stop actions.
                    @Override
                    public void stop() throws Exception {
                    }
                });
                old.stop();
            }
    
            @Override
            public void start() throws Exception {
                if (forceStart && old != null) {
                    this.next.set(old);
                }
                getNext().start();
            }
        };
        return broker;
    }

    创建 RegionBroker:

    protected Broker createRegionBroker(DestinationInterceptor destinationInterceptor) throws IOException {
        RegionBroker regionBroker;
        if (isUseJmx()) {
            try {
                regionBroker = new ManagedRegionBroker(this, getManagementContext(), getBrokerObjectName(),
                    getTaskRunnerFactory(), getConsumerSystemUsage(), destinationFactory, destinationInterceptor,getScheduler(),getExecutor());
            } catch(MalformedObjectNameException me){
                LOG.warn("Cannot create ManagedRegionBroker due " + me.getMessage(), me);
                throw new IOException(me);
            }
        } else {
            regionBroker = new RegionBroker(this, getTaskRunnerFactory(), getConsumerSystemUsage(), destinationFactory,
                    destinationInterceptor,getScheduler(),getExecutor());
        }
        destinationFactory.setRegionBroker(regionBroker);
        regionBroker.setKeepDurableSubsActive(keepDurableSubsActive);
        regionBroker.setBrokerName(getBrokerName());
        regionBroker.getDestinationStatistics().setEnabled(enableStatistics);
        regionBroker.setAllowTempAutoCreationOnSend(isAllowTempAutoCreationOnSend());
        if (brokerId != null) {
            regionBroker.setBrokerId(brokerId);
        }
        return regionBroker;
    }

    为RegionBroker添加BrokerFilter:

    protected Broker addInterceptors(Broker broker) throws Exception {
        if (isSchedulerSupport()) {
            SchedulerBroker sb = new SchedulerBroker(this, broker, getJobSchedulerStore());
            if (isUseJmx()) {
                JobSchedulerViewMBean view = new JobSchedulerView(sb.getJobScheduler());
                try {
                    ObjectName objectName = BrokerMBeanSupport.createJobSchedulerServiceName(getBrokerObjectName());
                    AnnotatedMBean.registerMBean(getManagementContext(), view, objectName);
                    this.adminView.setJMSJobScheduler(objectName);
                } catch (Throwable e) {
                    throw IOExceptionSupport.create("JobScheduler could not be registered in JMX: "
                            + e.getMessage(), e);
                }
            }
            broker = sb;
        }
        if (isUseJmx()) {
            HealthViewMBean statusView = new HealthView((ManagedRegionBroker)getRegionBroker());
            try {
                ObjectName objectName = BrokerMBeanSupport.createHealthServiceName(getBrokerObjectName());
                AnnotatedMBean.registerMBean(getManagementContext(), statusView, objectName);
            } catch (Throwable e) {
                throw IOExceptionSupport.create("Status MBean could not be registered in JMX: "
                        + e.getMessage(), e);
            }
        }
        if (isAdvisorySupport()) {
            broker = new AdvisoryBroker(broker);
        }
        broker = new CompositeDestinationBroker(broker);
        broker = new TransactionBroker(broker, getPersistenceAdapter().createTransactionStore());
        if (isPopulateJMSXUserID()) {
            UserIDBroker userIDBroker = new UserIDBroker(broker);
            userIDBroker.setUseAuthenticatePrincipal(isUseAuthenticatedPrincipalForJMSXUserID());
            broker = userIDBroker;
        }
        if (isMonitorConnectionSplits()) {
            broker = new ConnectionSplitBroker(broker);
        }
        if (plugins != null) {
            for (int i = 0; i < plugins.length; i++) {
                BrokerPlugin plugin = plugins[i];
                broker = plugin.installPlugin(broker);
            }
        }
        return broker;
    }

     我们创建的topic和queue都记录在ManagedRegionBroker这个类中:

    public class ManagedRegionBroker extends RegionBroker {
        private static final Logger LOG = LoggerFactory.getLogger(ManagedRegionBroker.class);
        private final ManagementContext managementContext;
        private final ObjectName brokerObjectName;
        private final Map<ObjectName, DestinationView> topics 
            = new ConcurrentHashMap<ObjectName, DestinationView>();
        private final Map<ObjectName, DestinationView> queues 
            = new ConcurrentHashMap<ObjectName, DestinationView>();
        private final Map<ObjectName, DestinationView> temporaryQueues 
            = new ConcurrentHashMap<ObjectName, DestinationView>();
        private final Map<ObjectName, DestinationView> temporaryTopics 
            = new ConcurrentHashMap<ObjectName, DestinationView>();
        private final Map<ObjectName, SubscriptionView> queueSubscribers 
            = new ConcurrentHashMap<ObjectName, SubscriptionView>();
        private final Map<ObjectName, SubscriptionView> topicSubscribers 
            = new ConcurrentHashMap<ObjectName, SubscriptionView>();
        private final Map<ObjectName, SubscriptionView> durableTopicSubscribers 
            = new ConcurrentHashMap<ObjectName, SubscriptionView>();
        private final Map<ObjectName, SubscriptionView> inactiveDurableTopicSubscribers 
            = new ConcurrentHashMap<ObjectName, SubscriptionView>();
        private final Map<ObjectName, SubscriptionView> temporaryQueueSubscribers 
            = new ConcurrentHashMap<ObjectName, SubscriptionView>();
        private final Map<ObjectName, SubscriptionView> temporaryTopicSubscribers 
            = new ConcurrentHashMap<ObjectName, SubscriptionView>();
        private final Map<ObjectName, ProducerView> queueProducers 
            = new ConcurrentHashMap<ObjectName, ProducerView>();
        private final Map<ObjectName, ProducerView> topicProducers 
            = new ConcurrentHashMap<ObjectName, ProducerView>();
        private final Map<ObjectName, ProducerView> temporaryQueueProducers 
            = new ConcurrentHashMap<ObjectName, ProducerView>();
        private final Map<ObjectName, ProducerView> temporaryTopicProducers 
            = new ConcurrentHashMap<ObjectName, ProducerView>();
        private final Map<ObjectName, ProducerView> dynamicDestinationProducers 
            = new ConcurrentHashMap<ObjectName, ProducerView>();
        private final Map<SubscriptionKey, ObjectName> subscriptionKeys 
            = new ConcurrentHashMap<SubscriptionKey, ObjectName>();
        private final Map<Subscription, ObjectName> subscriptionMap 
            = new ConcurrentHashMap<Subscription, ObjectName>();
        private final Set<ObjectName> registeredMBeans 
            = new CopyOnWriteArraySet<ObjectName>();
        /* This is the first broker in the broker interceptor chain. */
        private Broker contextBroker;
    }
  • 相关阅读:
    mfc启动画面
    个人冲刺第十天
    个人冲刺第九天
    个人冲刺第八天
    个人冲刺第七天
    个人冲刺第六天
    新一周冲刺计划2
    新一周冲刺
    创意1
    团队绩效与目标
  • 原文地址:https://www.cnblogs.com/allenwas3/p/8745327.html
Copyright © 2011-2022 走看看