zoukankan      html  css  js  c++  java
  • ActiveMQ producer同步/异步发送消息

    http://activemq.apache.org/async-sends.html

    producer发送消息有同步和异步两种模式,可以通过代码配置:

    ((ActiveMQConnection)connection).setUseAsyncSend(true);

    producer默认是异步发送消息。在没有开启事务的情况下,producer发送持久化消息是同步的,调用send会阻塞直到broker把消息保存到磁盘并返回确认。

    消息设置为持久:

    MessageProducer producer = session.createProducer(destination);
    producer.setDeliveryMode(DeliveryMode.PERSISTENT);

    消息设置为非持久:

    MessageProducer producer = session.createProducer(destination);
    producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);

    producer发送消息的调用栈如下:

    // ActiveMQSession
    protected void send(ActiveMQMessageProducer producer, ActiveMQDestination destination, 
                        Message message, int deliveryMode, int priority, long timeToLive,
                        MemoryUsage producerWindow, int sendTimeout, AsyncCallback onComplete) throws JMSException {
        // 省略其他代码
        // 消息的持久类型和和连接模式是或的:所以只要connection配置为异步,就走异步发送
        if (onComplete==null && sendTimeout <= 0 && !msg.isResponseRequired() && !connection.isAlwaysSyncSend() 
                && (!msg.isPersistent() || connection.isUseAsyncSend() || txid != null)) {
            this.connection.asyncSendPacket(msg);
            if (producerWindow != null) {
                int size = msg.getSize();
                producerWindow.increaseUsage(size);
            }
        } else { // 同步发送
            if (sendTimeout > 0 && onComplete==null) {
                this.connection.syncSendPacket(msg,sendTimeout);
            }else {
                this.connection.syncSendPacket(msg, onComplete);
            }
        }
    }

    producer发送同步消息的调用栈:

    // org.apache.activemq.transport.ResponseCorrelator
    public Object request(Object command) throws IOException {
        FutureResponse response = asyncRequest(command, null);
        return response.getResult();
    }
    
    public FutureResponse asyncRequest(Object o, ResponseCallback responseCallback) throws IOException {
        Command command = (Command) o;
        command.setCommandId(sequenceGenerator.getNextSequenceId());
        // 需要回复
        command.setResponseRequired(true);
        FutureResponse future = new FutureResponse(responseCallback);
        IOException priorError = null;
        synchronized (requestMap) {
            priorError = this.error;
            if (priorError == null) {
                requestMap.put(new Integer(command.getCommandId()), future);
            }
        }
    
        if (priorError != null) {
            future.set(new ExceptionResponse(priorError));
            throw priorError;
        }
    
        next.oneway(command);
        return future;
    }

    producer发送异步消息的调用栈:

    //org.apache.activemq.transport.ResponseCorrelator
    public void oneway(Object o) throws IOException {
        Command command = (Command)o;
        command.setCommandId(sequenceGenerator.getNextSequenceId());
        // 不需要回复
        command.setResponseRequired(false);
        next.oneway(command);
    }

    在不考虑事务的情况下:

    producer发送持久化消息是同步发送,发送是阻塞的,直到收到确认。同步发送肯定是有流量控制的。

    producer默认是异步发送,异步发送不会等待broker的确认, 所以就需要考虑流量控制了:

    ActiveMQConnectionFactory.setProducerWindowSize(int producerWindowSize)

    ProducerWindowSize的含义:producer每发送一个消息,统计一下发送的字节数,当字节数达到ProducerWindowSize值时,需要等待broker的确认,才能继续发送。

  • 相关阅读:
    [硬件]_ELVE_VS2015下opencv3.3的配置问题
    [Linux]_ELVE_ssh登录远程阿里服务器
    [python]_ELVE_pip2和pip3如何共存
    U盘无法打开提示格式化?如何进行恢复
    [转]pycharm的一些快捷键
    文件上传Django
    ansible编译安装--操作系统环境Redhat6.4
    django的models字段介绍
    paramiko模块
    mysql安装等操作
  • 原文地址:https://www.cnblogs.com/allenwas3/p/8600638.html
Copyright © 2011-2022 走看看