由于底层的CNC机床必须使用MQTT通信,所以采用MQTT作为底层的通讯协议,再上传到kafka集群。
现在有两种实现方案:一种是使用mqtt-kafka connector,相当于直接使用两者之间的接口通信;另一种是设置一个gateway。
由于暂未找到合适的python接口,所以自己简单实现了一个,代码如下。
mqtt发布端:
import sys import datetime import socket, sys import paho.mqtt.publish as publish import time def transmitMQTT(strMsg): # strMqttBroker = "led_power.mqtt.iot.gz.baidubce.com" # strMqttBroker = "127.0.0.1" strMqttBroker = "192.168.3.10" strMqttChannel = "confluent-kafka-topic" print(len(str(strMsg))) publish.single(strMqttChannel, strMsg, hostname=strMqttBroker) if __name__ == '__main__': while True: transmitMQTT("testtest") time.sleep(0.1) print (time.time())
接口(mqtt收+kafka生产消息):
import confluent_kafka import time import paho.mqtt.client as mqtt import time import sys import random def on_connect(client, userdata, flags, rc): print("Connected with result code"+str(rc)) client.subscribe(topic) def on_message(client, userdata, msg): print(msg.payload) confluent_kafka_producer_performance(msg.payload) def confluent_kafka_producer_performance(msg_payload): # topic = 'confluent-kafka-topic' conf = {'bootstrap.servers': '192.168.3.102:9092'} producer = confluent_kafka.Producer(**conf) messages_to_retry = 0 producer_start = time.time() for i in range(10): try: producer.produce(topic, value=msg_payload) except BufferError as e: messages_to_retry += 1 # hacky retry messages that over filled the local buffer for i in range(messages_to_retry): producer.poll(0) try: producer.produce(topic, value=msg_payload) except BufferError as e: producer.poll(0) producer.produce(topic, value=msg_payload) producer.flush() return time.time() - producer_start topic = 'confluent-kafka-topic' client = mqtt.Client() client.on_connect = on_connect client.on_message = on_message client.connect('192.168.3.10', 1883, 60) client.loop_forever()
kafka接收端:
from confluent_kafka import Consumer, KafkaError settings = { 'bootstrap.servers': '192.168.3.10:9092', 'group.id': 'mygroup', 'client.id': 'client-1', 'enable.auto.commit': True, 'session.timeout.ms': 6000, 'default.topic.config': {'auto.offset.reset': 'smallest'} } c = Consumer(settings) c.subscribe(['confluent-kafka-topic']) try: while True: msg = c.poll(0.1) if msg is None: continue elif not msg.error(): print('Received message: {0}'.format(msg.value())) elif msg.error().code() == KafkaError._PARTITION_EOF: print('End of partition reached {0}/{1}' .format(msg.topic(), msg.partition())) else: print('Error occured: {0}'.format(msg.error().str())) except KeyboardInterrupt: pass finally: c.close()