zoukankan      html  css  js  c++  java
  • emqtt 4 (我要publish消息了)

    这次,分析处理publish msg的流程。

    由protocol开始

    publish 类型的packet的处理是:

     1 process(Packet = ?PUBLISH_PACKET(_Qos, Topic, _PacketId, _Payload), State) ->
     2     %% ACL check
     3     ...
     4     publish(Packet, State);
     5     ...
     6 publish(Packet = ?PUBLISH_PACKET(?QOS_0, _PacketId),
     7         #proto_state{client_id = ClientId, username = Username, session = Session}) ->
     8     %% 处理packet 获得msg
     9     Msg = emqttd_message:from_packet(Username, ClientId, Packet),
    10     %% 调用emqttd_session module的publish/2 函数
    11     %% subscribe的时候,也是由protocol 进入的emqttd_session module
    12     emqttd_session:publish(Session, Msg);

    1、ACL 检查

    2、处理packet 获得msg

    3、调用session module进行处理

    emqttd_session 模块处理

    和subscribe的处理流程类似,emqttd_session:publish/2 也只是一个接口函数,该函数要根据QoS的不同,决定是

    1. 自己调用后续函数完成处理
    2. call session process 完成后续处理
     1 %% @doc Publish message
     2 -spec(publish(pid(), mqtt_message()) -> ok | {error, any()}).
     3 publish(_SessPid, Msg = #mqtt_message{qos = ?QOS_0}) ->
     4     %% publish qos0 directly
     5     emqttd:publish(Msg);
     6 publish(_SessPid, Msg = #mqtt_message{qos = ?QOS_1}) ->
     7     %% publish qos1 directly, and client will puback automatically
     8     emqttd:publish(Msg);
     9 publish(SessPid, Msg = #mqtt_message{qos = ?QOS_2}) ->
    10     %% publish qos2 by session 
    11     gen_server2:call(SessPid, {publish, Msg}, ?PUBSUB_TIMEOUT). 

    直接处理

    如果是自己调用后续函数完成处理的话,则继续调用emqttd:publish/2,则在emqttd module 中继续调用emqttd_server:publish/1:

    %% @doc Publish a Message
    -spec(publish(Msg :: mqtt_message()) -> any()).
    publish(Msg = #mqtt_message{from = From}) ->
        ...
        %% 处理topic
        ...
        %% pulish
        emqttd_pubsub:publish(Topic, Msg2),
        ...

    还是subscribe处理套路:

    emqttd_protocol ---> emqttd_session ---> emqttd ---> emqttd_server ---> emqttd_pubsub

    在emqttd_pubsub module中的处理是:

     1 %% @doc Publish message to Topic.
     2 -spec(publish(binary(), any()) -> any()).
     3 publish(Topic, Msg) ->
     4     lists:foreach(
     5         fun(#mqtt_route{topic = To, node = Node}) when Node =:= node() ->
     6             ?MODULE:dispatch(To, Msg);
     7            (#mqtt_route{topic = To, node = Node}) ->
     8             rpc:cast(Node, ?MODULE, dispatch, [To, Msg])
     9         end, emqttd_router:lookup(Topic)).
    10 
    11 dispatch(Topic, Msg) ->
    12     case subscribers(Topic) of
    13         [] ->
    14             dropped(Topic);
    15         [SubPid] ->
    16             SubPid ! {dispatch, Topic, Msg};
    17         SubPids ->
    18             lists:foreach(fun(SubPid) ->
    19                 SubPid ! {dispatch, Topic, Msg}
    20             end, SubPids)
    21     end.
    22 %% @private
    23 %% @doc Find all subscribers
    24 subscribers(Topic) ->
    25     case ets:member(subscriber, Topic) of
    26         true -> %% faster then lookup?
    27             try ets:lookup_element(subscriber, Topic, 2) catch error:badarg -> [] end;
    28         false ->
    29             []
    30     end.

    至此,msg 就已经以{dispatch, Topic, Msg}的形式发送给 clientid 对应的session process了。

    那么,就需要在emqttd_session module中的handle_info callback 函数处进行处理:

     1 %% Dispatch Message
     2 handle_info({dispatch, Topic, Msg}, Session = #session{subscriptions = Subscriptions})
     3     when is_record(Msg, mqtt_message) ->
     4     dispatch(tune_qos(Topic, Msg, Subscriptions), Session);
     5 
     6 %% Deliver qos0 message directly to client
     7 dispatch(Msg = #mqtt_message{qos = ?QOS0}, Session = #session{client_pid = ClientPid}) ->
     8     ClientPid ! {deliver, Msg},
     9     hibernate(Session);
    10 dispatch(Msg = #mqtt_message{qos = QoS}, Session = #session{message_queue = MsgQ})
    11     when QoS =:= ?QOS1 orelse QoS =:= ?QOS2 ->
    12     case check_inflight(Session) of
    13         true  ->
    14             noreply(deliver(Msg, Session));
    15         false ->
    16             hibernate(Session#session{message_queue = emqttd_mqueue:in(Msg, MsgQ)})
    17     end.

    继而,将信息发送给socket controlling process,然后根据QoS的不同,判断是否需要等待ack。

    总结

    (流程示意图待补)

  • 相关阅读:
    ASP.NET常用技巧方法代码断
    130道ASP.NET面试题
    ASP.NET 数据绑定常用代码及其性能分析
    asp.net C# 时间格式大全
    ASP.NET 日期 时间 年 月 日 时 分 秒 格式及转换
    经典算法,每个语言都出现的算法
    Asp.net 后台添加CSS、JS、Meta标签
    ASP.NET 窗体间传值实现方法详解
    asp.net 常用的几种调用存储过程的方法
    codeforces 868C
  • 原文地址:https://www.cnblogs.com/--00/p/emqtt_learning_publish_msg.html
Copyright © 2011-2022 走看看