zoukankan      html  css  js  c++  java
  • rabbitmq之back queue草稿

    申请队列
    rabbit_reader在收到消息后处理数据帧时,如果channel id不是0(0代表连接),则认为是channel相关方法.

    handle_frame(Type, Channel, Payload,
    State = #v1{connection = #connection{protocol = Protocol}})
    when ?IS_RUNNING(State) ->
    case rabbit_command_assembler:analyze_frame(Type, Payload, Protocol) of
    error -> frame_error(unknown_frame, Type, Channel, Payload, State);
    heartbeat -> unexpected_frame(Type, Channel, Payload, State);
    Frame -> process_frame(Frame, Channel, State)
    end;

    process_frame(Frame, Channel, State) ->
    ChKey = {channel, Channel},
    case (case get(ChKey) of
    undefined -> create_channel(Channel, State);
    Other -> {ok, Other, State}
    end) of
    {error, Error} ->
    handle_exception(State, Channel, Error);
    {ok, {ChPid, AState}, State1} ->
    case rabbit_command_assembler:process(Frame, AState) of
    {ok, NewAState} ->
    put(ChKey, {ChPid, NewAState}),
    post_process_frame(Frame, ChPid, State1);
    {ok, Method, NewAState} ->
    rabbit_channel:do(ChPid, Method),
    put(ChKey, {ChPid, NewAState}),
    post_process_frame(Frame, ChPid, State1);
    {ok, Method, Content, NewAState} ->
    rabbit_channel:do_flow(ChPid, Method, Content),
    put(ChKey, {ChPid, NewAState}),
    post_process_frame(Frame, ChPid, control_throttle(State1));
    {error, Reason} ->
    handle_exception(State1, Channel, Reason)
    end
    end.


    handle_method(#'queue.declare'{queue = QueueNameBin,
    passive = false,
    durable = DurableDeclare,
    exclusive = ExclusiveDeclare,
    auto_delete = AutoDelete,
    nowait = NoWait,
    arguments = Args} = Declare,
    _, State = #ch{virtual_host = VHostPath,
    conn_pid = ConnPid,
    queue_collector_pid = CollectorPid}) ->
    Owner = case ExclusiveDeclare of
    true -> ConnPid;
    false -> none
    end,
    Durable = DurableDeclare andalso not ExclusiveDeclare,
    ActualNameBin = case QueueNameBin of
    <<>> -> rabbit_guid:binary(rabbit_guid:gen_secure(),
    "amq.gen");
    Other -> check_name('queue', Other)
    end,
    QueueName = rabbit_misc:r(VHostPath, queue, ActualNameBin),
    check_configure_permitted(QueueName, State),
    #查找是否队列是否已经存在
    case rabbit_amqqueue:with(
    QueueName,
    fun (Q) -> ok = rabbit_amqqueue:assert_equivalence(
    Q, Durable, AutoDelete, Args, Owner),
    maybe_stat(NoWait, Q)
    end) of
    {ok, MessageCount, ConsumerCount} ->
    return_queue_declare_ok(QueueName, NoWait, MessageCount,
    ConsumerCount, State);
    {error, not_found} ->
    DlxKey = <<"x-dead-letter-exchange">>,
    case rabbit_misc:r_arg(VHostPath, exchange, Args, DlxKey) of
    undefined ->
    ok;
    {error, {invalid_type, Type}} ->
    precondition_failed(
    "invalid type '~s' for arg '~s' in ~s",
    [Type, DlxKey, rabbit_misc:rs(QueueName)]);
    DLX ->
    check_read_permitted(QueueName, State),
    check_write_permitted(DLX, State),
    ok
    end,
    case rabbit_amqqueue:declare(QueueName, Durable, AutoDelete,
    Args, Owner) of
    {new, #amqqueue{pid = QPid}} ->
    %% We need to notify the reader within the channel
    %% process so that we can be sure there are no
    %% outstanding exclusive queues being declared as
    %% the connection shuts down.
    ok = case Owner of
    none -> ok;
    _ -> rabbit_queue_collector:register(
    CollectorPid, QPid)
    end,
    return_queue_declare_ok(QueueName, NoWait, 0, 0, State);
    {existing, _Q} ->
    %% must have been created between the stat and the
    %% declare. Loop around again.
    handle_method(Declare, none, State);
    {absent, Q, Reason} ->
    rabbit_misc:absent(Q, Reason);
    {owner_died, _Q} ->
    %% Presumably our own days are numbered since the
    %% connection has died. Pretend the queue exists though,
    %% just so nothing fails.
    return_queue_declare_ok(QueueName, NoWait, 0, 0, State)
    end;
    {error, {absent, Q, Reason}} ->
    rabbit_misc:absent(Q, Reason)
    end;

    rabbit_amqqueue:declare(QueueName, Durable, AutoDelete,
    Args, Owner)


    #其中的node()是为了指明master queue的位置,即收到申请队列消息的节点
    declare(QueueName, Durable, AutoDelete, Args, Owner) ->
    declare(QueueName, Durable, AutoDelete, Args, Owner, node()).


    %% The Node argument suggests where the queue (master if mirrored)
    %% should be. Note that in some cases (e.g. with "nodes" policy in
    %% effect) this might not be possible to satisfy.
    declare(QueueName, Durable, AutoDelete, Args, Owner, Node) ->
    ok = check_declare_arguments(QueueName, Args),
    Q = rabbit_queue_decorator:set(
    rabbit_policy:set(#amqqueue{name = QueueName,
    durable = Durable,
    auto_delete = AutoDelete,
    arguments = Args,
    exclusive_owner = Owner,
    pid = none,
    slave_pids = [],
    sync_slave_pids = [],
    recoverable_slaves = [],
    gm_pids = [],
    state = live})),
    #根据所用方法来选选择节点
    Node = rabbit_mirror_queue_misc:initial_queue_node(Q, Node),
    gen_server2:call(
    rabbit_amqqueue_sup_sup:start_queue_process(Node, Q, declare),
    {init, new}, infinity).


    %%选择master节点
    Node = rabbit_mirror_queue_misc:initial_queue_node(Q, Node),


    initial_queue_node(Q, DefNode) ->
    {MNode, _SNodes} = suggested_queue_nodes(Q, DefNode, all_nodes()),
    MNode.

    suggested_queue_nodes(Q) -> suggested_queue_nodes(Q, all_nodes()).
    suggested_queue_nodes(Q, All) -> suggested_queue_nodes(Q, node(), All).


    gen_server2:call(
    rabbit_amqqueue_sup_sup:start_queue_process(Node, Q, declare),
    {init, new}, infinity).


    rabbit_amqqueue_sup_sup.erl
    start_queue_process(Node, Q, StartMode) ->
    {ok, _SupPid, QPid} = supervisor2:start_child(
    {?SERVER, Node}, [Q, StartMode]),
    QPid.


    rabbit_amqqueue_sup
    |
    rabbit_amqqueue


    rabbit_amqqueue_sup.erl
    start_link(Q, StartMode) ->
    %%Marker存在的意义是什么?标志着是否为第一次启动
    Marker = spawn_link(fun() -> receive stop -> ok end end),
    ChildSpec = {rabbit_amqqueue,
    {rabbit_prequeue, start_link, [Q, StartMode, Marker]},
    intrinsic, ?MAX_WAIT, worker, [rabbit_amqqueue_process,
    rabbit_mirror_queue_slave]},
    {ok, SupPid} = supervisor2:start_link(?MODULE, []),
    {ok, QPid} = supervisor2:start_child(SupPid, ChildSpec),
    unlink(Marker),
    Marker ! stop,
    {ok, SupPid, Qpid}.

    init({Q, StartMode, Marker}) ->
    init(Q, case {is_process_alive(Marker), StartMode} of
    {true, slave} -> slave;
    {true, _} -> master;
    {false, _} -> restart
    end).
    确定是起master还是起slave,master起的时候StartMode为delcare
    init(Q, master) -> rabbit_amqqueue_process:init(Q);
    init(Q, slave) -> rabbit_mirror_queue_slave:init(Q);

    主队列进程起动:
    init(Q) ->
    process_flag(trap_exit, true),
    ?store_proc_name(Q#amqqueue.name),
    {ok, init_state(Q#amqqueue{pid = self()}), hibernate,
    {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE},
    ?MODULE}.


    init_state(Q) ->
    State = #q{q = Q,
    exclusive_consumer = none,
    has_had_consumers = false,
    consumers = rabbit_queue_consumers:new(),
    senders = pmon:new(delegate),
    msg_id_to_channel = gb_trees:empty(),
    status = running,
    args_policy_version = 0},
    rabbit_event:init_stats_timer(State, #q.stats_timer).

    进程开头使用了process_flag(trap_exit, true),这种用法是为了对exist信息的处理,如果设置为true,那它进程退出时会进程信箱会收到{'EXIT',Pid,noproc}的消息,而不退出。
    exit(Pid, Reason) -> true
    If Pid is not trapping exits, Pid itself will exit with exit reason Reason. If Pid is trapping exits, the exit signal is transformed into a message {'EXIT', From, Reason} and delivered to the message queue of Pid. From is the pid of the process which sent the exit signal.

    http://blog.csdn.net/mycwq/article/details/41172863

    此处消息也是
    gen_server2:call(
    rabbit_amqqueue_sup_sup:start_queue_process(Node, Q, declare),
    {init, new}, infinity).
    收到消息{init, new},


    handle_call({init, Recover}, From, State) ->
    init_it(Recover, From, State);

    init_it(Recover, From, State = #q{q = #amqqueue{exclusive_owner = none}}) ->
    init_it2(Recover, From, State);

    init_it2(Recover, From, State = #q{q = Q,
    backing_queue = undefined,
    backing_queue_state = undefined}) ->
    {Barrier, TermsOrNew} = recovery_status(Recover),
    case rabbit_amqqueue:internal_declare(Q, Recover /= new) of
    #amqqueue{} = Q1 ->
    case matches(Recover, Q, Q1) of
    true ->
    ok = file_handle_cache:register_callback(
    rabbit_amqqueue, set_maximum_since_use, [self()]),
    ok = rabbit_memory_monitor:register(
    self(), {rabbit_amqqueue,
    set_ram_duration_target, [self()]}),
    %%rabbit_mirror_queue_master
    BQ = backing_queue_module(Q1),
    BQS = bq_init(BQ, Q, TermsOrNew),
    send_reply(From, {new, Q}),
    recovery_barrier(Barrier),
    State1 = process_args_policy(
    State#q{backing_queue = BQ,
    backing_queue_state = BQS}),
    notify_decorators(startup, State),
    rabbit_event:notify(queue_created,
    infos(?CREATION_EVENT_KEYS, State1)),
    rabbit_event:if_enabled(State1, #q.stats_timer,
    fun() -> emit_stats(State1) end),
    noreply(State1);
    false ->
    {stop, normal, {existing, Q1}, State}
    end;
    Err ->
    {stop, normal, Err, State}
    end.

    bq_init(BQ, Q, Recover) ->
    Self = self(),
    BQ:init(Q, Recover,
    fun (Mod, Fun) ->
    rabbit_amqqueue:run_backing_queue(Self, Mod, Fun)
    end).

    主备队列进程一个是amqqueue_process进程一个是rabbit_amqqueue_mirror_slave进程。
    通知队列进程用backing_queue去实现这个工作。
    run_backing_queue(QPid, Mod, Fun) ->
    gen_server2:cast(QPid, {run_backing_queue, Mod, Fun}).


    rabbit_mirror_queue_master.erl
    %%此处AsyncCallback为run_backing_queue,实际意思为commit_by_backing_queue
    init(Q, Recover, AsyncCallback) ->
    {ok, BQ} = application:get_env(backing_queue_module),
    %%rabbit_variable_queue
    BQS = BQ:init(Q, Recover, AsyncCallback),
    State = #state{gm = GM} = init_with_existing_bq(Q, BQ, BQS),
    ok = gm:broadcast(GM, {depth, BQ:depth(BQS)}),
    State.

    rabbit_variable_queue.erl
    %%起动持久化的进程,消息/索引
    init(Queue, Recover, Callback) ->
    init(
    Queue, Recover, Callback,
    fun (MsgIds, ActionTaken) ->
    msgs_written_to_disk(Callback, MsgIds, ActionTaken)
    end,
    fun (MsgIds) -> msg_indices_written_to_disk(Callback, MsgIds) end,
    fun (MsgIds) -> msgs_and_indices_written_to_disk(Callback, MsgIds) end).


    %%当创建的时候会走此流程
    init(#amqqueue { name = QueueName, durable = IsDurable }, new,
    AsyncCallback, MsgOnDiskFun, MsgIdxOnDiskFun, MsgAndIdxOnDiskFun) →
    %%初始化队列索引状态(目录文件 ),并未产生新的进程
    %%何时触发的这个动作,还需要考究下.
    IndexState = rabbit_queue_index:init(QueueName,
    MsgIdxOnDiskFun, MsgAndIdxOnDiskFun),
    init(IsDurable, IndexState, 0, 0, [],
    case IsDurable of
    true -> msg_store_client_init(?PERSISTENT_MSG_STORE,
    MsgOnDiskFun, AsyncCallback);
    false -> undefined
    end,
    %%消息转储的进程
    msg_store_client_init(?TRANSIENT_MSG_STORE, undefined, AsyncCallback));

    首先看索引相关
    rabbit_queue_index.erl
    队列索引是用来

    重点看下这个函数
    -define(TRANSIENT_MSG_STORE, msg_store_transient).
    msg_store_client_init(?TRANSIENT_MSG_STORE, undefined, AsyncCallback));

    msg_store_client_init(MsgStore, MsgOnDiskFun, Callback) ->
    msg_store_client_init(MsgStore, rabbit_guid:gen(), MsgOnDiskFun,
    Callback).

    msg_store_client_init(MsgStore, Ref, MsgOnDiskFun, Callback) ->
    CloseFDsFun = msg_store_close_fds_fun(MsgStore =:= ?PERSISTENT_MSG_STORE),
    rabbit_msg_store:client_init(MsgStore, Ref, MsgOnDiskFun,
    fun () -> Callback(?MODULE, CloseFDsFun) end).

    如果使用的msg_store呢?以发送端发送消息为例.
    rabbit_channel处理basic.pubish然后 deliver_to_queues
    DeliveredQPids = rabbit_amqqueue:deliver(Qs, Delivery),主队列和备队列都收到发送消息请求(deliver)
    主队列收到消息后deliver_or_enqueue,其中如果attempt_delivery失败后,则会准备将消息发到队列中BQS3 = BQ:publish(Message, Props, Delivered, SenderPid, Flow, BQS2),backing queue收到publish消息知道消息将要进入队列,则广播通知各个备队列gm:broadcast(GM, {publish, ChPid, Flow, MsgProps, Msg}, rabbit_basic:msg_size(Msg)),并让最终的backing queue来做处理BQ:publish(Msg, MsgProps, IsDelivered, ChPid, Flow, BQS),

    publish(Msg = #basic_message { is_persistent = IsPersistent, id = MsgId },
    MsgProps = #message_properties { needs_confirming = NeedsConfirming },
    IsDelivered, _ChPid, _Flow,
    State = #vqstate { q1 = Q1, q3 = Q3, q4 = Q4,
    next_seq_id = SeqId,
    in_counter = InCount,
    durable = IsDurable,
    unconfirmed = UC }) ->
    IsPersistent1 = IsDurable andalso IsPersistent,
    %%格式化消息
    MsgStatus = msg_status(IsPersistent1, IsDelivered, SeqId, Msg, MsgProps),
    {MsgStatus1, State1} = maybe_write_to_disk(false, false, MsgStatus, State),
     State2 = case ?QUEUE:is_empty(Q3) of
    false -> State1 #vqstate { q1 = ?QUEUE:in(m(MsgStatus1), Q1) };
    true -> State1 #vqstate { q4 = ?QUEUE:in(m(MsgStatus1), Q4) }
    end, 
    InCount1 = InCount + 1,
    UC1 = gb_sets_maybe_insert(NeedsConfirming, MsgId, UC),
    State3 = stats({1, 0}, {none, MsgStatus1},
    State2#vqstate{ next_seq_id = SeqId + 1,
    in_counter = InCount1,
    unconfirmed = UC1 }),
    a(reduce_memory_use(maybe_update_rates(State3))).

    一、如何将消息写入磁盘
    %%尝试将消息和索引写入到磁盘中
    maybe_write_to_disk(ForceMsg, ForceIndex, MsgStatus, State) ->
    {MsgStatus1, State1} = maybe_write_msg_to_disk(ForceMsg, MsgStatus, State),
    maybe_write_index_to_disk(ForceIndex, MsgStatus1, State1).

    maybe_write_msg_to_disk(Force, MsgStatus = #msg_status {
    msg = Msg, msg_id = MsgId,
    is_persistent = IsPersistent },
    State = #vqstate{ msg_store_clients = MSCState,
    disk_write_count = Count})
    when Force orelse IsPersistent ->
    case persist_to(MsgStatus) of
    msg_store -> ok = msg_store_write(MSCState, IsPersistent, MsgId,
    prepare_to_store(Msg)),
    {MsgStatus#msg_status{msg_in_store = true},
    State#vqstate{disk_write_count = Count + 1}};
    queue_index -> {MsgStatus, State}
    end;
    (_Force, MsgStatus, State) ->
    {MsgStatus, State}.

    备队列


    run_message_queue->rabbit_queue_consumers:deliver/2
    至于如果从backing queue中拿到消息,则需要看rabbit_amqueue_process:fetch
    消息从队列中取走

    从backing queue获取消息,轮转消息者deliver
    取消息时先从Q4中取,若Q4为空,则从q3中取
    fetch(AckRequired, State) ->
    case queue_out(State) of
    {empty, State1} ->
    {empty, a(State1)};
    {{value, MsgStatus}, State1} ->
    %% it is possible that the message wasn't read from disk
    %% at this point, so read it in.
    {Msg, State2} = read_msg(MsgStatus, State1),
    {AckTag, State3} = remove(AckRequired, MsgStatus, State2),
    {{Msg, MsgStatus#msg_status.is_delivered, AckTag}, a(State3)}
    end.

    queue_out(State = #vqstate { q4 = Q4 }) ->
    case ?QUEUE:out(Q4) of
    {empty, _Q4} ->
    case fetch_from_q3(State) of
    {empty, _State1} = Result -> Result;
    {loaded, {MsgStatus, State1}} -> {{value, MsgStatus}, State1}
    end;
    {{value, MsgStatus}, Q4a} ->
    {{value, MsgStatus}, State #vqstate { q4 = Q4a }}
    end.

    Q1 q2 delta q3 q4由ram到disk,再由disk到ram,消息进来时首先是ram,转储时到disk,ram空间时再到ram。取消息只从q3,q4中取,若从q3中取走消息,后需将消息队列状态做转换
     q3 is now empty, it wasn't before; delta is still empty. So q2 must be empty,

    状态转换触发条件:
    从q3取走消息后,若q3和delata都为空,则说明q2也为空,则将q1转移至q4;若q3为空,则可能需要做一次调整delta to beta,(maybe_deltas_to_betas其中还包含状态转换)

    初始化主队列之后从coordinator获取gm,来实现队列进程之间的通信
    init_with_existing_bq(Q = #amqqueue{name = QName}, BQ, BQS) ->
    {ok, CPid} = rabbit_mirror_queue_coordinator:start_link(
    Q, undefined, sender_death_fun(), depth_fun()),
    GM = rabbit_mirror_queue_coordinator:get_gm(CPid),
    Self = self(),
    ok = rabbit_misc:execute_mnesia_transaction(
    fun () ->
    [Q1 = #amqqueue{gm_pids = GMPids}]
    = mnesia:read({rabbit_queue, QName}),
    ok = rabbit_amqqueue:store_queue(
    Q1#amqqueue{gm_pids = [{GM, Self} | GMPids],
    state = live})
    end),
    %%获取slave节点
    {_MNode, SNodes} = rabbit_mirror_queue_misc:suggested_queue_nodes(Q),

    %%增加镜像队列
    rabbit_mirror_queue_misc:add_mirrors(QName, SNodes, sync),
    #state { name = QName,
    gm = GM,
    coordinator = CPid,
    backing_queue = BQ,
    backing_queue_state = BQS,
    seen_status = dict:new(),
    confirmed = [],
    known_senders = sets:new() }.

    add_mirrors(QName, Nodes, SyncMode) ->
    [add_mirror(QName, Node, SyncMode) || Node <- Nodes],
    ok.

    add_mirror(QName, MirrorNode, SyncMode) ->
    case rabbit_amqqueue:lookup(QName) of
    {ok, Q} ->
    rabbit_misc:with_exit_handler(
    rabbit_misc:const(ok),
    fun () ->
    SPid = rabbit_amqqueue_sup_sup:start_queue_process(
    MirrorNode, Q, slave),
    log_info(QName, "Adding mirror on node ~p: ~p~n",
    [MirrorNode, SPid]),
    rabbit_mirror_queue_slave:go(SPid, SyncMode)
    end);
    {error, not_found} = E ->
    E
    end.


    rabbit_mirror_queue_slave:go(SPid, SyncMode)
    go(SPid, sync) -> gen_server2:call(SPid, go, infinity);

    进程刚创建起来之后state仍是{not_started,Q}
    handle_call(go, _From, {not_started, Q} = NotStarted) ->
    case handle_go(Q) of
    {ok, State} -> {reply, ok, State};
    {error, Error} -> {stop, Error, NotStarted}
    end;

    handle_go
    创建gm,加入group(queue_name)


    rabbit_mirror_queue_slave使用 gm 和gen_server2两种behaviour,但是对于gm的使用只是实现了其中的一些回调函数。如果出现rabbit_mirror_queue_slave进程处理gm中的add_on_right等消息是错误的。

    之所以主队列进程的backing queue不直接是variable queue,而是rabbit_mirror_queue_master,是因为主队列需要在对消息处理完后,需要同步给备队列,所以可以使用gm。

    主队列进程 备队列进程
    rabbit_amq_process rabbit_mirror_queue_slave
    BQ(que ue_master) BQ(backing_queue)

    gm(从coordintor获取)同步 gm

  • 相关阅读:
    752.打开转盘锁
    733. 图像渲染
    704.二分查找
    leetcode 87 Scramble String
    找实习总结
    leetcode 44 Wildcard Matching
    Linux,网络编程接口记录
    leetcode 172 Factorial Trailing Zeroes
    leetcode 168 Excel Sheet Column Title
    leetcode 65 Valid Number
  • 原文地址:https://www.cnblogs.com/haoqingchuan/p/4666208.html
Copyright © 2011-2022 走看看