zoukankan      html  css  js  c++  java
  • Ejabberd网络结构

    ejabberd作为otp应用,与其相关的套接字模块为

    ejabberd_receiver

    ejabberd_socket

    ejabberd_listener

    ejabberd是分为两步来完成此任务的:

    1、ejabberd_listener首先根据配置文件,把相应的端口打开(处于监听状态),但是未接受连接

    配置文件(可知监听三个端口):

    [{{5222,{0,0,0,0},tcp},
      ejabberd_c2s,
      [{access,c2s},{shaper,c2s_shaper},{max_stanza_size,65536}]},
     {{5269,{0,0,0,0},tcp},ejabberd_s2s_in,[]},
     {{5280,{0,0,0,0},tcp},
      ejabberd_http,
      [{captcha,true},
       {http_bind,true},
       {web_admin,true},
       {request_handlers,[{<<"/websocket">>,ejabberd_http_ws}]}]}]

    相应的代码为:

    Sup = ejabberd_sup:start_link(),
    
        Listener =
        {ejabberd_listener,
         {ejabberd_listener, start_link, []},
         permanent,
         infinity,
         supervisor,
         [ejabberd_listener]},
    
    start_link() ->
        supervisor:start_link({local, ejabberd_listeners}, ?MODULE, []).
    init(_) ->
        ets:new(listen_sockets, [named_table, public]),
        bind_tcp_ports(),
        {ok, {{one_for_one, 10, 1}, []}}.
    bind_tcp_ports() ->
        case ejabberd_config:get_option(listen, fun validate_cfg/1) of
        undefined ->
            ignore;
        Ls ->
            lists:foreach(
              fun({Port, Module, Opts}) ->
                  ModuleRaw = strip_frontend(Module),
                  case ModuleRaw:socket_type() of
                  independent -> ok;
                  _ ->
                      bind_tcp_port(Port, Module, Opts)
                  end
              end, Ls)
        end.
    
    bind_tcp_port(PortIP, Module, RawOpts) ->
        try check_listener_options(RawOpts) of
        ok ->
            {Port, IPT, IPS, IPV, Proto, OptsClean} = parse_listener_portip(PortIP, RawOpts),
            {_Opts, SockOpts} = prepare_opts(IPT, IPV, OptsClean),
            case Proto of
            udp -> ok;
            _ ->
                ListenSocket = listen_tcp(PortIP, Module, SockOpts, Port, IPS),
                ets:insert(listen_sockets, {PortIP, ListenSocket}),
                        ok
            end
        catch
        throw:{error, Error} ->
            ?ERROR_MSG(Error, [])
        end.
    listen_tcp(PortIP, Module, SockOpts, Port, IPS) ->
        case ets:lookup(listen_sockets, PortIP) of
        [{PortIP, ListenSocket}] ->
            ?INFO_MSG("Reusing listening port for ~p", [PortIP]),
            ets:delete(listen_sockets, PortIP),
            ListenSocket;
        _ ->
            Res = gen_tcp:listen(Port, [binary,
                        {packet, 0},
                        {active, false},
                        {reuseaddr, true},
                        {nodelay, true},
                        {send_timeout, ?TCP_SEND_TIMEOUT},
                        {send_timeout_close, true},
                        {keepalive, true} |
                        SockOpts]),
            case Res of
            {ok, ListenSocket} ->
                ListenSocket;
            {error, Reason} ->
                socket_error(Reason, PortIP, Module, SockOpts, Port, IPS)
            end
        end.

    2、在ejabberd启动的最后开始接受连接

    ejabberd_listener:start_listeners(),
    
    start_listeners() ->
        case ejabberd_config:get_option(listen, fun validate_cfg/1) of
        undefined ->
            ignore;
        Ls ->
            Ls2 = lists:map(
                fun({Port, Module, Opts}) ->
                    case start_listener(Port, Module, Opts) of
                    {ok, _Pid} = R -> R;
                    {error, Error} ->
                    throw(Error)
                end
            end, Ls),
            report_duplicated_portips(Ls),
            {ok, {{one_for_one, 10, 1}, Ls2}}
        end.
    start_listener(Port, Module, Opts) ->
        case start_listener2(Port, Module, Opts) of
        {ok, _Pid} = R -> R;
        {error, {{'EXIT', {undef, [{M, _F, _A}|_]}}, _} = Error} ->
            ?ERROR_MSG("Error starting the ejabberd listener: ~p.~n"
                   "It could not be loaded or is not an ejabberd listener.~n"
                   "Error: ~p~n", [Module, Error]),
            {error, {module_not_available, M}};
        {error, {already_started, Pid}} ->
            {ok, Pid};
        {error, Error} ->
            {error, Error}
        end.
    start_listener2(Port, Module, Opts) ->
        %% It is only required to start the supervisor in some cases.
        %% But it doesn't hurt to attempt to start it for any listener.
        %% So, it's normal (and harmless) that in most cases this call returns: {error, {already_started, pid()}}
        maybe_start_sip(Module),
        start_module_sup(Port, Module),
        start_listener_sup(Port, Module, Opts).
    start_module_sup(_Port, Module) ->
        Proc1 = gen_mod:get_module_proc(<<"sup">>, Module),
        ChildSpec1 =
        {Proc1,
         {ejabberd_tmp_sup, start_link, [Proc1, strip_frontend(Module)]},
         permanent,
         infinity,
         supervisor,
         [ejabberd_tmp_sup]},
        supervisor:start_child(ejabberd_sup, ChildSpec1).
    
    start_listener_sup(Port, Module, Opts) ->
        ChildSpec = {Port,
             {?MODULE, start, [Port, Module, Opts]},
             transient,
             brutal_kill,
             worker,
             [?MODULE]},
        supervisor:start_child(ejabberd_listeners, ChildSpec).
    
    start(Port, Module, Opts) ->
        %% Check if the module is an ejabberd listener or an independent listener
        ModuleRaw = strip_frontend(Module),
        case ModuleRaw:socket_type() of
        independent -> ModuleRaw:start_listener(Port, Opts);
        _ -> start_dependent(Port, Module, Opts)
        end.
    
    %% @spec(Port, Module, Opts) -> {ok, Pid} | {error, ErrorMessage}
    start_dependent(Port, Module, Opts) ->
        try check_listener_options(Opts) of
        ok ->
            proc_lib:start_link(?MODULE, init, [Port, Module, Opts])
        catch
        throw:{error, Error} ->
            ?ERROR_MSG(Error, []),
            {error, Error}
        end.
    init(PortIP, Module, RawOpts) ->
        {Port, IPT, IPS, IPV, Proto, OptsClean} = parse_listener_portip(PortIP, RawOpts),
        {Opts, SockOpts} = prepare_opts(IPT, IPV, OptsClean),
        if Proto == udp ->
            init_udp(PortIP, Module, Opts, SockOpts, Port, IPS);
           true ->
            init_tcp(PortIP, Module, Opts, SockOpts, Port, IPS)
        end.
    init_tcp(PortIP, Module, Opts, SockOpts, Port, IPS) ->
        ListenSocket = listen_tcp(PortIP, Module, SockOpts, Port, IPS),
        %% Inform my parent that this port was opened succesfully
        proc_lib:init_ack({ok, self()}),
        case erlang:function_exported(Module, tcp_init, 2) of
        false ->
            accept(ListenSocket, Module, Opts);
        true ->
            case catch Module:tcp_init(ListenSocket, Opts) of
            {'EXIT', _} = Err ->
                ?ERROR_MSG("failed to process callback function "
                       "~p:~s(~p, ~p): ~p",
                       [Module, tcp_init, ListenSocket, Opts, Err]),
                accept(ListenSocket, Module, Opts);
            NewOpts ->
                accept(ListenSocket, Module, NewOpts)
            end
        end.

    由此可知针对每一个要开启的server,系统分别开启了两个进程:

    一个作为ejabberd_listeners的子进程,打开端口,接受外部的连接 start_listener_sup/3(最后走到init_tcp,利用accept接受连接)

    一个作为supervisor开启,管理上面进程派生的子进程start_module_sup/2,因为start_listener_sup接受到一个连接后,就会开启一个进程处理接受到的连接,原进程继续处于监听状态

    当一个外部连接到来时:

    accept根据CallMod选取对应的模块:

    例如 ejabberd_socket,进入到ejabberd_socket:start(strip_frontend(Module), gen_tcp, Socket, Opts)

    ejabberd_socket为每个接受到的连接生成两个进程(以ejabberd_c2s为例):

    1、ejabberd_receiver处理接受到的XML Stanza信息(SockMod:controlling_process(Socket, Receiver)),并转发给业务逻辑模块

    2、开启ejabberd_c2s(Module:start({?MODULE, SocketData}, Opts)),业务逻辑模块handler,handler决定如何处理接受到的信息,并可能使用ejabberd_socket发送响应数据

    最后如果想扩展ejabberd使用其他的非xmpp协议:

    则需要自定义receiver作为解析协议模块,handler作为业务逻辑模块

  • 相关阅读:
    重置root密码
    JavaEE完整体系架构
    Analysis servlet injection
    隔离级别
    ULVAC爱发科皮拉尼真空计SW1-N说明书-手册
    研华advantech-凌华ADLINK板卡运动控制卡
    vc6.0转vs2012的一些错误与解决方法
    MFC时间简单比较方法
    MFC_VC++_时间获取与保存列表控件内容到文件操作方法
    show and hide. xp扩展名
  • 原文地址:https://www.cnblogs.com/lawen/p/5057863.html
Copyright © 2011-2022 走看看