生产者发送消息:producer ---------> broker
broker返回确认:broker ---------> producer
生产者发送同步消息,broker会返回Response;发送异步消息,broker不会返回确认;满足一定条件时,broker会返回ProducerAck:
final boolean sendProducerAck = !message.isResponseRequired() && producerInfo.getWindowSize() > 0 && !context.isInRecoveryMode();
broker 分发消息:broker ---------> consumer
消费者返回确认: consumer ---------> broker
如果消息被正常处理掉,consumer返回 STANDARD_ACK_TYPE 的 MessageAck,如果消息没有被正常处理,且超过了客户端重新投递次数,consumer则返回 POSION_ACK_TYPE 的 MessageAck。在收到 MessageAck 后,broker 才会删除消息。
通常我们使用 ActiveMQ,会这样创建 Session,设置为自动确认:
ActiveMQSession session = (ActiveMQSession) connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
假定存在队列 TEST.FOO,它有1个消费者consumer1,1 个生产者producer1,当producer1向队列发送1条消息,broker 把这条消息分发给consumer1,如果配置自动确认,consumer进程会自动发送确认,broker收到确认后会删除消息。
反之如果配置为CLIENT_ACKNOWLEDGE,则需要手动确认,即显式调用代码:
consumer.acknowledge();
如果consumer1收到消息后,并不调用acknowledge(),即不发送消息确认,broker 也一直会保存消息。
client 和 broker 之间所有消息都继承自 BaseCommand:例如 ActiveMQTextMessage,ConnectionInfo,KeepAliveInfo,BrokerInfo 等。