zoukankan      html  css  js  c++  java
  • Riak Core Guide 2

    Learn Riak Core Step By Step 2

    Riak Core, The Coordinator

    What is a Coordinator?

    顾名思义。 Coordinator即使一个协调者,主要工作就是用来协调进来的请求。它强行运行N, R, and W的一致性语义,而且运行想read repairanti-entropy 服务。足药用在分布式集群中,当出现冲突时,用来同步数据。

    从技术上说, 协调器是一个gen_fsm,每个请求都会被他自己的erlang进程处理,一个协调器会和vnode保持通信,直请求结束。

    一个协调器总的来说:

    • 协调请求
    • 强一致性
    • 运行anti-entropy
    • 一个实现了gen-fsm行为的erlang进程
    • 与运行请求的vnode实例保持通信

    Implementing a Coordinator

    和vnode不一样,riak core未定义协调器的行为。样例中现实了get和put的协调器,能够參照,但不是一成不变的。

    样例中使用supervisour和gen_fsm worker的方式实现了协调器。

    init(Args) -> {ok, InitialState, SD, Timeout}

    Args                :: term()
    InitialState        :: atom()
    SD                  :: term()
    Timeout             :: integer()
    

    这实际上是gen_fsm行为的一部分,必须在回调中制定初始状态名和数据(SD)。

    有些情况下。你也能够制定超时的值为0以至于立即进入到初始状态-prepare

    一个get rts的协调器须要4个參数:

    • RequestId:本次请求的唯一Id

    • From: 应答者

    • Client: The name of the client entity -- the entity that is writing log events to RTS.

    • StatName: 统计项的名字.

    init([ReqId, From, Client, StatName]) ->
        SD = #state{req_id=ReqId,
                    from=From,
                    client=Client,
                    stat_name=StatName},
        {ok, prepare, SD, 0}.
    

    rts的写协调者也是一样。可是有两个额外的參数。

    • Op: 运行的操作能够是setappendincrincrby或者sadd中的一种.

    • Val: 被操作的值,incr操作没有这项定义。

    init([ReqID, From, Client, StatName, Op, Val]) ->
        SD = #state{req_id=ReqID,
                    from=From,
                    client=Client,
                    stat_name=StatName,
                    op=Op,
                    val=Val},
        {ok, prepare, SD, 0}.
    

    prepare(timeout, SD0) -> {next_state, NextState, SD, Timeout}

    SD0 = SD            :: term()
    NextState           :: atom()
    Timeout             :: integer()
    

    prepare的工作是建立一个优先列表。这个列表是应该參与本次请求的优先的vnode集合的列表.大部分的工作都被riak_core_util:chash_key/1riak_core_apl:get_apl/3做完了.getwrite协调器这时后做的工作都一样。

    计算请求落在环的索引,从索引中确定N个优先处理这个请求的分区。

    以下是代码:

    prepare(timeout, SD0=#state{client=Client,
                                stat_name=StatName}) ->
        DocIdx = riak_core_util:chash_key({list_to_binary(Client),
                                           list_to_binary(StatName)}),
        Prelist = riak_core_apl:get_apl(DocIdx, ?N, rts_stat),
        SD = SD0#state{preflist=Prelist},
        {next_state, execute, SD, 0}.
    

    execute(timeout, SD0) -> {next_state, NextState, SD}

    SD0 = SD            :: term()
    NextState           :: atom()
    

    prepare之后就会调用excute,excute会依据优先列表来运行对应的stat请求。

    execute(timeout, SD0=#state{req_id=ReqId,
                                stat_name=StatName,
                                preflist=Prelist}) ->
        rts_stat_vnode:get(Prelist, ReqId, StatName),
        {next_state, waiting, SD0}.
    

    写协调器和get协调器一样,就是多了op.

    execute(timeout, SD0=#state{req_id=ReqID,
                            stat_name=StatName,
                            op=Op,
                            val=undefined,
                            preflist=Preflist}) ->
        rts_stat_vnode:Op(Preflist, ReqID, StatName),
        {next_state, waiting, SD0}.
    

    waiting(Reply, SD0) -> Result

    Reply               :: {ok, ReqID}
    Result              :: {next_state, NextState, SD}
                         | {stop, normal, SD}
    NextState           :: atom()
    SD0 = SD            :: term()
    

    以下是get的代码

    waiting({ok, ReqID, Val}, SD0=#state{from=From, num_r=NumR0, replies=Replies0}) ->
        NumR = NumR0 + 1,
        Replies = [Val|Replies0],
        SD = SD0#state{num_r=NumR,replies=Replies},
        if
            NumR =:= ?R ->
                Reply =
                    case lists:any(different(Val), Replies) of
                        true ->
                            Replies;
                        false ->
                            Val
                    end,
                From ! {ReqID, ok, Reply},
                {stop, normal, SD};
            true -> {next_state, waiting, SD}
        end.
    

    从代码中能够看出所谓强一致性就是等待所有应答,然后把应答结果组织后。一起返回去,没有达到应答数量会一直等待。

    写协调更加easy:

    waiting({ok, ReqID}, SD0=#state{from=From, num_w=NumW0}) ->
        NumW = NumW0 + 1,
        SD = SD0#state{num_w=NumW},
        if
        NumW =:= ?W ->
                From ! {ReqID, ok},
                {stop, normal, SD};
        true -> {next_state, waiting, SD}
        end.
    

    What About the Entry Coordinator?

    Entry仅仅是解析每个日志,不是必需使用协调器。协调器一般用在存储。

    Changes to rts.erl and rts_stat_vnode

    rts的模块也须要更新,主要添加fsm的代码,rts不会直接和vnode通信,交给fsm间接通信。

    rts:get ----> rts_stat_vnode:get (local)
    
                                                              /--> stat_vnode@rts1
    rts:get ----> rts_get_fsm:get ----> rts_stat_vnode:get --|---> stat_vnode@rts2
                                                              --> stat_vnode@rts3
    

    rts:get/2函数如今也是调用get协调器然后等待结果。

    get(Client, StatName) ->
        {ok, ReqID} = rts_get_fsm:get(Client, StatName),
        wait_for_reqid(ReqID, ?

    TIMEOUT).

    写请求也经过了像是的重构。

    do_write(Client, StatName, Op) ->
        {ok, ReqID} = rts_write_fsm:write(Client, StatName, Op),
        wait_for_reqid(ReqID, ?TIMEOUT).
    
    do_write(Client, StatName, Op, Val) ->
        {ok, ReqID} = rts_write_fsm:write(Client, StatName, Op, Val),
        wait_for_reqid(ReqID, ?

    TIMEOUT).

    rts_stat_vnode也进行了重构。使用riak_core_vnode_master:command/4而且携带了从參数PreflistMsgSender 和VMaster.

    Preflist: 被发送命令的vnode列表
    
    Msg: 被发送的命令.
    
    Sender: 发送者,这里表示协调者. 主要用于vnode正确返回信息。

    VMaster: VNode master的名字.

    get(Preflist, ReqID, StatName) -> 
        riak_core_vnode_master:command(Preflist, {get, ReqID, StatName}, {fsm, undefined, self()}, ?MASTER).
    

    Coordinators in Action

    • Build the devrel
    make
    make devrel
    
    • Start the Cluster
    for d in dev/dev*; do $d/bin/rts start; done
    for d in dev/dev{2,3}; do $d/bin/rts-admin join rts1@127.0.0.1; done
    
    • Feed in Some Data
    gunzip -c progski.access.log.gz | head -100 | ./replay --devrel progski
    
    • Get Some Stats
    ./dev/dev1/bin/rts attach
    (rts1@127.0.0.1)1> rts:get("progski", "total_reqs").
    {ok,97}
    (rts1@127.0.0.1)2> rts:get("progski", "GET").       
    {ok,91}
    (rts1@127.0.0.1)3> rts:get("progski", "total_sent").
    {ok,445972}
    (rts1@127.0.0.1)4> rts:get("progski", "HEAD").      
    {ok,6}
    (rts1@127.0.0.1)5> rts:get("progski", "PUT").  
    {ok,not_found}
    (rts1@127.0.0.1)6> rts:get_dbg_preflist("progski", "total_reqs"). 
    [{730750818665451459101842416358141509827966271488,
      'rts3@127.0.0.1'},
     {753586781748746817198774991869333432010090217472,
      'rts1@127.0.0.1'},
     {776422744832042175295707567380525354192214163456,
      'rts2@127.0.0.1'}]
    (rts1@127.0.0.1)7> rts:get_dbg_preflist("progski", "GET").       
    [{274031556999544297163190906134303066185487351808,
      'rts1@127.0.0.1'},
     {296867520082839655260123481645494988367611297792,
      'rts2@127.0.0.1'},
     {319703483166135013357056057156686910549735243776,
      'rts3@127.0.0.1'}]
    
    • Kill a Node
    (rts1@127.0.0.1)8> os:getpid().
    "91461"
    Ctrl^D
    kill -9 91461
    
    • Verify it's Down
    $ ./dev/dev1/bin/rts ping
    Node 'rts1@127.0.0.1' not responding to pings.
    
    • Get Stats on rts2
    ./dev/dev2/bin/rts attach
    (rts2@127.0.0.1)1> rts:get("progski", "total_reqs").
    {ok,97}
    (rts2@127.0.0.1)2> rts:get("progski", "GET").       
    {ok,[not_found,91]}
    (rts2@127.0.0.1)3> rts:get("progski", "total_sent").
    {ok,445972}
    (rts2@127.0.0.1)4> rts:get("progski", "HEAD").      
    {ok,[not_found,6]}
    (rts2@127.0.0.1)5> rts:get("progski", "PUT"). 
    {ok,not_found}
    
    • Let's Compare the Before and After Preflist

    注意:落在rts2的gets有些返回单一的值,而有些还是和曾经一样返回列表值。主要原因是优先列表的计算包括了fallback vnodefallback vnode 是一个没有落在适当的物理节点的虚拟节点。由于rts1被杀死掉了,所以落在他的节点的请求必须路由到其它节点去.由于请求-应答的模型在协调器和vnode之间是异步的,因此,我们的应答值将会依赖与第一个vnode的应答事例,假设假设是第一次,你将会的到单一值,当kill 掉一个节点之后。得到的将是列表值。详细原因请看waiting函数.

    (rts2@127.0.0.1)6> rts:get_dbg_preflist("progski", "total_reqs"). 
    [{730750818665451459101842416358141509827966271488,
      'rts3@127.0.0.1'},
     {776422744832042175295707567380525354192214163456,
      'rts2@127.0.0.1'},
     {753586781748746817198774991869333432010090217472,
      'rts3@127.0.0.1'}]
    (rts2@127.0.0.1)7> rts:get_dbg_preflist("progski", "GET").       
    [{296867520082839655260123481645494988367611297792,
      'rts2@127.0.0.1'},
     {319703483166135013357056057156686910549735243776,
      'rts3@127.0.0.1'},
     {274031556999544297163190906134303066185487351808,
      'rts2@127.0.0.1'}]
    

    由于一个节点已经fallback了,所以要获取第3个的话是获取不到的,由于[rts1, rts2, rts3] 已经变成[ rts3, rts2, rts3], 就是说rts1已经被rts2或者rts3替代了。替代后会产生心的进程。这个新的进程没有存储有数据。所以,请求的结果是not-found

    (rts2@127.0.0.1)8> rts:get_dbg_preflist("progski", "total_reqs", 1).
    [{730750818665451459101842416358141509827966271488,
      'rts3@127.0.0.1'},
     97]
    (rts2@127.0.0.1)9> rts:get_dbg_preflist("progski", "total_reqs", 2). 
    [{776422744832042175295707567380525354192214163456,
      'rts2@127.0.0.1'},
     97]
    (rts2@127.0.0.1)10> rts:get_dbg_preflist("progski", "total_reqs", 3).
    [{753586781748746817198774991869333432010090217472,
      'rts3@127.0.0.1'},
     not_found]
    (rts2@127.0.0.1)11> rts:get_dbg_preflist("progski", "GET", 1).       
    [{296867520082839655260123481645494988367611297792,
      'rts2@127.0.0.1'},
     91]
    (rts2@127.0.0.1)12> rts:get_dbg_preflist("progski", "GET", 2).
    [{319703483166135013357056057156686910549735243776,
      'rts3@127.0.0.1'},
     91]
    (rts2@127.0.0.1)13> rts:get_dbg_preflist("progski", "GET", 3).
    [{274031556999544297163190906134303066185487351808,
      'rts2@127.0.0.1'},
     not_found]
    

    ** 注意 ** :fallbacks是在列表的最后一个。

  • 相关阅读:
    【转】【SEE】基于SSE指令集的程序设计简介
    【转】【Asp.Net】asp.net服务器控件创建
    ControlTemplate in WPF ——ScrollBar
    ControlTemplate in WPF —— Menu
    ControlTemplate in WPF —— Expander
    ControlTemplate in WPF —— TreeView
    ControlTemplate in WPF —— ListBox
    ControlTemplate in WPF —— ComboBox
    ControlTemplate in WPF —— TextBox
    ControlTemplate in WPF —— RadioButton
  • 原文地址:https://www.cnblogs.com/jzssuanfa/p/6796275.html
Copyright © 2011-2022 走看看