zoukankan      html  css  js  c++  java
  • apollo 消息分发源代码分析

    1。MessageDispatch消息分发信息

        public static final byte DATA_STRUCTURE_TYPE = CommandTypes.MESSAGE_DISPATCH;
    
        protected ConsumerId consumerId;
        protected ActiveMQDestination destination;
        protected Message message;
        protected int redeliveryCounter;
    
        protected transient long deliverySequenceId;
        protected transient Object consumer;
        protected transient Runnable transmitCallback;
        protected transient Throwable rollbackCause;


    2,自己主动确认。是再接受到消息,反馈给上层应用之前就给确认

    在afterMessageIsConsumed方法中
       先deliveredMessages.clear();接着 session.sendAck(ack);


    3,在从tcp取到消息后放到unconsumedMessages等待消费


    4,在从unconsumedMessages取出消息预处理后。在beforeMessageIsConsumed方法加消息加到deliveredMessages 里面。

    unconsumedMessages; 消费者从mq接受到消息存储的位置,还没有消费


    5,receive内部消费之前

     private void beforeMessageIsConsumed(MessageDispatch md) throws JMSException {
            md.setDeliverySequenceId(session.getNextDeliveryId());
            lastDeliveredSequenceId = md.getMessage().getMessageId().getBrokerSequenceId();
            if (!isAutoAcknowledgeBatch()) {
                synchronized(deliveredMessages) {
                    deliveredMessages.addFirst(md);
                }
                if (session.getTransacted()) {
                    if (transactedIndividualAck) {
                        immediateIndividualTransactedAck(md);
                    } else {
                        ackLater(md, MessageAck.DELIVERED_ACK_TYPE);
                    }
                }
            }
        }



    6,receive内部消费之后

    private void afterMessageIsConsumed(MessageDispatch md, boolean messageExpired) throws JMSException {
            if (unconsumedMessages.isClosed()) {
                return;
            }
            if (messageExpired) {
                acknowledge(md, MessageAck.DELIVERED_ACK_TYPE);
                stats.getExpiredMessageCount().increment();
            } else {
                stats.onMessage();
                if (session.getTransacted()) {
                    // Do nothing.
                } else if (isAutoAcknowledgeEach()) {
                    if (deliveryingAcknowledgements.compareAndSet(false, true)) {
                        synchronized (deliveredMessages) {
                            if (!deliveredMessages.isEmpty()) {
                                if (optimizeAcknowledge) {
                                    ackCounter++;
    
                                    // AMQ-3956 evaluate both expired and normal msgs as
                                    // otherwise consumer may get stalled
                                    if (ackCounter + deliveredCounter >= (info.getPrefetchSize() * .65) || (optimizeAcknowledgeTimeOut > 0 && System.currentTimeMillis() >= (optimizeAckTimestamp + optimizeAcknowledgeTimeOut))) {
                                        MessageAck ack = makeAckForAllDeliveredMessages(MessageAck.STANDARD_ACK_TYPE);
                                        if (ack != null) {
                                            deliveredMessages.clear();
                                            ackCounter = 0;
                                            session.sendAck(ack);
                                            optimizeAckTimestamp = System.currentTimeMillis();
                                        }
                                        // AMQ-3956 - as further optimization send
                                        // ack for expired msgs when there are any.
                                        // This resets the deliveredCounter to 0 so that
                                        // we won't sent standard acks with every msg just
                                        // because the deliveredCounter just below
                                        // 0.5 * prefetch as used in ackLater()
                                        if (pendingAck != null && deliveredCounter > 0) {
                                            session.sendAck(pendingAck);
                                            pendingAck = null;
                                            deliveredCounter = 0;
                                        }
                                    }
                                } else {
                                    MessageAck ack = makeAckForAllDeliveredMessages(MessageAck.STANDARD_ACK_TYPE);
                                    if (ack!=null) {
                                        deliveredMessages.clear();
                                        session.sendAck(ack);
                                    }
                                }
                            }
                        }
                        deliveryingAcknowledgements.set(false);
                    }
                } else if (isAutoAcknowledgeBatch()) {
                    ackLater(md, MessageAck.STANDARD_ACK_TYPE);
                } else if (session.isClientAcknowledge()||session.isIndividualAcknowledge()) {
                    boolean messageUnackedByConsumer = false;
                    synchronized (deliveredMessages) {
                        messageUnackedByConsumer = deliveredMessages.contains(md);
                    }
                    if (messageUnackedByConsumer) {
                        ackLater(md, MessageAck.DELIVERED_ACK_TYPE);
                    }
                }
                else {
                    throw new IllegalStateException("Invalid session state.");
                }
            }
        }
    




  • 相关阅读:
    Java动态代理(三)——模拟AOP实现
    Java动态代理(二)CGLIB动态代理应用
    Java动态代理(一)动态类Proxy的使用
    CGLIB实现动态代理
    初识Java微信公众号开发
    spring+ibatis事务管理配置
    什么是事务的传播特性
    Spring事务配置的五种方式
    Java语言基础(五) Java原始数据类型的分类以及数据范围
    Java语言基础(四) String和StringBuffer的区别
  • 原文地址:https://www.cnblogs.com/yfceshi/p/6848835.html
Copyright © 2011-2022 走看看