zoukankan      html  css  js  c++  java
  • Python paho-mqtt库测试MQTT

    一、MQTT库安装

    pip install paho-mqtt

    二、代码

    # coding=utf-8
    import paho.mqtt.client as mqtt
    import time,os,requests,json,threading
    
    cafile = r"E:Python
    ootCA.PEM"                                #身份认证文件
    certfile = r"E:PythonClientCA_11111.PEM"   
    keyfile = r"E:PythonClientKey_11111.PEM"
    host = "xxxxxx"                                                 #主机地址
    port = 8883                                                     #端口号
    data={"state":{"desired":{"WIFI_AP":"on"}}}                     #发布信息
    sub_topic="@askey/dvr/xxxxxx/status"                            #订阅主题
    pub_topic="@askey/dvr/xxxxxx/status"                            #发布主题
    
    client = mqtt.Client()                                          #创建一个mqtt对象
    client.tls_set(cafile, certfile, keyfile)                       #加密身份认证
    client.connect(host, port, 60)                                  #连接mqtt服务器
    client.loop_start()                                             #后台运行一个线程来自动调用loop()
    
    def on_connect(client, userdata, flags, rc):
        if rc == 0:
            print('连接成功')                                        #0代表连接成功
            client.subscribe(sub_topic)                             #订阅消息
        else:
            print('Connect Error status {0}'.format(rc))            #连接失败并显示错误代码
    
    def on_message(client, userdata, msg):
        print("主题:"+msg.topic + " 消息:" + str(msg.payload.decode('utf-8')))   #接收消息后处理函数
    
    
    def mqtt_subscribe():
        client.on_connect = client.subscribe(sub_topic)                 # 设置连接上服务器回调函数
        client.on_message = on_message                                  # 设置接收到服务器消息回调函数
        #client.loop_forever()                                          # 守护连接状态
    
    def mqtt_publish():
        while True:
            client.publish(pub_topic, payload=str(data), qos=1)         # 发布消息
            time.sleep(2)
    
    if __name__ == '__main__':
        p = threading.Thread(target=mqtt_publish, name="Thread_pub" , args=()).start()    #发布主题线程
        s = threading.Thread(target=mqtt_subscribe, name="Thread_sub", args=()).start()   #订阅主题线程
    —————————————————————————————— 选择正确的事、再把事做正确 ——————————————————————————————
  • 相关阅读:
    简单通讯聊天 群聊功能 Windows下的客户端 Linux下的epoll服务器
    Windows客户端 Linux服务器通讯 字符编码问题
    C++时间标准库时间time和系统时间的使用
    Window7系统安装Ubuntu16双系统
    Window7 系统下重新建立一个新分区
    UltraISO(软碟通) 制作U盘启动盘
    Python 列表反转显示方法
    HTML,CSS,JS个别知识点总结
    Git 创建版本库并实现本地上传数据到GitHub库
    Python爬虫数据保存到MongoDB中
  • 原文地址:https://www.cnblogs.com/airb/p/13229395.html
Copyright © 2011-2022 走看看