zoukankan      html  css  js  c++  java
  • MQTT结构分析

    MQTT,是:

    • 轻量级的消息订阅和发布(publish/subscribe)协议
    • 建立在TCP/IP协议之上

    IoT,internet of things,物联网,MQTT在这方面应用较多。

    官方网站:http://mqtt.org/

    MQTT协议是针对如下情况设计的:

    • M2M(Machine to Machine) communication,机器端到端通信,比如传感器之间的数据通讯
    • 因为是Machine to Machine,需要考虑:
      • Machine,或者叫设备,比如温度传感器,硬件能力很弱,协议要考虑尽量小的资源消耗,比如计算能力和存储等
      • M2M可能是无线连接,网络不稳定,带宽也比较小

    MQTT协议的架构,用一个示例说明。比如有1个温度传感器(1个Machine),2个小的显示屏(2个Machine),显示屏要显示温度传感器的温度值。

    可通过MQTT V3.1 Protocol Specification查阅详细规范的细节。

    显示器需要先通过MQTT协议subscribe(订阅)一个比如叫temperature的topic(主题):

    当温度传感器publish(发布)温度数据,显示器就可以收到了:

    注:以上两张图,取自MQTT and CoAP, IoT Protocols

    协议里还有2个主要的角色:

    • client,客户端
    • broker,服务器端

    它们是通过TCP/IP协议连接的。

    因为MQTT是协议,所以不能拿来直接用的,就好比HTTP协议一样。需要找实现这个协议的库或者服务器来运行。

    这里是官方的Server support

    我服务器端使用nodejs开发,因此选择了:

    • MQTT.js:MQTT协议的底层实现库,服务器端很简易,需要自己编写代码才可使用
    • Mosca:在MQTT.js基础上完善的服务器端

    MQTT.js最基本使用

    安装是很简单的:

    npm install mqtt
    

    MQTT.js实现的服务器端

    代码如下:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    var mqtt = require('mqtt');
    //{'topicName':[clientObj,clientObj ..]}
    var subscribeTopics={};
    //创建服务器对象
    var server = mqtt.createServer(function(client) {
    //建立连接时触发
    client.on('connect', function(packet) {
    client.connack({returnCode: 0});
    });
    //客户端发布主题时触发
    client.on('publish', function(packet) {
    var topic=packet.topic;
    var payload=packet.payload;
    //如果没有创建空的主题对应的client数组
    if(subscribeTopics[topic]==null){
    subscribeTopics[topic]=[];
    }else{
    //遍历该主题下全部client,并逐一发送消息
    for(var i in subscribeTopics[topic]){
    var client=subscribeTopics[topic][i];
    client.publish({
    topic: topic,
    payload: payload
    });
    }
    }
    });
    //当客户端订阅时触发
    client.on('subscribe', function(packet) {
    var topic=packet.subscriptions[0].topic;
    //如没有,创建空的主题对应的client数组
    if(subscribeTopics[topic]==null){
    subscribeTopics[topic]=[];
    }
    //如果client数组中没有当前client,加入
    if(subscribeTopics[topic].indexOf(client)==-1){
    subscribeTopics[topic].push(client);
    }
     
    });
    client.on('pingreq', function(packet) {
    client.pingresp();
    });
    client.on('disconnect', function(packet) {
    //遍历所有主题,检查对应的数组中是否有当前client,从数组中删除
    for (var topic in subscribeTopics){
    var index=subscribeTopics[topic].indexOf(client);
    if(index>-1){
    subscribeTopics[topic].splice(index,1);
    }
    }
    });
    });
    //监听端口
    server.listen(1883);

    这是一个最基本的服务器端,消息的存储和查询都需要自己编程处理。

    比如你如果需要用redis保存和触发数据,可参考这篇中文文章:node mqtt server (redis pub/sub)

    MQTT.js实现的客户端

    代码:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    var mqtt = require('mqtt');
    client = mqtt.createClient(1883, 'localhost');
    client.subscribe('testMessage');
    client.publish('testMessage', '发布测试信息');
    client.on('message', function (topic, message) {
    console.log(message);
    client.end();
    });

    写的很简易,订阅了主题,然后向相同主题发布消息,接收到消息后client停止。

    使用Mosca

    MQTT.js只是实现了最基础的MQTT协议部分,对于服务器端的处理需要自己完成。

    有关MQTT.js是否实现了MQTT server,详细的说明,可参见MQTT Server: MQTT.js or Mosca?

    正好,Mosca在MQTT基础上实现了这些,它可以:

    • 作为独立运行的MQTT服务器运行
    • 集成到nodejs程序里使用

    安装很简单:

    npm install mosca bunyan -g
    

    作为独立服务器运行

    运行:

    mosca -v | bunyan
    

    然后,还可以用我上文的客户端代码运行测试。

    集成在自己程序中使用

    我考虑的后端持久化,是用MongoDB。Mosca另外几个选项:

    • Redis,缺点是更注重作为缓存,而不适合可靠持久化
    • LevelUp,头一次听说,不打算做技术准备了,是用nodejs的包装起来的LevelDB
    • Memory,使用内存,估计默认的就是这个,不适合我使用的情况

    首先要安装mosca的库:

    npm install mosca
    

    然后,在本机将mongodb运行起来,应该就可以执行下面的代码了:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    var mosca = require('mosca')
    var settings = {
    port: 1883,
    backend:{
    type: 'mongo',
    url: 'mongodb://localhost:27017/mqtt',
    pubsubCollection: 'ascoltatori',
    mongo: {}
    },
    persistence:{
    factory: mosca.persistence.Mongo,
    url: "mongodb://localhost:27017/mosca"
    }
    };
    var server = new mosca.Server(settings);
    server.on('ready', function(){
    console.log('Mosca server is up and running');
    });
    server.on('published', function(packet, client) {
    console.log('Published', packet.payload);
    });

    直接运行作者文档中的代码会在多次运行客户端后出现错误,我是参考了他2天前加上的示例代码

    作者Matteo Collina生活在意大利的博洛尼亚,写代码很勤奋,这个项目更新很快,是不是说明这个方向(mqtt)很活跃呢?

    作者也写了个幻灯片,MQTT and Node.js

    MQTT高级问题

    keepalive和PING

    从这篇文章MQTT协议笔记之连接和心跳

    心跳时间(Keep Alive timer)

    以秒为单位,定义服务器端从客户端接收消息的最大时间间隔。一般应用服务会在业务层次检测客户端网络是否连接,不是TCP/IP协议层面的 心跳机制(比如开启SOCKET的SO_KEEPALIVE选项)。 一般来讲,在一个心跳间隔内,客户端发送一个PINGREQ消息到服务器,服务器返回PINGRESP消息,完成一次心跳交互,继而等待下一轮。若客户端 没有收到心跳反馈,会关闭掉TCP/IP端口连接,离线。 16位两个字节,可看做一个无符号的short类型值。最大值,2^16-1 = 65535秒 = 18小时。最小值可以为0,表示客户端不断开。一般设为几分钟,比如微信心跳周期为300秒。

    下面的代码中我设置的是10秒:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    var mqtt = require('mqtt');
    var settings = {
    keepalive: 10,
    protocolId: 'MQIsdp',
    protocolVersion: 3,
    clientId: 'client-b',
    clean: false
    }
    client = mqtt.createClient(1883, 'localhost',settings);

    可以使用MQTT.js编写简单的服务器代码,观察到服务器端接收到PING请求,并发回PING响应:

    1
    2
    3
    4
    client.on('pingreq', function(packet) {
    client.pingresp();
    console.log('pingreq & resp');
    });

    完整代码上面已经贴过,另见Gist

    QoS

    QoS在MQTT中有(摘自MQ 遥测传输 (MQTT) V3.1 协议规范):

    • “至多一次”,消息发布完全依赖底层 TCP/IP 网络。会发生消息丢失或重复。这一级别可用于如下情况,环境传感器数据,丢失一次读记录无所谓,因为不久后还会有第二次发送。
    • “至少一次”,确保消息到达,但消息重复可能会发生。
    • “只有一次”,确保消息到达一次。这一级别可用于如下情况,在计费系统中,消息重复或丢失会导致不正确的结果。

    MQTT.js只是支持了MQTT协议,并没有支持QoS,也就是说,只支持最低级别的“至多一次”(QoS0)。

    Mosca支持QoS0和1,但不支持2,见Add support QOS 2

    接收离线消息

    我在应用中的一个主要场景是,使用MQTT.js+Mosca做聊天服务器。

    默认Mosca是不支持离线消息的,表现的现象是,如果是有人(client-a)先在主题上发布了消息:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    var mqtt = require('mqtt');
    var settings = {
    keepalive: 10,
    protocolId: 'MQIsdp',
    protocolVersion: 3,
    clientId: 'client-a'
    }
    client = mqtt.createClient(1883, 'localhost',settings);
    client.publish('testMessage', '发布new测试信息0',{qos:1,retain: true});
    client.publish('testMessage', '发布new测试信息1',{qos:1,retain: true});
    client.publish('testMessage', '发布new测试信息2',{qos:1,retain: true});
    client.publish('testMessage', '发布new测试信息3',{qos:1,retain: true});
    setTimeout(function(){
    client.end();
    },1000);

    那么另外一个人(client-b),随后订阅,仅能看到最后一条消息:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    var mqtt = require('mqtt');
    var settings = {
    keepalive: 10,
    protocolId: 'MQIsdp',
    protocolVersion: 3,
    clientId: 'client-b'
    }
    client = mqtt.createClient(1883, 'localhost',settings);
    client.subscribe('testMessage',{qos:1},function(){
    console.log('subscribe ok.');
    });
    client.on("message", function(topic, payload) {
    console.log('message: '+payload);
    });

    运行结果类似这样:

    subscribe ok.
    message: 发布new测试信息3

    离线消息,需要以下几点:

    • 客户端订阅设置QoS=1
    • 客户端连接属性clean: false,作用是断开连接重连的时候服务器端帮助恢复session,不需要再次订阅

    用代码说明以下,先运行这段代码:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    var mqtt = require('mqtt');
    var settings = {
    keepalive: 10,
    protocolId: 'MQIsdp',
    protocolVersion: 3,
    clientId: 'client-b',
    clean: false
    }
    client = mqtt.createClient(1883, 'localhost',settings);
    client.subscribe('testMessage',{qos:1},function(){
    console.log('subscribe ok.');
    client.end();
    });

    然后执行刚才发布多条消息的代码。再执行下面的代码:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    var mqtt = require('mqtt');
    var settings = {
    keepalive: 10,
    protocolId: 'MQIsdp',
    protocolVersion: 3,
    clientId: 'client-b',
    clean: false
    }
    client = mqtt.createClient(1883, 'localhost',settings);
    client.on("message", function(topic, payload) {
    console.log('message: '+payload);
    });

    运行结果类似这样:

    message: 发布new测试信息1
    message: 发布new测试信息3
    message: 发布new测试信息2
    message: 发布new测试信息0
    

    收到消息的顺序是乱的,为什么会这样,其实很好理解,为了小型受限设备以及网络不稳定的情况,消息是不好保证顺序的。

    解决办法是发送的消息带时间戳,接收后再做排序。

    另外,担心客户端没有做client.end()而非正常退出,那么再次连接是否能恢复session,测试了一下,注释client.end(),没有问题,正常收到多条离线消息。

    SSL连接

    Mosca支持SSL连接,可根据Nodejs TLS创建公钥私钥。

    然后类似这样启动:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    var mosca = require('mosca')
    var SECURE_KEY = __dirname + '/../../test/secure/tls-key.pem';
    var SECURE_CERT = __dirname + '/../../test/secure/tls-cert.pem';
    var settings = {
    port: 8443,
    logger: {
    name: "secureExample",
    level: 40,
    },
    secure : {
    keyPath: SECURE_KEY,
    certPath: SECURE_CERT,
    }
    };
    var server = new mosca.Server(settings);
    server.on('ready', setup);
    // fired when the mqtt server is ready
    function setup() {
    console.log('Mosca server is up and running')
    }

    这部分我没有测试,直接转自Mosca Encryption Support

    认证和授权

    Mosca Authentication提供了个简易的命令行,可创建账号用于认证并授权。

    但是它不适合我的需求场景,我需要自己编写认证和授权的逻辑。

    虽然在作者官方网站上未找到,但在问题管理记录中提交了这方面的支持:Authentication & Authorization

    有下面两条支持,应该可以写出自己的回调,并集成到Mosca中:

    • add a callback to authorize a publish.
    • add a callback to authorize a subscribe.

    不过这块没有写代码,只是大致能确定。

    性能问题

    MQTT.js并不是完整解决方案,不需要考虑它的性能问题。

    说一下Mosca,有一个这方面问题作者的答复,what about mosca’s performance,问问题的还是个中国人,我前面还引用了他的文章。作者基本意思是:

    It basically depends on the RAM. On an AWS large instance it can reach 
    10k concurrent connections, with roughly 10k messages/second.
    

     转自:https://www.cnblogs.com/yfliufei/p/4386439.html

     
     
  • 相关阅读:
    架构师考试回顾
    精通 WPF UI Virtualization
    疑难杂症之Web客户端无法登录
    CDTray, 打开,关闭光驱的系统托盘程序
    原来Queryable是这样实现的..
    jQuery.Excel, 使用Ctrl+方向键/Home/End在input表格中移动
    nGoodPersonCards++
    C#数据库数据导入导出系列之三 数据库导出到Excel下
    调用webservice时提示对操作的回复消息正文进行反序列化时出错&&Web service 超过了最大请求长度
    ASP.NET网络映射驱动器无权限访问的解决方案(转)
  • 原文地址:https://www.cnblogs.com/javalinux/p/14549661.html
Copyright © 2011-2022 走看看