在 ActiveMQ 中,broker 集中管理持久的、临时的 queue 和 topic。
public class BrokerFilter implements Broker { // 省略其他代码 protected final Broker next; }
最终的 Broker 链是这样的:
StatisticsBroker -> TransactionBroker -> CompositeDestinationBroker -> AdvisoryBroker -> ManagedRegionBroker
创建Broker 链:
// org.apache.activemq.broker.BrokerService protected Broker createBroker() throws Exception { // 创建 RegionBroker regionBroker = createRegionBroker(); // 添加拦截器 Broker broker = addInterceptors(regionBroker); // Add a filter that will stop access to the broker once stopped broker = new MutableBrokerFilter(broker) { Broker old; @Override public void stop() throws Exception { old = this.next.getAndSet(new ErrorBroker("Broker has been stopped: " + this) { // Just ignore additional stop actions. @Override public void stop() throws Exception { } }); old.stop(); } @Override public void start() throws Exception { if (forceStart && old != null) { this.next.set(old); } getNext().start(); } }; return broker; }
创建 RegionBroker:
protected Broker createRegionBroker(DestinationInterceptor destinationInterceptor) throws IOException { RegionBroker regionBroker; if (isUseJmx()) { try { regionBroker = new ManagedRegionBroker(this, getManagementContext(), getBrokerObjectName(), getTaskRunnerFactory(), getConsumerSystemUsage(), destinationFactory, destinationInterceptor,getScheduler(),getExecutor()); } catch(MalformedObjectNameException me){ LOG.warn("Cannot create ManagedRegionBroker due " + me.getMessage(), me); throw new IOException(me); } } else { regionBroker = new RegionBroker(this, getTaskRunnerFactory(), getConsumerSystemUsage(), destinationFactory, destinationInterceptor,getScheduler(),getExecutor()); } destinationFactory.setRegionBroker(regionBroker); regionBroker.setKeepDurableSubsActive(keepDurableSubsActive); regionBroker.setBrokerName(getBrokerName()); regionBroker.getDestinationStatistics().setEnabled(enableStatistics); regionBroker.setAllowTempAutoCreationOnSend(isAllowTempAutoCreationOnSend()); if (brokerId != null) { regionBroker.setBrokerId(brokerId); } return regionBroker; }
为RegionBroker添加BrokerFilter:
protected Broker addInterceptors(Broker broker) throws Exception { if (isSchedulerSupport()) { SchedulerBroker sb = new SchedulerBroker(this, broker, getJobSchedulerStore()); if (isUseJmx()) { JobSchedulerViewMBean view = new JobSchedulerView(sb.getJobScheduler()); try { ObjectName objectName = BrokerMBeanSupport.createJobSchedulerServiceName(getBrokerObjectName()); AnnotatedMBean.registerMBean(getManagementContext(), view, objectName); this.adminView.setJMSJobScheduler(objectName); } catch (Throwable e) { throw IOExceptionSupport.create("JobScheduler could not be registered in JMX: " + e.getMessage(), e); } } broker = sb; } if (isUseJmx()) { HealthViewMBean statusView = new HealthView((ManagedRegionBroker)getRegionBroker()); try { ObjectName objectName = BrokerMBeanSupport.createHealthServiceName(getBrokerObjectName()); AnnotatedMBean.registerMBean(getManagementContext(), statusView, objectName); } catch (Throwable e) { throw IOExceptionSupport.create("Status MBean could not be registered in JMX: " + e.getMessage(), e); } } if (isAdvisorySupport()) { broker = new AdvisoryBroker(broker); } broker = new CompositeDestinationBroker(broker); broker = new TransactionBroker(broker, getPersistenceAdapter().createTransactionStore()); if (isPopulateJMSXUserID()) { UserIDBroker userIDBroker = new UserIDBroker(broker); userIDBroker.setUseAuthenticatePrincipal(isUseAuthenticatedPrincipalForJMSXUserID()); broker = userIDBroker; } if (isMonitorConnectionSplits()) { broker = new ConnectionSplitBroker(broker); } if (plugins != null) { for (int i = 0; i < plugins.length; i++) { BrokerPlugin plugin = plugins[i]; broker = plugin.installPlugin(broker); } } return broker; }
我们创建的topic和queue都记录在ManagedRegionBroker这个类中:
public class ManagedRegionBroker extends RegionBroker { private static final Logger LOG = LoggerFactory.getLogger(ManagedRegionBroker.class); private final ManagementContext managementContext; private final ObjectName brokerObjectName; private final Map<ObjectName, DestinationView> topics = new ConcurrentHashMap<ObjectName, DestinationView>(); private final Map<ObjectName, DestinationView> queues = new ConcurrentHashMap<ObjectName, DestinationView>(); private final Map<ObjectName, DestinationView> temporaryQueues = new ConcurrentHashMap<ObjectName, DestinationView>(); private final Map<ObjectName, DestinationView> temporaryTopics = new ConcurrentHashMap<ObjectName, DestinationView>(); private final Map<ObjectName, SubscriptionView> queueSubscribers = new ConcurrentHashMap<ObjectName, SubscriptionView>(); private final Map<ObjectName, SubscriptionView> topicSubscribers = new ConcurrentHashMap<ObjectName, SubscriptionView>(); private final Map<ObjectName, SubscriptionView> durableTopicSubscribers = new ConcurrentHashMap<ObjectName, SubscriptionView>(); private final Map<ObjectName, SubscriptionView> inactiveDurableTopicSubscribers = new ConcurrentHashMap<ObjectName, SubscriptionView>(); private final Map<ObjectName, SubscriptionView> temporaryQueueSubscribers = new ConcurrentHashMap<ObjectName, SubscriptionView>(); private final Map<ObjectName, SubscriptionView> temporaryTopicSubscribers = new ConcurrentHashMap<ObjectName, SubscriptionView>(); private final Map<ObjectName, ProducerView> queueProducers = new ConcurrentHashMap<ObjectName, ProducerView>(); private final Map<ObjectName, ProducerView> topicProducers = new ConcurrentHashMap<ObjectName, ProducerView>(); private final Map<ObjectName, ProducerView> temporaryQueueProducers = new ConcurrentHashMap<ObjectName, ProducerView>(); private final Map<ObjectName, ProducerView> temporaryTopicProducers = new ConcurrentHashMap<ObjectName, ProducerView>(); private final Map<ObjectName, ProducerView> dynamicDestinationProducers = new ConcurrentHashMap<ObjectName, ProducerView>(); private final Map<SubscriptionKey, ObjectName> subscriptionKeys = new ConcurrentHashMap<SubscriptionKey, ObjectName>(); private final Map<Subscription, ObjectName> subscriptionMap = new ConcurrentHashMap<Subscription, ObjectName>(); private final Set<ObjectName> registeredMBeans = new CopyOnWriteArraySet<ObjectName>(); /* This is the first broker in the broker interceptor chain. */ private Broker contextBroker; }