zoukankan      html  css  js  c++  java
  • python实现mqtt 双向订阅、发布

    一,mqqt 参考

    MQTT官网

    官网推荐相关系列文章

    总结:

    1.MQTT是用于物联网(IoT)的OASIS标准消息传递协议。
    2.它被设计为一种非常轻量级的发布/订阅消息传送。
    3.MQTT中Broker存储数据,转发数据,客户端和服务端是解耦的
    4.订阅发布方通过主题匹配,Broker进行消息转发,也可以解决层级关系,eg: myhome/底楼/客厅/温度
    5.服务质量:级别0:尽力而为。级别1:至少一次。级别2:恰好一次。
    
    二,下载安装broker
    1. MQTT通过Broker存储数据,转发数据,所以要启动一个Broker软件,本人使用[EMQ X Broker官网]

    (https://www.emqx.cn/downloads#broker)

    ​ 1.2 将 emqx-windows-4.2.9.zip解压,本文解压至桌面C:UsersAdministratorDesktopemqx
    ​ 1.3 在windows命令行,进入你解压的 emqxin 目录下

    ​ 1.4 在emqxin目录下,输入启动服务命令 emqx.cmd start,查询端口是否正在监听 netstat -ano | findstr "1883"

    ​ 1.5 启动服务器后,可以使用网页查看相应状态,包括主题。发送的数据等,浏览器打开http://127.0.0.1:18083,使用默认管理账号admin/public即可登录查看,在设置里还能改成中文界面。

    2.使用Python语言

    进行开发,需要安装依赖包 pip install paho-mqtt

    三、MQTT在Python代码快速实现
    1. A端代码

      import paho.mqtt.client as mqtt
      import time
      
      
      # 当代理响应订阅请求时被调用。
      def on_connect(client, userdata, flags, rc):
          if rc == 0:
              print("连接成功")
          print("Connected with result code " + str(rc))
      
      
      # 当代理响应订阅请求时被调用
      def on_subscribe(client, userdata, mid, granted_qos):
          print("Subscribed: " + str(mid) + " " + str(granted_qos))
      
      
      # 当使用使用publish()发送的消息已经传输到代理时被调用。
      def on_publish(client, obj, mid):
          print("OnPublish, mid: " + str(mid))
      
      
      # 当收到关于客户订阅的主题的消息时调用。 message是一个描述所有消息参数的MQTTMessage。
      def on_message(client, userdata, msg):
          print(msg.topic + " " + str(msg.payload))
      
      
      # 当客户端有日志信息时调用
      def on_log(client, obj, level, string):
          print("Log:" + string)
      
      
      # 实例化
      client = mqtt.Client()
      # client.username_pw_set("admin", "password")
      # 回调函数
      client.on_connect = on_connect
      client.on_subscribe = on_subscribe
      client.on_message = on_message
      client.on_log = on_log
      # host为启动的broker地址 举例本机启动的ip 端口默认1883
      client.connect(host="127.0.0.1", port=1883, keepalive=6000)  # 订阅频道
      time.sleep(1)
      
      # 多个主题采用此方式
      # client.subscribe([("demo", 0), ("test", 2)])      #  test主题,订阅者订阅此主题,即可接到发布者发布的数据
      
      # 订阅主题 实现双向通信中接收功能,qs质量等级为2
      client.subscribe(("test", 2))
      client.loop_start()
      
      i = 0
      while True:
          try:
              # 发布MQTT信息
              sensor_data = "ni hao ......from topic-demo"
              # 消息将会发送给代理,并随后从代理发送到订阅匹配主题的任何客户端。
              # publish(topic, payload=None, qos=0, retain=False)
              # topic:该消息发布的主题
              # payload:要发送的实际消息。如果没有给出,或设置为无,则将使用零长度消息。 传递int或float将导致有效负载转换为表示该数字的字符串。 如果你想发送一个真正的int / float,使用struct.pack()来创建你需要的负载
              # qos:服务的质量级别 对于Qos级别为1和2的消息,这意味着已经完成了与代理的握手。 对于Qos级别为0的消息,这只意味着消息离开了客户端。
              # retain:如果设置为True,则该消息将被设置为该主题的“最后已知良好” / 保留的消息
              client.publish(topic="demo", payload=sensor_data, qos=2)
              time.sleep(5)
              # i += 1
          except KeyboardInterrupt:
              print("EXIT")
              # 这是网络循环的阻塞形式,直到客户端调用disconnect()时才会返回。它会自动处理重新连接。
              client.disconnect()
      
      
      
    2. B端代码

      import paho.mqtt.client as mqtt
      import time
      
      
      # 当代理响应订阅请求时被调用。
      def on_connect(client, userdata, flags, rc):
          if rc == 0:
              print("连接成功")
          print("Connected with result code " + str(rc))
      
      
      # 当代理响应订阅请求时被调用
      def on_subscribe(client, userdata, mid, granted_qos):
          print("Subscribed: " + str(mid) + " " + str(granted_qos))
      
      
      # 当使用使用publish()发送的消息已经传输到代理时被调用。
      def on_publish(client, obj, mid):
          print("OnPublish, mid: " + str(mid))
      
      
      # 当收到关于客户订阅的主题的消息时调用。 message是一个描述所有消息参数的MQTTMessage。
      def on_message(client, userdata, msg):
          print(msg.topic + " " + str(msg.payload))
      
      
      # 当客户端有日志信息时调用
      def on_log(client, obj, level, string):
          print("Log:" + string)
      
      
      # 实例化
      client = mqtt.Client()
      # client.username_pw_set("admin", "password")
      # 回调函数
      client.on_connect = on_connect
      client.on_subscribe = on_subscribe
      client.on_message = on_message
      client.on_log = on_log
      # host为启动的broker地址 举例本机启动的ip 端口默认1883
      client.connect(host="127.0.0.1", port=1883, keepalive=6000)  # 订阅频道
      time.sleep(1)
      
      # 多个主题采用此方式
      # client.subscribe([("demo", 0), ("test", 2)])
      
      # 订阅主题 实现双向通信中接收功能,qs质量等级为2
      client.subscribe(("demo", 2))
      client.loop_start()
      
      i = 0
      while True:
          try:
              # 发布MQTT信息
              sensor_data = "hello word......from topic-test"
              # 消息将会发送给代理,并随后从代理发送到订阅匹配主题的任何客户端。
              # publish(topic, payload=None, qos=0, retain=False)
              # topic:该消息发布的主题
              # payload:要发送的实际消息。如果没有给出,或设置为无,则将使用零长度消息。 传递int或float将导致有效负载转换为表示该数字的字符串。 如果你想发送一个真正的int / float,使用struct.pack()来创建你需要的负载
              # qos:服务的质量级别 对于Qos级别为1和2的消息,这意味着已经完成了与代理的握手。 对于Qos级别为0的消息,这只意味着消息离开了客户端。
              # retain:如果设置为True,则该消息将被设置为该主题的“最后已知良好” / 保留的消息
              client.publish(topic="test", payload=sensor_data, qos=2)
              time.sleep(5)
              # i += 1
          except KeyboardInterrupt:
              print("EXIT")
              # 这是网络循环的阻塞形式,直到客户端调用disconnect()时才会返回。它会自动处理重新连接。
              client.disconnect()
      
      
      
    python
  • 相关阅读:
    OSG开源教程(转)
    小组项目工作分配,任务确认
    OSG安装编译
    JAVA开发工具IDEA使用体验
    软件工程结对项目总结
    迭代开发个人总结20160705
    迭代开发个人总结20160704
    迭代开发个人总结20160703
    迭代开发个人总结20160702
    迭代开发个人总结20160701
  • 原文地址:https://www.cnblogs.com/bky20061005/p/14609444.html
Copyright © 2011-2022 走看看