zoukankan      html  css  js  c++  java
  • kafka + mqtt接口实现

    由于底层的CNC机床必须使用MQTT通信,所以采用MQTT作为底层的通讯协议,再上传到kafka集群。

    现在有两种实现方案:一种是使用mqtt-kafka connector,相当于直接使用两者之间的接口通信;另一种是设置一个gateway。

    由于暂未找到合适的python接口,所以自己简单实现了一个,代码如下。

     mqtt发布端:

    import sys
    import datetime
    import socket, sys
    import paho.mqtt.publish as publish
    import time
    
    def transmitMQTT(strMsg):
        # strMqttBroker = "led_power.mqtt.iot.gz.baidubce.com"
        # strMqttBroker = "127.0.0.1"
        strMqttBroker = "192.168.3.10"
        strMqttChannel = "confluent-kafka-topic"
        print(len(str(strMsg)))
        publish.single(strMqttChannel, strMsg, hostname=strMqttBroker)
    
    if __name__ == '__main__':
        while True:
            transmitMQTT("testtest")
        time.sleep(0.1)
    
            print (time.time())

    接口(mqtt收+kafka生产消息):

    import confluent_kafka
    import time
    import paho.mqtt.client as mqtt
    import time
    import sys
    import random
    
    
    def on_connect(client, userdata, flags, rc):
        print("Connected with result code"+str(rc))
        client.subscribe(topic)
    
    
    def on_message(client, userdata, msg):
        print(msg.payload)
        confluent_kafka_producer_performance(msg.payload)
    
    
    def confluent_kafka_producer_performance(msg_payload):
        # topic = 'confluent-kafka-topic'
        conf = {'bootstrap.servers': '192.168.3.102:9092'}
        producer = confluent_kafka.Producer(**conf)
        messages_to_retry = 0
    
        producer_start = time.time()
        for i in range(10):
            try:
                producer.produce(topic, value=msg_payload)
            except BufferError as e:
                messages_to_retry += 1
    
        # hacky retry messages that over filled the local buffer
        for i in range(messages_to_retry):
            producer.poll(0)
            try:
                producer.produce(topic, value=msg_payload)
            except BufferError as e:
                producer.poll(0)
                producer.produce(topic, value=msg_payload)
    
        producer.flush()
    
        return time.time() - producer_start
    
    
    topic = 'confluent-kafka-topic'
    client = mqtt.Client()
    client.on_connect = on_connect
    client.on_message = on_message
    client.connect('192.168.3.10', 1883, 60)
    client.loop_forever()

    kafka接收端:

    from confluent_kafka import Consumer, KafkaError
    
    settings = {
        'bootstrap.servers': '192.168.3.10:9092',
        'group.id': 'mygroup',
        'client.id': 'client-1',
        'enable.auto.commit': True,
        'session.timeout.ms': 6000,
        'default.topic.config': {'auto.offset.reset': 'smallest'}
    }
    
    c = Consumer(settings)
    
    c.subscribe(['confluent-kafka-topic'])
    
    try:
        while True:
            msg = c.poll(0.1)
            if msg is None:
                continue
            elif not msg.error():
                print('Received message: {0}'.format(msg.value()))
            elif msg.error().code() == KafkaError._PARTITION_EOF:
                print('End of partition reached {0}/{1}'
                      .format(msg.topic(), msg.partition()))
            else:
                print('Error occured: {0}'.format(msg.error().str()))
    
    except KeyboardInterrupt:
        pass
    
    finally:
        c.close()
  • 相关阅读:
    IDEA在编辑时提示could not autowire
    python基础——使用__slots__
    python基础——实例属性和类属性
    python基础——继承和多态
    python基础——访问限制
    python基础——面向对象编程
    python基础——第三方模块
    python基础——使用模块
    python基础——模块
    Python tkinter调整元件在窗口中的位置与几何布局管理
  • 原文地址:https://www.cnblogs.com/ryu-manager/p/9472867.html
Copyright © 2011-2022 走看看