zoukankan      html  css  js  c++  java
  • activemq-broker持久化、转发消息

    broker在接收到producer发送来的Message后(其实接收client发来的命令并不属于broker的职责,broker真正要做的是将处理这些命令,比如将消息路由置对应的destination,而接收client命令的任务是由TransportServer完成的),就需要持久化、抓发消息了。

     1 //org.apache.activemq.broker.region.Queue的send方法
     2 public void send(final ProducerBrokerExchange producerExchange, final Message message) throws Exception {
     3     final ConnectionContext context = producerExchange.getConnectionContext();
     4     // There is delay between the client sending it and it arriving at the
     5     // destination.. it may have expired.
     6     message.setRegionDestination(this);
     7     ProducerState state = producerExchange.getProducerState();
     8     if (state == null) {
     9         LOG.warn("Send failed for: {}, missing producer state for: {}", message, producerExchange);
    10         throw new JMSException("Cannot send message to " + getActiveMQDestination() + " with invalid (null) producer state");
    11     }
    12     final ProducerInfo producerInfo = producerExchange.getProducerState().getInfo();
    13     //ProducerAck有一个重要字段就是size,表示message的size,
    14     //意在告诉producer,broker已经收下了size大小的message(还有一个producerId,因为一个connection可能有多个producer),
    15     //这时producer的window的剩余空间就会变大,producer就可以发送更多的message。
    16     //ProducerAck的作用就在于释放producer的window空间。
    17     final boolean sendProducerAck = !message.isResponseRequired() && producerInfo.getWindowSize() > 0
    18             && !context.isInRecoveryMode();
    19     //检查message是否过期,可以通过Message.setExpiration(或setJMSExpiration)设置绝对时间,
    20     //也可以通过producer.setTimeToLive设置相对时间,setTimeToLive会在send前被转换成Expiration(now + timeToLive)
    21     if (message.isExpired()) {
    22         // message not stored - or added to stats yet - so chuck here
    23         broker.getRoot().messageExpired(context, message, null);
    24         if (sendProducerAck) {
    25                 //如果message过期,直接发送ProducerAck至producer
    26             ProducerAck ack = new ProducerAck(producerInfo.getProducerId(), message.getSize());
    27             context.getConnection().dispatchAsync(ack);
    28         }
    29         return;
    30     }
    31     //broker内存使用量达到设置上限
    32     if (memoryUsage.isFull()) {
    33         //......
    34     }
    35     //发送message并不是发送message至consumer,只是broker接收该消息
    36     doMessageSend(producerExchange, message);
    37     //回复ProducerAck
    38     if (sendProducerAck) {
    39             //一个连接可能有多个producer,所以要producerId,
    40             //传回messageSize是为了释放该message所占的window空间(window只是一个数字)
    41         ProducerAck ack = new ProducerAck(producerInfo.getProducerId(), message.getSize());
    42         context.getConnection().dispatchAsync(ack);
    43     }
    44 }

    doMessageSend主要的任务是持久化消息、添加消息置cursor。

     1 void doMessageSend(final ProducerBrokerExchange producerExchange, final Message message) throws IOException,
     2       Exception {
     3   final ConnectionContext context = producerExchange.getConnectionContext();
     4   ListenableFuture<Object> result = null;
     5 
     6   producerExchange.incrementSend();
     7   do {
     8         //检查broker存储空间使用量
     9     checkUsage(context, producerExchange, message);
    10     message.getMessageId().setBrokerSequenceId(getDestinationSequenceId());
    11     
    12     //message持久化
    13     if (store != null && message.isPersistent()) {
    14       message.getMessageId().setFutureOrSequenceLong(null);
    15       try {
    16         if (messages.isCacheEnabled() && !isPersistJMSRedelivered()) {
    17           result = store.asyncAddQueueMessage(context, message, isOptimizeStorage());//异步
    18           result.addListener(new PendingMarshalUsageTracker(message));
    19         } else {
    20           store.addMessage(context, message);//同步
    21         }
    22       } catch (Exception e) {
    23         // we may have a store in inconsistent state, so reset the cursor
    24         // before restarting normal broker operations
    25         resetNeeded = true;
    26         throw e;
    27       }
    28     }
    29 
    30     //Clear the unmarshalled state if the message is marshalled
    31     //Persistent messages will always be marshalled but non-persistent may not be
    32     //Specially non-persistent messages over the VM transport won't be
    33     if (isReduceMemoryFootprint() && message.isMarshalled()) {
    34       message.clearUnMarshalledState();
    35     }
    36     
    37     //添加至cursor,cursor可以有message的缓存。
    38     //cursor是store的游标,可以读取store中下一个message。
    39     //存入cursor后会有一个wakeup操作,wakeup会引发Queue的iterate方法的执行,
    40     //iterate会page in messages,同时会将这些messages发送(轮询发送)置consumer。
    41     //page in的数量是consumers的prefetchSize、maxPageSize、总共消息数的最小值,如果此时没有consumer则不会page in。
    42     if(tryOrderedCursorAdd(message, context)) {
    43       break;
    44     }
    45   } while (started.get());
    46 
    47   if (result != null && message.isResponseRequired() && !result.isCancelled()) {
    48     try {
    49       result.get();
    50     } catch (CancellationException e) {
    51       // ignore - the task has been cancelled if the message
    52       // has already been deleted
    53     }
    54   }
    55 }

    producer流量控制

    window是activemq发送异步消息时进行流量控制的一种手段,org.apache.activemq.ActiveMQMessageProducer如果设置了window,发送消息会减小window,broker确认消息(ProducerAck)会增大window(不超过设定值)。send前会先检查window大小(window只是一个数值,并没有存储能力),如果剩余空间足够容纳即将发送的消息,则可以发送该消息,如果空间不足,则会阻塞send方法。
    如果Producer不设置window,并且是异步发送,producer就会不停的发送(无视broker的响应,broker也不会对无window的producer发出ProducerAck),当broker内存达到阈值时,broker就会阻塞broker端与该connection对应的线程,直至有空间来存放新message。由于线程阻塞了,也就不能继续读取tcp数据了,tcp缓存满后对端也就发送不了数据了,这是依靠tcp本身的流量控制实现的。
    如果Producer设置了window,只要window空间足够,producer就可以发送message,如果此时broker的使用内存已达阈值,broker并不会阻塞线程,而是将message存储等后续操作放入队列等待执行,当producer端的window达到阈值时,producer的send就会阻塞,这样就达到了流量控制的目的。这里无法模拟tcp那样的流量控制(Min(接受窗口, 发送窗口)),因为broker中的destination可以有多个producer同时发送,无法很好的确定发送窗口大小。

    消息持久化

    Cursor是持久化系统的游标,其内部会持有一个链表(batchList)作为消息缓存,在向consumer发送消息时,会先通过page in将持久化消息恢复到cursor的链表缓存中,然后通过cursor.next挨个发送消息。由于从磁盘恢复消息时,batchList可能已经缓存了数据,所以会在持久化系统中记录一个相对位置,该位置就指向了batchList的下一个数据。

    producer发来的数据先会持久化(同步或异步),然后才会放入cursor中,放入cursor后就可以向consumer转发消息了,消息转发后会在cursor清除(没有ack就不会清除持久化数据,redelivery只是client自己重发给自己,然后发送一个redelivered命令给broker,如果redelivery数达到阈值broker会清除该message对应的message并放入DLQ),如果cursor缓存(batchList)满了,就需要在持久化系统中做偏移量标记。

    同步持久化的处理相对简单,持久化、cursor、转发(同步或异步)、回复producer一条线。异步持久化时,转发消息、回复producer可能发生在真正持久化前,这种异步操作极大提高了cpu利用率。但是异步持久化也是有风险的,试想如果回复producer在持久化之前完成,此时broker挂掉,导致broker持久化失败,而producer已经得到确认回复,认为消息转发成功,而由于broker并没有持久化消息,重启broker后,消息也不会恢复,consumer就永远也得不到该消息了。

    StoreQueueCursor

    参考:https://access.redhat.com/documentation/en-US/Fuse_Message_Broker/5.4/html/Using_Persistent_Messages/files/FuseMBPersistCursorsTypes.html#FuseMBPersistCursorsStore

  • 相关阅读:
    LeeCode(两数相加)
    Linux vim中移动显示横线
    JAVA各版本的区别
    LNMP一键包安装完成后的目录结构
    tp6打开和关闭调试的方式
    windows安装Thinkphp6的过程
    Composer 的安装方法(一)
    解决:libsodium-1.0.17安装失败
    有些国内的安卓APP下载不了的解决办法
    Linux 安装时不能下载的问题处理办法
  • 原文地址:https://www.cnblogs.com/holoyong/p/7471365.html
Copyright © 2011-2022 走看看