zoukankan      html  css  js  c++  java
  • rabbimq之流控

    rabbitmq为了能够保证服务器在大量使用资源的情况下正常工作,会做流控。

    所谓流控有以下两个方面。一是针对连接做流控,即降低某频率过快的发送消息。二是整体流控,即将所有消费者发送的消息丢掉,悄无声息。

    首先是针对连接做的流控,per-connection

    rabbitmq通过使用credit_flow来实现连接级别的流控.假设有这样的数据流向,A->B->C,如果C消息处理不及时,B能够感得到,则B会减少A发送的消息,从而从源头作到流控.rabbitmq消息有如下的流向,rabbit_reader->rabbit_channel->rabbit_amqqueue->rabbit_msg_store,如果中间某个模块处理消息不及时,会导致最终从源头rabbit_reader完成阻塞工作,进而实现流控.下面来看下这个功能是如果实现的.

    先介绍credit,信用.A向B发消息,第次发消息都会使A中B的credit减1,如果降到0的话A就会block; B收到消息后会向A作ack,每次ack都会使的A中B的credit增加.

    -define(DEFAULT_CREDIT, {200, 50}).

    A中B的credit初始值,200, 每次B向A作ack都会使得A中B的credit加50.

    以rabbit_reader与rabbit_channel为例. rabbit_reader向rabbit_channel发消息时会做rabbit_channel:do_flow/3操作,

     1 process_frame(Frame, Channel, State) ->
     2     ChKey = {channel, Channel},
     3     case (case get(ChKey) of
     4               undefined -> create_channel(Channel, State);
     5               Other     -> {ok, Other, State}
     6           end) of
     7         {ok, {ChPid, AState}, State1} ->
     8             case rabbit_command_assembler:process(Frame, AState) of
     9                 {ok, Method, Content, NewAState} ->
    10                     rabbit_channel:do_flow(ChPid, Method, Content),
    11                     put(ChKey, {ChPid, NewAState}),
    12                     post_process_frame(Frame, ChPid, control_throttle(State1));
    13             end
    14     end.

    rabbit_channel:do_flow会让rabbit_reader将rabbit_channel的credit减1,并通过让rabbitmq_channel处理method并对此次消息发送作credit_flow:ack,即

    do_flow(Pid, Method, Content) ->
        credit_flow:send(Pid),
        gen_server2:cast(Pid, {method, Method, Content, flow}).
    credit_flow:send/1会在rabbit_reader中以进程字典的形式记录rabbit_channel的credit. rabbit_reader中rabbit_channel的credit以1为单位减少,如果为0的话即作block.如果credit小于0,rabbit_reader仍会给rabbit_channel发消息.
    1 send(From) -> send(From, ?DEFAULT_CREDIT).
    2 
    3 send(From, {InitialCredit, _MoreCreditAfter}) ->
    4     ?UPDATE({credit_from, From}, InitialCredit, C,
    5             if C == 1 -> block(From),
    6                          0;
    7                true   -> C - 1
    8             end).   

    如果credit为0的话,rabbit_reader会将rabbit_channel置入credit_blocked进程字典中,

    1 block(From) ->
    2     case blocked() of
    3         false -> put(credit_blocked_at, erlang:now());
    4         true  -> ok
    5     end,
    6     ?UPDATE(credit_blocked, [], Blocks, [From | Blocks]).

     正常情况下,在收到消息之后会给ack,当积累足够的时候才会向rabbit_reader申请更多的credit.

    1 ack(To) -> ack(To, ?DEFAULT_CREDIT).
    2 
    3 ack(To, {_InitialCredit, MoreCreditAfter}) ->
    4     ?UPDATE({credit_to, To}, MoreCreditAfter, C,
    5             if C == 1 -> grant(To, MoreCreditAfter),
    6                          MoreCreditAfter;
    7                true   -> C - 1
    8             end).

    在向rabbit_reader申请更多的credit的时候会发rabbit_reader发送消息{bump_credit, {self(), Quantity}},如果自己当前的状态是blocked的话,就不会申请更多的credit,因为现在不需要接收消息了.

    1 grant(To, Quantity) ->
    2     Msg = {bump_credit, {self(), Quantity}},
    3     case blocked() of
    4         false -> To ! Msg;
    5         true  -> ?UPDATE(credit_deferred, [], Deferred, [{To, Msg} | Deferred])
    6     end.

    rabbit_reader收到消息之后会将增加{credit_from, From}的值,control_throttle是用来改变当前连接的状态,running,blocking,blocked

    1 handle_other({bump_credit, Msg}, State) ->
    2     credit_flow:handle_bump_msg(Msg),
    3     control_throttle(State);

    如果之前是blocked状态,在增加了{credit_from, From}的值之后,需要可能需要unblock操作,unblock即是将之前申请block时未成功申请的credit,再依次做申请,欠的迟早要还.

    1 unblock(From) ->
    2     ?UPDATE(credit_blocked, [], Blocks, Blocks -- [From]),
    3     case blocked() of
    4         false -> case erase(credit_deferred) of
    5                      undefined -> ok;
    6                      Credits   -> [To ! Msg || {To, Msg} <- Credits]
    7                  end;
    8         true  -> ok
    9     end.

    "最终进程阻塞在mainloop/2的rabbit_net:recv/1函数上。rabbit_net:recv/1函数会阻塞的原因是RabbitMQ采用了gen_tcp的半阻塞模型,也就是说每次接受一个tcp消息之后,必须显式调用inet:setopts(Sock, [{active, once}])来激活一下,否则,进程会一直阻塞在receive语句上。"

    总结:

    消息发送者拥有{credit_from, From}用来知道给哪些消息接收者发送了消息,以便在任何一个接收者出现credit不足的时候,消息发送者都将会把自己置入一个blocked的状态

    接收者拥有{credit_to, To}根据此项来知道是否需要向消息发送者申请更多的credit.

    一个进程即可以作为消息发送者,也可以作为消息接收者,那么在自己处于blocked的状态下,{credit_to, To}时不会再申请更多的credit,这样会导致发送者进入blocked状态,最终使得整个工作流blocked,没有消息流通,进而达到流控的效果.

    参考文献:

    衔山. RabbitMQ源码分析:Per-Connection流控机制. http://fengchj.com/?p=2084

  • 相关阅读:
    A1066 Root of AVL Tree (25 分)
    A1099 Build A Binary Search Tree (30 分)
    A1043 Is It a Binary Search Tree (25 分) ——PA, 24/25, 先记录思路
    A1079; A1090; A1004:一般树遍历
    A1053 Path of Equal Weight (30 分)
    A1086 Tree Traversals Again (25 分)
    A1020 Tree Traversals (25 分)
    A1091 Acute Stroke (30 分)
    A1103 Integer Factorization (30 分)
    A1032 Sharing (25 分)
  • 原文地址:https://www.cnblogs.com/haoqingchuan/p/4393116.html
Copyright © 2011-2022 走看看