原文:A Million-user Comet Application with Mochiweb, Part 2
参考资料:Comet--基于 HTTP 长连接、无须在浏览器端安装插件的“服务器推”技术为“Comet”
MochiWeb--建立轻量级HTTP服务器的Erlang库
在第一部分 , 我们构建了一个每10秒向客户端发送一条消息的mochiweb comet应用(没什么用处)。我们微调了一下linux内核,做了一个能够建立大量网络连接以测试应用性能和所耗内存的工具 。我们发现每个连接花费大约45K内存。
本系列的第二部分讲的主要是把应用变得更加有用,更加节省内存:
用一个login/logout/send API实现一个消息路由器更新mochiweb应用使之能够从路由器接收消息建立一个分布式erlang系统,这样我们可以在不同的节点和主机上运行路由器写一个能给路由器发送大量无用信息的工具超过24小时的内存用量图,优化mochiweb应用以节约内存
这就意味着我们需要把消息发送逻辑从mochiweb应用中剥离出来。利用第一部分的压力测试工具,我们可以建立一个更接近产品级别的基准测试。
实现消息路由器
路由器的API只有3个函数:
login(Id, Pid) 为Id注册一个接收消息的进程(Pid )logout(Pid) 停止接受消息send(Id, Msg)向已登录客户端(Id)发送消息(Msg)
注意,从设计上来说,多个不同的用户登陆到同一个进程是有可能的。
这个实例路由器模块用了2个ets表存储Pids和Ids的双向映射(pid2id和id2pid在下面的#state记录定义中)。
router.erl:
-module( router) .-behaviour( gen_server) . -export([start_link /0]) .-export([ init/1 , handle_call/3 , handle_cast/2 , handle_info/2 , terminate/2 , code_change/3]) . -export([ send/2 , login/2 , logout/1]) . -define(SERVER , global:whereis_name( ?MODULE)) . % will hold bidirectional mapping between id <–> pid-record( state, { pid2id, id2pid}) . start_link() -> gen_server :start_link({ global, ?MODULE} , ?MODULE , [] , []) . % sends Msg to anyone logged in as Idsend(Id , Msg) -> gen_server :call( ?SERVER , { send, Id , Msg}) . login(Id , Pid) when is_pid(Pid) -> gen_server :call( ?SERVER , { login, Id , Pid}) . logout(Pid) when is_pid(Pid) -> gen_server :call( ?SERVER , { logout, Pid}) . %% init([]) -> % set this so we can catch death of logged in pids: process_flag( trap_exit, true) , % use ets for routing tables { ok, #state{ pid2id = ets:new( ?MODULE , [ bag]) , id2pid = ets:new( ?MODULE , [ bag]) } } . handle_call({ login, Id , Pid} , _From , State) when is_pid(Pid) -> ets :insert(State #state.pid2id, {Pid , Id}) , ets:insert(State #state.id2pid, {Id , Pid}) , link(Pid) , % tell us if they exit, so we can log them out io:format("~w logged in as ~w " ,[Pid , Id]) , { reply, ok, State} ; handle_call({ logout, Pid} , _From , State) when is_pid(Pid) -> unlink(Pid) , PidRows = ets:lookup(State #state.pid2id, Pid) , casePidRowsof [] -> ok ; _ -> IdRows = [{I ,P} || {P ,I} <- PidRows] , % invert tuples % delete all pid->id entries ets:delete(State #state.pid2id, Pid) , % and all id->pid [ ets:delete_object(State #state.id2pid, Obj) || Obj <- IdRows] end , io:format("pid ~w logged out " ,[Pid]) , { reply, ok, State} ; handle_call({ send, Id , Msg} , _From , State) -> % get pids who are logged in as this Id Pids = [P || { _Id , P} <- ets:lookup(State #state.id2pid, Id)] , % send Msg to them all M = { router_msg, Msg} , [Pid ! M || Pid <- Pids] , { reply, ok, State} . % handle death and cleanup of logged in processeshandle_info(Info , State) -> caseInfoof {‘EXIT’ , Pid , _Why} -> % force logout: handle_call({ logout, Pid} , blah, State) ; Wtf -> io :format("Caught unhandled message: ~w " , [Wtf]) end , { noreply, State} . handle_cast( _Msg , State) -> { noreply, State} .terminate( _Reason , _State) -> ok .code_change( _OldVsn , State , _Extra) -> { ok, State} .
更新mochiweb应用
让我们假设用户是由基于连入mochieweb的URl中的Id号所描述的,我们用那个id向消息路由器注册。 取代阻塞10秒后发送消息,mochiweb的loop循环将组塞在从路由器接收消息上,路由器给mochiweb进程发送消息后mochiweb进程就会向客户端发送Http数据块:
客户端从http://localhost:8000/test/123连接mochiwebMochiweb应用为id为123的用户的那个连接向消息路由器注册进程(pid)假如你用id为123向路由器发送一条消息,他将转发到正确的mochiweb进程,继而消息会出现在那个用户的浏览器上
这是mochiconntest_web.erl的更新版本:
-module( mochiconntest_web) . -export([ start/1 , stop/0 , loop/2]) . %% External API start(Options) -> {DocRoot , Options1} = get_option( docroot, Options) , Loop = fun (Req) -> ?MODULE :loop(Req , DocRoot) end , % we’ll set our maximum to 1 million connections. (default: 2048) mochiweb_http:start([{ max, 1000000} , { name, ?MODULE} , { loop, Loop} | Options1]) . stop() -> mochiweb_http :stop( ?MODULE) . loop(Req , DocRoot) -> "/" ++ Path = Req :get( path) , caseReq :get( method)of Method when Method =:= ‘GET’ ; Method =:= ‘HEAD’ -> casePathof "test/" ++ Id -> Response = Req :ok({"text/html; charset=utf-8" , [{"Server" ,"Mochiweb-Test"}] , chunked}) , % login using an integer rather than a string {IdInt , _} = string:to_integer(Id) , router:login(IdInt , self()) , feed(Response , IdInt , 1) ; _ -> Req :not_found() end ; ‘POST’ -> casePathof _ -> Req :not_found() end ; _ -> Req :respond({501 , [] , []}) end . feed(Response , Id , N) -> receive { router_msg, Msg} -> Html = io_lib:format("Recvd msg #~w: ‘~s’" , [N , Msg]) , Response :write_chunk(Html) end , feed(Response , Id , N+1) . %% Internal API get_option(Option , Options) -> { proplists:get_value(Option , Options) , proplists:delete(Option , Options)} .
动起来!
现在让我们让它活起来 - 我们用两个erlang shell, 一个用于mochiweb,一个用于路由器。 编辑start-dev.sh , 用于启动mochiweb, 下面的额外参数是用于erl的:
-sname n1 命名节点‘n1′+K true 使kernel-poll有效。 当处理当量的连接时看起来不是那么的不知所措+P 134217727 缺省的你能调度的最大进程数为 32768. 想像一下我们每个连接就用一个进程(我不知道不那样做的更好原因) 我建议设置这个参数为最大的可能值。 根据 “man erl”,134,217,727 是最大的
现在运行make && ./start-dev.sh 你将看到一个像(n1@localhost)1>的提示符 -你的mochiweb应用已经那个运行起来了,erlang节点也有了名字。
现在运行另外的erlang shell,就像这样:
erl -sname n2
现在两个erlang实例彼此不知道对方,更正它:
(n2@localhost)1> nodes().
[]
(n2@localhost)2> net_adm:ping(n1@localhost).
pong
(n2@localhost)3> nodes().
[n1@localhost]
现在从这个shell上编译启动路由器:
(n2@localhost)4> c(router).
{ok,router}
(n2@localhost)5> router:start_link().
{ok,<0.38.0>}
现在更好玩点, 从浏览器中执行 http://localhost:8000/test/123 (或者从终端执行lynx --source "http://localhost:8000/test/123" )。 检查运行路由器的shell,你将看到已经有一个用户登陆了。
你现在可以向路由器发送消息并且在浏览器上看到她们。现在只是发送字符串,因为我们在feed函数中用~s 来格式化io_lib:fomart中的参数,原子将使其崩溃:
借用你运行路由器的shell:
(n2@localhost)6> router:send(123, "Hello World").
(n2@localhost)7> router:send(123, "Why not open another browser window too?").
(n2@localhost)8> router:send(456, "This message will go into the void unless you are connected as /test/456 too").
检查你的浏览器,你已经得到了comet,呵呵
在分布式erlang系统中运行
感觉上路由器和mochiweb前端运行在不同的机器上。 假设你有一对备用机用来测试,你应该把erlangshell作为分布式节点启动,也就是说用 -name n1@host1.example.com 取代 -sname n1 (n2也一样)。确信他们可以看到彼此,就像上面似的用 net_adm:ping(...) 。
注意router.erl中的16行, 路由器进程的名字(’router’)被注册成全局的,因为我们在对gen_server的调用中用随后的宏去标志和定位路由器,它已经在分布式系统中很好的工作了:
-define(SERVER, global:whereis_name(?MODULE))。
在分布式系统中为进程注册全局名是elang为你做的很自然的事情之一。
生成大量信息
在实际环境中我们可能看到像用例模型似的长尾想象,有一些很活跃的用户和很多不活跃用户。但是在这个测试中我们将不分青红皂白的为随机用户生成无用的消息。
msggen.erl:
-module( msggen) .-export([ start/3]) . start(0 , _, _) -> ok ;start(Num , Interval , Max) -> Id = random:uniform(Max) , router:send(Id , "Fake message Num = " ++ Num) , receiveafterInterval -> start(Num-1 , Interval , Max)end .
这将向id在1到max之间的随机用户发送Num个消息,每次发送等待Interval 毫秒。
你可以看到这些东西假如你运行路由器和mochiweb应用后用浏览器连接http://localhost:8000/test/3之 后执行
erl -sname test (test@localhost)1> net_adm:ping(n1@localhost).pong (test@localhost)2> c(msggen).{ok,msggen} (test@localhost)3> msggen:start(20, 10, 5).ok
这将向随机id在1-5之间的用户发送20条消息,每条消息之间有10毫秒等待。 Id 3有机会收到一条或者四条消息。
我们可以均等的并行运行一些进程以模拟多个消息源。这里的例子是生成10个进程,每个进程发送20条消息到1-5号用户,每条消息之间间隔100毫秒:
[ spawn(fun() -> msggen:start(20, 100, 5), io:format("~w finished.
", [self()]) end) || _ <- lists:seq(1,10) ].
[<0.97.0>,<0.98.0>,<0.99.0>,<0.100.0>,<0.101.0>,<0.102.0>,
<0.103.0>,<0.104.0>,<0.105.0>,<0.106.0>]
<0.101.0> finished.
<0.105.0> finished.
<0.106.0> finished.
<0.104.0> finished.
<0.102.0> finished.
<0.98.0> finished.
<0.99.0> finished.
<0.100.0> finished.
<0.103.0> finished.
<0.97.0> finished.
再次感受C10K测试
现在我们需要运行另外一个有更大伸缩行的测试;客户端链接到mochiweb应用,mochiweb把他们注册到路由器。我们能生成更多的虚假消息来考验路由器,路由器将把这些消息发送到任何注册的客户端。让我们再次运行在Part 1 使用的10,000个并发用户的测试,但是这次我们将在传输大量消息之前保持所有的客户连接一段时间。
假设你按照在 部分提到的操作调整了你的内核,增加了最大文件限制,这很简单。你已经运行了mochiweb应用和路由器,让我们查看下流量情况。
在没有任何客户连接的情况下, 每个pochiweb beam进程用了大约40MB内存(常驻):
$ ps -o rss= -p `pgrep -f 'sname n1'`
40156
带‘sname n1'的greps命令是为了得到我们的mochiweb erlang进程ID的,然后用带有格式化选项的ps命令打印常驻内存大小 (KB)
我组合出了这讨厌的每60秒用一行打印时间戳, 当前内存用量 (常驻 KB), 当前建立的连接数的语句 - 在另外一个运行mochiweb的机子的终端中放任他运行:
假如有谁知道为一个进程长时间动态剥离出内存用量更好的方法,请留个言。
现在在一个新的erl shell中运行第一部分使用的floodtest工具:
erl> floodtest:start("/tmp/mochi-urls.txt", 10).
他已经达到了10K个并发连接(加上我用firefox打开的这个),mochiweb常驻内存的大小也在90MB左右(90964KB)。
也就是100个进程各自以每秒10条消息的速度向随机的id为1到10000的客户端发送1百万条消息。那就意味着路由器每秒钟看到1000条消息,平均我们10K个客户端每10秒得到一条信息。
当10K个客户端已经连接后,你能看到内存已经由40MB涨到了90M,运行更长一段时间涨到了125M。
我运行这个24小时, mochiweb进程的内存用量信息被写到mochimem.log文件中。这是10000个连接客户端,每秒1000条消息发送给随机的客户端。
下面的bash/awk语句是为了把mochimem.log信息转成图例:
内存用量,c10k连接, 1000条消息/秒,24小时
这个图展示内存用量 (10k 活跃连接, 1000条消息/秒) 24小时持续在250M。有两个大点的下掉, 一个在测试开始一个在结束, 这是当在mochiweb进程中处于好奇运行这个:
erl> [erlang:garbage_collect(P) || P <- erlang:processes()].
他迫使所有的进程进行垃圾回收,这收回大约100MB的内存-下面我们研究一些不用手动强迫进行垃圾回收的方法以节约内存。
在mochiweb减少内存的方法
看起来mochiweb应用只是发送消息然后立即跌掉她们,内存用量不应该随消息发送数的增长而增长。
对于Erlang内存管理我是个新手,但是我继续假设能够频繁的进行垃圾回收,这将允许我们剩下大量内存, 最终让我们能用比较少的系统内存服务更多用户。 我们可能利用更多点的cpu占用率, 但是是可以接收的。
深挖
有这么几个选项:
erlang:system_flag(fullsweep_after, Number)
Number是一个标志在没有全扫描的情况下多少次垃圾回收可以做的一个非负常数。这个值适用于新进程;已经运行的进程不受影响。
在低内存的系统中(特别没有虚拟内存),设置这个值为0可以帮助节约内存
另一个设置次值可选的方法是通过(操作系统)环境变量ERL_FULLSWEEP_AFTER。
听起来挺有意思,但是他仅仅适用于新进程而且将对虚拟机中的所有进程产生作用,只是除了我们的mochiweb进程,呵呵
接下来
erlang:system_flag(min_heap_size, MinHeapSize)
为进程设置缺省的堆大小。以字为单位。新的min_heap_size仅仅影响当min_heap_size改变后生成的进程。通过spawn_opt/N or process_flag/2 min_heap_size可以为单独进行设置
可能有用,但是我更愿意确保我们的mochiweb进程有一个比缺省值大点的内存堆。我更喜欢尽可能避免为了加spawn选项而对mochieweb源代码打补丁
下面的吸引了我的眼球
erlang:hibernate(Module, Function, Args)
把正在调用的进程处于等待状态,它的内存分配就会尽可能的少,假如进程不想在短时间内接收任何数据了那么这是非常有用的。
进程在有消息发送过来时被唤醒, 跟着调用栈被清空,带有由Args给定参数的Module:Function将得到控制权, 意味着这个进程当函数返回时将被终止。 这样erlang:hibernate/3将永远也返回不到调用他的地方
假如进程在消息队列里有任何消息,进程也会以上面的方式被立即唤醒。
用更专业的术语来说,erlang:hibernate/3 做了下面几点。他丢弃进程的调用栈之后进行垃圾回收。在回收后,所有活跃数据都在一个连续的堆中。这个堆然后把空间压缩到刚好能容纳的了活跃数据的尺寸 (即使这个尺寸比进程的最小堆的还小).
假如进程活跃数据的大小小于最小堆尺寸,第一次垃圾回收也会在进程被唤醒后发生,这样确保堆尺寸会变到不小于最小堆尺寸。
注意,清空调用栈意味着任何异常处理都被移除并在休眠后重新插入。一个影响是进程要用proc_lib启动(间接的, gen_server也可以), 用proc_lib:hibernate/3代替主要是确保异常处理在进程被唤醒后能够继续工作。
听起来很合理 - 让我们发送完每个消息后试着休眠,看到底发生了什么
编辑mochiconntest_web.erl ,改变如下:
让 feed(Response, Id, N)函数的 最后一行调用hibernate,而不是调用他自己登进路由器后立马调用hibernate,而不是调用feed并阻塞在receive上记住导出feed/3 ,这样hibernate在唤醒时可以回调回这个函数
用hibernation更新后的mochiconntest_web.erl :
-module( mochiconntest_web) . -export([ start/1 , stop/0 , loop/2 , feed/3]) . %% External API start(Options) -> {DocRoot , Options1} = get_option( docroot, Options) , Loop = fun (Req) -> ?MODULE :loop(Req , DocRoot) end , % we’ll set our maximum to 1 million connections. (default: 2048) mochiweb_http:start([{ max, 1000000} , { name, ?MODULE} , { loop, Loop} | Options1]) . stop() -> mochiweb_http :stop( ?MODULE) . loop(Req , DocRoot) -> "/" ++ Path = Req :get( path) , caseReq :get( method)of Method when Method =:= ‘GET’ ; Method =:= ‘HEAD’ -> casePathof "test/" ++ IdStr -> Response = Req :ok({"text/html; charset=utf-8" , [{"Server" ,"Mochiweb-Test"}] , chunked}) , {Id , _} = string:to_integer(IdStr) , router:login(Id , self()) , % Hibernate this process until it receives a message: proc_lib:hibernate( ?MODULE , feed, [Response , Id , 1]) ; _ -> Req :not_found() end ; ‘POST’ -> casePathof _ -> Req :not_found() end ; _ -> Req :respond({501 , [] , []}) end . feed(Response , Id , N) -> receive { router_msg, Msg} -> Html = io_lib:format("Recvd msg #~w: ‘~w’<br/>" , [N , Msg]) , Response :write_chunk(Html) end , % Hibernate this process until it receives a message: proc_lib:hibernate( ?MODULE , feed, [Response , Id , N+1]) . %% Internal API get_option(Option , Options) -> { proplists:get_value(Option , Options) , proplists:delete(Option , Options)} .
我做了这些改变,运行make重新构建mochiweb,然后重做同样的c10k测试 (1000条消息/秒 24小时).
运行24小时后的结果 w/ proc_lib:hibernate()
内存用量 c10k, 1000条消息/秒, 24小时, 用hibernate()
恰如其分的,用了hibernate,10K个连接的mochiweb应用的内存用量维持在78MB这个水平, 要比我们在
看到的450MB好的多。CPU占用率也没明显增加。
总结
我们基于mochiweb做了个comet应用,他让我们推送任意消息给由ID标志的客户端。在以每秒1000条消息的速度推送24小时后, 10,000个连接用户,我们发现它用了80MB内存,或者说每个用户8KB 。 我们同样也做了很漂亮的图。
相对于在
我们所看到的每个用户45KB,这是一个很大的改进。这些优化节省是归因于让我们的应用表现的更加贴近实际, 为mochiweb进程在每条消息之间用 hibernate。
下一步
后续, 我将调整它到一百万个连接客户端。我将部署这个测试应用到拥有充沛内存的多核64位服务器上 。这将展示有什么不同,如果有的话也可以运行在64位虚拟机上。为了模拟一百万客户连接我将详细介绍一些额外的技巧和调整 。
这个应用将发展成一系列公共子系统,在那订阅被关联于用户ID 并被存储于这个应用, 而不是当用户连接时由他们提供。我们将调入一个典型的社会网络数据集: friends。这将允许一个用户用一个用户ID登陆并且自动接收任何有他朋友生成的消息。