一、MQTT库安装
pip install paho-mqtt
二、代码
# coding=utf-8 import paho.mqtt.client as mqtt import time,os,requests,json,threading cafile = r"E:Python ootCA.PEM" #身份认证文件 certfile = r"E:PythonClientCA_11111.PEM" keyfile = r"E:PythonClientKey_11111.PEM" host = "xxxxxx" #主机地址 port = 8883 #端口号 data={"state":{"desired":{"WIFI_AP":"on"}}} #发布信息 sub_topic="@askey/dvr/xxxxxx/status" #订阅主题 pub_topic="@askey/dvr/xxxxxx/status" #发布主题 client = mqtt.Client() #创建一个mqtt对象 client.tls_set(cafile, certfile, keyfile) #加密身份认证 client.connect(host, port, 60) #连接mqtt服务器 client.loop_start() #后台运行一个线程来自动调用loop() def on_connect(client, userdata, flags, rc): if rc == 0: print('连接成功') #0代表连接成功 client.subscribe(sub_topic) #订阅消息 else: print('Connect Error status {0}'.format(rc)) #连接失败并显示错误代码 def on_message(client, userdata, msg): print("主题:"+msg.topic + " 消息:" + str(msg.payload.decode('utf-8'))) #接收消息后处理函数 def mqtt_subscribe(): client.on_connect = client.subscribe(sub_topic) # 设置连接上服务器回调函数 client.on_message = on_message # 设置接收到服务器消息回调函数 #client.loop_forever() # 守护连接状态 def mqtt_publish(): while True: client.publish(pub_topic, payload=str(data), qos=1) # 发布消息 time.sleep(2) if __name__ == '__main__': p = threading.Thread(target=mqtt_publish, name="Thread_pub" , args=()).start() #发布主题线程 s = threading.Thread(target=mqtt_subscribe, name="Thread_sub", args=()).start() #订阅主题线程