zoukankan      html  css  js  c++  java
  • 使用 erlang OTP 模式编写非阻塞的 tcp 服务器(来自erlang wiki)

    参考资料:http://erlangcentral.org/wiki/index.php/Building_a_Non-blocking_TCP_server_using_OTP_principles

    服务器设计
    tcp_server_app下的根监控树使用one_for_one重启策略。两个子树应用,第一个是一个tcp套接字监听服务器,使用gen_server模式来实现,采用异步监听的客户端连接的模式。第二个是一个客户端应用,使用gen_fsm模式实现,使用标准SASL错误报告接口,记录客户端消息处理的日志以及非正常与服务器断开连接日志。

    整体应用架构:
                     +----------------+
                     | tcp_server_app |
                     +--------+-------+
                              | (one_for_one)
             +----------------+---------+
             |                                       |
     +-------+------+           +-------+--------+
     | tcp_listener |                + tcp_client_sup |
     +--------------+            +-------+--------+
                                                      | (simple_one_for_one)
                                             +-----|---------+
                                          +-------|--------+|
                                         +--------+-------+|+
                                          |  tcp_echo_fsm  |+
                                         +----------------+

    tcp_server代码如下:

      1 %% TCP Server Application (tcp_server_app.erl)
      2 -module(tcp_server_app).
      3 -author('saleyn@gmail.com').
      4  
      5 %% 实现application模式
      6 -behaviour(application).
      7  
      8 -export([start_client/0]).
      9  
     10 %% 应用程序启动以及监控树回调函数
     11 -export([start/2, stop/1, init/1]).
     12  
     13 %% 宏变量定义
     14 -define(MAX_RESTART,    5).
     15 -define(MAX_TIME,      60).
     16 -define(DEF_PORT,    2222).
     17  
     18 %% 启动客户端进程的接口
     19 %% 在监听程序建立连接时调用
     20 start_client() ->
     21     %% 回调第二个init函数,因为第二个是动态添加监控树子节点
     22     %% 也就是说这里是两颗不同的监控树,使用了一个模块两个 init 函数来实现
     23     supervisor:start_child(tcp_client_sup, []). 
     24  
     25 %%----------------------------------------------------------------------
     26 %% Application behaviour callbacks
     27 %%----------------------------------------------------------------------
     28 start(_Type, _Args) ->
     29     %% 获取端口配置参数,找不到时返回默认端口 ?DEF_PORT
     30     ListenPort = get_app_env(listen_port, ?DEF_PORT), 
     31 
     32     %% 启动应用程序,回调函数为 第一个 init 函数,根据参数匹配,参数为 [端口,客户端回调模块]
     33     %% 第一个 init 函数仅仅是启动了两个监控树
     34     supervisor:start_link({local, ?MODULE}, ?MODULE, [ListenPort, tcp_echo_fsm]).
     35  
     36 stop(_S) ->
     37     ok.
     38  
     39 %%----------------------------------------------------------------------
     40 %% Supervisor behaviour callbacks
     41 %%----------------------------------------------------------------------
     42 init([Port, Module]) ->
     43     {ok,
     44         %% 监控树策略参数,ono_for_one策略,设置MAX_TIME最多重启的MAX_RESTART次
     45         {_SupFlags = {one_for_one, ?MAX_RESTART, ?MAX_TIME},
     46             [
     47               % TCP Listener
     48               {   tcp_server_sup,                          % Id       = internal id
     49                   {tcp_listener,start_link,[Port,Module]}, % StartFun = {M, F, A}
     50                   permanent,                               % Restart  = permanent | transient | temporary
     51                   2000,                                    % Shutdown = brutal_kill | int() >= 0 | infinity
     52                   worker,                                  % Type     = worker | supervisor
     53                   [tcp_listener]                           % Modules  = [Module] | dynamic
     54               },
     55               % Client instance supervisor
     56               {   
     57                      %% Module参数初始化了tcp_client_sup监控树的 init 函数, init 函数在下面
     58                      tcp_client_sup, 
     59                      %% 子节点启动策略
     60                   {supervisor,start_link,[{local, tcp_client_sup}, ?MODULE, [Module]]}, 
     61                   permanent,                               % Restart  = permanent | transient | temporary
     62                   infinity,                                % Shutdown = brutal_kill | int() >= 0 | infinity
     63                   supervisor,                              % Type     = worker | supervisor
     64                   []                                       % Modules  = [Module] | dynamic
     65               }
     66             ]
     67         }
     68     };
     69  
     70 %% 在服务器接收连接时,创建客户端进程时会回调到这个函数,使用simple_one_for_one启动策略 
     71 %% 参数 Module 在第一个
     72 init([Module]) ->
     73     {ok,    
     74         %% 另外一种根监督树模式,simple_one_for_one策略子节点只能动态添加
     75         {_SupFlags = {simple_one_for_one, ?MAX_RESTART, ?MAX_TIME},
     76             [
     77               % TCP Client
     78               {   undefined,                               % Id       = internal id
     79                   {Module,start_link,[]},                  % StartFun = {M, F, A}
     80                   temporary,                               % Restart  = permanent | transient | temporary
     81                   2000,                                    % Shutdown = brutal_kill | int() >= 0 | infinity
     82                   worker,                                  % Type     = worker | supervisor
     83                   []                                       % Modules  = [Module] | dynamic
     84               }
     85             ]
     86         }
     87     }.
     88  
     89 %%----------------------------------------------------------------------
     90 %% Internal functions
     91 %%----------------------------------------------------------------------
     92 %% 获取配置文件xxx.app文件中的配置变量 
     93 get_app_env(Opt, Default) ->
     94     case application:get_env(application:get_application(), Opt) of
     95     {ok, Val} -> Val;
     96     _ ->
     97         case init:get_argument(Opt) of
     98         [[Val | _]] -> Val;
     99         error       -> Default
    100         end
    101     end.

    下面是服务端socket监听程序,这里使用了一个不具有官方文档的 api
     prim_inet:async_accept/2 来实现一个异步监听套接字的服务器程序,代码如下:

      1 % TCP Listener Process (tcp_listener.erl)    
      2 -module(tcp_listener).
      3 -author('saleyn@gmail.com').
      4  
      5 %% 实现 gen_server 模式 
      6 -behaviour(gen_server).
      7  
      8 %% 内部接口
      9 -export([start_link/2]).
     10  
     11 %% gen_server 回调函数
     12 -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
     13          code_change/3]).
     14  
     15 %% 定义了一个 record 记录 gen_server 进程的状态
     16 -record(state, {
     17                 listener,       % Listening socket
     18                 acceptor,       % Asynchronous acceptor's internal reference
     19                 module          % FSM handling module
     20                }).
     21  
     22 %%--------------------------------------------------------------------
     23 %% @spec (Port::integer(), Module) -> {ok, Pid} | {error, Reason}
     24 %% @doc 监控树调用并开始进行tcp套接字监听
     25 %% @end
     26 %%----------------------------------------------------------------------
     27 start_link(Port, Module) when is_integer(Port), is_atom(Module) ->
     28     gen_server:start_link({local, ?MODULE}, ?MODULE, [Port, Module], []).
     29  
     30 %%%------------------------------------------------------------------------
     31 %%% Callback functions from gen_server
     32 %%%------------------------------------------------------------------------
     33  
     34 %%----------------------------------------------------------------------
     35 %% @spec (Port::integer()) -> {ok, State}           |
     36 %%                            {ok, State, Timeout}  |
     37 %%                            ignore                |
     38 %%                            {stop, Reason}
     39 %%
     40 %% @doc gen_server启动时回调,并创建 tcp 监听
     41 %% @end
     42 %%----------------------------------------------------------------------
     43 init([Port, Module]) ->
     44     process_flag(trap_exit, true),
     45     Opts = [binary, {packet, 2}, {reuseaddr, true},
     46             {keepalive, true}, {backlog, 30}, {active, false}],
     47     %% 使用 gen_tcp 模块启动套接字监听,这是一个阻塞动作
     48     case gen_tcp:listen(Port, Opts) of
     49     {ok, Listen_socket} -> %% 创建监听成功返回监听socket
     50         %% 创建第一个接受连接的进程
     51         %% prim_inet:async_accept/2开启异步监听
     52         %% 之后有客户端连接时会向此进程发送一个异步消息inet_async到进程消息队列
     53         %% Ref 存储接受进程的引用
     54         {ok, Ref} = prim_inet:async_accept(Listen_socket, -1),
     55         {ok, #state{listener = Listen_socket,
     56                     acceptor = Ref,
     57                     module   = Module}};
     58     {error, Reason} ->
     59         {stop, Reason}
     60     end.
     61  
     62 %%-------------------------------------------------------------------------
     63 %% @spec (Request, From, State) -> {reply, Reply, State}          |
     64 %%                                 {reply, Reply, State, Timeout} |
     65 %%                                 {noreply, State}               |
     66 %%                                 {noreply, State, Timeout}      |
     67 %%                                 {stop, Reason, Reply, State}   |
     68 %%                                 {stop, Reason, State}
     69 %% @doc 服务进程被同步调用时的回调函数
     70 %% @end
     71 %% @private
     72 %%-------------------------------------------------------------------------
     73 handle_call(Request, _From, State) ->
     74     {stop, {unknown_call, Request}, State}.
     75  
     76 %%-------------------------------------------------------------------------
     77 %% @spec (Msg, State) ->{noreply, State}          |
     78 %%                      {noreply, State, Timeout} |
     79 %%                      {stop, Reason, State}
     80 %% @doc 服务进程被异步调用时的回调函数
     81 %% @end
     82 %% @private
     83 %%-------------------------------------------------------------------------
     84 handle_cast(_Msg, State) ->
     85     {noreply, State}.
     86  
     87 %%-------------------------------------------------------------------------
     88 %% @spec (Msg, State) ->{noreply, State}          |
     89 %%                      {noreply, State, Timeout} |
     90 %%                      {stop, Reason, State}
     91 %% @doc 回调函数,处理那些直接发消息到进程邮箱的事件
     92 %% 这里处理的是 {inet_async, ListSock, Ref, {ok, CliSocket}}事件,
     93 %% inet_async 表示是一个异步事件,服务器端接收连接采用异步的方式,
     94 %% 客户端连接最终会被转化成一个 inet_async 消息发送到进程邮箱等待处理
     95 %% {{ok, CliSocket}} 里的CliSocket表示客户端建立的连接套接口
     96 %% @end
     97 %% @private
     98 %%-------------------------------------------------------------------------
     99 
    100 %% 注意这里 ListSock 以及 Ref 做了匹配,只有匹配了才是该监听口接收的连接
    101 handle_info({inet_async, ListSock, Ref, {ok, CliSocket}},
    102             #state{listener=ListSock, acceptor=Ref, module=Module} = State) ->
    103     try
    104         case set_sockopt(ListSock, CliSocket) of
    105         ok              -> ok;
    106         {error, Reason} -> exit({set_sockopt, Reason})
    107         end,
    108  
    109         %% 接收新的客户端连接,启动一个新的客户端状态机进程,动态添加到 tcp_client_sup 客户端监控树
    110         {ok, Pid} = tcp_server_app:start_client(),
    111 
    112         %% 绑定 CliSocet 到客户端进程 Pid, 这样CliSocket接收数据都会被转化成Pid代表进程的邮箱消息
    113         gen_tcp:controlling_process(CliSocket, Pid),
    114         %% Instruct the new FSM that it owns the socket.
    115 
    116         Module:set_socket(Pid, CliSocket),
    117  
    118         %% Signal the network driver that we are ready to accept another connection
    119         %% 重新设置异步监听下一个客户端连接的消息,设置新的监听引用
    120         %% 必须重新设置才能监听到 {inet_async,S,Ref,Status} 消息
    121         case prim_inet:async_accept(ListSock, -1) of
    122         {ok,    NewRef} -> ok;
    123         {error, NewRef} -> exit({async_accept, inet:format_error(NewRef)})
    124         end,
    125          
    126          %% 更新新的监听引用
    127         {noreply, State#state{acceptor=NewRef}}
    128     catch exit:Why ->
    129         error_logger:error_msg("Error in async accept: ~p.
    ", [Why]),
    130         {stop, Why, State}
    131     end;
    132 
    133 %%客户端建立连接的容错处理
    134 handle_info({inet_async, ListSock, Ref, Error}, #state{listener=ListSock, acceptor=Ref} = State) ->
    135     error_logger:error_msg("Error in socket acceptor: ~p.
    ", [Error]),
    136     {stop, Error, State};
    137  
    138 handle_info(_Info, State) ->
    139     {noreply, State}.
    140  
    141 %%-------------------------------------------------------------------------
    142 %% @spec (Reason, State) -> any
    143 %% @doc  Callback executed on server shutdown. It is only invoked if
    144 %%       `process_flag(trap_exit, true)' is set by the server process.
    145 %%       The return value is ignored.
    146 %% @end
    147 %% @private
    148 %%-------------------------------------------------------------------------
    149 terminate(_Reason, State) ->
    150     gen_tcp:close(State#state.listener),
    151     ok.
    152  
    153 %%-------------------------------------------------------------------------
    154 %% @spec (OldVsn, State, Extra) -> {ok, NewState}
    155 %% @doc  Convert process state when code is changed.
    156 %% @end
    157 %% @private
    158 %%-------------------------------------------------------------------------
    159 code_change(_OldVsn, State, _Extra) ->
    160     {ok, State}.
    161  
    162 %%%------------------------------------------------------------------------
    163 %%% Internal functions
    164 %%%------------------------------------------------------------------------
    165  
    166 %% 设置客户端socket的参数选项,只是简单的复制了监听服务器的配置选项
    167 set_sockopt(ListSock, CliSocket) ->
    168     true = inet_db:register_socket(CliSocket, inet_tcp),
    169     case prim_inet:getopts(ListSock, [active, nodelay, keepalive, delay_send, priority, tos]) of
    170     {ok, Opts} ->
    171         case prim_inet:setopts(CliSocket, Opts) of
    172         ok    -> ok;
    173         Error -> gen_tcp:close(CliSocket), Error
    174         end;
    175     Error ->
    176         gen_tcp:close(CliSocket), Error
    177     end.

    下面是客户端处理输出的状态机:

      1 %% TCP Client Socket Handling FSM (tcp_echo_fsm.erl)
      2 %% 客户端输出处理状态机,这里其实就是一个 echo_server 的客户端版本
      3     
      4 -module(tcp_echo_fsm).
      5 -author('saleyn@gmail.com').
      6  
      7 %% 实现 gen_fsm 模式,事实上状态机应用场景没有 gen_server 多
      8 %% 不过能用的场景都比较特殊,比如游戏客户端,服务端战斗模块
      9 -behaviour(gen_fsm).
     10  
     11 -export([start_link/0, set_socket/2]).
     12  
     13 %% gen_fsm 回调函数
     14 -export([init/1, handle_event/3,
     15          handle_sync_event/4, handle_info/3, terminate/3, code_change/4]).
     16  
     17 %% FSM States FSM 状态机的状态
     18 -export([
     19     'WAIT_FOR_SOCKET'/2, %% 等待socket
     20     'WAIT_FOR_DATA'/2    %% 等待socket数据
     21 ]).
     22  
     23 -record(state, {
     24                 socket,    % client socket
     25                 addr       % client address
     26                }).
     27  
     28 -define(TIMEOUT, 120000).
     29  
     30 %%%------------------------------------------------------------------------
     31 %%% API
     32 %%%------------------------------------------------------------------------
     33  
     34 %%-------------------------------------------------------------------------
     35 %% @spec (Socket) -> {ok,Pid} | ignore | {error,Error}
     36 %% @doc To be called by the supervisor in order to start the server.
     37 %%      If init/1 fails with Reason, the function returns {error,Reason}.
     38 %%      If init/1 returns {stop,Reason} or ignore, the process is
     39 %%      terminated and the function returns {error,Reason} or ignore,
     40 %%      respectively.
     41 %% @end
     42 %%-------------------------------------------------------------------------
     43 start_link() ->
     44     gen_fsm:start_link(?MODULE, [], []).
     45  
     46 set_socket(Pid, Socket) when is_pid(Pid), is_port(Socket) ->
     47     gen_fsm:send_event(Pid, {socket_ready, Socket}).
     48  
     49 %%%------------------------------------------------------------------------
     50 %%% Callback functions from gen_server
     51 %%%------------------------------------------------------------------------
     52  
     53 %%-------------------------------------------------------------------------
     54 %% Func: init/1
     55 %% Returns: {ok, StateName, StateData}          |
     56 %%          {ok, StateName, StateData, Timeout} |
     57 %%          ignore                              |
     58 %%          {stop, StopReason}
     59 %% @private
     60 %%-------------------------------------------------------------------------
     61 init([]) ->
     62     process_flag(trap_exit, true),
     63 
     64       %% 状态机启动之后的初始化状态
     65     {ok, 'WAIT_FOR_SOCKET', #state{}}.
     66  
     67 %%-------------------------------------------------------------------------
     68 %% Func: StateName/2
     69 %% Returns: {next_state, NextStateName, NextStateData}          |
     70 %%          {next_state, NextStateName, NextStateData, Timeout} |
     71 %%          {stop, Reason, NewStateData}
     72 %% @private
     73 %%-------------------------------------------------------------------------
     74 
     75 %% 创建客户端之后 set_socket 函数发送消息之后在这里被处理了
     76 %% 大致逻辑是:收到通知,客户端连接socket到手,可以设置套接字选项并开始接收数据
     77 'WAIT_FOR_SOCKET'({socket_ready, Socket}, State) when is_port(Socket) ->
     78     % Now we own the socket
     79     inet:setopts(Socket, [{active, once}, {packet, 2}, binary]),
     80     {ok, {IP, _Port}} = inet:peername(Socket),
     81 
     82     %% 确定了socket之后,状态机的下一个状态就是等着接收数据了
     83     {next_state, 'WAIT_FOR_DATA', State#state{socket=Socket, addr=IP}, ?TIMEOUT};
     84 'WAIT_FOR_SOCKET'(Other, State) ->
     85     error_logger:error_msg("State: 'WAIT_FOR_SOCKET'. Unexpected message: ~p
    ", [Other]),
     86     %% Allow to receive async messages
     87     {next_state, 'WAIT_FOR_SOCKET', State}.
     88  
     89 %% 显示来自客户端的事件
     90 'WAIT_FOR_DATA'({data, Data}, #state{socket=S} = State) ->
     91     ok = gen_tcp:send(S, Data),
     92     {next_state, 'WAIT_FOR_DATA', State, ?TIMEOUT};
     93  
     94 'WAIT_FOR_DATA'(timeout, State) ->
     95     error_logger:error_msg("~p Client connection timeout - closing.
    ", [self()]),
     96     {stop, normal, State};
     97  
     98 'WAIT_FOR_DATA'(Data, State) ->
     99     io:format("~p Ignoring data: ~p
    ", [self(), Data]),
    100     {next_state, 'WAIT_FOR_DATA', State, ?TIMEOUT}.
    101  
    102 %%-------------------------------------------------------------------------
    103 %% Func: handle_event/3
    104 %% Returns: {next_state, NextStateName, NextStateData}          |
    105 %%          {next_state, NextStateName, NextStateData, Timeout} |
    106 %%          {stop, Reason, NewStateData}
    107 %% @private
    108 %%-------------------------------------------------------------------------
    109 handle_event(Event, StateName, StateData) ->
    110     {stop, {StateName, undefined_event, Event}, StateData}.
    111  
    112 %%-------------------------------------------------------------------------
    113 %% Func: handle_sync_event/4
    114 %% Returns: {next_state, NextStateName, NextStateData}            |
    115 %%          {next_state, NextStateName, NextStateData, Timeout}   |
    116 %%          {reply, Reply, NextStateName, NextStateData}          |
    117 %%          {reply, Reply, NextStateName, NextStateData, Timeout} |
    118 %%          {stop, Reason, NewStateData}                          |
    119 %%          {stop, Reason, Reply, NewStateData}
    120 %% @private
    121 %%-------------------------------------------------------------------------
    122 handle_sync_event(Event, _From, StateName, StateData) ->
    123     {stop, {StateName, undefined_event, Event}, StateData}.
    124  
    125 %%-------------------------------------------------------------------------
    126 %% Func: handle_info/3
    127 %% Returns: {next_state, NextStateName, NextStateData}          |
    128 %%          {next_state, NextStateName, NextStateData, Timeout} |
    129 %%          {stop, Reason, NewStateData}
    130 %% @private
    131 %%-------------------------------------------------------------------------
    132 handle_info({tcp, Socket, Bin}, StateName, #state{socket=Socket} = StateData) ->
    133     % Flow control: enable forwarding of next TCP message
    134     inet:setopts(Socket, [{active, once}]),
    135     ?MODULE:StateName({data, Bin}, StateData);
    136  
    137 handle_info({tcp_closed, Socket}, _StateName,
    138             #state{socket=Socket, addr=Addr} = StateData) ->
    139     error_logger:info_msg("~p Client ~p disconnected.
    ", [self(), Addr]),
    140     {stop, normal, StateData};
    141  
    142 handle_info(_Info, StateName, StateData) ->
    143     {noreply, StateName, StateData}.
    144  
    145 %%-------------------------------------------------------------------------
    146 %% Func: terminate/3
    147 %% Purpose: Shutdown the fsm
    148 %% Returns: any
    149 %% @private
    150 %%-------------------------------------------------------------------------
    151 terminate(_Reason, _StateName, #state{socket=Socket}) ->
    152     (catch gen_tcp:close(Socket)),
    153     ok.
    154  
    155 %%-------------------------------------------------------------------------
    156 %% Func: code_change/4
    157 %% Purpose: Convert process state when code is changed
    158 %% Returns: {ok, NewState, NewStateData}
    159 %% @private
    160 %%-------------------------------------------------------------------------
    161 code_change(_OldVsn, StateName, StateData, _Extra) ->
    162     {ok, StateName, StateData}.

    最后是app文件了:

     1 %% tcp_server.app 文件
     2     
     3 {application, tcp_server,
     4  [
     5   {description, "Demo TCP server"},
     6   {vsn, "1.0"},
     7   {id, "tcp_server"},
     8   {modules,      [tcp_listener, tcp_echo_fsm]},
     9   {registered,   [tcp_server_sup, tcp_listener]},
    10   {applications, [kernel, stdlib]},
    11   %%
    12   %% mod: 指定应用启动初始化的模块
    13   %%
    14   {mod, {tcp_server_app, []}},
    15   {env, []}
    16  ]
    17 }.

    以上基本上都是个人查找资料过程的笔记,有理解错误的地方请评论指出,谢谢!

  • 相关阅读:
    es6学习笔记
    vue.js项目目录结构说明
    js 数组操作总结
    js 数组去重方法
    HTTP协议三次握手过程
    MVC与MVVM模式对比
    谱面编辑器
    LL谱面分析和难度标定
    SLP的模块结构
    LL基本姿势
  • 原文地址:https://www.cnblogs.com/bicowang/p/3976129.html
Copyright © 2011-2022 走看看