zoukankan      html  css  js  c++  java
  • rabbitmq之work_pool

     

    worker_pool_worker的作用是用来完成数据操作。

    如何获取worker是从worker_pool里获取,并由worker_pool管理。

    起动时间:

    -rabbit_boot_step({worker_pool,

    [{description, "worker pool"},

    {mfa, {rabbit_sup, start_supervisor_child,

    [worker_pool_sup]}},

    {requires, pre_boot},

    {enables, external_infrastructure}]}).


    在起动顺序中,work_pool是由pre_boot之后,external_infrastructure之后起动的。起动一个worker_pool进程,并起动Wcountwork_pool_worker进程,Wcount是由erlang:system_info(schedulers)决定的。

    init([WCount]) ->

    {ok, {{one_for_one, 10, 10},

    [{worker_pool, {worker_pool, start_link, []}, transient,

    16#ffffffff, worker, [worker_pool]} |

    [{N, {worker_pool_worker, start_link, []}, transient, 16#ffffffff,

    worker, [worker_pool_worker]} || N <- lists:seq(1, Wcount)]]}}.



    worker_pool_worker起动的时候会将通过worke_pool:read(self())来监控起来,并将此进程放置于stateavaliable列表中。当worker_pool_worker down掉时,worker_pool则会将此进程从avaliable队表中删除掉。

    init([]) ->

    ok = file_handle_cache:register_callback(?MODULE, set_maximum_since_use,

    [self()]),

    ok = worker_pool:ready(self()),

    put(worker_pool_worker, true),

    {ok, undefined, hibernate,

    {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.



    worker_pool.erl

    ready(WPid) -> gen_server2:cast(?SERVER, {ready, Wpid}).

    handle_cast({ready, WPid}, State) ->

    erlang:monitor(process, WPid),

    handle_cast({idle, WPid}, State);


    handle_cast({idle, WPid}, State = #state { available = Avail,

    pending = Pending }) ->

    {noreply,

    case queue:out(Pending) of

    {empty, _Pending} ->

    State #state { available = ordsets:add_element(WPid, Avail) };

    {{value, {next_free, From, CPid}}, Pending1} ->

    worker_pool_worker:next_job_from(WPid, CPid),

    gen_server2:reply(From, WPid),

    State #state { pending = Pending1 };

    {{value, {run_async, Fun}}, Pending1} ->

    worker_pool_worker:submit_async(WPid, Fun),

    State #state { pending = Pending1 }

    end, hibernate};


    handle_info({'DOWN', _MRef, process, WPid, _Reason},

    State = #state { available = Avail }) ->

    {noreply, State #state { available = ordsets:del_element(WPid, Avail) },

    hibernate};

    如果有submit动作,获取空闲的worker,并作

    submit(Fun, ProcessModel) ->

    case get(worker_pool_worker) of

    true -> worker_pool_worker:run(Fun);

    _ -> Pid = gen_server2:call(?SERVER, {next_free, self()}, infinity),

    worker_pool_worker:submit(Pid, Fun, ProcessModel)

    end.


    此时应获取一个空闲的worker_pool_worker,并让此worker装备一下,返回worker Pid

    handle_call({next_free, CPid}, _From, State = #state { available =

    [WPid | Avail1] }) ->

    worker_pool_worker:next_job_from(WPid, CPid),

    {reply, WPid, State #state { available = Avail1 }, hibernate};



    worker_pool_worker准备工作

    next_job_from(Pid, CPid) ->

    gen_server2:cast(Pid, {next_job_from, Cpid}).


    work_pool_worke刚起来的时候stateundefined,当需要自己去完成工作的时候,worker会将work_pool进程监控起来,以便pool在使用自己执行操作的时候且pool异常时能够释放自己。

    handle_cast({next_job_from, CPid}, undefined) ->

    MRef = erlang:monitor(process, CPid),

    {noreply, {from, CPid, MRef}, hibernate};

    submit(Pid, Fun, ProcessModel) ->

    gen_server2:call(Pid, {submit, Fun, self(), ProcessModel}, infinity).


    pool解监控,返回pool执行结果,释放自己,state重回undefined

    handle_call({submit, Fun, CPid, ProcessModel}, From, {from, CPid, MRef}) ->

    erlang:demonitor(MRef),

    gen_server2:reply(From, run(Fun, ProcessModel)),

    ok = worker_pool:idle(self()),

    {noreply, undefined, hibernate};

    参考文献:

    ErlangRabbitMQ源码分析 5. worker pool 实现分析. http://m.blog.csdn.net/blog/liaosongbo/39317829

  • 相关阅读:
    linux之awk命令
    HDU 2097 Sky数 进制转换
    HDU 2077 汉诺塔IV
    HDU 2094 产生冠军 dfs加map容器
    HDU 2073 叠框
    HDU 2083 简易版之最短距离
    HDU 2063 过山车 二分匹配
    天梯 1014 装箱问题
    天梯 1214 线段覆盖
    天梯 1098 均分纸牌
  • 原文地址:https://www.cnblogs.com/haoqingchuan/p/4452197.html
Copyright © 2011-2022 走看看