zoukankan      html  css  js  c++  java
  • rabbitmq method之queue.declare

    queue.declare即申请队列,首先对队列名作处理,若未指定队列名则随机生成一个,然后查询数据库队列是否已经创建,若创建完成则会申请队列返回

    handle_method(#'queue.declare'{queue       = QueueNameBin, 
                                   passive     = false, 
                                   durable     = DurableDeclare, 
                                   exclusive   = ExclusiveDeclare, 
                                   auto_delete = AutoDelete, 
                                   nowait      = NoWait, 
                                   arguments   = Args} = Declare, 
                  _, State = #ch{virtual_host        = VHostPath, 
                                 conn_pid            = ConnPid, 
                                 queue_collector_pid = CollectorPid}) -> 
        Owner = case ExclusiveDeclare of 
                    true  -> ConnPid; 
                    false -> none 
                end, 
        Durable = DurableDeclare andalso not ExclusiveDeclare, 
        ActualNameBin = case QueueNameBin of 
                            <<>>  -> rabbit_guid:binary(rabbit_guid:gen_secure(), 
                                                        "amq.gen"); 
                            Other -> check_name('queue', Other) 
                        end, 
        QueueName = rabbit_misc:r(VHostPath, queue, ActualNameBin), 
        check_configure_permitted(QueueName, State), 
        %%查找是否队列是否已经存在
        case rabbit_amqqueue:with( 
               QueueName, 
               fun (Q) -> ok = rabbit_amqqueue:assert_equivalence( 
                                 Q, Durable, AutoDelete, Args, Owner), 
                          maybe_stat(NoWait, Q) 
               end) of 
            {ok, MessageCount, ConsumerCount} -> 
                return_queue_declare_ok(QueueName, NoWait, MessageCount, 
                                        ConsumerCount, State);         
            {error, not_found} -> 
                DlxKey = <<"x-dead-letter-exchange">>, 
                case rabbit_misc:r_arg(VHostPath, exchange, Args, DlxKey) of 
                   undefined -> 
                       ok; 
                   {error, {invalid_type, Type}} -> 
                        precondition_failed( 
                          "invalid type '~s' for arg '~s' in ~s", 
                          [Type, DlxKey, rabbit_misc:rs(QueueName)]); 
                   DLX -> 
                       check_read_permitted(QueueName, State), 
                       check_write_permitted(DLX, State), 
                       ok 
                end, 
                case rabbit_amqqueue:declare(QueueName, Durable, AutoDelete, 
                                             Args, Owner) of 
                    {new, #amqqueue{pid = QPid}} -> 
                        %% We need to notify the reader within the channel 
                        %% process so that we can be sure there are no 
                        %% outstanding exclusive queues being declared as 
                        %% the connection shuts down. 
                        ok = case Owner of 
                                 none -> ok; 
                                 _    -> rabbit_queue_collector:register( 
                                           CollectorPid, QPid) 
                             end, 
                        return_queue_declare_ok(QueueName, NoWait, 0, 0, State); 
                    {existing, _Q} -> 
                        %% must have been created between the stat and the 
                        %% declare. Loop around again. 
                        handle_method(Declare, none, State); 
                    {absent, Q, Reason} -> 
                        rabbit_misc:absent(Q, Reason); 
                    {owner_died, _Q} -> 
                        %% Presumably our own days are numbered since the 
                        %% connection has died. Pretend the queue exists though, 
                        %% just so nothing fails. 
                        return_queue_declare_ok(QueueName, NoWait, 0, 0, State) 
                end; 
            {error, {absent, Q, Reason}} -> 
                rabbit_misc:absent(Q, Reason) 
        end;

    rabbit_amqqueue.erl

    其中的node()是为了指明master queue的位置,即收到申请队列消息的节点

    declare(QueueName, Durable, AutoDelete, Args, Owner) -> 
        declare(QueueName, Durable, AutoDelete, Args, Owner, node()). 
    选择主节点并对主节点发创建队列进程的消息
    declare(QueueName, Durable, AutoDelete, Args, Owner, Node) -> 
        ok = check_declare_arguments(QueueName, Args), 
        Q = rabbit_queue_decorator:set( 
              rabbit_policy:set(#amqqueue{name               = QueueName, 
                                          durable            = Durable, 
                                          auto_delete        = AutoDelete, 
                                          arguments          = Args, 
                                          exclusive_owner    = Owner, 
                                          pid                = none, 
                                          slave_pids         = [], 
                                          sync_slave_pids    = [], 
                                          recoverable_slaves = [], 
                                          gm_pids            = [], 
                                          state              = live})), 
        Node = rabbit_mirror_queue_misc:initial_queue_node(Q, Node), 
        gen_server2:call( 
          rabbit_amqqueue_sup_sup:start_queue_process(Node, Q, declare), 
          {init, new}, infinity).

    rabbit_amqqueue_sup.erl

    在启动rabbit_amqp_process的时候,supervisor使用的Maker来标志此进程是否首次启动,以区别重启进程来做不同操作 。

    start_link(Q, StartMode) -> 
        %%Marker存在的意义是什么?标志着是否为第一次启动
        Marker = spawn_link(fun() -> receive stop -> ok end end), 
        ChildSpec = {rabbit_amqqueue, 
                     {rabbit_prequeue, start_link, [Q, StartMode, Marker]}, 
                     intrinsic, ?MAX_WAIT, worker, [rabbit_amqqueue_process, 
                                                    rabbit_mirror_queue_slave]}, 
        {ok, SupPid} = supervisor2:start_link(?MODULE, []), 
        {ok, QPid} = supervisor2:start_child(SupPid, ChildSpec), 
        unlink(Marker), 
        Marker ! stop, 
        {ok, SupPid, Qpid}.

    之后,主节点会启动rabbit_amqp_process,用coordinator来完成数据同步(gm),而备节点则会启动rabbit_mirror_queue_slave进程,后者同时使用了gm behaviour,所以可以和coordinator来进程数据同步,以mq节点之间状态保持一致。

    通过coordinator获取gm完成可靠同步,然后获取备节点在备节点增加镜像队列

    init_with_existing_bq(Q = #amqqueue{name = QName}, BQ, BQS) -> 
        {ok, CPid} = rabbit_mirror_queue_coordinator:start_link( 
                       Q, undefined, sender_death_fun(), depth_fun()), 
        GM = rabbit_mirror_queue_coordinator:get_gm(CPid), 
        Self = self(), 
        ok = rabbit_misc:execute_mnesia_transaction( 
               fun () -> 
                       [Q1 = #amqqueue{gm_pids = GMPids}] 
                           = mnesia:read({rabbit_queue, QName}), 
                       ok = rabbit_amqqueue:store_queue( 
                              Q1#amqqueue{gm_pids = [{GM, Self} | GMPids], 
                                          state   = live}) 
               end), 

        {_MNode, SNodes} = rabbit_mirror_queue_misc:suggested_queue_nodes(Q), 

    在所有的备节点上增加镜像队列,即创建备队列进程
        rabbit_mirror_queue_misc:add_mirrors(QName, SNodes, sync), 
        #state { name                = QName, 
                 gm                  = GM, 
                 coordinator         = CPid, 
                 backing_queue       = BQ, 
                 backing_queue_state = BQS, 
                 seen_status         = dict:new(), 
                 confirmed           = [], 
                 known_senders       = sets:new() }.
    
    
    
    add_mirrors(QName, Nodes, SyncMode) -> 
        [add_mirror(QName, Node, SyncMode)  || Node <- Nodes], 
        ok. 
    
    add_mirror(QName, MirrorNode, SyncMode) -> 
        case rabbit_amqqueue:lookup(QName) of 
            {ok, Q} -> 
                rabbit_misc:with_exit_handler( 
                  rabbit_misc:const(ok), 
                  fun () -> 
                          SPid = rabbit_amqqueue_sup_sup:start_queue_process( 
                                   MirrorNode, Q, slave), 
                          log_info(QName, "Adding mirror on node ~p: ~p~n", 
                                   [MirrorNode, SPid]), 
                          rabbit_mirror_queue_slave:go(SPid, SyncMode) 
                  end); 
            {error, not_found} = E -> 
                E 
        end.

    未完成待续
  • 相关阅读:
    Python UDP Server Client
    Django点滴(五)建模
    机房收费系统中的数据库操作
    一个普通的工科应届生
    Zebra命令模式分析(二)[补]
    php 备份mysql数据库(joomla数据库可直接使用,其他数据库稍作修改即可)
    dalvik直接跑hello world并用jdb调试
    动态规划小结(1)最大子段和
    struts返回对象json格式数据
    有关public接口和友元类的讨论
  • 原文地址:https://www.cnblogs.com/haoqingchuan/p/4666206.html
Copyright © 2011-2022 走看看