zoukankan      html  css  js  c++  java
  • ejabberd中的http反向推送

    http的反向推送通常使用"长轮询"或"长连接"的方式。 所谓"长轮询"是指客户端发送请求给服务器,服务器发现没有数据需要发送给客户端。

    http的反向推送通常使用"长轮询"或"长连接"的方式。
    所谓"长轮询"是指客户端发送请求给服务器,服务器发现没有数据需要发送给客户端于是hold住不及时返回,等有数据需要发送给客户端时,进行回复,然后关闭连接,客户端收到回复后再发送新的http请求,以便服务器能有对应的请求用于消息的反向推送。
    而"长连接"是在长轮询的基础上增加"keep-alive"属性,服务器收到请求后不直接回复,等有数据需要发送给客户端时再进行response,但是并不关闭连接,这样客户端收到服务器的response后在同一连接上再次发送http请求。

     在ejabberd的实现中,采用了bosh技术来完成对应的工作,具体定义可参考:

    英文: http://go.rritw.com/xmpp.org/extensions/xep-0124.html

    中文: http://go.rritw.com/wiki.jabbercn.org/XEP-0124

    大概实现原理:ejabberd收到一个客户端http请求后会为该客户端最终创建三个进程:ejabberd_http, ejabberd_http_bind, ejabberd_c2s。

    ejabberd_http进程不断的从对应的socket上收客户端的请求,并转发交给对应的ejabberd_http_bind进程进行处理,然后同步等待处理结果,并将结果返回给客户端。

    复制代码
    init() ->
        ...
        receive_headers(State).
    
    
    receive_headers(#state{trail=Trail} = State) ->
        SockMod = State#state.sockmod,
        Socket = State#state.socket,
        Data = SockMod:recv(Socket, 0, 300000),
        case State#state.sockmod of
            gen_tcp ->
                NewState = process_header(State, Data),
                case NewState#state.end_of_request of
                    true -> ok;
                    _ -> receive_headers(NewState)
                end;
            _ ->
                case Data of
                    {ok, D} ->
                        parse_headers(State#state{trail = <<Trail/binary, D/binary>>});
                    {error, _} ->
                        ok
                end
        end.
    
    process_header(State, Data) ->
        case Data of
            ...
            {ok, http_eoh} ->
                ...
                Out = process_request(State2),
                send_text(State2, Out),
                case State2#state.request_keepalive of
                    true ->
                        ...
                        #state{sockmod = SockMod,
                               socket = Socket,
                               request_handlers = State#state.request_handlers};
                    _ ->
                       #state{end_of_request = true,
                              request_handlers = State#state.request_handlers}
    复制代码

    从代码中可看出,未设置keep-alive属性的时候,该进程处理完一次http请求后便自己结束(长轮询模式)。设置了keep-alive属性的时候,该进程不断的循环接收http请求,并转发接收与响应(长连接模式)。

    ejabberd_http_bind进程负责hold住http请求,对于正常的客户端请求,ejabberd_http_bind进程会将请求转发给对应的ejabberd_c2s进程进行实际业务的处理,而对于空的请求(便于服务器反向推送数据),ejabberd_http_bind设置定时器,等待ejabberd_c2s进程对实际请求的响应或者是需要推送给客户端的消息。

    复制代码
    handle_sync_event({send_xml,Packet},_From,StateName,
                      #state{http_receiver = undefined} = StateData) ->
        Output = [Packet | StateData#state.output],
        Reply = ok,
        {reply, Reply, StateName, StateData#state{output = Output}};
    
    handle_sync_event({send_xml, Packet}, _From, StateName, StateData) ->
        Output = [Packet | StateData#state.output],
        cancel_timer(StateData#state.timer),
        Timer = set_inactivity_timer(StateData#state.pause,
                                     StateData#state.max_inactivity),
        HTTPReply = {ok, Output},
        gen_fsm:reply(StateData#state.http_receiver, HTTPReply),
        cancel_timer(StateData#state.wait_timer),
        Rid = StateData#state.rid,
        ReqList = [#hbr{rid = Rid,key = StateData#state.key,
                        out = Output
                       } |
                   [El || El <- StateData#state.req_list,
                          El#hbr.rid /= Rid ]],
        Reply = ok,
        {reply, Reply, StateName,
         StateData#state{output = [],
                         http_receiver = undefined,
                         req_list = ReqList,
                         wait_timer = undefined,
                         timer = Timer}};
    
    handle_sync_event({http_get,Rid,Wait,Hold},From,StateName,
                      StateData) ->
        %% setup timer
        send_receiver_reply(StateData#state.http_receiver, {ok, empty}),
        cancel_timer(StateData#state.wait_timer),
        TNow = tnow(),
        if
            (Hold > 0) and
            (StateData#state.output == []) and
            ((TNow -StateData#state.ctime<(Wait*1000*1000)) and
            (StateData#state.rid == Rid) and
            (StateData#state.input /= cancel) and
            (StateData#state.pause == 0) ->
                WaitTimer = erlang:start_timer(Wait * 1000, self(), []),
                %% MR: Not sure we should cancel the state timer here.
                cancel_timer(StateData#state.timer),
                             {next_state,StateName,
                              StateData#state{http_receiver = From,
                                              wait_timer = WaitTimer,
                                              timer = undefined}};
            (StateData#state.input == cancel) ->
                cancel_timer(StateData#state.timer),
                Timer = set_inactivity_timer(StateData#state.pause,
                                        StateData#state.max_inactivity),
                Reply = {ok, cancel},
                {reply, Reply, StateName,
                 StateData#state{input = queue:new(),
                                 http_receiver = undefined,
                                 wait_timer = undefined,
                                 timer = Timer}};
            true ->
                cancel_timer(StateData#state.timer),
                Timer = set_inactivity_timer(StateData#state.pause,
                                         StateData#state.max_inactivity),
                Reply = {ok, StateData#state.output},
                %% save request
                ReqList = [#hbr{rid = Rid,
                                key = StateData#state.key,
                                out = StateData#state.output
                               } |
                           [El || El <- StateData#state.req_list,
                                 El#hbr.rid /= Rid]
                          ],
                {reply, Reply, StateName,
                 StateData#state{output = [],
                                 http_receiver = undefined,
                                 wait_timer = undefined,
                                 timer = Timer,
                                 req_list = ReqList}}
        end;
    
    handle_info({timeout, WaitTimer, _}, StateName,
                 #state{wait_timer = WaitTimer} = StateData) ->
        if
            StateData#state.http_receiver /= undefined ->
                cancel_timer(StateData#state.timer),
                Timer = set_inactivity_timer(StateData#state.pause,
                                             StateData#state.max_inactivity),
                gen_fsm:reply(StateData#state.http_receiver, {ok, empty}),
                Rid = StateData#state.rid,
                ReqList = [#hbr{rid = Rid,
                                key = StateData#state.key,
                                out = []
                               } |
                           [El || El <- StateData#state.req_list,
                                  El#hbr.rid /= Rid ]
                           ],
                {next_state, StateName,
                 StateData#state{http_receiver = undefined,
                                 req_list = ReqList,
                                 wait_timer = undefined,
                                 timer = Timer}};
            true ->
                {next_state, StateName, StateData}
        end;
    复制代码

    ejabberd_http进程最终会调用ejabberd_http_bind的http_get方法获取请求的响应结果或者是需要推送的数据,ejabberd_http_bind进程收到请求后会进行相应处理,比如有数据则直接回复,或者设置定时器。当收到ejabberd_c2s进程推送过来的数据时,停止定时器并将数据立即回复给ejabberd_http进程,如果定时器超时则回复一个空的消息。
    =============================
    ejabberd_c2s为客户端对应的会话进程,负责维护客户端的在线状态,联系人列表,请求处理等等。
    ============
    服务器在创建ejabberd_http_bind进程时,会生成一个唯一的sid,用于标识该进程,该sid与ejabberd_http_bind进程pid的对应关系会存储到mnesia中,同时服务器会将sid也会告诉客户端,客户端后续的请求也都需要带上该sid。http_bind收到请求后根据sid从mnesia查找匹配的ejabberd_http_bind进程。

    复制代码
    process_request(Data, IP) ->
        ...
        case catch parse_request(Data, PayloadSize, MaxStanzaSize) of
            %% No existing session:
            {ok, {"", Rid, Attrs, Payload}} ->
                ...
                Sid = sha:sha(term_to_binary({now(), make_ref()})),
                case start(XmppDomain, Sid, "", IP) of
                    {ok, Pid} ->
                        handle_session_start(Pid, XmppDomain, Sid, Rid, Attrs,Payload, PayloadSize, IP)
                    ...
                 end;
            {ok, {Sid, Rid, Attrs, Payload1}} ->
                ...
                handle_http_put(Sid, Rid, Attrs, Payload2, PayloadSize,
                                StreamStart, IP);
    
    handle_session_start(Pid, XmppDomain, Sid, Rid, Attrs,
                         Payload, PayloadSize, IP) ->
        ...
        mnesia:dirty_write(#http_bind{id = Sid,
                                      pid = Pid,
                                      to = {XmppDomain,XmppVersion},
                                      hold = Hold,
                                      wait = Wait,
                                      process_delay = Pdelay,
                                      version = Version}),
        handle_http_put(Sid, Rid, Attrs, Payload, PayloadSize, true, IP).
    
    http_put(Sid, Rid, Attrs, Payload, PayloadSize, StreamStart, IP) ->
        case mnesia:dirty_read({http_bind, Sid}) of
            [] ->
                {error, not_exists};
            [#http_bind{pid = FsmRef, hold=Hold, to={To, StreamVersion}}=Sess] ->
                ...
                {gen_fsm:sync_send_all_state_event(FsmRef, 
                    #http_put{rid = Rid, attrs = Attrs,
                              payload = Payload,
                              payload_size = PayloadSize, hold = Hold,
                              stream = NewStream, ip = IP}, 30000), Sess}
        end.
    复制代码
    ejabberd_http_bind与ejabberd_c2s会互相记住对方的进程pid,这样每个客户端都有自己唯一的ejabberd_http_bind与ejabberd_c2s进程进行相应的请求处理。【转】 http://my.oschina.net/hncscwc/blog/178471
  • 相关阅读:
    记第一场省选
    POJ 2083 Fractal 分形
    CodeForces 605A Sorting Railway Cars 思维
    FZU 1896 神奇的魔法数 dp
    FZU 1893 内存管理 模拟
    FZU 1894 志愿者选拔 单调队列
    FZU 1920 Left Mouse Button 简单搜索
    FZU 2086 餐厅点餐
    poj 2299 Ultra-QuickSort 逆序对模版题
    COMP9313 week4a MapReduce
  • 原文地址:https://www.cnblogs.com/xuan52rock/p/4607639.html
Copyright © 2011-2022 走看看