zoukankan      html  css  js  c++  java
  • Erlang pool management -- RabbitMQ worker_pool

    在RabbitMQ中,pool 是以worker_pool 的形式存在的, 其主要用途之一是对Mnesia transaction 的操作. 而在RabbitMQ 中, pool 中的worker 数量是固定不变的, 是和虚拟机的schedulers 相关.这次会首先分别分析设计worker_pool 功能的三个module, 然后分析worker_pool 和 worker_pool_worker module 之间的调度关系.

    worker_pool_sup module

    worker_pool_sup module 是RabbitMQ pool(以下简称为rpool)的supervisor 进程,用来start worker_pool gen_server 进程和若干数量的worker_pool_worker gen_server 进程, 其中worker_pool_worker gen_server 进程的数量和虚拟机的schedulers 相关.

     1 start_link() ->
     2     start_link(erlang:system_info(schedulers)).
     3 
     4 start_link(WCount) ->
     5     supervisor:start_link({local, ?SERVER}, ?MODULE, [WCount]).
     6 
     7 %%----------------------------------------------------------------------------
     8 
     9 init([WCount]) ->
    10     {ok, {{one_for_one, 10, 10},
    11           [{worker_pool, {worker_pool, start_link, []}, transient,
    12             16#ffffffff, worker, [worker_pool]} |
    13            [{N, {worker_pool_worker, start_link, []}, transient, 16#ffffffff,
    14              worker, [worker_pool_worker]} || N <- lists:seq(1, WCount)]]}}.

    L2 处调用了erlang:system_info/1 函数以获取当前虚拟机使用的schedulers 数目. 在L13 处, 添加了和schedulers 数目等量的worker_pool_worker gen_server 进程.

    这里这样设计的主要目的是为了和Mnesia transaction 操作相匹配. (多说一句, Mnesia transaction 操作是由gen_server 进程控制, 默认是异步操作, Mnesia transaction 的操作频率过大的话, 容易导致mnesia_tm 进程的消息队列overload, 所以, 控制Mnesia transaction 并发调用数量, 可以避免mnesia_tm message len overload 引发的系统性能下降)

    worker_pool_worker module

    worker_pool_worker module 定义了rpool 的工作进程, 使用gen_server2 behaviour.

    worker_pool_worker module 主要定义了:

    1, next_job_from/2

    主要用于 worker_pool_worker 进程 monitor 用户进程, 并将worker_pool_worker 进程的state 置为{from, CPid(用户进程), MRef(monitor 结果)}, 等待用户进程任务的submit. 

    当收到用户进程 'DOWN' 的message 时, 若当前state 为该用户进程, 则将worker_pool_worker 进程的状态置为dile, state 置为undefined .

    此函数的调用者为worker_pool 进程, 第一个参数为worker_pool_worker 进程Pid, 第二个参数为用户进程Pid .

    2, submit/3

    主要用户进程想worker_pool_worker 进程submit 任务, 然后根据当前worker_pool_worker 进程的state 来决定是执行用户进程提交的Fun 函数, 还是重置进程的state 信息, 等待下一次next_job_from message.

    1 handle_call({submit, Fun, CPid, ProcessModel}, From, undefined) ->
    2     {noreply, {job, CPid, From, Fun, ProcessModel}, hibernate};
    3 
    4 handle_call({submit, Fun, CPid, ProcessModel}, From, {from, CPid, MRef}) ->
    5     erlang:demonitor(MRef),
    6     gen_server2:reply(From, run(Fun, ProcessModel)),
    7     ok = worker_pool:idle(self()),
    8     {noreply, undefined, hibernate};

    在上面handle_call callback 处理submit tag 的message 中, state 信息为{from, CPid, MRef}, 即执行用户提交的Fun 任务, 若state 信息为'undefined', 就重置state 信息,并等待next_job_from message :

    1 handle_cast({next_job_from, CPid}, {job, CPid, From, Fun, ProcessModel}) ->
    2     gen_server2:reply(From, run(Fun, ProcessModel)),
    3     ok = worker_pool:idle(self()),
    4     {noreply, undefined, hibernate};

    执行Fun 任务.

    两个不同的处理逻辑的最后,都会调用worker_pool module 的idle 函数, 告诉worker_pool 进程自己已经idle .(以备接受下一次的执行任务)

    3, submit_async/2

    该函数是完成异步执行submit 用户任务.

    1 handle_cast({submit_async, Fun}, undefined) ->
    2     run(Fun),
    3     ok = worker_pool:idle(self()),
    4     {noreply, undefined, hibernate};

    从上面可以看出该功能无需知道Fun 执行的返回结果, 只要Fun 执行完毕后, 就完成了此次任务.

    worker_pool_worker 进程的state 转换太过复杂, 主要原因是用户进程的调用状态是由worker_pool_worker 进程来管理, worker_pool gen_server 只是管理 worker_pool_worker 进程的idle, ready 信息用来路由用户任务到idle 的 worker_pool_worker 进程.

    这样做的弊端是增加了worker_pool_worker 进程的负担, 带来的好处是减轻了单点进程(worker_pool) 的压力, 由于worker_pool_worker 进程并不完全是一个, 因此, 这样的设计是利大于弊的.

    worker_pool module

    上面也提到了, worker_pool module 定义rpool 工作进程的调用进程. 用来管理工作进程的ready idle busy 状态,和Emysql pool 的管理相比, worker_pool 只使用了两种管理角色available(同Emysql pool 中的available), pending(等同于Emysql pool 中的waiting queue). 在Emysql pool 的管理中, 还有一个locked 角色, 就是正在被使用的资源, 而在worker_pool 中, 并没有与之参照的角色. 而是使用了 monitor 的方式, 在工作进程ready 的时候, worker_pool 进程会monitor 工作进程:

    1 handle_cast({ready, WPid}, State) ->
    2     erlang:monitor(process, WPid),
    3     handle_cast({idle, WPid}, State);
    worker_pool monitor 工作进程

    在接收到工作进程'DOWN' 的消息, 会从available 角色中, 将工作进程delete :

    handle_info({'DOWN', _MRef, process, WPid, _Reason},
                State = #state { available = Avail }) ->
        {noreply, State #state { available = ordsets:del_element(WPid, Avail) },
         hibernate};
    处理工作进程'DOWN' 的消息

    之所以存在这样的不同, 是因为Emysql pool 中的元素是socket 链接, 需要存储用户进程和socket 链接之间的关联关系, 当使用socket 链接的用户进程exit 之后, 可以根据用户进程获取socket, 继而重置socket 链接. 而在rpool 中, pool 中的元素是Erlang process , 使用的是工作进程执行用户进程submit Fun 的方式, 当工作进程出现异常exit 时, work_pool 可以接收到工作进程'DOWN' 的消息, 并采取处理. 因而再使用locked 角色, 就会显得多余.

    同样是因为pool 中元素性质的不同, monitor 用户进程的工作就从pool 的管理进程交给了 pool 中的元素.

    总结

    1, pool 中工作元素的数目和所使用的场景密切相关;

    2, pool 中工作元素的性质, 决定了pool 管理需要使用到的角色(available/waiting/locked);

    3, 从目前Emysql 和 rpool 的相关点来看, available 和waiting queue 对于一个pool 管理者而言, 是必不可少的;

    4, 需要尽可能减轻管理者的压力, 或者增加管理者的进程数量.

    下一篇的内容主要利用画图的方式梳理清楚worker_pool 中的进程之间的调用关系,以及进程内部state 的转换.

  • 相关阅读:
    【React Native】某个页面禁用物理返回键
    【React Native】DeviceEventEmitter监听通知及带参数传值
    转载【React Native代码】手写验证码倒计时组件
    【React Native】 中设置 APP 名称、应用图标、为安卓添加启动图
    【React Native错误集】* What went wrong: Execution failed for task ':app:installDebug'.
    【React Native错误集】Import fails with "Failed to execute 'ImportScripts' on 'WorkerGlobalScope'"
    【React Native错误集】Android error “Could not get BatchedBridge, make sure your bundle is packaged properly” on start of app
    「React Native笔记」在React的 setState 中操作数组和对象的多种方法(合集)
    【React Native】Error: Attribute application@allowBackup value=(false) from AndroidManifest.xml
    坚果云如何使用二次验证码/谷歌身份验证器/两步验证/虚拟MFA?
  • 原文地址:https://www.cnblogs.com/--00/p/4287209.html
Copyright © 2011-2022 走看看