zoukankan      html  css  js  c++  java
  • ActiveMQ 处理不同类型的消息

    ActiveMQ 中的消息都继承自 org.apache.activemq.command.BaseCommand 类。

    broker 处理消息的调用栈如下:

    TransportConnection 类实现了 CommandVisitor 接口,描述了处理各种消息的逻辑。

    public class TransportConnection implements Connection, Task, CommandVisitor {
        @Override
        public Response service(Command command) {
            ...
            // command 即消息。以 ProducerInfo 为例
            response = command.visit(this);
            ...
        }
    
        @Override
        public Response processAddProducer(ProducerInfo info) throws Exception {
            SessionId sessionId = info.getProducerId().getParentId();
            ConnectionId connectionId = sessionId.getParentId();
            TransportConnectionState cs = lookupConnectionState(connectionId);
            if (cs == null) {
                throw new IllegalStateException("Cannot add a producer to a connection that had not been registered: "
                        + connectionId);
            }
            SessionState ss = cs.getSessionState(sessionId);
            if (ss == null) {
                throw new IllegalStateException("Cannot add a producer to a session that had not been registered: "
                        + sessionId);
            }
            // Avoid replaying dup commands
            if (!ss.getProducerIds().contains(info.getProducerId())) {
                ActiveMQDestination destination = info.getDestination();
                if (destination != null && !AdvisorySupport.isAdvisoryTopic(destination)) {
                    if (getProducerCount(connectionId) >= connector.getMaximumProducersAllowedPerConnection()){
                        throw new IllegalStateException("Can't add producer on connection " + connectionId + ": at maximum limit: " + connector.getMaximumProducersAllowedPerConnection());
                    }
                }
                broker.addProducer(cs.getContext(), info);
                try {
                    ss.addProducer(info);
                } catch (IllegalStateException e) {
                    broker.removeProducer(cs.getContext(), info);
                }
    
            }
            return null;
        }
    
    }
    
    // org.apache.activemq.command.ProducerInfo
    public Response visit(CommandVisitor visitor) throws Exception {
        return visitor.processAddProducer(this);
    }
  • 相关阅读:
    矩阵距离
    CF409D Big Data
    AT2685 i18n
    P3366 【模板】最小生成树
    P3367 【模板】并查集
    ISBN(洛谷P1055)
    关于数组
    0021---一元一次方程
    0020---求圆锥体积
    0019---求圆台的体积
  • 原文地址:https://www.cnblogs.com/allenwas3/p/8910576.html
Copyright © 2011-2022 走看看