zoukankan      html  css  js  c++  java
  • ActiveMQ(2)---ActiveMQ原理分析之消息发送

    持久化消息和非持久化消息的发送策略

    消息同步发送和异步发送

    ActiveMQ支持同步、异步两种发送模式将消息发送到broker上。
    同步发送过程中,发送者发送一条消息会阻塞直到broker反馈一个确认消息,表示消息已经被broker处理。这个机
    制提供了消息的安全性保障,但是由于是阻塞的操作,会影响到客户端消息发送的性能


    异步发送的过程中,发送者不需要等待broker提供反馈,所以性能相对较高。但是可能会出现消息丢失的情况。所
    以使用异步发送的前提是在某些情况下允许出现数据丢失的情况。


    默认情况下,非持久化消息是异步发送的,持久化消息并且是在非事务模式下是同步发送的。
    但是在开启事务的情况下,消息都是异步发送。由于异步发送的效率会比同步发送性能更高。所以在发送持久化消
    息的时候,尽量去开启事务会话。
    除了持久化消息和非持久化消息的同步和异步特性以外,我们还可以通过以下几种方式来设置异步发送

    1.ConnectionFactory connectionFactory=new ActiveMQConnectionFactory("tcp://192.168.11.153:61616?
    jms.useAsyncSend=true");
    2.((ActiveMQConnectionFactory) connectionFactory).setUseAsyncSend(true);
    3.((ActiveMQConnection)connection).setUseAsyncSend(true);
    

    消息的发送原理分析图解

    消息发送的流程图

    ProducerWindowSize的含义

    producer每发送一个消息,统计一下发送的字节数,当字节数达到ProducerWindowSize值时,需要等待broker的
    确认,才能继续发送。
    代码在:ActiveMQSession的1957行


    主要用来约束在异步发送时producer端允许积压的(尚未ACK)的消息的大小,且只对异步发送有意义。每次发送消
    息之后,都将会导致memoryUsage大小增加(+message.size),当broker返回producerAck时,memoryUsage尺
    寸减少(producerAck.size,此size表示先前发送消息的大小)。


    可以通过如下2种方式设置:
    Ø 在brokerUrl中设置: "tcp://localhost:61616?jms.producerWindowSize=1048576",这种设置将会对所有的
    producer生效。
    Ø 在destinationUri中设置: "test-queue?producer.windowSize=1048576",此参数只会对使用此Destination实例
    的producer失效,将会覆盖brokerUrl中的producerWindowSize值。
    注意:此值越大,意味着消耗Client端的内存就越大。

    消息发送的源码分析

    以producer.send为入口

    public void send(Destination destination, Message message, int deliveryMode, int priority, long timeToLive,
    			AsyncCallback onComplete) throws JMSException {
    		checkClosed(); // 检查session的状态,如果session以关闭则抛异常
    		if (destination == null) {
    			if (info.getDestination() == null) {
    				throw new UnsupportedOperationException("A destination must be specified.");
    			}
    			throw new InvalidDestinationException("Don't understand null destinations");
    		}
    		ActiveMQDestination dest;
    		if (destination.equals(info.getDestination())) {// 检查destination的类型,如果符合要求,就转变为
    			ActiveMQDestination dest = (ActiveMQDestination) destination;
    		} else if (info.getDestination() == null) {
    			dest = ActiveMQDestination.transform(destination);
    		} else {
    			throw new UnsupportedOperationException(
    					"This producer can only send messages to: " + this.info.getDestination().getPhysicalName());
    		}
    		if (dest == null) {
    			throw new JMSException("No destination specified");
    		}
    		if (transformer != null) {
    			Message transformedMessage = transformer.producerTransform(session, this, message);
    			if (transformedMessage != null) {
    				message = transformedMessage;
    			}
    		}
    		if (producerWindow != null) {// 如果发送窗口大小不为空,则判断发送窗口的大小决定是否阻塞
    			try {
    				producerWindow.waitForSpace();
    			} catch (InterruptedException e) {
    				throw new JMSException("Send aborted due to thread interrupt.");
    			}
    		}
    		// 发送消息到broker的topic
    		this.session.send(this, dest, message, deliveryMode, priority, timeToLive, producerWindow, sendTimeout,
    				onComplete);
    		stats.onMessage();
    	}
    

      

      ActiveMQSession的send方法

    protected void send(ActiveMQMessageProducer producer, ActiveMQDestination destination, Message
    message, int deliveryMode, int priority, long timeToLive,
    MemoryUsage producerWindow, int sendTimeout, AsyncCallback onComplete)
    throws JMSException {
    checkClosed();
    if (destination.isTemporary() && connection.isDeleted(destination)) {
    throw new InvalidDestinationException("Cannot publish to a deleted Destination: " +
    destination);
    }
    synchronized (sendMutex) { //互斥锁,如果一个session的多个producer发送消息到这里,会保证消息发送
    的有序性
    // tell the Broker we are about to start a new transaction
    doStartTransaction();//告诉broker开始一个新事务,只有事务型会话中才会开启
    TransactionId txid = transactionContext.getTransactionId();//从事务上下文中获取事务id
    long sequenceNumber = producer.getMessageSequence();
    //Set the "JMS" header fields on the original message, see 1.1 spec section 3.4.11
    message.setJMSDeliveryMode(deliveryMode); //在JMS协议头中设置是否持久化标识
    long expiration = 0L;//计算消息过期时间
    if (!producer.getDisableMessageTimestamp()) {
    long timeStamp = System.currentTimeMillis();
    message.setJMSTimestamp(timeStamp);
    if (timeToLive > 0) {
    expiration = timeToLive + timeStamp;
    }
    }
    message.setJMSExpiration(expiration);//设置消息过期时间
    message.setJMSPriority(priority);//设置消息的优先级
    message.setJMSRedelivered(false);//设置消息为非重发
    // transform to our own message format here
    //将不通的消息格式统一转化为ActiveMQMessage
    ActiveMQMessage msg = ActiveMQMessageTransformation.transformMessage(message,
    connection);
    msg.setDestination(destination);//设置目的地
    //生成并设置消息id
    msg.setMessageId(new MessageId(producer.getProducerInfo().getProducerId(),
    sequenceNumber));
    // Set the message id.
    if (msg != message) {//如果消息是经过转化的,则更新原来的消息id和目的地
    message.setJMSMessageID(msg.getMessageId().toString());
    // Make sure the JMS destination is set on the foreign messages too.
    message.setJMSDestination(destination);
    }
    //clear the brokerPath in case we are re-sending this message
    msg.setBrokerPath(null);
    msg.setTransactionId(txid);
    if (connection.isCopyMessageOnSend()) {
    msg = (ActiveMQMessage)msg.copy();
    }
    msg.setConnection(connection);
    msg.onSend();//把消息属性和消息体都设置为只读,防止被修改
    msg.setProducerId(msg.getMessageId().getProducerId());
    if (LOG.isTraceEnabled()) {
    LOG.trace(getSessionId() + " sending message: " + msg);
    }
    ActiveMQConnection. doAsyncSendPacket
    这个地方问题来了,this.transport是什么东西?在哪里实例化的?按照以前看源码的惯例来看,它肯定不是一个
    单纯的对象。
    按照以往我们看源码的经验来看,一定是在创建连接的过程中初始化的。所以我们定位到代码
    transport的实例化过程
    从connection=connectionFactory.createConnection();这行代码作为入口,一直跟踪到
    ActiveMQConnectionFactory. createActiveMQConnection这个方法中。代码如下
    //如果onComplete没有设置,且发送超时时间小于0,且消息不需要反馈,且连接器不是同步发送模式,且消息非持久
    化或者连接器是异步发送模式
    //或者存在事务id的情况下,走异步发送,否则走同步发送
    if (onComplete==null && sendTimeout <= 0 && !msg.isResponseRequired() &&
    !connection.isAlwaysSyncSend() && (!msg.isPersistent() || connection.isUseAsyncSend() || txid !=
    null)) {
    this.connection.asyncSendPacket(msg);
    if (producerWindow != null) {
    // Since we defer lots of the marshaling till we hit the
    // wire, this might not
    // provide and accurate size. We may change over to doing
    // more aggressive marshaling,
    // to get more accurate sizes.. this is more important once
    // users start using producer window
    // flow control.
    int size = msg.getSize(); //异步发送的情况下,需要设置producerWindow的大小
    producerWindow.increaseUsage(size);
    }
    } else {
    if (sendTimeout > 0 && onComplete==null) {
    this.connection.syncSendPacket(msg,sendTimeout); //带超时时间的同步发送
    }else {
    this.connection.syncSendPacket(msg, onComplete); //带回调的同步发送
    }
    }
    }
    }

    ActiveMQConnection. doAsyncSendPacket

    private void doAsyncSendPacket(Command command) throws JMSException {
    try {
    this.transport.oneway(command);
    } catch (IOException e) {
    throw JMSExceptionSupport.create(e);
    }
    }

    这个地方问题来了,this.transport是什么东西?在哪里实例化的?按照以前看源码的惯例来看,它肯定不是一个单纯的对象。

    按照以往我们看源码的经验来看,一定是在创建连接的过程中初始化的。所以我们定位到代码

    transport的实例化过程

    从connection=connectionFactory.createConnection();这行代码作为入口,一直跟踪到ActiveMQConnectionFactory. createActiveMQConnection这个方法中。代码如下

    protected ActiveMQConnection createActiveMQConnection(String userName, String password) throws
    JMSException {
    if (brokerURL == null) {
    throw new ConfigurationException("brokerURL not set.");
    }
    ActiveMQConnection connection = null;
    try {
    Transport transport = createTransport();
    connection = createActiveMQConnection(transport, factoryStats);
    connection.setUserName(userName);
    connection.setPassword(password);
    //省略后面的代码
    }

    createTransport

    调用ActiveMQConnectionFactory.createTransport方法,去创建一个transport对象。
    1. 构建一个URI
    2. 根据URL去创建一个连接TransportFactory.connect
    Ø 默认使用的是tcp的协议

      

    protected Transport createTransport() throws JMSException {
    		try {
    			URI connectBrokerUL = brokerURL;
    			String scheme = brokerURL.getScheme();
    			if (scheme == null) {
    				throw new IOException("Transport not scheme specified: [" + brokerURL + "]");
    			}
    			if (scheme.equals("auto")) {
    				connectBrokerUL = new URI(brokerURL.toString().replace("auto", "tcp"));
    			} else if (scheme.equals("auto+ssl")) {
    				connectBrokerUL = new URI(brokerURL.toString().replace("auto+ssl", "ssl"));
    			} else if (scheme.equals("auto+nio")) {
    				connectBrokerUL = new URI(brokerURL.toString().replace("auto+nio", "nio"));
    			} else if (scheme.equals("auto+nio+ssl")) {
    				connectBrokerUL = new URI(brokerURL.toString().replace("auto+nio+ssl", "nio+ssl"));
    			}
    			return TransportFactory.connect(connectBrokerUL);
    		} catch (Exception e) {
    			throw JMSExceptionSupport.create("Could not create Transport. Reason: " + e, e);
    		}
    	}
    

      

    TransportFactory. findTransportFactory

    1. 从TRANSPORT_FACTORYS这个Map集合中,根据scheme去获得一个Transp

    2. 如果Map集合中不存在,则通过TRANSPORT_FACTORY_FINDER去找一个并且

    Ø 这个地方又有点类似于我们之前所学过的SPI的思想吧?他会从METAINF/services/org/apache/activemq/transport/ 这个路径下,根据URI组装的sche实例化,所以根据tcp为key去对应的路径下可以找到TcpTransportFactory

    public static TransportFactory findTransportFactory(URI location) throws IOException {
    		String scheme = location.getScheme();
    		if (scheme == null) {
    			throw new IOException("Transport not scheme specified: [" + location + "]");
    		}
    		TransportFactory tf = TRANSPORT_FACTORYS.get(scheme);
    		if (tf == null) {
    			// Try to load if from a META-INF property.
    			try {
    				tf = (TransportFactory) TRANSPORT_FACTORY_FINDER.newInstance(scheme);
    				TRANSPORT_FACTORYS.put(scheme, tf);
    			} catch (Throwable e) {
    				throw IOExceptionSupport.create("Transport scheme NOT recognized: [" + scheme + "]", e);
    			}
    		}
    		return tf;
    	}
    

      

     调用TransportFactory.doConnect去构建一个连接

    public Transport doConnect(URI location) throws Exception {
    try {
    Map<String, String> options = new HashMap<String, String>
    (URISupport.parseParameters(location));
    if( !options.containsKey("wireFormat.host") ) {
    options.put("wireFormat.host", location.getHost());
    }
    WireFormat wf = createWireFormat(options);
    Transport transport = createTransport(location, wf); //创建一个Transport,创建一个socket连
    接 -> 终于找到真相了
    Transport rc = configure(transport, wf, options);//配置configure,这个里面是对Transport做
    链路包装
    //remove auto
    IntrospectionSupport.extractProperties(options, "auto.");
    if (!options.isEmpty()) {
    throw new IllegalArgumentException("Invalid connect parameters: " + options);
    }
    configure
    到目前为止,这个transport实际上就是一个调用链了,他的链结构为
    ResponseCorrelator(MutexTransport(WireFormatNegotiator(IactivityMonitor(TcpTransport()))
    每一层包装表示什么意思呢?
    ResponseCorrelator 用于实现异步请求。
    MutexTransport 实现写锁,表示同一时间只允许发送一个请求
    WireFormatNegotiator 实现了客户端连接broker的时候先发送数据解析相关的协议信息,比如解析版本号,是否
    使用缓存等
    InactivityMonitor 用于实现连接成功成功后的心跳检查机制,客户端每10s发送一次心跳信息。服务端每30s读取
    一次心跳信息。
    同步发送和异步发送的区别
    在ResponseCorrelator的request方法中,需要通过response.getResult去获得broker的反馈,否则会阻塞
    持久化消息和非持久化消息的存储原理
    return rc;
    } catch (URISyntaxException e) {
    throw IOExceptionSupport.create(e);
    }
    }
    

      configure

    public Transport configure(Transport transport, WireFormat wf, Map options) throws Exception {
    //组装一个复合的transport,这里会包装两层,一个是IactivityMonitor.另一个是WireFormatNegotiator
    transport = compositeConfigure(transport, wf, options);
    transport = new MutexTransport(transport); //再做一层包装,MutexTransport
    transport = new ResponseCorrelator(transport); //包装ResponseCorrelator
    return transport;
    }

    到目前为止,这个transport实际上就是一个调用链了,他的链结构为
    ResponseCorrelator(MutexTransport(WireFormatNegotiator(IactivityMonitor(TcpTransport()))
    每一层包装表示什么意思呢?
    ResponseCorrelator 用于实现异步请求。
    MutexTransport 实现写锁,表示同一时间只允许发送一个请求
    WireFormatNegotiator 实现了客户端连接broker的时候先发送数据解析相关的协议信息,比如解析版本号,是否使用缓存等

    InactivityMonitor 用于实现连接成功成功后的心跳检查机制,客户端每10s发送一次心跳信息。服务端每30s读取一次心跳信息

    同步发送和异步发送的区别

    public Object request(Object command, int timeout) throws IOException {
    FutureResponse response = asyncRequest(command, null);
    return response.getResult(timeout); // 从future方法阻塞等待返回
    }

    在ResponseCorrelator的request方法中,需要通过response.getResult去获得broker的反馈,否则会阻塞

     

  • 相关阅读:
    HDU
    HDU
    HDU
    HDU
    HDU
    HDU
    HDU
    HDU
    HDU
    HDU
  • 原文地址:https://www.cnblogs.com/flgb/p/10667368.html
Copyright © 2011-2022 走看看