zoukankan      html  css  js  c++  java
  • rabbitmq之消息重入队列

    说起消息重入队列还得从队列注册消费者说起,客户端在向队列注册消费者之后,创建的channel也会被主队列进程monitor,当channel挂掉后,主队列进程(rabbit_amqqueue_process)收到'DOWN'通知,将未ack的消息重入队列,并根据消息的deliver tag,也就是消费入队列的顺序,将消息重入队列中

    主要代码如下:

    1.注册消费者

    handle_method(#'basic.consume'{queue        = QueueNameBin,
                                   consumer_tag = ConsumerTag,
                                   no_local     = _, % FIXME: implement
                                   no_ack       = NoAck,
                                   exclusive    = ExclusiveConsume,
                                   nowait       = NoWait,
                                   arguments    = Args},
                  _, State = #ch{consumer_prefetch = ConsumerPrefetch,
                                 consumer_mapping  = ConsumerMapping}) ->
        case dict:find(ConsumerTag, ConsumerMapping) of
            error ->
                QueueName = qbin_to_resource(QueueNameBin, State),
                check_read_permitted(QueueName, State),
                ActualConsumerTag =
                    case ConsumerTag of
                        <<>>  -> rabbit_guid:binary(rabbit_guid:gen_secure(),
                                                    "amq.ctag");
                        Other -> Other
                    end,
                case basic_consume(
                       QueueName, NoAck, ConsumerPrefetch, ActualConsumerTag,
                       ExclusiveConsume, Args, NoWait, State) of
                    {ok, State1} ->
                        {noreply, State1};
                    {error, exclusive_consume_unavailable} ->
                        rabbit_misc:protocol_error(
                          access_refused, "~s in exclusive use",
                          [rabbit_misc:rs(QueueName)])
                end;
            {ok, _} ->
                %% Attempted reuse of consumer tag.
                rabbit_misc:protocol_error(
                  not_allowed, "attempt to reuse consumer tag '~s'", [ConsumerTag])
        end;
    

    2.主队列进程增加消费者,并对channel进程监控

    handle_call({basic_consume, NoAck, ChPid, LimiterPid, LimiterActive,
                 PrefetchCount, ConsumerTag, ExclusiveConsume, Args, OkMsg},
                _From, State = #q{consumers          = Consumers,
                                  exclusive_consumer = Holder}) ->
        case check_exclusive_access(Holder, ExclusiveConsume, State) of
            in_use -> reply({error, exclusive_consume_unavailable}, State);
            ok     -> Consumers1 = rabbit_queue_consumers:add(
                                     ChPid, ConsumerTag, NoAck,
                                     LimiterPid, LimiterActive,
                                     PrefetchCount, Args, is_empty(State),
                                     Consumers),
        end;
    
    ch_record(ChPid, LimiterPid) ->
        Key = {ch, ChPid},
        case get(Key) of
            undefined -> MonitorRef = erlang:monitor(process, ChPid),
                         Limiter = rabbit_limiter:client(LimiterPid),
                         C = #cr{ch_pid               = ChPid,
                                 monitor_ref          = MonitorRef,
                                 acktags              = queue:new(),
                                 consumer_count       = 0,
                                 blocked_consumers    = priority_queue:new(),
                                 limiter              = Limiter,
                                 unsent_message_count = 0},
                         put(Key, C),
                         C;
            C = #cr{} -> C
        end.
    

    3.主队列进程收到channel 'DOWN'的消息后,删除消费者,获取此被此channel ack的消息

    handle_info({'DOWN', _MonitorRef, process, DownPid, _Reason}, State) ->
        case handle_ch_down(DownPid, State) of
            {ok, State1}   -> noreply(State1);
            {stop, State1} -> stop(State1)
        end;
    
    handle_ch_down(DownPid, State = #q{consumers          = Consumers,
                                       exclusive_consumer = Holder,
                                       senders            = Senders}) ->
        State1 = State#q{senders = case pmon:is_monitored(DownPid, Senders) of
                                       false -> Senders;
                                       true  -> credit_flow:peer_down(DownPid),
                                                pmon:demonitor(DownPid, Senders)
                                   end},
        case rabbit_queue_consumers:erase_ch(DownPid, Consumers) of
            not_found ->
                {ok, State1};
            {ChAckTags, ChCTags, Consumers1} ->
    
                case should_auto_delete(State2) of
                    true  -> {stop, State2};
                    false -> {ok, requeue_and_run(ChAckTags,
                                                  ensure_expiry_timer(State2))}
                end
        end.
    

    4.涉及重入队列时,需要了解backing queue,即消息是如何在镜像队列之间内部以及消息如何在本地内存和磁盘资源之间按需切换,或此部分涉及内容较多,后序会专门列出一个专题来分析此实现。rabbit_amqqueue_process主队列进程的backing_queue是rabbit_mirror_queue_master(镜像队列消息同步),后者因为需要将消息按需放置,所以也有backing_queueu,即rabbit_variable_queue。

    根据sequeue_id来判断消息在队列中的位置,从当前队列中pop出队头的消息(最早入队列的消息),若未ack的消息较晚(seqid相对大),则将pop队头的队列再与未ack的消息比较,将消息pop出的消息放置在front队列中,直到符合,插入队列,并将front队列与此队列合入。

    queue_merge(SeqIds, Q, MsgIds, Limit, PubFun, State) ->
        queue_merge(SeqIds, Q, ?QUEUE:new(), MsgIds,
                    Limit, PubFun, State).
    
    queue_merge([SeqId | Rest] = SeqIds, Q, Front, MsgIds,
                Limit, PubFun, State)
      when Limit == undefined orelse SeqId < Limit ->
        case ?QUEUE:out(Q) of
            {{value, #msg_status { seq_id = SeqIdQ } = MsgStatus}, Q1}
              when SeqIdQ < SeqId ->
                %% enqueue from the remaining queue
                queue_merge(SeqIds, Q1, ?QUEUE:in(MsgStatus, Front), MsgIds,
                            Limit, PubFun, State);
            {_, _Q1} ->
                %% enqueue from the remaining list of sequence ids
                {MsgStatus, State1} = msg_from_pending_ack(SeqId, State),
                {#msg_status { msg_id = MsgId } = MsgStatus1, State2} =
                    PubFun(MsgStatus, State1),
                queue_merge(Rest, Q, ?QUEUE:in(MsgStatus1, Front), [MsgId | MsgIds],
                            Limit, PubFun, State2)
        end;
    queue_merge(SeqIds, Q, Front, MsgIds,
                _Limit, _PubFun, State) ->
        {SeqIds, ?QUEUE:join(Front, Q), MsgIds, State}.
    
  • 相关阅读:
    做了个专门针对少儿编程教程的网站,有兴趣的可以来逛逛
    解决 supervisor : 无法加载文件 C:UserscharlesAppDataRoaming pmsupervisor.ps1
    node.js03 第一个node.js程序和读取文件
    .net图表之ECharts随笔06-这才是最简单的
    .net图表之ECharts随笔05-不同01的语法步骤
    .net图表之ECharts随笔04-散点图
    .net图表之ECharts随笔03-热力地图
    .net图表之ECharts随笔02-字符云
    .net图表之ECharts随笔01-最简单的使用步骤
    Python学习之全局变量与global
  • 原文地址:https://www.cnblogs.com/haoqingchuan/p/4777560.html
Copyright © 2011-2022 走看看