zoukankan      html  css  js  c++  java
  • 转: python 利用EMQ实现消费者和生产者模型

    消费者

    """
    测试emq-消费者
    @author me
    """
    import paho.mqtt.client as mqtt
    import time
     
     
    class Consumer(object):
     
        def get_time(self):
            """
        获取时间
        """
            return time.strftime("%Y-%m-%d %H:%M:%S",time.localtime(time.time()))
     
        def on_subscribe(self, client, userdata, mid, granted_qos):
            """
            开始时订阅 callback
            :param userdata:
            :param mid:
            :param granted_qos:
            :return:
            """
            print(self.get_time(), "Begin subscribe topic with ", mid)
     
        def on_message(self, client, userdata, message):
            """
            接收消息 callback
            :param userdata:
            :param message:
            :return:
            """
            print(self.get_time(), " Received message '" + str(message.payload) + "' on topic '" +
                  message.topic + "' with QoS " + str(message.qos))
     
        def on_connect(self, client, userdata, flags, rc):
            """
            连接时的 callback
            :param client:
            :param userdata:
            :param flags:
            :param rc:
            :return:
            """
            print(self.get_time(), "[consumer]Connected with result code " + str(rc))
            if rc == 0:
                sub_result = client.subscribe("/chat/room/4", qos=0)
                print(self.get_time(), "Connected with result is (status,mid)", sub_result)
            else:
                print(self.get_time(), " connect failed")
     
        def run(self):
            # 4就是MQTT3.1.1
            emq_client = mqtt.Client(client_id="emqttd_2018080922", userdata=None, protocol=4)
            emq_client.on_connect = self.on_connect
            # emq_client.on_disconnect = self.on_disconnect
            emq_client.on_message = self.on_message
            emq_client.on_subscribe = self.on_subscribe
            # 设置用户密码,如果没有设置用户,这里可以省略
            emq_client.username_pw_set('admin', "123.com")
            emq_client.connect("192.168.0.251", 1883, keepalive=60)
            emq_client.loop_forever()
     
     
    if __name__ == "__main__":
        consumer = Consumer()
        consumer.run()
    

      

    生产者

    """
    测试emq-生产者
    @author me
    """
    import paho.mqtt.client as mqtt
    import time
     
     
    class Producer(object):
     
        def get_time(self):
            """
          获取时间
          """
            return time.strftime("%Y-%m-%d %H:%M:%S",time.localtime(time.time()))
     
        def on_disconnect(self, client, userdata, rc):
            """
            rc 是操作结果的状态码,0 代表成功
            断开连接时的 callback
            """
            print(self.get_time(), " end a loop with code "+str(rc))
     
        def pub_topic(self, client, topic_text):
            """
            发布topic
            """
            try:
                pub_result = client.publish("/chat/room/4", payload=topic_text, qos=0, retain=False)
                if pub_result.is_published:
                    print(self.get_time(), " success pub message with id: ", pub_result.mid)
                else:
                    print(self.get_time(), "failed to  pub message")
            except ValueError as err_str:
                print(self.get_time(), "please check your parameters: ", err_str)
     
        def on_connect(self, client, userdata, flags, rc):
            """
            连接broker时的callback
            """
            print(self.get_time(), "[producer]Connected with result code " + str(rc))
            if rc == 0:
                while True:
                    topic_text = input("Enter your topic text,('end' to end a loop): ")
                    if topic_text == "end":
                        print(self.get_time(), "EXIT ..... ")
                        client.disconnect()
                        break
                    else:
                        self.pub_topic(client, topic_text)
            else:
                print(self.get_time(), "Connected Failed, Exited ")
                client.disconnect()
     
        def run(self):
            # 4就是MQTT3.1.1
            emq_client = mqtt.Client(client_id="emqttd_2018080946", userdata=None, protocol=4)
            emq_client.on_connect = self.on_connect
            emq_client.on_disconnect = self.on_disconnect
            # 设置用户密码,如果没有设置用户,这里可以省略
            emq_client.username_pw_set('admin', "123.com")
            emq_client.connect("192.168.0.250", 1883, keepalive=60)
            emq_client.loop_forever()
     
        def main(self):
            self.run()
            while True:
                start = input("Enter your start sign,('no' to end a  program): ")
                if start != 'no':
                    self.run()
                else:
                    print(self.get_time(), "Exiting program ")
                    break
     
     
    if __name__ == "__main__":
        producer = Producer()
        producer.main()
    

      

    运行

    首先,启动消费者,然后启动生产者,就可以输入消息体了,输入 end 表示发布之前的消息,no 结束生产者。
    生产者
    消费者

  • 相关阅读:
    Borland C++ Builder Practical learning series
    vmware打开vmx文件不能创建虚拟机的问题
    c3p0 连接数据库失败的问题
    外在 挺直背和走路的问题
    JAVAWEB tomcat服务器启动错误原因总结
    JAVAWEB 项目注册登录模块问题总结
    JAVA eclipse Maven项目红叹号解决方案
    JAVA 文件读取写入后 md5值不变的方法
    Git的安装配置(win环境)
    JAVA 静态方法和实例方法的区别 (图表)
  • 原文地址:https://www.cnblogs.com/saryli/p/9766141.html
Copyright © 2011-2022 走看看