1. 客户端怎样显式地使用事务?
producer 开启事务(代码片段):
ActiveMQSession session = (ActiveMQSession)connection.createSession(false, Session.AUTO_ACKNOWLEDGE); Destination destination = session.createQueue("TEST.FOO"); MessageProducer producer = session.createProducer(destination); producer.setDeliveryMode(DeliveryMode.PERSISTENT); // 开启事务 // 发送 TransactionInfo 消息 BEGIN session.getTransactionContext().begin(); for (int i = 0; i < 2; i++) { // Create a message String text = "zhang"; TextMessage message = session.createTextMessage(text); producer.send(message); } // session.getTransactionContext().rollback(); //提交事务 // 发送 TransactionInfo 消息 COMMIT_ONE_PHASE session.getTransactionContext().commit();
2. broker 处理事务的入口:
broker 处理事务的逻辑在 TransactionBroker 类中。
那么,具体在 Queue 中是怎样体现事务的呢?
ActiveMQ 客户端默认不会开启事务,而如果客户端显式地开启了事务,则 Queue 中可能会存在多个事务,一个事务中必然会有一个消息列表,当客户端提交事务时,Queue 接收事务对应的消息列表,而如果客户端回滚事务,则 Queue 会删除这些消息。
Queue 中的事务变量:
// 键是Transaction,值是对应的消息列表 final ConcurrentHashMap<Transaction, SendSync> sendSyncs = new ConcurrentHashMap<Transaction, SendSync>(); private final LinkedList<Transaction> orderIndexUpdates = new LinkedList<Transaction>();
Queue 内部类 SendSync 封装了消息和同步操作:
class SendSync extends Synchronization { class MessageContext { public Message message; public ConnectionContext context; public MessageContext(ConnectionContext context, Message message) { this.context = context; this.message = message; } } final Transaction transaction; // 这就是我要找的消息列表 List<MessageContext> additions = new ArrayList<MessageContext>(); public SendSync(Transaction transaction) { this.transaction = transaction; } public void add(ConnectionContext context, Message message) { additions.add(new MessageContext(context, message)); } @Override public void beforeCommit() throws Exception { synchronized (orderIndexUpdates) { orderIndexUpdates.addLast(transaction); } } @Override public void afterCommit() throws Exception { ArrayList<SendSync> syncs = new ArrayList<SendSync>(200); sendLock.lockInterruptibly(); try { synchronized (orderIndexUpdates) { Transaction next = orderIndexUpdates.peek(); while( next!=null && next.isCommitted() ) { syncs.add(sendSyncs.remove(orderIndexUpdates.removeFirst())); next = orderIndexUpdates.peek(); } } for (SendSync sync : syncs) { sync.processSend(); } } finally { sendLock.unlock(); } for (SendSync sync : syncs) { sync.processSent(); } } // called with sendLock private void processSend() throws Exception { for (Iterator<MessageContext> iterator = additions.iterator(); iterator.hasNext(); ) { MessageContext messageContext = iterator.next(); // It could take while before we receive the commit // op, by that time the message could have expired.. if (broker.isExpired(messageContext.message)) { broker.messageExpired(messageContext.context, messageContext.message, null); destinationStatistics.getExpired().increment(); iterator.remove(); continue; } sendMessage(messageContext.message); messageContext.message.decrementReferenceCount(); } } private void processSent() throws Exception { for (MessageContext messageContext : additions) { messageSent(messageContext.context, messageContext.message); } } @Override public void afterRollback() throws Exception { try { for (MessageContext messageContext : additions) { messageContext.message.decrementReferenceCount(); } } finally { sendSyncs.remove(transaction); } } }
3. 那么 XA 事务又是什么呢?ActiveMQ 实现了分布式事务,当系统中存在多数据源的情况下,也许会需要使用 XA ,为了方便,只提供一个单数据源的例子:
Xid xid = new MyXid(1, new byte[]{0x01}, new byte[]{0x02}); session.getTransactionContext().start(xid, XAResource.TMSUCCESS); // 操作mq session.getTransactionContext().end(xid, XAResource.TMSUCCESS); int prepare = session.getTransactionContext().prepare(xid); System.out.println("prepare:" + prepare); // 根据prepare结果决定是否提交 session.getTransactionContext().commit(xid, false);
这些操作步骤,和 MySQL的 XA 是一样的,也是 start,end,prepare,commit,实现的都是javax transaction 那一套接口。
public class MyXid implements Xid { private int formatId; private byte[] globalTid; private byte[] branchQ; public MyXid(int formatId, byte[] globalTid, byte[] branchQ) { this.formatId = formatId; this.globalTid = globalTid; this.branchQ = branchQ; } public byte[] getBranchQualifier() { return this.branchQ; } public int getFormatId() { return formatId; } public byte[] getGlobalTransactionId() { return this.globalTid; } }