zoukankan      html  css  js  c++  java
  • python使用mqtt

    (1) 安装paho-mqtt包

    (2) 导入mqttimport paho.mqtt.client as mqttimport threading

    import json
    import paho.mqtt.publish as publish
    
    
    class Thread(threading.Thread):         # 开启一个线程 
        def __init__(self, dat):
            threading.Thread.__init__(self)
            self.Dat = dat
    
        def run(self):
            c = self.Dat
            c.client = mqtt.Client(c.client_id)
            c.client.on_connect = c.on_connect  # 设置连接上服务器回调函数
            c.client.on_message = c.on_message  # 设置接收到服务器消息回调函数
            c.client.on_subscribe = c.on_subscribe
            c.client.on_publish = c.on_publish
            c.client.on_unsubscribe = c.on_unsubscribe
            c.client.on_disconnect = c.on_disconnect
            nRet = False
            try:
                c.client.connect(c.host, c.port, c.keepalive)  # 连接服务器,端口为1883,维持心跳为60秒
                nRet = True
            except Exception as e:
                print('MQTT errorA', e)  # 打错误
            c.client.loop_forever(retry_first_connection=True)
    
    
    class ClientProtcal(object):
        def __init__(self):
            pass
    
        def Byte(self, op):
            data = json.dumps(op).encode('utf-8')
            return data
    
        def Parse(self, buf):
            op = None
            try:
                op = json.loads(buf.decode('utf-8'))
            except Exception as e:
                print('error', e)
            return op
    
    
    class MqttClient(object):
        protcal = ClientProtcal()
        logName = ''
    
        def __init__(self, obj, host, port=1883, keepalive=60, bind_address="", log=None):
            if log:
                self.l = log
            self.name = 'Mqtt'  # 类名
            self.obj = obj      # 对象
            self.q = obj.q
            self.topical = obj.topical      # 主题
            self.client_id = obj.client_id
            self.host = host
            self.port = port
            self.keepalive = keepalive
            self.bind_address = bind_address
            self.client = None
            self.thread = Thread(self)
            self.thread.start()
    
        def log(self, msg):
            if self.l:
                self.l.logInfo(msg)
    
        def setName(self, name):
            self.name = name
    
        def on_connect(self, client, userdata, flags, rc):  # 连接成功
            if self.client:
                for t in self.topical:
                    
                    # topical = '%s%s' % ('', t)
                    self.client.subscribe(t)  # 订阅主题
                    self.log('订阅主题:[%s]' % t)
    
        def subscribes(self, topicals):
            for topical in topicals:
                self.subscribe(topical)
    
        def subscribe(self, topical):
            if self.client:
                self.client.subscribe(topical)
                self.log('订阅主题:[%s]' % topical)
    
        def unSubscribe(self, topical):
            if self.client:
                self.client.unsubscribe(topical)
                self.log('取消订阅:[%s]' % topical)
    
        def on_message(self, client, userdata, msg):  # 接收到消息
            op = self.protcal.Parse(msg.payload)
            s = '收到[%s]推送:[%s]' % (msg.topic, str(op))
         print(s)
    self.q.put(op) def on_socket_close(self, client, userdata, socket): self.obj.strNetStaut = 'OffLine' self.obj.netState = 0 pass def close(self): if self.client: self.client.disconnect() def Send(self, topical, op): # 发送消息 if self.client: payload = MqttClient.protcal.Byte(op) res = self.client.publish(topical, payload) @classmethod def single(cls, stopic, payload, host, port): d = MqttClient.protcal.Byte(payload) try: publish.single(stopic, payload=d, hostname=host, client_id="lora1", port=port, protocol=mqtt.MQTTv311) MqttClient.log(MqttClient.logName, '推送主题:[%s][%s]' % (stopic, str(payload))) except Exception as e: print("publish.single err:", e)

    (3)在需要的用到的地方导入类

    mq=MqttClient(obj=self.obj,host=mqttIp,port=mqttPort,keepalive=60)

    (4) 发送消息:

    mq.send("topicl","message")

    (5) 接受消息

    class CHMTestThread(threading.Thread):  # 线程获取消息,做相应处理
        def __init__(self, dat):
            threading.Thread.__init__(self)
            self.Dat = dat
            self.RunFlag = True
    
        def run(self):
            obj= self.Dat 
            q = obj.q  
            while self.RunFlag:
                op = q.get()
                if op:
                    obj.handle(op)

    (6) 在主对象中提供一个处理函数。

    class obj(object):
        def __init__():
           print("init")
        def handle(message):
           print(message)
  • 相关阅读:
    学习制作操作系统 0
    阅读《C陷阱与缺陷》的知识增量
    CSS 优先级和特指度
    openCV2马拉松第19圈——Harris角点检測(自己实现)
    Cacti监控mysql数据库server实现过程
    ledisdb:支持类redis接口的嵌入式nosql
    03005_SQL查询语句
    通过smtp直接发送邮件
    XML 解析默认去掉命名空间和注释
    C# /VB.NET 创建PDF项目符号列表和多级编号列表
  • 原文地址:https://www.cnblogs.com/countryboy666/p/14076831.html
Copyright © 2011-2022 走看看