zoukankan      html  css  js  c++  java
  • 基于MQTT(EMQ)的消息发布订阅-python实现

    参考路径:https://www.cnblogs.com/taozihua/articles/11197607.html

    python代码实现 

    安装:pip install paho-mqtt

    实现Publish-发送消息:

    #!/usr/bin/env python
    # encoding: utf-8
    """
    @version: v1.0
    @author: W_H_J
    @license: Apache Licence
    @contact: 415900617@qq.com
    @software: PyCharm
    @file: clicentMqttTest.py
    @time: 2019/2/22 14:19
    @describe: mqtt客户端
    """
    import json
    import sys
    import os
    import paho.mqtt.client as mqtt
    import time

    sys.path.append(os.path.abspath(os.path.dirname(__file__) + '/' + '..'))
    sys.path.append("..")

    TASK_TOPIC = 'test' # 客户端发布消息主题

    client_id = time.strftime('%Y%m%d%H%M%S', time.localtime(time.time()))
    """
    client_id是连接到代理。如果client_id的长度为零或为零,则行为为由使用的协议版本定义。如果使用MQTT v3.1.1,
    那么一个零长度的客户机id将被发送到代理,代理将被发送为客户端生成一个随机变量。如果使用MQTT v3.1,那么id将是
    随机生成的。在这两种情况下,clean_session都必须为True。如果这在这种情况下不会产生ValueError。
    注意:一般情况下如果客户端服务端启用两个监听那么客户端client_id 不能与服务器相同,如这里用时间"20190222142358"作为它的id,
    如果与服务器id相同,则无法接收到消息
    """
    client = mqtt.Client(client_id, transport='tcp')

    client.connect("127.0.0.1", 1883, 60) # 此处端口默认为1883,通信端口期keepalive默认60
    client.loop_start()


    def clicent_main(message: str):
    """
    客户端发布消息
    :param message: 消息主体
    :return:
    """
    time_now = time.strftime('%Y-%m-%d %H-%M-%S', time.localtime(time.time()))
    payload = {"msg": "%s" % message, "data": "%s" % time_now}
    # publish(主题:Topic; 消息内容)
    client.publish(TASK_TOPIC, json.dumps(payload, ensure_ascii=False))
    print("Successful send message!")
    return True


    if __name__ == '__main__':
    msg = "我是一条测试数据!"
    clicent_main(msg)


    复制代码
     1 #!/usr/bin/env python  
     2 # encoding: utf-8  
     3 """ 
     4 @version: v1.0 
     5 @author: W_H_J 
     6 @license: Apache Licence  
     7 @contact: 415900617@qq.com 
     8 @software: PyCharm 
     9 @file: clicentMqttTest.py 
    10 @time: 2019/2/22 14:19 
    11 @describe: mqtt客户端
    12 """
    13 import json
    14 import sys
    15 import os
    16 import paho.mqtt.client as mqtt
    17 import time
    18  
    19 sys.path.append(os.path.abspath(os.path.dirname(__file__) + '/' + '..'))
    20 sys.path.append("..")
    21  
    22 TASK_TOPIC = 'test'  # 客户端发布消息主题
    23  
    24 client_id = time.strftime('%Y%m%d%H%M%S', time.localtime(time.time()))
    25 """
    26 client_id是连接到代理。如果client_id的长度为零或为零,则行为为由使用的协议版本定义。如果使用MQTT v3.1.1,
    27 那么一个零长度的客户机id将被发送到代理,代理将被发送为客户端生成一个随机变量。如果使用MQTT v3.1,那么id将是
    28 随机生成的。在这两种情况下,clean_session都必须为True。如果这在这种情况下不会产生ValueError。
    29 注意:一般情况下如果客户端服务端启用两个监听那么客户端client_id 不能与服务器相同,如这里用时间"20190222142358"作为它的id,
    30 如果与服务器id相同,则无法接收到消息
    31 """
    32 client = mqtt.Client(client_id, transport='tcp')
    33  
    34 client.connect("127.0.0.1", 1883, 60)  # 此处端口默认为1883,通信端口期keepalive默认60
    35 client.loop_start()
    36  
    37  
    38 def clicent_main(message: str):
    39     """
    40     客户端发布消息
    41     :param message: 消息主体
    42     :return:
    43     """
    44     time_now = time.strftime('%Y-%m-%d %H-%M-%S', time.localtime(time.time()))
    45     payload = {"msg": "%s" % message, "data": "%s" % time_now}
    46     # publish(主题:Topic; 消息内容)
    47     client.publish(TASK_TOPIC, json.dumps(payload, ensure_ascii=False))
    48     print("Successful send message!")
    49     return True
    50  
    51  
    52 if __name__ == '__main__':
    53     msg = "我是一条测试数据!"
    54     clicent_main(msg)
    复制代码

    实现Subscribe-订阅

    #!/usr/bin/env python
    # encoding: utf-8
    """
    @version: v1.0
    @author: W_H_J
    @license: Apache Licence
    @contact: 415900617@qq.com
    @software: PyCharm
    @file: serverMqttTest.py
    @time: 2019/2/22 14:35
    @describe: mqtt 服务端
    """
    import json
    import sys
    import os
    import time
    import paho.mqtt.client as mqtt
    sys.path.append(os.path.abspath(os.path.dirname(__file__) + '/' + '..'))
    sys.path.append("..")

    REPORT_TOPIC = 'test' # 主题


    def on_connect(client, userdata, flags, rc):
    print('connected to mqtt with resurt code ', rc)
    client.subscribe(REPORT_TOPIC) # 订阅主题


    def on_message(client, userdata, msg):
    """
    接收客户端发送的消息
    :param client: 连接信息
    :param userdata:
    :param msg: 客户端返回的消息
    :return:
    """
    print("Start server!")
    payload = json.loads(msg.payload.decode('utf-8'))
    print(payload)


    def server_conenet(client):
    client.on_connect = on_connect # 启用订阅模式
    client.on_message = on_message # 接收消息
    client.connect("127.0.0.1", 1883, 60) # 链接
    # client.loop_start() # 以start方式运行,需要启动一个守护线程,让服务端运行,否则会随主线程死亡
    client.loop_forever() # 以forever方式阻塞运行。


    def server_stop(client):
    client.loop_stop() # 停止服务端
    sys.exit(0)


    def server_main():
    client_id = time.strftime('%Y%m%d%H%M%S', time.localtime(time.time()))
    client = mqtt.Client(client_id, transport='tcp')
    server_conenet(client)


    if __name__ == '__main__':
    # 启动监听
    server_main()

    此方案需要搭建本地MQTT服务器:

    我们使用搭建EMQX,下载地址为:https://www.emqx.io/cn/downloads  ,解压后cd到bin目录,执行 emqx console

    执行成功会弹出下面窗口,不成功就关掉cmd重新试下

    打开浏览器输入
    http://127.0.0.1:18083
    默认用户名 admin
    默认密码 public

  • 相关阅读:
    Visual Studio Code必备插件
    webpack4+:. css属性自动追加前缀 与 mini-css-extract-plugin 插件 打包冲突问题
    webpack4.x抽取css【extract-text-webpack-plugin与mini-css-extract-plugin】
    javaScript中slice, substring,substr三者区别以及用法扩展
    Spring Boot实践——Mybatis分页插件PageHelper的使用
    Nginx配置详解
    Maven的几个常用plugin
    excel拼接数据宏
    我的Linux之路——windows10用WMware安装CentOS7.5 虚拟机详细步骤
    Spring Boot实践——多线程
  • 原文地址:https://www.cnblogs.com/hbtmwangjin/p/12101457.html
Copyright © 2011-2022 走看看