zoukankan      html  css  js  c++  java
  • python 多进程队列数据处理

    # -*- coding:utf8 -*-
    import paho.mqtt.client as mqtt
    from multiprocessing import Process, Queue
    import time, random, os
    import camera_person_num
    
    MQTTHOST = "172.19.4.4"
    MQTTPORT = 1883
    mqttClient = mqtt.Client()
    q = Queue() 
    
     
    # 连接MQTT服务器
    def on_mqtt_connect():
        mqttClient.connect(MQTTHOST, MQTTPORT, 60)
        mqttClient.loop_start()
    
      
    # 消息处理函数
    def on_message_come(lient, userdata, msg):
        # print(msg.topic + ":" + str(msg.payload.decode("utf-8")))
        
        q.put(msg.payload.decode("utf-8"))  # 放入队列
        print("产生消息", msg.payload.decode("utf-8"))
        # 消息处理开启多进程
        # p = Process(target=talk, args=("/camera/person/num/result", msg.payload.decode("utf-8")))
        # p.start()
    
    
    def consumer(q, pid):
        print("开启消费序列进程", pid)
        while True:
            msg = q.get()
            # p = Process(target=talk, args=("/camera/person/num/result", msg, pid))
            # p.start()
            talk("/camera/person/num/result", msg, pid)  
    
    
    # subscribe 消息订阅
    def on_subscribe():
        mqttClient.subscribe("test123", 1)  # 主题为"test"
        mqttClient.on_message = on_message_come  # 消息到来处理函数
    
    
    # publish 消息发布
    def on_publish(topic, msg, qos):
        mqttClient.publish(topic, msg, qos);
    
    
    # 多进程中发布消息需要重新初始化mqttClient
    def talk(topic, msg, pid):
        cameraPsersonNum = camera_person_num.CameraPsersonNum(msg)
        t_max, t_mean, t_min = cameraPsersonNum.personNum()
        # time.sleep(20)
        print("消费消息", pid, msg) 
        mqttClient2 = mqtt.Client()
        mqttClient2.connect(MQTTHOST, MQTTPORT, 60)
        mqttClient2.loop_start()
        mqttClient2.publish(topic, '{"max":' + str(t_max) + ',"mean":' + str(t_mean) + ',"min:"' + t_min + '}', 1)
        mqttClient2.disconnect()
    
     
    def main():
        
        on_mqtt_connect()
        on_subscribe()
        for i in range(1, 3):
            c1 = Process(target=consumer, args=(q, i))
            c1.start()
        while True:
            pass
     
     
    if __name__ == '__main__':
        main()
    
  • 相关阅读:
    内容收缩伸展
    分页浏览的导航栏Bootstrap和js两种方法
    Bootstrap-缩略图
    Bootstrap-进度条
    Bootstrap-点击“×”,可以关闭页面
    Bootstrap页头
    Bootstrap分页
    Bootstrap--面包屑路径导航
    Bootstrap--标签和徽章<新闻后面的new hot等>
    NSLog 去除上线版本
  • 原文地址:https://www.cnblogs.com/gmhappy/p/9472370.html
Copyright © 2011-2022 走看看