zoukankan      html  css  js  c++  java
  • emqtt 1 (初初初初稿)

    第一篇,先简单分析一下整个emqtt 的大致结构,包括两个部分:

    1、message packet 类型

    2、message 流向

    message packet 类型

     P1:mqtt_packet 的基本结构,其中header 中的type 与variable 的mqtt_packet_* 一一对应。

    emqtt 的packet 定义如下:

    1 -record(mqtt_packet,
    2         {header    :: #mqtt_packet_header{},
    3          variable  :: #mqtt_packet_connect{} | #mqtt_packet_connack{}
    4                     | #mqtt_packet_publish{} | #mqtt_packet_puback{}
    5                     | #mqtt_packet_subscribe{} | #mqtt_packet_suback{}
    6                     | #mqtt_packet_unsubscribe{} | #mqtt_packet_unsuback{}
    7                     | mqtt_packet_id() | undefined,
    8          payload   :: binary() | undefined }).

    包括包头、包的类型、实际的数据消息。

    在emqtt 中,message packet 的类型主要分为一下几类(这些类型,基本上定义在emqttd_protocol.hrl 文件中):

    1、connect/connack

    P2:connect packet的基本结构

    用于客户端向server端建立链接以及server 端向client 确定链接。

     1 -record(mqtt_packet_connect,
     2         {client_id   = <<>>              :: mqtt_client_id(),
     3          proto_ver   = ?MQTT_PROTO_V311  :: mqtt_vsn(),
     4          proto_name  = <<"MQTT">>        :: binary(),
     5          will_retain = false             :: boolean(),
     6          will_qos    = ?QOS_0            :: mqtt_qos(),
     7          will_flag   = false             :: boolean(),
     8          clean_sess  = false             :: boolean(),
     9          keep_alive  = 60                :: non_neg_integer(),
    10          will_topic  = undefined         :: undefined | binary(),
    11          will_msg    = undefined         :: undefined | binary(),
    12          username    = undefined         :: undefined | binary(),
    13          password    = undefined         :: undefined | binary()}).
    14 
    15 -record(mqtt_packet_connack,
    16         {ack_flags = ?RESERVED   :: 0 | 1,
    17          return_code             :: mqtt_connack() }).

    其中,主要的几个字段,client_id、proto_ver、proto_name以及keep_alive。

    client_id在username undefined的情况下,充当username的角色,是客户端的唯一标识,在接下来的session manage以及 subscription 管理中,具有非常重要的作用。

    2、subscribe/suback

    P3:subscribe packet的基本结构

    用于处理客户端 subscribe 某个topic,已经server端向客户端的确认。

    1 -record(mqtt_packet_subscribe,
    2         {packet_id   :: mqtt_packet_id(),
    3          topic_table :: list({binary(), mqtt_qos()}) }).
    4 -record(mqtt_packet_suback,
    5         {packet_id   :: mqtt_packet_id(),
    6          qos_table   :: list(mqtt_qos() | 128) }).

    topic_table 字段表示的是所要subscribe的topic 以及对应的QoS。

    3、unsubscribe/unsuback

    subscribe的反向操作

    4、publish/puback

    P4: publish packet的基本结构

    1 -record(mqtt_packet_publish,
    2         {topic_name  :: binary(),
    3          packet_id   :: mqtt_packet_id() }).
    4 
    5 -record(mqtt_packet_puback,
    6         {packet_id   :: mqtt_packet_id() }).

    topic_name指定了将要publish到的topic的名字。

    在emqtt中,用Erlanger中record数据类型,定义了以上一种message packet的类型,而且,这些类型的packet的作用都显而易见。

    message 流向

    首先,mqtt是基于TCP协议的,因此,emqtt本身也是架构在TCP server上的service。其底层,基于esockd(主要借鉴RabbitMQ networking的实现)。socket controlling 进程的执行逻辑是emqttd_client module。emqttd_client module主要与esockd application 中的esockd_connection module 相关联,并存在必要的callback 关系。

    由于socket controlling 进程的入口以及主要执行逻辑由emqttd_client module实现。

    因此:

    • message进入到server后,首先由emqttd_client module进行处理

    进入到emqtt server的message,主要是二进制socket数据,想要转换成Erlang的内部数据结构,就必要进行必要的数据解析。

    1、socket 数据包接收

    1 handle_info({inet_async, _Sock, _Ref, {ok, Data}}, State) ->
    2     Size = size(Data),
    3     ?LOG(debug, "RECV ~p", [Data], State),
    4     emqttd_metrics:inc('bytes/received', Size),
    5     received(Data, rate_limit(Size, State#client_state{await_recv = false}));

    L2,L3,L4这三行代码主要用于调试和metric。

    2、socket 数据包解析

    socket 二进制数据包的解析,主要是由emqttd_parser module进行处理,包括二进制协议的解析以及socket 粘包的处理。

    • message 由emqttd_parser module进行二进制数据协议的解析

    解析成为mqtt_pakcet record 类型的数据结构。

    3、mqtt packet 处理

    emqtt中,mqtt packet的处理是由emqtt_protocol module完成,其入口函数为received/2:

     1 %% A Client can only send the CONNECT Packet once over a Network Connection. 
     2 -spec(received(mqtt_packet(), proto_state()) -> {ok, proto_state()} | {error, any()}).
     3 received(Packet = ?PACKET(?CONNECT), State = #proto_state{connected = false}) ->
     4     process(Packet, State#proto_state{connected = true});
     5 
     6 received(?PACKET(?CONNECT), State = #proto_state{connected = true}) ->
     7     {error, protocol_bad_connect, State};
     8 
     9 %% Received other packets when CONNECT not arrived.
    10 received(_Packet, State = #proto_state{connected = false}) ->
    11     {error, protocol_not_connected, State};
    12 
    13 received(Packet = ?PACKET(_Type), State) ->
    14     trace(recv, Packet, State),
    15     case validate_packet(Packet) of
    16         ok ->
    17             process(Packet, State);
    18         {error, Reason} ->
    19             {error, Reason, State}
    20     end.

    前三个子函数,主要处理"是否已经login"相关的packet,而最后一个子函数则是在login 之后,处理正常的packet。

    在emqttd_protocol module 中,由入口函数received 做简单的处理之后,则交由本module 中的process/2 函数进行处理,并最后交由后续的实际业务module 进行处理。

    • mqtt packet 由emqttd_protocol module进行处理,并交由后续的module 

    综上,在emqtt中,message 基本的流向为:

    1. message进入到server后,首先由emqttd_client module进行处理
    2. message 由emqttd_parser module进行二进制数据协议的解析
    3. mqtt packet 由emqttd_protocol module进行处理,并交由后续的module 
    4. 后续根据不同类型的packet,再做不同的处理

    图示如下:

    P5: message基本流向示意图

    总结

    需要?

  • 相关阅读:
    二分查找
    基本功能
    pandas的数据结构
    部署metrics-server遇到的坑
    Promethus安装指南
    Spark学习笔记
    Hadoop学习笔记
    大数据处理框架
    大数据Hadoop生态圈:Pig和Hive
    Hadoop HA 机制学习
  • 原文地址:https://www.cnblogs.com/--00/p/emqtt_learning_1.html
Copyright © 2011-2022 走看看