zoukankan      html  css  js  c++  java
  • activeMq发送消息流程

    1,发送消息入口

      Message message = messageBean.getMessageCreator().createMessage(session);
      producer.send(message);

    2,调用ActiveMQMessageProducerSupport的send方法。

    该类实现了MessageProducer接口

     public void send(Message message) throws JMSException {
            this.send(this.getDestination(),
                      message,
                      this.defaultDeliveryMode,
                      this.defaultPriority,
                      this.defaultTimeToLive);
        }
        
        public void send(Destination destination, Message message, int deliveryMode, int priority, long timeToLive, AsyncCallback onComplete) throws JMSException {
            checkClosed();
            if (destination == null) {
                if (info.getDestination() == null) {
                    throw new UnsupportedOperationException("A destination must be specified.");
                }
                throw new InvalidDestinationException("Don't understand null destinations");
            }
    
            ActiveMQDestination dest;
            if (destination.equals(info.getDestination())) {
                dest = (ActiveMQDestination)destination;
            } else if (info.getDestination() == null) {
                dest = ActiveMQDestination.transform(destination);
            } else {
                throw new UnsupportedOperationException("This producer can only send messages to: " + this.info.getDestination().getPhysicalName());
            }
            if (dest == null) {
                throw new JMSException("No destination specified");
            }
    
            if (transformer != null) {
                Message transformedMessage = transformer.producerTransform(session, this, message);
                if (transformedMessage != null) {
                    message = transformedMessage;
                }
            }
    
            if (producerWindow != null) {
                try {
                    producerWindow.waitForSpace();
                } catch (InterruptedException e) {
                    throw new JMSException("Send aborted due to thread interrupt.");
                }
            }
    
            this.session.send(this, dest, message, deliveryMode, priority, timeToLive, producerWindow, sendTimeout, onComplete);
    
            stats.onMessage();
        }

    3调用ActiveMQSession的send接口进行消息发送

    protected void send(ActiveMQMessageProducer producer, ActiveMQDestination destination, Message message, int deliveryMode, int priority, long timeToLive,
                            MemoryUsage producerWindow, int sendTimeout, AsyncCallback onComplete) throws JMSException {
    
            checkClosed();
            if (destination.isTemporary() && connection.isDeleted(destination)) {
                throw new InvalidDestinationException("Cannot publish to a deleted Destination: " + destination);
            }
            synchronized (sendMutex) {
                // tell the Broker we are about to start a new transaction
                doStartTransaction();
                TransactionId txid = transactionContext.getTransactionId();
                long sequenceNumber = producer.getMessageSequence();
    
                //Set the "JMS" header fields on the original message, see 1.1 spec section 3.4.11
                message.setJMSDeliveryMode(deliveryMode);
                long expiration = 0L;
                if (!producer.getDisableMessageTimestamp()) {
                    long timeStamp = System.currentTimeMillis();
                    message.setJMSTimestamp(timeStamp);
                    if (timeToLive > 0) {
                        expiration = timeToLive + timeStamp;
                    }
                }
                message.setJMSExpiration(expiration);
                message.setJMSPriority(priority);
                message.setJMSRedelivered(false);
    
                // transform to our own message format here
                ActiveMQMessage msg = ActiveMQMessageTransformation.transformMessage(message, connection);
    
                // Set the message id.
                if (msg == message) {
                    msg.setMessageId(new MessageId(producer.getProducerInfo().getProducerId(), sequenceNumber));
                } else {
                    msg.setMessageId(new MessageId(producer.getProducerInfo().getProducerId(), sequenceNumber));
                    message.setJMSMessageID(msg.getMessageId().toString());
                }
                //clear the brokerPath in case we are re-sending this message
                msg.setBrokerPath(null);
                // destination format is provider specific so only set on transformed message
                msg.setJMSDestination(destination);
    
                msg.setTransactionId(txid);
                if (connection.isCopyMessageOnSend()) {
                    msg = (ActiveMQMessage)msg.copy();
                }
                msg.setConnection(connection);
                msg.onSend();
                msg.setProducerId(msg.getMessageId().getProducerId());
                if (LOG.isTraceEnabled()) {
                    LOG.trace(getSessionId() + " sending message: " + msg);
                }
                if (onComplete==null && sendTimeout <= 0 && !msg.isResponseRequired() && !connection.isAlwaysSyncSend() && (!msg.isPersistent() || connection.isUseAsyncSend() || txid != null)) {
                    this.connection.asyncSendPacket(msg);
                    if (producerWindow != null) {
                        // Since we defer lots of the marshaling till we hit the
                        // wire, this might not
                        // provide and accurate size. We may change over to doing
                        // more aggressive marshaling,
                        // to get more accurate sizes.. this is more important once
                        // users start using producer window
                        // flow control.
                        int size = msg.getSize();
                        producerWindow.increaseUsage(size);
                    }
                } else {
                    if (sendTimeout > 0 && onComplete==null) {
                        this.connection.syncSendPacket(msg,sendTimeout);
                    }else {
                        this.connection.syncSendPacket(msg, onComplete);
                    }
                }
    
            }
        }


    4,ActiveMQConnection发送消息

        public Response syncSendPacket(Command command) throws JMSException {
            if (isClosed()) {
                throw new ConnectionClosedException();
            } else {
    
                try {
                    Response response = (Response)this.transport.request(command);
                    if (response.isException()) {
                        ExceptionResponse er = (ExceptionResponse)response;
                        if (er.getException() instanceof JMSException) {
                            throw (JMSException)er.getException();
                        } else {
                            if (isClosed()||closing.get()) {
                                LOG.debug("Received an exception but connection is closing");
                            }
                            JMSException jmsEx = null;
                            try {
                                jmsEx = JMSExceptionSupport.create(er.getException());
                            } catch(Throwable e) {
                                LOG.error("Caught an exception trying to create a JMSException for " +er.getException(),e);
                            }
                            //dispose of transport for security exceptions
                            if (er.getException() instanceof SecurityException && command instanceof ConnectionInfo){
                                Transport t = this.transport;
                                if (null != t){
                                    ServiceSupport.dispose(t);
                                }
                            }
                            if (jmsEx !=null) {
                                throw jmsEx;
                            }
                        }
                    }
                    return response;
                } catch (IOException e) {
                    throw JMSExceptionSupport.create(e);
                }
            }
        }
        
        public Response syncSendPacket(Command command) throws JMSException {
            if (isClosed()) {
                throw new ConnectionClosedException();
            } else {
    
                try {
                    Response response = (Response)this.transport.request(command);
                    if (response.isException()) {
                        ExceptionResponse er = (ExceptionResponse)response;
                        if (er.getException() instanceof JMSException) {
                            throw (JMSException)er.getException();
                        } else {
                            if (isClosed()||closing.get()) {
                                LOG.debug("Received an exception but connection is closing");
                            }
                            JMSException jmsEx = null;
                            try {
                                jmsEx = JMSExceptionSupport.create(er.getException());
                            } catch(Throwable e) {
                                LOG.error("Caught an exception trying to create a JMSException for " +er.getException(),e);
                            }
                            //dispose of transport for security exceptions
                            if (er.getException() instanceof SecurityException && command instanceof ConnectionInfo){
                                Transport t = this.transport;
                                if (null != t){
                                    ServiceSupport.dispose(t);
                                }
                            }
                            if (jmsEx !=null) {
                                throw jmsEx;
                            }
                        }
                    }
                    return response;
                } catch (IOException e) {
                    throw JMSExceptionSupport.create(e);
                }
            }
        }


    5,ResponseCorrelator发送request

        public Object request(Object command) throws IOException {
            FutureResponse response = asyncRequest(command, null);
            return response.getResult();
        }
            public FutureResponse asyncRequest(Object o, ResponseCallback responseCallback) throws IOException {
            Command command = (Command) o;
            command.setCommandId(sequenceGenerator.getNextSequenceId());
            command.setResponseRequired(true);
            FutureResponse future = new FutureResponse(responseCallback);
            IOException priorError = null;
            synchronized (requestMap) {
                priorError = this.error;
                if (priorError == null) {
                    requestMap.put(new Integer(command.getCommandId()), future);
                }
            }
    
            if (priorError != null) {
                future.set(new ExceptionResponse(priorError));
                throw priorError;
            }
    
            next.oneway(command);
            return future;
        }


    6,调用MutexTransport来发送消息

        @Override
        public void oneway(Object command) throws IOException {
            writeLock.lock();
            try {
                next.oneway(command);
            } finally {
                writeLock.unlock();
            }
        }
        


    7,调用AbstractInactivityMonitor完毕消息发送准备

        public void oneway(Object o) throws IOException {
            // To prevent the inactivity monitor from sending a message while we
            // are performing a send we take a read lock.  The inactivity monitor
            // sends its Heart-beat commands under a write lock.  This means that
            // the MutexTransport is still responsible for synchronizing sends
            this.sendLock.readLock().lock();
            inSend.set(true);
            try {
                doOnewaySend(o);
            } finally {
                commandSent.set(true);
                inSend.set(false);
                this.sendLock.readLock().unlock();
            }
        }
        
            private void doOnewaySend(Object command) throws IOException {
            if( failed.get() ) {
                throw new InactivityIOException("Cannot send, channel has already failed: "+next.getRemoteAddress());
            }
            if (command.getClass() == WireFormatInfo.class) {
                synchronized (this) {
                    processOutboundWireFormatInfo((WireFormatInfo) command);
                }
            }
            next.oneway(command);
        }



    8。调用TcpTransport。OpenWireFormat,DataOutputStream终于完毕通过tcp发送消息

       public void oneway(Object command) throws IOException {
            checkStarted();
            wireFormat.marshal(command, dataOut);
            dataOut.flush();
        }
         public synchronized void marshal(Object o, DataOutput dataOut) throws IOException {
    
            if (cacheEnabled) {
                runMarshallCacheEvictionSweep();
            }
    
            int size = 1;
            if (o != null) {
    
                DataStructure c = (DataStructure)o;
                byte type = c.getDataStructureType();
                DataStreamMarshaller dsm = (DataStreamMarshaller)dataMarshallers[type & 0xFF];
                if (dsm == null) {
                    throw new IOException("Unknown data type: " + type);
                }
                if (tightEncodingEnabled) {
                    BooleanStream bs = new BooleanStream();
                    size += dsm.tightMarshal1(this, c, bs);
                    size += bs.marshalledSize();
    
                    if (!sizePrefixDisabled) {
                        dataOut.writeInt(size);
                    }
    
                    dataOut.writeByte(type);
                    bs.marshal(dataOut);
                    dsm.tightMarshal2(this, c, dataOut, bs);
    
                } else {
                    DataOutput looseOut = dataOut;
    
                    if (!sizePrefixDisabled) {
                        bytesOut.restart();
                        looseOut = bytesOut;
                    }
    
                    looseOut.writeByte(type);
                    dsm.looseMarshal(this, c, looseOut);
    
                    if (!sizePrefixDisabled) {
                        ByteSequence sequence = bytesOut.toByteSequence();
                        dataOut.writeInt(sequence.getLength());
                        dataOut.write(sequence.getData(), sequence.getOffset(), sequence.getLength());
                    }
    
                }
    
            } else {
                if (!sizePrefixDisabled) {
                	dataOut.writeInt(size);
                }
                dataOut.writeByte(NULL_TYPE);
            }
        }
            public final void writeInt(int v) throws IOException {
            out.write((v >>> 24) & 0xFF);
            out.write((v >>> 16) & 0xFF);
            out.write((v >>>  8) & 0xFF);
            out.write((v >>>  0) & 0xFF);
            incCount(4);
        }




    附图


    备注:MessageId是通过ProducerId+sequenceNumber来生成的,sequenceNumber通过原子变量的加1完毕



  • 相关阅读:
    selenium+python处理Alert弹窗
    HTML在网页上不能显示图片问题
    制作python程序windows安装包(飞机大战源码)
    python读取ini文件
    python杀死Windows后台程序
    python3中文乱码解决方法
    算法04
    Windows10
    Windows10 快捷键
    文件夹选项-安装功能-window服务
  • 原文地址:https://www.cnblogs.com/mthoutai/p/6774920.html
Copyright © 2011-2022 走看看