handle_method(#'queue.declare'{queue = QueueNameBin, passive = false, durable = DurableDeclare, exclusive = ExclusiveDeclare, auto_delete = AutoDelete, nowait = NoWait, arguments = Args} = Declare, _, State = #ch{virtual_host = VHostPath, conn_pid = ConnPid, queue_collector_pid = CollectorPid}) -> Owner = case ExclusiveDeclare of true -> ConnPid; false -> none end, Durable = DurableDeclare andalso not ExclusiveDeclare, ActualNameBin = case QueueNameBin of <<>> -> rabbit_guid:binary(rabbit_guid:gen_secure(), "amq.gen"); Other -> check_name('queue', Other) end, QueueName = rabbit_misc:r(VHostPath, queue, ActualNameBin), check_configure_permitted(QueueName, State), %%查找是否队列是否已经存在 case rabbit_amqqueue:with( QueueName, fun (Q) -> ok = rabbit_amqqueue:assert_equivalence( Q, Durable, AutoDelete, Args, Owner), maybe_stat(NoWait, Q) end) of {ok, MessageCount, ConsumerCount} -> return_queue_declare_ok(QueueName, NoWait, MessageCount, ConsumerCount, State); {error, not_found} -> DlxKey = <<"x-dead-letter-exchange">>, case rabbit_misc:r_arg(VHostPath, exchange, Args, DlxKey) of undefined -> ok; {error, {invalid_type, Type}} -> precondition_failed( "invalid type '~s' for arg '~s' in ~s", [Type, DlxKey, rabbit_misc:rs(QueueName)]); DLX -> check_read_permitted(QueueName, State), check_write_permitted(DLX, State), ok end, case rabbit_amqqueue:declare(QueueName, Durable, AutoDelete, Args, Owner) of {new, #amqqueue{pid = QPid}} -> %% We need to notify the reader within the channel %% process so that we can be sure there are no %% outstanding exclusive queues being declared as %% the connection shuts down. ok = case Owner of none -> ok; _ -> rabbit_queue_collector:register( CollectorPid, QPid) end, return_queue_declare_ok(QueueName, NoWait, 0, 0, State); {existing, _Q} -> %% must have been created between the stat and the %% declare. Loop around again. handle_method(Declare, none, State); {absent, Q, Reason} -> rabbit_misc:absent(Q, Reason); {owner_died, _Q} -> %% Presumably our own days are numbered since the %% connection has died. Pretend the queue exists though, %% just so nothing fails. return_queue_declare_ok(QueueName, NoWait, 0, 0, State) end; {error, {absent, Q, Reason}} -> rabbit_misc:absent(Q, Reason) end;
其中的node()是为了指明master queue的位置,即收到申请队列消息的节点
declare(QueueName, Durable, AutoDelete, Args, Owner) -> declare(QueueName, Durable, AutoDelete, Args, Owner, node()).
declare(QueueName, Durable, AutoDelete, Args, Owner, Node) -> ok = check_declare_arguments(QueueName, Args), Q = rabbit_queue_decorator:set( rabbit_policy:set(#amqqueue{name = QueueName, durable = Durable, auto_delete = AutoDelete, arguments = Args, exclusive_owner = Owner, pid = none, slave_pids = [], sync_slave_pids = [], recoverable_slaves = [], gm_pids = [], state = live})), Node = rabbit_mirror_queue_misc:initial_queue_node(Q, Node), gen_server2:call( rabbit_amqqueue_sup_sup:start_queue_process(Node, Q, declare), {init, new}, infinity).
在启动rabbit_amqp_process的时候,supervisor使用的Maker来标志此进程是否首次启动,以区别重启进程来做不同操作 。
start_link(Q, StartMode) -> %%Marker存在的意义是什么?标志着是否为第一次启动 Marker = spawn_link(fun() -> receive stop -> ok end end), ChildSpec = {rabbit_amqqueue, {rabbit_prequeue, start_link, [Q, StartMode, Marker]}, intrinsic, ?MAX_WAIT, worker, [rabbit_amqqueue_process, rabbit_mirror_queue_slave]}, {ok, SupPid} = supervisor2:start_link(?MODULE, []), {ok, QPid} = supervisor2:start_child(SupPid, ChildSpec), unlink(Marker), Marker ! stop, {ok, SupPid, Qpid}.
之后,主节点会启动rabbit_amqp_process,用coordinator来完成数据同步(gm),而备节点则会启动rabbit_mirror_queue_slave进程,后者同时使用了gm behaviour,所以可以和coordinator来进程数据同步,以mq节点之间状态保持一致。
init_with_existing_bq(Q = #amqqueue{name = QName}, BQ, BQS) -> {ok, CPid} = rabbit_mirror_queue_coordinator:start_link( Q, undefined, sender_death_fun(), depth_fun()), GM = rabbit_mirror_queue_coordinator:get_gm(CPid), Self = self(), ok = rabbit_misc:execute_mnesia_transaction( fun () -> [Q1 = #amqqueue{gm_pids = GMPids}] = mnesia:read({rabbit_queue, QName}), ok = rabbit_amqqueue:store_queue( Q1#amqqueue{gm_pids = [{GM, Self} | GMPids], state = live}) end),
{_MNode, SNodes} = rabbit_mirror_queue_misc:suggested_queue_nodes(Q),
rabbit_mirror_queue_misc:add_mirrors(QName, SNodes, sync), #state { name = QName, gm = GM, coordinator = CPid, backing_queue = BQ, backing_queue_state = BQS, seen_status = dict:new(), confirmed = [], known_senders = sets:new() }. add_mirrors(QName, Nodes, SyncMode) -> [add_mirror(QName, Node, SyncMode) || Node <- Nodes], ok. add_mirror(QName, MirrorNode, SyncMode) -> case rabbit_amqqueue:lookup(QName) of {ok, Q} -> rabbit_misc:with_exit_handler( rabbit_misc:const(ok), fun () -> SPid = rabbit_amqqueue_sup_sup:start_queue_process( MirrorNode, Q, slave), log_info(QName, "Adding mirror on node ~p: ~p~n", [MirrorNode, SPid]), rabbit_mirror_queue_slave:go(SPid, SyncMode) end); {error, not_found} = E -> E end.