ActiveMQ 中的消息在内存中时,以链表形式保存,以 PendingList 表示,每一个消息是 PendingNode。
PendingList 主要有2种实现:OrderedPendingList 和 PrioritizedPendingList
OrderedPendingList 就是一个双向链表,多了一个保存消息的 map:
public class OrderedPendingList implements PendingList { //省略其他代码 private PendingNode root = null; private PendingNode tail = null; private final Map<MessageId, PendingNode> map = new HashMap<MessageId, PendingNode>(); }
而 PrioritizedPendingList,是维持了一个 OrderedPendingList 数组,消息根据优先级存放在对应的 OrderedPendingList 中。
public class PrioritizedPendingList implements PendingList { //省略其他代码 private static final Integer MAX_PRIORITY = 10; private final OrderedPendingList[] lists = new OrderedPendingList[MAX_PRIORITY]; private final Map<MessageId, PendingNode> map = new HashMap<MessageId, PendingNode>(); public PrioritizedPendingList() { for (int i = 0; i < MAX_PRIORITY; i++) { this.lists[i] = new OrderedPendingList(); } } }
message cursor 的作用是遍历消息。
常用的 message cursor 分为 VMPendingMessageCursor 和 FilePendingMessageCursor,VMPendingMessageCursor 主要是对 VMPendingMessageCursor 做一个封装:
public class VMPendingMessageCursor extends AbstractPendingMessageCursor { //省略其他代码 private final PendingList list; private Iterator<MessageReference> iter; public VMPendingMessageCursor(boolean prioritizedMessages) { super(prioritizedMessages); if (this.prioritizedMessages) { this.list= new PrioritizedPendingList(); }else { this.list = new OrderedPendingList(); } } }
而 FilePendingMessageCursor 就复杂的多,它会把消息刷到临时文件中:
// 省略其他代码 public class FilePendingMessageCursor extends AbstractPendingMessageCursor implements UsageListener { protected Broker broker; // PListStoreImpl 临时文件 private final PListStore store; // 内存中的消息 private PendingList memoryList; // PListImpl private PList diskList; }
持久化消息使用的 message cursor 是 QueueStorePrefetch:
// 省略其他代码 class QueueStorePrefetch extends AbstractStoreCursor { // 类型为:KahaDBTransactionStore$1 private final MessageStore store; private final Broker broker; }