zoukankan      html  css  js  c++  java
  • Erlang中频繁发送远程消息要注意的问题

    http://avindev.iteye.com/blog/76373

    注:这篇文章可能会有争议,欢迎提出意见 

    在Erlang中,如果要实现两个远程节点之间的通信,就需要通过网络来实现,对于消息发送,是使用TCP。如果要在两个节点间频繁发送消息,比如每秒几百上千条,那样就要注意了。 

    无论是网游服务器开发的书籍,或是经验老道的工程师,都会告诉你,在发送数据包时,尽可能把小的消息组合为一个比较大的包来发送,毕竟一个TCP包的头也很大,首先是浪费带宽,其次调用底层发送的指令也是有开销的。有工程师告诉我,一般每秒大概是2W次左右。 

    简单测试一下,先是代码 

    一个接收消息并马上抛弃的Server: 

    Java代码  收藏代码
    1. start() ->  
    2.     register(nullserver, self()),  
    3.     loop().  
    4.   
    5. loop() ->  
    6.     receive  
    7.     Any ->  
    8.         loop() %drop message and loop  
    9.     end.  



    一个在循环中向它发送消息的Client: 

    Java代码  收藏代码
    1. start() ->  
    2.     start_send(100).  
    3.   
    4. start_send(0) ->  
    5.     ok;  
    6. start_send(N) ->  
    7.     {nullserver, 'foo@192.168.0.3'} ! hi,  
    8.     start_send(N-1).  



    然后打开截包工具,运行server和client,截取到接近200个包的发送和接收记录,其中,大部分是这样的数据: 

    引用
    00 14 78 B9 14 BC 00 11-11 9F 91 1A 08 00 45 00 
    00 45 EE 77 40 00 80 06-80 E4 C0 A8 00 CC DB E8 
    ED F9 13 58 C1 C6 AA 4E-59 F2 38 CF 22 2D 50 18 
    FF 19 B9 EE 00 00 00 00-00 19 70 83 68 04 61 06 
    67 43 CC 00 00 00 01 00-00 00 00 02 43 05 43 BD 
    83 43 BF                                     



    引用
    00 14 78 B9 14 BC 00 11-11 9F 91 1A 08 00 45 00 
    00 45 EE 78 40 00 80 06-80 E3 C0 A8 00 CC DB E8 
    ED F9 13 58 C1 C6 AA 4E-5A 0F 38 CF 22 2D 50 18 
    FF 19 B9 D1 00 00 00 00-00 19 70 83 68 04 61 06 
    67 43 CC 00 00 00 01 00-00 00 00 02 43 05 43 BD 
    83 43 BF                                      




    实际上,只有从 00 00-00 19 这里开始,才是TCP包的内容,前面都是底层协议的数据,就是这样的数据包发送了100次,浪费是巨大的。而且,在消息发送后,还收到同样数目类似 

    引用
    00 11 11 9F 91 1A 00 14-78 B9 14 BC 08 00 45 00 
    00 28 8C FC 40 00 32 06-30 7D DB E8 ED F9 C0 A8 
    00 CC C1 C6 13 58 38 CF-22 2D AA 4E 59 F2 50 10 
    19 20 D7 01 00 00 00 00-00 00 00 00 

              

    这样的响应包,也浪费着带宽。 


    从目前我所阅读过的文档来看,暂时没有有关如何缓存这些消息定期一并发送的参数设置。那么有什么解决办法,我自己有两种。 

    一种是将要发送的一批Message打包到一个list发送,接收方从list中取出所有message并处理。 

    另一种是通过一个Proxy,发送方不通过 {Name, Node} ! Message 这种方式来发送,而是通过一个本地的Proxy Process,代理会将所有发送到某个节点的消息累积起来,定时批量发送过去;接收方也有一个Listening Process,它接收批量的Message,遍历后发送给本地的相应进程。 

    这里是我初步写出来的实现,不太漂亮,仅供参考~ 

    message_agent.erl: 实现消息的批量发送,接收和转发 

    Java代码  收藏代码
    1. -module(message_agent).  
    2. -export([listen/0, proxy/2, block_exit/1]).  
    3. -export([loop_receive/0]).  
    4. -define(MAX_BATCH_MESSAGE_SIZE, 50).  
    5.   
    6. listen() ->  
    7.     io:format("Message agent server start listen~n"),  
    8.     spawn(fun() -> register('MsgServerAgent', self()), loop_receive() end),  
    9.     ok.  
    10.   
    11. loop_receive() ->  
    12.     receive  
    13.     {forward_message, PName, Messages} ->  
    14.         forward_messages(PName, Messages),  
    15.             loop_receive();  
    16.     Any ->  
    17.         message_agent:loop_receive()  
    18.     end.  
    19.   
    20. forward_messages(PName, []) ->  
    21.     ok;  
    22. forward_messages(PName, [H|T]) ->  
    23.     %io:format("Forward message ~w to process ~w~n", [H, PName]),  
    24.     catch PName ! H,  
    25.     forward_messages(PName, T).  
    26.   
    27.   
    28. proxy(Node, PName) ->  
    29.     spawn_link(fun() -> handle_message_forward(Node, PName, []) end).  
    30.   
    31. block_exit(Agent) ->  
    32.     Agent ! {block_wait, self()},  
    33.     receive  
    34.     {unblock} ->  
    35.         ok  
    36.     end.  
    37.   
    38. handle_message_forward(Node, PName, Messages) ->  
    39.     receive  
    40.     {block_wait, Pid} ->  
    41.         catch send_batch(Node, PName, lists:reverse(Messages)),  
    42.         Pid ! {unblock};  
    43.     Any ->  
    44.         NewMessages = [Any|Messages],  
    45.         case length(NewMessages)>=?MAX_BATCH_MESSAGE_SIZE of  
    46.         true ->  
    47.             send_batch(Node, PName, lists:reverse(NewMessages)),  
    48.             handle_message_forward(Node, PName, []);  
    49.         false ->  
    50.             handle_message_forward(Node, PName, NewMessages)  
    51.         end  
    52.     after  
    53.     0 ->  
    54.         case length(Messages)>0 of  
    55.         true ->  
    56.             catch send_batch(Node, PName, lists:reverse(Messages));  
    57.         false ->  
    58.             ok  
    59.         end,  
    60.         handle_message_forward(Node, PName, [])  
    61.     end.  
    62.   
    63. send_batch(Node, PName, Messages) ->  
    64.     %io:format("Send batch message, size ~p~n", [length(Messages)]),  
    65.     {'MsgServerAgent', Node} ! {forward_message, PName, Messages}.  




    使用方式很简单,在接收Message的一端调用 message_agent:listen() 启动监听代理,客户端使用 register(agent, message_agent:proxy(?NODE, 'MsgServer')) 的方式启动代理进程,消息发送给这个代理进程就可以了。下面是我写的简单例子: 

    Java代码  收藏代码
    1. -module(message_server).  
    2. -export([start/0]).  
    3. -define(TIMEOUT_MS, 1000).  
    4.   
    5. start() ->  
    6.     io:format("Message server start~n"),  
    7.     register('MsgServer', self()),  
    8.     message_agent:listen(),  
    9.     loop_receive(0).  
    10.   
    11. loop_receive(Count) ->  
    12.     receive  
    13.     Any ->  
    14.         %io:format("Receive msg ~w~n", [Any]),  
    15.             loop_receive(Count+1)  
    16.     after  
    17.     ?TIMEOUT_MS ->  
    18.         if   
    19.         Count>0 ->  
    20.             io:format("Previous receive msg count: ~p~n", [Count]),  
    21.             loop_receive(0);  
    22.         true ->  
    23.             loop_receive(0)  
    24.         end  
    25.     end.  



    Java代码  收藏代码
    1. -module(message_client).  
    2. -define(NODE, 'msgsrv@192.168.0.3').  
    3. -define(COUNT, 20000).  
    4. -export([start/0]).  
    5.   
    6. start() ->  
    7.     statistics(wall_clock),  
    8.     register(agent, message_agent:proxy(?NODE, 'MsgServer')),  
    9.     send_loop(?COUNT).  
    10.   
    11. send_loop(0) ->  
    12.     message_agent:block_exit(agent),  
    13.     {_, Interval} = statistics(wall_clock),  
    14.     io:format("Finished ~p sends in ~p ms, exiting...~n", [?COUNT, Interval]);  
    15. send_loop(Count) ->  
    16.     agent ! {self(), lalala},  
    17.     send_loop(Count-1).  



    这里要注意的是,消息发送端和接收端都是由一个单独的进程来处理消息。在Erlang的默认堆实现,是私有堆,本地进程间的消息发送是需要拷贝的,在数据量大的时候,该进程堆的垃圾回收会相当频繁。

  • 相关阅读:
    IIS Admin Service安装
    Linux常用命令总结
    Mysql常用命令操作小结
    mysql常用操作
    初识linux
    python基础
    接口测试基础
    MYSQL笔记
    mysql使用存储函数批量添加数据
    linux的基础命令(博客使用测试中 更新中)
  • 原文地址:https://www.cnblogs.com/fvsfvs123/p/4255374.html
Copyright © 2011-2022 走看看