zoukankan      html  css  js  c++  java
  • ejabberd中的http反向推送

    http的反向推送通常使用"长轮询"或"长连接"的方式。 所谓"长轮询"是指客户端发送请求给服务器,服务器发现没有数据需要发送给客户端。

    http的反向推送通常使用"长轮询"或"长连接"的方式。
    所谓"长轮询"是指客户端发送请求给服务器,服务器发现没有数据需要发送给客户端于是hold住不及时返回,等有数据需要发送给客户端时,进行回复,然后关闭连接,客户端收到回复后再发送新的http请求,以便服务器能有对应的请求用于消息的反向推送。
    而"长连接"是在长轮询的基础上增加"keep-alive"属性,服务器收到请求后不直接回复,等有数据需要发送给客户端时再进行response,但是并不关闭连接,这样客户端收到服务器的response后在同一连接上再次发送http请求。

     在ejabberd的实现中,采用了bosh技术来完成对应的工作,具体定义可参考:

    英文: http://go.rritw.com/xmpp.org/extensions/xep-0124.html

    中文: http://go.rritw.com/wiki.jabbercn.org/XEP-0124

    大概实现原理:ejabberd收到一个客户端http请求后会为该客户端最终创建三个进程:ejabberd_http, ejabberd_http_bind, ejabberd_c2s。

    ejabberd_http进程不断的从对应的socket上收客户端的请求,并转发交给对应的ejabberd_http_bind进程进行处理,然后同步等待处理结果,并将结果返回给客户端。

    复制代码
    init() ->
        ...
        receive_headers(State).
    
    
    receive_headers(#state{trail=Trail} = State) ->
        SockMod = State#state.sockmod,
        Socket = State#state.socket,
        Data = SockMod:recv(Socket, 0, 300000),
        case State#state.sockmod of
            gen_tcp ->
                NewState = process_header(State, Data),
                case NewState#state.end_of_request of
                    true -> ok;
                    _ -> receive_headers(NewState)
                end;
            _ ->
                case Data of
                    {ok, D} ->
                        parse_headers(State#state{trail = <<Trail/binary, D/binary>>});
                    {error, _} ->
                        ok
                end
        end.
    
    process_header(State, Data) ->
        case Data of
            ...
            {ok, http_eoh} ->
                ...
                Out = process_request(State2),
                send_text(State2, Out),
                case State2#state.request_keepalive of
                    true ->
                        ...
                        #state{sockmod = SockMod,
                               socket = Socket,
                               request_handlers = State#state.request_handlers};
                    _ ->
                       #state{end_of_request = true,
                              request_handlers = State#state.request_handlers}
    复制代码

    从代码中可看出,未设置keep-alive属性的时候,该进程处理完一次http请求后便自己结束(长轮询模式)。设置了keep-alive属性的时候,该进程不断的循环接收http请求,并转发接收与响应(长连接模式)。

    ejabberd_http_bind进程负责hold住http请求,对于正常的客户端请求,ejabberd_http_bind进程会将请求转发给对应的ejabberd_c2s进程进行实际业务的处理,而对于空的请求(便于服务器反向推送数据),ejabberd_http_bind设置定时器,等待ejabberd_c2s进程对实际请求的响应或者是需要推送给客户端的消息。

    复制代码
    handle_sync_event({send_xml,Packet},_From,StateName,
                      #state{http_receiver = undefined} = StateData) ->
        Output = [Packet | StateData#state.output],
        Reply = ok,
        {reply, Reply, StateName, StateData#state{output = Output}};
    
    handle_sync_event({send_xml, Packet}, _From, StateName, StateData) ->
        Output = [Packet | StateData#state.output],
        cancel_timer(StateData#state.timer),
        Timer = set_inactivity_timer(StateData#state.pause,
                                     StateData#state.max_inactivity),
        HTTPReply = {ok, Output},
        gen_fsm:reply(StateData#state.http_receiver, HTTPReply),
        cancel_timer(StateData#state.wait_timer),
        Rid = StateData#state.rid,
        ReqList = [#hbr{rid = Rid,key = StateData#state.key,
                        out = Output
                       } |
                   [El || El <- StateData#state.req_list,
                          El#hbr.rid /= Rid ]],
        Reply = ok,
        {reply, Reply, StateName,
         StateData#state{output = [],
                         http_receiver = undefined,
                         req_list = ReqList,
                         wait_timer = undefined,
                         timer = Timer}};
    
    handle_sync_event({http_get,Rid,Wait,Hold},From,StateName,
                      StateData) ->
        %% setup timer
        send_receiver_reply(StateData#state.http_receiver, {ok, empty}),
        cancel_timer(StateData#state.wait_timer),
        TNow = tnow(),
        if
            (Hold > 0) and
            (StateData#state.output == []) and
            ((TNow -StateData#state.ctime<(Wait*1000*1000)) and
            (StateData#state.rid == Rid) and
            (StateData#state.input /= cancel) and
            (StateData#state.pause == 0) ->
                WaitTimer = erlang:start_timer(Wait * 1000, self(), []),
                %% MR: Not sure we should cancel the state timer here.
                cancel_timer(StateData#state.timer),
                             {next_state,StateName,
                              StateData#state{http_receiver = From,
                                              wait_timer = WaitTimer,
                                              timer = undefined}};
            (StateData#state.input == cancel) ->
                cancel_timer(StateData#state.timer),
                Timer = set_inactivity_timer(StateData#state.pause,
                                        StateData#state.max_inactivity),
                Reply = {ok, cancel},
                {reply, Reply, StateName,
                 StateData#state{input = queue:new(),
                                 http_receiver = undefined,
                                 wait_timer = undefined,
                                 timer = Timer}};
            true ->
                cancel_timer(StateData#state.timer),
                Timer = set_inactivity_timer(StateData#state.pause,
                                         StateData#state.max_inactivity),
                Reply = {ok, StateData#state.output},
                %% save request
                ReqList = [#hbr{rid = Rid,
                                key = StateData#state.key,
                                out = StateData#state.output
                               } |
                           [El || El <- StateData#state.req_list,
                                 El#hbr.rid /= Rid]
                          ],
                {reply, Reply, StateName,
                 StateData#state{output = [],
                                 http_receiver = undefined,
                                 wait_timer = undefined,
                                 timer = Timer,
                                 req_list = ReqList}}
        end;
    
    handle_info({timeout, WaitTimer, _}, StateName,
                 #state{wait_timer = WaitTimer} = StateData) ->
        if
            StateData#state.http_receiver /= undefined ->
                cancel_timer(StateData#state.timer),
                Timer = set_inactivity_timer(StateData#state.pause,
                                             StateData#state.max_inactivity),
                gen_fsm:reply(StateData#state.http_receiver, {ok, empty}),
                Rid = StateData#state.rid,
                ReqList = [#hbr{rid = Rid,
                                key = StateData#state.key,
                                out = []
                               } |
                           [El || El <- StateData#state.req_list,
                                  El#hbr.rid /= Rid ]
                           ],
                {next_state, StateName,
                 StateData#state{http_receiver = undefined,
                                 req_list = ReqList,
                                 wait_timer = undefined,
                                 timer = Timer}};
            true ->
                {next_state, StateName, StateData}
        end;
    复制代码

    ejabberd_http进程最终会调用ejabberd_http_bind的http_get方法获取请求的响应结果或者是需要推送的数据,ejabberd_http_bind进程收到请求后会进行相应处理,比如有数据则直接回复,或者设置定时器。当收到ejabberd_c2s进程推送过来的数据时,停止定时器并将数据立即回复给ejabberd_http进程,如果定时器超时则回复一个空的消息。
    =============================
    ejabberd_c2s为客户端对应的会话进程,负责维护客户端的在线状态,联系人列表,请求处理等等。
    ============
    服务器在创建ejabberd_http_bind进程时,会生成一个唯一的sid,用于标识该进程,该sid与ejabberd_http_bind进程pid的对应关系会存储到mnesia中,同时服务器会将sid也会告诉客户端,客户端后续的请求也都需要带上该sid。http_bind收到请求后根据sid从mnesia查找匹配的ejabberd_http_bind进程。

    复制代码
    process_request(Data, IP) ->
        ...
        case catch parse_request(Data, PayloadSize, MaxStanzaSize) of
            %% No existing session:
            {ok, {"", Rid, Attrs, Payload}} ->
                ...
                Sid = sha:sha(term_to_binary({now(), make_ref()})),
                case start(XmppDomain, Sid, "", IP) of
                    {ok, Pid} ->
                        handle_session_start(Pid, XmppDomain, Sid, Rid, Attrs,Payload, PayloadSize, IP)
                    ...
                 end;
            {ok, {Sid, Rid, Attrs, Payload1}} ->
                ...
                handle_http_put(Sid, Rid, Attrs, Payload2, PayloadSize,
                                StreamStart, IP);
    
    handle_session_start(Pid, XmppDomain, Sid, Rid, Attrs,
                         Payload, PayloadSize, IP) ->
        ...
        mnesia:dirty_write(#http_bind{id = Sid,
                                      pid = Pid,
                                      to = {XmppDomain,XmppVersion},
                                      hold = Hold,
                                      wait = Wait,
                                      process_delay = Pdelay,
                                      version = Version}),
        handle_http_put(Sid, Rid, Attrs, Payload, PayloadSize, true, IP).
    
    http_put(Sid, Rid, Attrs, Payload, PayloadSize, StreamStart, IP) ->
        case mnesia:dirty_read({http_bind, Sid}) of
            [] ->
                {error, not_exists};
            [#http_bind{pid = FsmRef, hold=Hold, to={To, StreamVersion}}=Sess] ->
                ...
                {gen_fsm:sync_send_all_state_event(FsmRef, 
                    #http_put{rid = Rid, attrs = Attrs,
                              payload = Payload,
                              payload_size = PayloadSize, hold = Hold,
                              stream = NewStream, ip = IP}, 30000), Sess}
        end.
    复制代码
    ejabberd_http_bind与ejabberd_c2s会互相记住对方的进程pid,这样每个客户端都有自己唯一的ejabberd_http_bind与ejabberd_c2s进程进行相应的请求处理。【转】 http://my.oschina.net/hncscwc/blog/178471
  • 相关阅读:
    JSP——隐式对象(implicit object)
    Spring——原理解析利用反射和注解模拟IoC的自动装配
    Spring操作指南针对JDBC配置声明式事务管理(基于注解)
    SpringMVC操作指南登录功能与请求过滤
    Spring操作指南针对JDBC配置声明式事务管理(基于XML)
    Flex Tree 限制只能同级拖动 获得拖动前后节点信息
    坐标系、坐标参照系、坐标变换、投影变换【转】
    国内地图偏移的一些知识
    Web Mercator
    WKT【转】
  • 原文地址:https://www.cnblogs.com/xuan52rock/p/4607639.html
Copyright © 2011-2022 走看看