正常情况下:consumer 消费完消息后,会发送"标准确认"给 broker,这个确认对象以 MessageAck 类表征:
// 省略其他代码。类中定义了各种确认的类型 public class MessageAck extends BaseCommand { public static final byte DATA_STRUCTURE_TYPE = CommandTypes.MESSAGE_ACK; public static final byte DELIVERED_ACK_TYPE = 0; public static final byte STANDARD_ACK_TYPE = 2; public static final byte POSION_ACK_TYPE = 1; public static final byte REDELIVERED_ACK_TYPE = 3; public static final byte INDIVIDUAL_ACK_TYPE = 4; public static final byte UNMATCHED_ACK_TYPE = 5; public static final byte EXPIRED_ACK_TYPE = 6; protected byte ackType; // messageCount 表示确认的消息的数量,即 consumer 可以对消息进行批量确认 public MessageAck(Message message, byte ackType, int messageCount) { this.ackType = ackType; this.destination = message.getDestination(); this.lastMessageId = message.getMessageId(); this.messageCount = messageCount; } }
但是当 consumer 处理消息失败时,会怎样呢?例如:发生了除数为 0,抛出异常
consumer.setMessageListener(new MessageListener() { public void onMessage(Message message) { logger.info("Received: " + message); float f = 1 / 0; } });
consumer 会进行重新投递,重新把消息给 listener 处理。具体流程是:consumer 消费消息失败,抛出异常,回滚,然后重新投递。
// void org.apache.activemq.ActiveMQMessageConsumer.rollback() throws JMSException if (messageListener.get() != null) { session.redispatch(this, unconsumedMessages); }
下面的代码设置 RedeliveryPolicy:
RedeliveryPolicy queuePolicy = new RedeliveryPolicy(); queuePolicy.setInitialRedeliveryDelay(0); queuePolicy.setRedeliveryDelay(1000); queuePolicy.setUseExponentialBackOff(false); queuePolicy.setMaximumRedeliveries(1); RedeliveryPolicyMap map = connection.getRedeliveryPolicyMap(); map.put(new ActiveMQQueue(">"), queuePolicy);
这个重新投递策略是 consumer 端的(consumer 重新投递给自己的消息 listener),而不是 broker 重新投递给 consumer 的,理解这一点特别重要。超过了最大投递次数后,consumer 会发送给 broker 一个 POSION_ACK_TYPE 类型的 MessageAck 响应,正常情况是 STANDARD_ACK_TYPE 类型的。
consumer发送正常消息确认的调用栈:
主要逻辑在:
// void org.apache.activemq.ActiveMQMessageConsumer private void afterMessageIsConsumed(MessageDispatch md, boolean messageExpired) throws JMSException { if (unconsumedMessages.isClosed()) { return; } if (messageExpired) { // 过期消息 acknowledge(md, MessageAck.EXPIRED_ACK_TYPE); stats.getExpiredMessageCount().increment(); } else { stats.onMessage(); if (session.getTransacted()) { // Do nothing. } else if (isAutoAcknowledgeEach()) { // 设置为 Session.AUTO_ACKNOWLEDGE if (deliveryingAcknowledgements.compareAndSet(false, true)) { synchronized (deliveredMessages) { if (!deliveredMessages.isEmpty()) { if (optimizeAcknowledge) { ackCounter++; // AMQ-3956 evaluate both expired and normal msgs as // otherwise consumer may get stalled if (ackCounter + deliveredCounter >= (info.getPrefetchSize() * .65) || (optimizeAcknowledgeTimeOut > 0 && System.currentTimeMillis() >= (optimizeAckTimestamp + optimizeAcknowledgeTimeOut))) { MessageAck ack = makeAckForAllDeliveredMessages(MessageAck.STANDARD_ACK_TYPE); if (ack != null) { deliveredMessages.clear(); ackCounter = 0; session.sendAck(ack); optimizeAckTimestamp = System.currentTimeMillis(); } // AMQ-3956 - as further optimization send // ack for expired msgs when there are any. // This resets the deliveredCounter to 0 so that // we won't sent standard acks with every msg just // because the deliveredCounter just below // 0.5 * prefetch as used in ackLater() if (pendingAck != null && deliveredCounter > 0) { session.sendAck(pendingAck); pendingAck = null; deliveredCounter = 0; } } } else { MessageAck ack = makeAckForAllDeliveredMessages(MessageAck.STANDARD_ACK_TYPE); if (ack!=null) { deliveredMessages.clear(); session.sendAck(ack); } } } } deliveryingAcknowledgements.set(false); } } else if (isAutoAcknowledgeBatch()) { ackLater(md, MessageAck.STANDARD_ACK_TYPE); } else if (session.isClientAcknowledge()||session.isIndividualAcknowledge()) { boolean messageUnackedByConsumer = false; synchronized (deliveredMessages) { messageUnackedByConsumer = deliveredMessages.contains(md); } if (messageUnackedByConsumer) { ackLater(md, MessageAck.DELIVERED_ACK_TYPE); } } else { throw new IllegalStateException("Invalid session state."); } } }
consumer 发送有毒消息确认的调用栈:
broker 接收消息确认的调用栈:
那么,在什么情况下,broker 会重新发送消息给 cosumer 呢?答案是:
broker 把一个消息推送给 consumer 后,但是还没收到任何确认,如果这时消费者断开连接,broker 会把这个消息加入到重新投递队列中,推送给其他的消费者。