zoukankan      html  css  js  c++  java
  • Python管道进行数据的吞吐处理

    import multiprocessing
    import random
    import time
    import datetime
    import struct
    import os
    
    import getFile
    
    # 76(28) + (2048 + 16) * 512 + 4
    
    frame_flag_0 = 0x0000000000000001 # 8 byte
    frame_flag_1 = 0x0000000000000002 # 8 byte
    frame_flag_2 = 0x0000000000000003 # 8 byte
    frame_flag_9 = 0x00000001 # 4 byte
    
    dir_name = 'Z:/'
    file_out = 'Z:/312.dat'
    
    def producer(pipe):
        print('start time: ' + datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'))
        linestr = getFile.getFileList(dir_name, '_312_')
        
        for i in range(0, len(linestr)):
            file_in = linestr[i]
            f_in = open(file_in, 'rb')
            f_in.seek(109)
            while True:
                buff = f_in.read(9)
                buff = f_in.read(1792)
                if buff:
                    pipe.send_bytes(buff)
                else:
                    f_in.close()
                    break
                    
        pipe.close()
        
        
    def consumer(pipe):
        f_out = open(file_out, 'xb')
        mm = 0
        while True:
            try:
            
                item = pipe.recv_bytes(1792)
                if item:
                    if mm % 512 == 0: # 开始的首帧
                        f_out.write(struct.pack('>3Q', frame_flag_0, frame_flag_1, frame_flag_2))
                        f_out.write(struct.pack('>I', frame_flag_9))
                        f_out.write(item)
                        f_out.write(struct.pack('>I', 0x00000000) * 68) # 272 byte : 2048 - (896*2) + 16
                        mm = 0
                    elif mm % 512 == 511: # 结尾的结束帧
                        f_out.write(item)
                        f_out.write(struct.pack('>I', 0x00000000) * 68) # 272 byte
                        f_out.write(struct.pack('>I', 0x00000000)) # 4 byte 包尾
                        f_out.flush()
                    else:
                        f_out.write(item)
                        f_out.write(struct.pack('>I', 0x00000000) * 68) # 272 byte
                    mm += 1
                    
            except EOFError:
                f_out.write(struct.pack('>I', 0x00000000) * 516 * (512 - mm)) # (2048 + 16)/4 = 516
                f_out.write(struct.pack('>I', 0x00000000)) # 4 byte 包尾
                f_out.close()
                print('end time: ' + datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'))
                break
                
                
    if __name__ == "__main__":
        pipe = multiprocessing.Pipe()
        process_producer = multiprocessing.Process(target = producer, args = (pipe[0],))
        process_consumer = multiprocessing.Process(target = consumer, args = (pipe[1],))
        process_producer.start()
        process_consumer.start()
        pipe[0].close()
        process_producer.join()
        process_consumer.join()
        
        print('OK')
  • 相关阅读:
    Hadoop运行环境搭建
    大数据技术快速扫盲篇
    基于Cloudera Manager Server的WebUI添加Hive服务实战案例
    基于Cloudera Manager Server的WebUI添加Spark服务实战案例
    zabbix的聚合图形配置实战案例
    zabbix配置短信报警概述
    zabbix配置微信报警实战案例
    基于Zabbix WebUI的API实现自动化添加主机实战案例
    zabbix监控 Nginx web页面实战案例
    zabbix agent批量安装并自动监控TCP的11种状态实战案例
  • 原文地址:https://www.cnblogs.com/mikew/p/12111032.html
Copyright © 2011-2022 走看看