zoukankan      html  css  js  c++  java
  • rabbimq之流控

    rabbitmq为了能够保证服务器在大量使用资源的情况下正常工作,会做流控。

    所谓流控有以下两个方面。一是针对连接做流控,即降低某频率过快的发送消息。二是整体流控,即将所有消费者发送的消息丢掉,悄无声息。

    首先是针对连接做的流控,per-connection

    rabbitmq通过使用credit_flow来实现连接级别的流控.假设有这样的数据流向,A->B->C,如果C消息处理不及时,B能够感得到,则B会减少A发送的消息,从而从源头作到流控.rabbitmq消息有如下的流向,rabbit_reader->rabbit_channel->rabbit_amqqueue->rabbit_msg_store,如果中间某个模块处理消息不及时,会导致最终从源头rabbit_reader完成阻塞工作,进而实现流控.下面来看下这个功能是如果实现的.

    先介绍credit,信用.A向B发消息,第次发消息都会使A中B的credit减1,如果降到0的话A就会block; B收到消息后会向A作ack,每次ack都会使的A中B的credit增加.

    -define(DEFAULT_CREDIT, {200, 50}).

    A中B的credit初始值,200, 每次B向A作ack都会使得A中B的credit加50.

    以rabbit_reader与rabbit_channel为例. rabbit_reader向rabbit_channel发消息时会做rabbit_channel:do_flow/3操作,

     1 process_frame(Frame, Channel, State) ->
     2     ChKey = {channel, Channel},
     3     case (case get(ChKey) of
     4               undefined -> create_channel(Channel, State);
     5               Other     -> {ok, Other, State}
     6           end) of
     7         {ok, {ChPid, AState}, State1} ->
     8             case rabbit_command_assembler:process(Frame, AState) of
     9                 {ok, Method, Content, NewAState} ->
    10                     rabbit_channel:do_flow(ChPid, Method, Content),
    11                     put(ChKey, {ChPid, NewAState}),
    12                     post_process_frame(Frame, ChPid, control_throttle(State1));
    13             end
    14     end.

    rabbit_channel:do_flow会让rabbit_reader将rabbit_channel的credit减1,并通过让rabbitmq_channel处理method并对此次消息发送作credit_flow:ack,即

    do_flow(Pid, Method, Content) ->
        credit_flow:send(Pid),
        gen_server2:cast(Pid, {method, Method, Content, flow}).
    credit_flow:send/1会在rabbit_reader中以进程字典的形式记录rabbit_channel的credit. rabbit_reader中rabbit_channel的credit以1为单位减少,如果为0的话即作block.如果credit小于0,rabbit_reader仍会给rabbit_channel发消息.
    1 send(From) -> send(From, ?DEFAULT_CREDIT).
    2 
    3 send(From, {InitialCredit, _MoreCreditAfter}) ->
    4     ?UPDATE({credit_from, From}, InitialCredit, C,
    5             if C == 1 -> block(From),
    6                          0;
    7                true   -> C - 1
    8             end).   

    如果credit为0的话,rabbit_reader会将rabbit_channel置入credit_blocked进程字典中,

    1 block(From) ->
    2     case blocked() of
    3         false -> put(credit_blocked_at, erlang:now());
    4         true  -> ok
    5     end,
    6     ?UPDATE(credit_blocked, [], Blocks, [From | Blocks]).

     正常情况下,在收到消息之后会给ack,当积累足够的时候才会向rabbit_reader申请更多的credit.

    1 ack(To) -> ack(To, ?DEFAULT_CREDIT).
    2 
    3 ack(To, {_InitialCredit, MoreCreditAfter}) ->
    4     ?UPDATE({credit_to, To}, MoreCreditAfter, C,
    5             if C == 1 -> grant(To, MoreCreditAfter),
    6                          MoreCreditAfter;
    7                true   -> C - 1
    8             end).

    在向rabbit_reader申请更多的credit的时候会发rabbit_reader发送消息{bump_credit, {self(), Quantity}},如果自己当前的状态是blocked的话,就不会申请更多的credit,因为现在不需要接收消息了.

    1 grant(To, Quantity) ->
    2     Msg = {bump_credit, {self(), Quantity}},
    3     case blocked() of
    4         false -> To ! Msg;
    5         true  -> ?UPDATE(credit_deferred, [], Deferred, [{To, Msg} | Deferred])
    6     end.

    rabbit_reader收到消息之后会将增加{credit_from, From}的值,control_throttle是用来改变当前连接的状态,running,blocking,blocked

    1 handle_other({bump_credit, Msg}, State) ->
    2     credit_flow:handle_bump_msg(Msg),
    3     control_throttle(State);

    如果之前是blocked状态,在增加了{credit_from, From}的值之后,需要可能需要unblock操作,unblock即是将之前申请block时未成功申请的credit,再依次做申请,欠的迟早要还.

    1 unblock(From) ->
    2     ?UPDATE(credit_blocked, [], Blocks, Blocks -- [From]),
    3     case blocked() of
    4         false -> case erase(credit_deferred) of
    5                      undefined -> ok;
    6                      Credits   -> [To ! Msg || {To, Msg} <- Credits]
    7                  end;
    8         true  -> ok
    9     end.

    "最终进程阻塞在mainloop/2的rabbit_net:recv/1函数上。rabbit_net:recv/1函数会阻塞的原因是RabbitMQ采用了gen_tcp的半阻塞模型,也就是说每次接受一个tcp消息之后,必须显式调用inet:setopts(Sock, [{active, once}])来激活一下,否则,进程会一直阻塞在receive语句上。"

    总结:

    消息发送者拥有{credit_from, From}用来知道给哪些消息接收者发送了消息,以便在任何一个接收者出现credit不足的时候,消息发送者都将会把自己置入一个blocked的状态

    接收者拥有{credit_to, To}根据此项来知道是否需要向消息发送者申请更多的credit.

    一个进程即可以作为消息发送者,也可以作为消息接收者,那么在自己处于blocked的状态下,{credit_to, To}时不会再申请更多的credit,这样会导致发送者进入blocked状态,最终使得整个工作流blocked,没有消息流通,进而达到流控的效果.

    参考文献:

    衔山. RabbitMQ源码分析:Per-Connection流控机制. http://fengchj.com/?p=2084

  • 相关阅读:
    angularJs项目实战!02:前端的页面分解与组装
    angularJs项目实战!01:模块划分和目录组织
    django admin 导出数据简单示例
    django 学习之model操作(想细化)
    6.11大杂烩。。
    InlineModelAdmin对象的学习
    django-salmonella的使用
    python 保留两位小数
    Django 时间与时区设置问题
    Django学习
  • 原文地址:https://www.cnblogs.com/haoqingchuan/p/4393116.html
Copyright © 2011-2022 走看看