zoukankan      html  css  js  c++  java
  • MQTT简单介绍与实现

    1. MQTT 介绍
    它是一种 机器之间通讯 machine-to-machine (M2M)、物联网 Internet of Things (IoT)常用的一种轻量级消息传输协议
    适用于网络带宽较低的场合
    包含发布、订阅模式,通过一个代理服务器(broker),任何一个客户端(client)都可以订阅或者发布某个主题的消息,然后订阅了该主题的客户端则会收到该消息

    1.1 消息主题
    发布消息或者订阅消息都要选定一个消息主题,消息主题可以任意定制,类似文件系统,用 “/” 进行分隔,例如主题为 /a/b/c/d 的消息
    客户端可以使用完全字符匹配消息,也可以使用通配符进行消息匹配
    通配符 + :替换任意单个层级。比如订阅 /a/b/c/d、/a/+/c/d 、+/+/+/+ 主题的消息即可收到主题为 /a/b/c/d 的消息,而 b/+/c/d 、 +/+/+ 不会匹配
    通配符 # :匹配任意层级,只能用于末尾, #、a/# 可以匹配上面的主题消息
    长度为 0 的主题层级也是允许的。比如发布主题为 a//topic 的消息,客户端可以用 a/+/topic 进行匹配。/a/topic 的主题用 +/a/topic、#、/# 可以匹配。

    1.2 服务质量(Quality of Service,QoS)
    MQTT 定义了三种客户端与代理服务器之间消息到达的难度

    0:broker/client 之间消息传一次,并不确认传到没有,消息可能丢失
    1:broker/client 之间消息至少一次,带确认消息的传输,可能重复收到
    2:broker/client 之间消息仅有一次,利用四次握手进行确认,网络延迟可能会增加
    当客户端订阅的消息质量与代理服务器发布主题的质量不同时,客户端会选择难度最小的 QoS 接收消息

    发布等级为 2 ,客户端订阅等级为 0, 那么客户端接收到的 QoS = 0
    发布等级为 0 ,订阅等级为 2,那么客户端接收到的 QoS = 0

    1.3 消息保留
    即当 broker 正在发送消息给 client 时,消息会保存,如果此时有新的 client 订阅了该主题的消息,那么它也会收到消息。这种做法的好处就是当消息主题经常变换的时候,如果有新的 client 订阅该消息,那么它不用等待太长的时间就可以收到消息
    1.4 会话清除
    client 可以设置 clean session 标志位,当 clean session = false 时,client 失去连接时, broker 会一直保留消息直到 client 重新连接。而 clean session = true 时,broker 会清除所有的消息当这个 client 失去连接。
    1.5 消息意愿
    当 client 连接上 broker 时,client 会提示 broker 它有一个意愿消息,这个意愿消息将会在 client 失去连接时,broker 发送出去。消息意愿和普通消息一样都包含主题和内容。

    2. 实例

    js实现过程

    function RndNum(n){
    var rnd="";
    for(var i=0;i<n;i++)
    rnd+=Math.floor(Math.random()*10);
    return rnd;
    }


    var hostname = '服务器',
    port = “端口号”,
    clientId = RndNum(5),
    keepAlive = 100,
    cleanSession = false,
    userName = '',
    password = '',
    topic = '订阅消息';
    client = new Paho.MQTT.Client(hostname, port, clientId);
    //建立客户端实例
    var options = {
    invocationContext: {
    host: hostname,
    port: port,
    path: client.path,
    clientId: clientId
    },
    keepAliveInterval: keepAlive,
    cleanSession: cleanSession,
    userName: userName,
    password: password,
    onSuccess: onConnect,
    };
    client.connect(options);
    //连接服务器并注册连接成功处理事件
    function onConnect() {
    console.log("onConnected");
    client.subscribe(topic);
    }
    client.onConnectionLost = onConnectionLost;
    //注册连接断开处理事件
    client.onMessageArrived = onMessageArrived;

    //注册消息接收处理事件
    function onConnectionLost(responseObject) {
    console.log(responseObject);
    if (responseObject.errorCode !== 0) {
    console.log("onConnectionLost:" + responseObject.errorMessage);
    console.log("连接已断开");
    }
    }
    function onMessageArrived(message) {
    let msg = message.payloadString;
    // 消息处理

    }


    var count = 0;
    function start() {
    window.tester = window.setInterval(function () {
    if(count!=0){
    if (client.isConnected) {
    var s = "content:" + (count++) ;
    message = new Paho.MQTT.Message(s);
    message.destinationName = topic;
    client.send(message);
    }
    }else
    {
    window.clearInterval(window.tester);
    }
    });
    }

    java spring boot实现过程
    1.添加依赖
    <dependency>
    <groupId>org.eclipse.paho</groupId>
    <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
    <version>1.1.1</version>
    </dependency>
    2.创建类
    public class mqttutil {
    private String serviceURI = "服务器端口号";
    private String clientID = "客户端id";
    private MqttClientPersistence persistence = new MemoryPersistence();
    private String username = "";
    private String password = "";
    private String topic = "订阅";
    private String MessageStr = "";
    private int qos = 0;
    /**
    * 消息订阅
    **/
    public void subscribe() {
    try {
    MqttClient client = new MqttClient(serviceURI, clientID, persistence);
    client.setCallback(new MqttCallback() {
    public void connectionLost(Throwable cause) {
    System.out.println("订阅者连接丢失...");
    System.out.println(cause.getMessage());
    }
    public void messageArrived(String topic, MqttMessage message) {
    MessageStr = message.toString();
    // System.out.println("订阅者接收到消息:"+MessageStr);
    }
    public void deliveryComplete(IMqttDeliveryToken token) {
    }
    });
    MqttConnectOptions connectOptions = new MqttConnectOptions();
    connectOptions.setUserName(username);
    connectOptions.setPassword(password.toCharArray());
    connectOptions.setCleanSession(false);
    //订阅者连接订阅主题
    client.connect(connectOptions);
    client.subscribe(topic, qos);
    System.out.println("订阅者连接状态: " + client.isConnected());
    } catch (MqttException e) {
    e.printStackTrace();
    }
    }
  • 相关阅读:
    LVS负载均衡NAT模式实现
    Linux-ftp服务搭建
    Keepalived配置详解
    Keepalived高可用概念篇
    Nginx-http_proxy_module模块
    Nginx-keepalived+Nginx实现高可用集群
    Oracle注入之带外通信
    Oracle基于延时的盲注总结
    Oracle基于布尔的盲注总结
    Oracle报错注入总结
  • 原文地址:https://www.cnblogs.com/XT666-666/p/10566006.html
Copyright © 2011-2022 走看看