zoukankan      html  css  js  c++  java
  • ActiveMQ 事务和XA

    1. 客户端怎样显式地使用事务?

    producer 开启事务(代码片段):

    ActiveMQSession session = (ActiveMQSession)connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
    Destination destination = session.createQueue("TEST.FOO");
    MessageProducer producer = session.createProducer(destination);
    producer.setDeliveryMode(DeliveryMode.PERSISTENT);
    
    // 开启事务 
    // 发送 TransactionInfo 消息 BEGIN
    session.getTransactionContext().begin();
    
    for (int i = 0; i < 2; i++) {
        // Create a message
        String text = "zhang";
        TextMessage message = session.createTextMessage(text);
        producer.send(message);
    }
    // session.getTransactionContext().rollback();
    //提交事务
    // 发送 TransactionInfo 消息 COMMIT_ONE_PHASE
    session.getTransactionContext().commit();

    2. broker 处理事务的入口:

    TransportConnection.processBeginTransaction
    TransportConnection.processCommitTransactionOnePhase
    TransportConnection.processCommitTransactionTwoPhase

    broker 处理事务的逻辑在 TransactionBroker 类中。

    那么,具体在 Queue 中是怎样体现事务的呢?

    ActiveMQ 客户端默认不会开启事务,而如果客户端显式地开启了事务,则 Queue 中可能会存在多个事务,一个事务中必然会有一个消息列表,当客户端提交事务时,Queue 接收事务对应的消息列表,而如果客户端回滚事务,则 Queue 会删除这些消息。

    Queue 中的事务变量:

    // 键是Transaction,值是对应的消息列表
    final ConcurrentHashMap<Transaction, SendSync> sendSyncs = new ConcurrentHashMap<Transaction, SendSync>();
    private final LinkedList<Transaction> orderIndexUpdates = new LinkedList<Transaction>();

    Queue 内部类 SendSync 封装了消息和同步操作:

    class SendSync extends Synchronization {
    
        class MessageContext {
            public Message message;
            public ConnectionContext context;
    
            public MessageContext(ConnectionContext context, Message message) {
                this.context = context;
                this.message = message;
            }
        }
    
        final Transaction transaction;
        // 这就是我要找的消息列表
        List<MessageContext> additions = new ArrayList<MessageContext>();
    
        public SendSync(Transaction transaction) {
            this.transaction = transaction;
        }
    
        public void add(ConnectionContext context, Message message) {
            additions.add(new MessageContext(context, message));
        }
    
        @Override
        public void beforeCommit() throws Exception {
            synchronized (orderIndexUpdates) {
                orderIndexUpdates.addLast(transaction);
            }
        }
    
        @Override
        public void afterCommit() throws Exception {
            ArrayList<SendSync> syncs = new ArrayList<SendSync>(200);
            sendLock.lockInterruptibly();
            try {
                synchronized (orderIndexUpdates) {
                    Transaction next = orderIndexUpdates.peek();
                    while( next!=null && next.isCommitted() ) {
                        syncs.add(sendSyncs.remove(orderIndexUpdates.removeFirst()));
                        next = orderIndexUpdates.peek();
                    }
                }
                for (SendSync sync : syncs) {
                    sync.processSend();
                }
            } finally {
                sendLock.unlock();
            }
            for (SendSync sync : syncs) {
                sync.processSent();
            }
        }
    
        // called with sendLock
        private void processSend() throws Exception {
    
            for (Iterator<MessageContext> iterator = additions.iterator(); iterator.hasNext(); ) {
                MessageContext messageContext = iterator.next();
                // It could take while before we receive the commit
                // op, by that time the message could have expired..
                if (broker.isExpired(messageContext.message)) {
                    broker.messageExpired(messageContext.context, messageContext.message, null);
                    destinationStatistics.getExpired().increment();
                    iterator.remove();
                    continue;
                }
                sendMessage(messageContext.message);
                messageContext.message.decrementReferenceCount();
            }
        }
    
        private void processSent() throws Exception {
            for (MessageContext messageContext : additions) {
                messageSent(messageContext.context, messageContext.message);
            }
        }
    
        @Override
        public void afterRollback() throws Exception {
            try {
                for (MessageContext messageContext : additions) {
                    messageContext.message.decrementReferenceCount();
                }
            } finally {
                sendSyncs.remove(transaction);
            }
        }
    }

    3. 那么 XA 事务又是什么呢?ActiveMQ 实现了分布式事务,当系统中存在多数据源的情况下,也许会需要使用 XA ,为了方便,只提供一个单数据源的例子:

    Xid xid = new MyXid(1, new byte[]{0x01}, new byte[]{0x02});
    session.getTransactionContext().start(xid, XAResource.TMSUCCESS);
    // 操作mq
    session.getTransactionContext().end(xid, XAResource.TMSUCCESS);
    int prepare = session.getTransactionContext().prepare(xid);
    System.out.println("prepare:" + prepare);
    // 根据prepare结果决定是否提交
    session.getTransactionContext().commit(xid, false);

    这些操作步骤,和 MySQL的 XA 是一样的,也是 start,end,prepare,commit,实现的都是javax transaction 那一套接口。

    public class MyXid implements Xid {
        private int formatId;
        private byte[] globalTid;
        private byte[] branchQ;
        
        public MyXid(int formatId, byte[] globalTid, byte[] branchQ) {
            this.formatId = formatId;
            this.globalTid = globalTid;
            this.branchQ = branchQ;
        }
        
        public byte[] getBranchQualifier() {
            return this.branchQ;
        }
    
        public int getFormatId() {
            return formatId;
        }
    
        public byte[] getGlobalTransactionId() {
            return this.globalTid;
        }
    }
  • 相关阅读:
    封装的图片预加载,数据加载到浏览器底部加载数据
    自己封装的弹出层插件
    在规定的时间内出现动画.html
    WEB前端资源集
    前端优化几项
    移动H5前端性能优化指南
    微信小程序IDE(微信web开发者工具)安装、破解手册--转载
    微信小程序开发—快速掌握组件及API的方法---转载
    STM32数据类型定义
    HDOJ 4802 GPA
  • 原文地址:https://www.cnblogs.com/allenwas3/p/9003418.html
Copyright © 2011-2022 走看看