zoukankan      html  css  js  c++  java
  • rabbitmq method之basic.consume

    basic.consume指的是channel在 某个队列上注册消费者,那在这个队列有消息来了之后,就会把消息转发到给此channel处理,如果 这个队列有多个消费者,则会采用轮转的方式将消息分发给消息者.

    首先是rabbit_reader接收数据包后,解析组装出其中的method,channel方法交给channel处理.具体过程见http://www.cnblogs.com/haoqingchuan/p/4354692.html

    channel进程处理basic.consume的方法.先从状态中查看是否已经存在此tag(以channel为域,不同的consumer_tag标识了不同的消费者,每个channel的内的consumer tag必须是唯一的).如果没有查找到则正常,如果未对队列名字命名,则会产生一个uuid来作为队列名.

     1 handle_method(#'basic.consume'{queue        = QueueNameBin,
     2                                consumer_tag = ConsumerTag,
     3                                no_local     = _, % FIXME: implement
     4                                no_ack       = NoAck,
     5                                exclusive    = ExclusiveConsume,
     6                                nowait       = NoWait,
     7                                arguments    = Args},
     8               _, State = #ch{consumer_prefetch = ConsumerPrefetch,
     9                              consumer_mapping  = ConsumerMapping}) ->
    10     case dict:find(ConsumerTag, ConsumerMapping) of
    11         error ->
    12             QueueName = qbin_to_resource(QueueNameBin, State),
    13             check_read_permitted(QueueName, State),
    14             ActualConsumerTag =
    15                 case ConsumerTag of
    16                     <<>>  -> rabbit_guid:binary(rabbit_guid:gen_secure(),
    17                                                 "amq.ctag");
    18                     Other -> Other
    19                 end,
    20             case basic_consume(
    21                    QueueName, NoAck, ConsumerPrefetch, ActualConsumerTag,
    22                    ExclusiveConsume, Args, NoWait, State) of
    23                 {ok, State1} ->
    24                     {noreply, State1};
    25                 {error, exclusive_consume_unavailable} ->
    26                     rabbit_misc:protocol_error(
    27                       access_refused, "~s in exclusive use",
    28                       [rabbit_misc:rs(QueueName)])
    29             end;
    30         {ok, _} ->
    31             %% Attempted reuse of consumer tag.
    32             rabbit_misc:protocol_error(
    33               not_allowed, "attempt to reuse consumer tag '~s'", [ConsumerTag])
    34     end;
     1 basic_consume(QueueName, NoAck, ConsumerPrefetch, ActualConsumerTag,
     2               ExclusiveConsume, Args, NoWait,
     3               State = #ch{conn_pid          = ConnPid,
     4                           limiter           = Limiter,
     5                           consumer_mapping  = ConsumerMapping}) ->
     6     case rabbit_amqqueue:with_exclusive_access_or_die(
     7            QueueName, ConnPid,
     8            fun (Q) ->
     9                    {rabbit_amqqueue:basic_consume(
    10                       Q, NoAck, self(),
    11                       rabbit_limiter:pid(Limiter),
    12                       rabbit_limiter:is_active(Limiter),
    13                       ConsumerPrefetch, ActualConsumerTag,
    14                       ExclusiveConsume, Args,
    15                       ok_msg(NoWait, #'basic.consume_ok'{
    16                                consumer_tag = ActualConsumerTag})),
    17                     Q}
    18            end) of
    19         {ok, Q = #amqqueue{pid = QPid, name = QName}} ->
    20             CM1 = dict:store(
    21                     ActualConsumerTag,
    22                     {Q, {NoAck, ConsumerPrefetch, ExclusiveConsume, Args}},
    23                     ConsumerMapping),
    24             State1 = monitor_delivering_queue(
    25                        NoAck, QPid, QName,
    26                        State#ch{consumer_mapping = CM1}),
    27             {ok, case NoWait of
    28                      true  -> consumer_monitor(ActualConsumerTag, State1);
    29                      false -> State1
    30                  end};
    31         {{error, exclusive_consume_unavailable} = E, _Q} ->
    32             E
    33     end.

    rabbit_amqqueue.erl

    rabbitmq_channel进程向rabbitmq_amqp_process进程发送消息来完成增加消费者的动作

    1 basic_consume(#amqqueue{pid = QPid, name = QName}, NoAck, ChPid, LimiterPid,
    2               LimiterActive, ConsumerPrefetchCount, ConsumerTag,
    3               ExclusiveConsume, Args, OkMsg) ->
    4     ok = check_consume_arguments(QName, Args),
    5     delegate:call(QPid, {basic_consume, NoAck, ChPid, LimiterPid, LimiterActive,
    6                          ConsumerPrefetchCount, ConsumerTag, ExclusiveConsume,
    7                          Args, OkMsg}).

    rabbit_amqqueue_process.erl

     增加consumer,并更新到state中。

     1 handle_call({basic_consume, NoAck, ChPid, LimiterPid, LimiterActive,
     2              PrefetchCount, ConsumerTag, ExclusiveConsume, Args, OkMsg},
     3             _From, State = #q{consumers          = Consumers,
     4                               exclusive_consumer = Holder}) ->
     5     case check_exclusive_access(Holder, ExclusiveConsume, State) of
     6         in_use -> reply({error, exclusive_consume_unavailable}, State);
     7         ok     -> Consumers1 = rabbit_queue_consumers:add(
     8                                  ChPid, ConsumerTag, NoAck,
     9                                  LimiterPid, LimiterActive,
    10                                  PrefetchCount, Args, is_empty(State),
    11                                  Consumers),
    12                   ExclusiveConsumer =
    13                       if ExclusiveConsume -> {ChPid, ConsumerTag};
    14                          true             -> Holder
    15                       end,
    16                   State1 = State#q{consumers          = Consumers1,
    17                                    has_had_consumers  = true,
    18                                    exclusive_consumer = ExclusiveConsumer},
    19                   ok = maybe_send_reply(ChPid, OkMsg),
    20                   emit_consumer_created(ChPid, ConsumerTag, ExclusiveConsume,
    21                                         not NoAck, qname(State1),
    22                                         PrefetchCount, Args, none),
    23                   notify_decorators(State1),
    24                   reply(ok, run_message_queue(State1))
    25     end;

    rabbit_queue_consumers.erl

    更新进程字典,并为队列增加新消费者.

     1 add(ChPid, CTag, NoAck, LimiterPid, LimiterActive, Prefetch, Args, IsEmpty,
     2     State = #state{consumers = Consumers,
     3                    use       = CUInfo}) ->
     4     C = #cr{consumer_count = Count,
     5             limiter        = Limiter} = ch_record(ChPid, LimiterPid),
     6     Limiter1 = case LimiterActive of
     7                    true  -> rabbit_limiter:activate(Limiter);
     8                    false -> Limiter
     9                end,
    10     C1 = C#cr{consumer_count = Count + 1, limiter = Limiter1},
    11     update_ch_record(
    12       case parse_credit_args(Prefetch, Args) of
    13           {0,       auto}            -> C1;
    14           {_Credit, auto} when NoAck -> C1;
    15           {Credit,  Mode}            -> credit_and_drain(
    16                                           C1, CTag, Credit, Mode, IsEmpty)
    17       end),
    18     Consumer = #consumer{tag          = CTag,
    19                          ack_required = not NoAck,
    20                          prefetch     = Prefetch,
    21                          args         = Args},
    22     State#state{consumers = add_consumer({ChPid, Consumer}, Consumers),
    23                 use       = update_use(CUInfo, active)}.

    %%consumer加入consumers列表里面,也就是后面分发消息的时候会从这个列表里将消息取出

    1 in(X, 0, {  queue, [_] = In, [], 1}) ->
    2     {queue, [X], In, 2};
    3 in(X, 0, {queue, In, Out, Len}) when is_list(In), is_list(Out) ->
    4     {queue, [X|In], Out, Len + 1};
  • 相关阅读:
    express中间件
    复习node中加载静态资源--用express+esj
    有关es6的模块化
    es6转码和package.json中的配置
    MySQL必知必会--使用子查询
    MySQL必知必会--分 组 数 据
    MySQL必知必会--汇 总 数 据
    mysql必知必会--使用数据处理函数
    拼凑可导的充分必要条件
    递推数列极限存在证明
  • 原文地址:https://www.cnblogs.com/haoqingchuan/p/4550794.html
Copyright © 2011-2022 走看看