zoukankan      html  css  js  c++  java
  • Python管道进行数据的吞吐处理

    import multiprocessing
    import random
    import time
    import datetime
    import struct
    import os
    
    import getFile
    
    # 76(28) + (2048 + 16) * 512 + 4
    
    frame_flag_0 = 0x0000000000000001 # 8 byte
    frame_flag_1 = 0x0000000000000002 # 8 byte
    frame_flag_2 = 0x0000000000000003 # 8 byte
    frame_flag_9 = 0x00000001 # 4 byte
    
    dir_name = 'Z:/'
    file_out = 'Z:/312.dat'
    
    def producer(pipe):
        print('start time: ' + datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'))
        linestr = getFile.getFileList(dir_name, '_312_')
        
        for i in range(0, len(linestr)):
            file_in = linestr[i]
            f_in = open(file_in, 'rb')
            f_in.seek(109)
            while True:
                buff = f_in.read(9)
                buff = f_in.read(1792)
                if buff:
                    pipe.send_bytes(buff)
                else:
                    f_in.close()
                    break
                    
        pipe.close()
        
        
    def consumer(pipe):
        f_out = open(file_out, 'xb')
        mm = 0
        while True:
            try:
            
                item = pipe.recv_bytes(1792)
                if item:
                    if mm % 512 == 0: # 开始的首帧
                        f_out.write(struct.pack('>3Q', frame_flag_0, frame_flag_1, frame_flag_2))
                        f_out.write(struct.pack('>I', frame_flag_9))
                        f_out.write(item)
                        f_out.write(struct.pack('>I', 0x00000000) * 68) # 272 byte : 2048 - (896*2) + 16
                        mm = 0
                    elif mm % 512 == 511: # 结尾的结束帧
                        f_out.write(item)
                        f_out.write(struct.pack('>I', 0x00000000) * 68) # 272 byte
                        f_out.write(struct.pack('>I', 0x00000000)) # 4 byte 包尾
                        f_out.flush()
                    else:
                        f_out.write(item)
                        f_out.write(struct.pack('>I', 0x00000000) * 68) # 272 byte
                    mm += 1
                    
            except EOFError:
                f_out.write(struct.pack('>I', 0x00000000) * 516 * (512 - mm)) # (2048 + 16)/4 = 516
                f_out.write(struct.pack('>I', 0x00000000)) # 4 byte 包尾
                f_out.close()
                print('end time: ' + datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'))
                break
                
                
    if __name__ == "__main__":
        pipe = multiprocessing.Pipe()
        process_producer = multiprocessing.Process(target = producer, args = (pipe[0],))
        process_consumer = multiprocessing.Process(target = consumer, args = (pipe[1],))
        process_producer.start()
        process_consumer.start()
        pipe[0].close()
        process_producer.join()
        process_consumer.join()
        
        print('OK')
  • 相关阅读:
    命令模式
    连接mysql数据库,创建用户模型
    管理信息系统的开发与管理
    加载静态文件,父模板的继承和扩展
    夜间模式的开启与关闭,父模板的制作
    完成登录与注册页面的前端
    JavaScript 基础,登录前端验证
    CSS实例:图片导航块
    导航,头部,CSS基础
    web基础,用html元素制作web页面
  • 原文地址:https://www.cnblogs.com/mikew/p/12111032.html
Copyright © 2011-2022 走看看